summaryrefslogtreecommitdiff
path: root/src/Erebos/Discovery.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Discovery.hs')
-rw-r--r--src/Erebos/Discovery.hs386
1 files changed, 293 insertions, 93 deletions
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs
index 2fb0ffe..5590e4c 100644
--- a/src/Erebos/Discovery.hs
+++ b/src/Erebos/Discovery.hs
@@ -1,4 +1,5 @@
{-# LANGUAGE CPP #-}
+{-# LANGUAGE OverloadedStrings #-}
module Erebos.Discovery (
DiscoveryService(..),
@@ -6,6 +7,7 @@ module Erebos.Discovery (
DiscoveryConnection(..),
discoverySearch,
+ discoverySetupTunnel,
) where
import Control.Concurrent
@@ -13,7 +15,6 @@ import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
-import Data.IP qualified as IP
import Data.List
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as M
@@ -25,15 +26,17 @@ import Data.Text (Text)
import Data.Text qualified as T
import Data.Word
-import Network.Socket
+import Text.Read
#ifdef ENABLE_ICE_SUPPORT
import Erebos.ICE
#endif
import Erebos.Identity
import Erebos.Network
+import Erebos.Network.Address
import Erebos.Object
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Storable
@@ -45,18 +48,25 @@ type IceRemoteInfo = Stored Object
data DiscoveryService
- = DiscoverySelf [ Text ] (Maybe Int)
- | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16)
+ = DiscoverySelf [ DiscoveryAddress ] (Maybe Int)
+ | DiscoveryAcknowledged [ DiscoveryAddress ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16)
| DiscoverySearch (Either Ref RefDigest)
- | DiscoveryResult (Either Ref RefDigest) [ Text ]
+ | DiscoveryResult (Either Ref RefDigest) [ DiscoveryAddress ]
| DiscoveryConnectionRequest DiscoveryConnection
| DiscoveryConnectionResponse DiscoveryConnection
+data DiscoveryAddress
+ = DiscoveryIP InetAddress PortNumber
+ | DiscoveryICE
+ | DiscoveryTunnel
+ | DiscoveryOther Text
+
data DiscoveryAttributes = DiscoveryAttributes
{ discoveryStunPort :: Maybe Word16
, discoveryStunServer :: Maybe Text
, discoveryTurnPort :: Maybe Word16
, discoveryTurnServer :: Maybe Text
+ , discoveryProvideTunnel :: Peer -> PeerAddress -> Bool
}
defaultDiscoveryAttributes :: DiscoveryAttributes
@@ -65,12 +75,14 @@ defaultDiscoveryAttributes = DiscoveryAttributes
, discoveryStunServer = Nothing
, discoveryTurnPort = Nothing
, discoveryTurnServer = Nothing
+ , discoveryProvideTunnel = \_ _ -> False
}
data DiscoveryConnection = DiscoveryConnection
{ dconnSource :: Either Ref RefDigest
, dconnTarget :: Either Ref RefDigest
, dconnAddress :: Maybe Text
+ , dconnTunnel :: Bool
, dconnIceInfo :: Maybe IceRemoteInfo
}
@@ -78,6 +90,7 @@ emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConn
emptyConnection dconnSource dconnTarget = DiscoveryConnection {..}
where
dconnAddress = Nothing
+ dconnTunnel = False
dconnIceInfo = Nothing
instance Storable DiscoveryService where
@@ -101,11 +114,12 @@ instance Storable DiscoveryService where
DiscoveryConnectionResponse conn -> storeConnection "response" conn
where
- storeConnection ctype DiscoveryConnection {..} = do
+ storeConnection (ctype :: Text) DiscoveryConnection {..} = do
storeText "connection" $ ctype
either (storeRawRef "source") (storeRawWeak "source") dconnSource
either (storeRawRef "target") (storeRawWeak "target") dconnTarget
storeMbText "address" dconnAddress
+ when dconnTunnel $ storeEmpty "tunnel"
storeMbRef "ice-info" dconnIceInfo
load' = loadRec $ msum
@@ -138,7 +152,7 @@ instance Storable DiscoveryService where
, loadConnection "response" DiscoveryConnectionResponse
]
where
- loadConnection ctype ctor = do
+ loadConnection (ctype :: Text) ctor = do
ctype' <- loadText "connection"
guard $ ctype == ctype'
dconnSource <- msum
@@ -150,13 +164,37 @@ instance Storable DiscoveryService where
, Right <$> loadRawWeak "target"
]
dconnAddress <- loadMbText "address"
+ dconnTunnel <- isJust <$> loadMbEmpty "tunnel"
dconnIceInfo <- loadMbRef "ice-info"
return $ ctor DiscoveryConnection {..}
+instance StorableText DiscoveryAddress where
+ toText = \case
+ DiscoveryIP addr port -> T.unwords [ T.pack $ show addr, T.pack $ show port ]
+ DiscoveryICE -> "ICE"
+ DiscoveryTunnel -> "tunnel"
+ DiscoveryOther str -> str
+
+ fromText str = return $ if
+ | [ addrStr, portStr ] <- T.words str
+ , Just addr <- readMaybe $ T.unpack addrStr
+ , Just port <- readMaybe $ T.unpack portStr
+ -> DiscoveryIP addr port
+
+ | "ice" <- T.toLower str
+ -> DiscoveryICE
+
+ | "tunnel" <- str
+ -> DiscoveryTunnel
+
+ | otherwise
+ -> DiscoveryOther str
+
+
data DiscoveryPeer = DiscoveryPeer
{ dpPriority :: Int
, dpPeer :: Maybe Peer
- , dpAddress :: [ Text ]
+ , dpAddress :: [ DiscoveryAddress ]
, dpIceSession :: Maybe IceSession
}
@@ -169,7 +207,11 @@ emptyPeer = DiscoveryPeer
}
data DiscoveryPeerState = DiscoveryPeerState
- { dpsStunServer :: Maybe ( Text, Word16 )
+ { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ]
+ -- ( original target, our write stream )
+ , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ]
+ -- ( original source, ( from source, to target ))
+ , dpsStunServer :: Maybe ( Text, Word16 )
, dpsTurnServer :: Maybe ( Text, Word16 )
, dpsIceConfig :: Maybe IceConfig
}
@@ -187,7 +229,9 @@ instance Service DiscoveryService where
type ServiceState DiscoveryService = DiscoveryPeerState
emptyServiceState _ = DiscoveryPeerState
- { dpsStunServer = Nothing
+ { dpsOurTunnelRequests = []
+ , dpsRelayedTunnelRequests = []
+ , dpsStunServer = Nothing
, dpsTurnServer = Nothing
, dpsIceConfig = Nothing
}
@@ -202,26 +246,22 @@ instance Service DiscoveryService where
DiscoverySelf addrs priority -> do
pid <- asks svcPeerIdentity
peer <- asks svcPeer
+ paddrs <- getPeerAddresses peer
+
let insertHelper new old | dpPriority new > dpPriority old = new
| otherwise = old
- matchedAddrs <- fmap catMaybes $ forM addrs $ \addr -> if
- | addr == T.pack "ICE" -> do
- return $ Just addr
- | [ ipaddr, port ] <- words (T.unpack addr)
- , DatagramAddress paddr <- peerAddress peer -> do
- saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- return $ if paddr == addrAddress saddr
- then Just addr
- else Nothing
-
- | otherwise -> return Nothing
+ let matchedAddrs = flip filter addrs $ \case
+ DiscoveryICE -> True
+ DiscoveryIP ipaddr port ->
+ DatagramAddress (inetToSockAddr ( ipaddr, port )) `elem` paddrs
+ _ -> False
forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do
let dp = DiscoveryPeer
{ dpPriority = fromMaybe 0 priority
, dpPeer = Just peer
- , dpAddress = addrs
+ , dpAddress = matchedAddrs
, dpIceSession = Nothing
}
svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s }
@@ -233,14 +273,8 @@ instance Service DiscoveryService where
(discoveryTurnPort attrs)
DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do
- paddr <- asks (peerAddress . svcPeer) >>= return . \case
- (DatagramAddress saddr) -> case IP.fromSockAddr saddr of
- Just (IP.IPv6 ipv6, _)
- | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6
- -> Just $ T.pack $ show (IP.toIPv4w ipv4)
- Just (addr, _)
- -> Just $ T.pack $ show addr
- _ -> Nothing
+ paddr <- asks svcPeerAddress >>= return . \case
+ (DatagramAddress saddr) -> T.pack . show . fst <$> inetFromSockAddr saddr
_ -> Nothing
let toIceServer Nothing Nothing = Nothing
@@ -255,10 +289,17 @@ instance Service DiscoveryService where
DiscoverySearch edgst -> do
dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal
- replyPacket $ DiscoveryResult edgst $ maybe [] dpAddress dpeer
+ peer <- asks svcPeer
+ paddr <- asks svcPeerAddress
+ attrs <- asks svcAttributes
+ let offerTunnel
+ | discoveryProvideTunnel attrs peer paddr = (++ [ DiscoveryTunnel ])
+ | otherwise = id
+ replyPacket $ DiscoveryResult edgst $ maybe [] (offerTunnel . dpAddress) dpeer
- DiscoveryResult edgst [] -> do
- svcPrint $ "Discovery: " ++ show (either refDigest id edgst) ++ " not found"
+ DiscoveryResult _ [] -> do
+ -- not found
+ return ()
DiscoveryResult edgst addrs -> do
let dgst = either refDigest id edgst
@@ -269,56 +310,82 @@ instance Service DiscoveryService where
discoveryPeer <- asks svcPeer
let runAsService = runPeerService @DiscoveryService discoveryPeer
- forM_ addrs $ \addr -> if
- | addr == T.pack "ICE"
- -> do
-#ifdef ENABLE_ICE_SUPPORT
- getIceConfig >>= \case
- Just config -> void $ liftIO $ forkIO $ do
- ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
- rinfo <- iceRemoteInfo ice
-
- -- Try to promote weak ref to normal one for older peers:
- edgst' <- case edgst of
- Left r -> return (Left r)
- Right d -> refFromDigest st d >>= \case
- Just r -> return (Left r)
- Nothing -> return (Right d)
-
- res <- runExceptT $ sendToPeer discoveryPeer $
- DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo }
- case res of
- Right _ -> return ()
- Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
-
+ let tryAddresses = \case
+ DiscoveryIP ipaddr port : _ -> do
+ void $ liftIO $ forkIO $ do
+ let saddr = inetToSockAddr ( ipaddr, port )
+ peer <- serverPeer server saddr
runAsService $ do
- let upd dp = dp { dpIceSession = Just ice }
+ let upd dp = dp { dpPeer = Just peer }
svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
- Nothing -> do
- return ()
+ DiscoveryICE : rest -> do
+#ifdef ENABLE_ICE_SUPPORT
+ getIceConfig >>= \case
+ Just config -> do
+ void $ liftIO $ forkIO $ do
+ ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
+ rinfo <- iceRemoteInfo ice
+
+ -- Try to promote weak ref to normal one for older peers:
+ edgst' <- case edgst of
+ Left r -> return (Left r)
+ Right d -> refFromDigest st d >>= \case
+ Just r -> return (Left r)
+ Nothing -> return (Right d)
+
+ res <- runExceptT $ sendToPeer discoveryPeer $
+ DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo }
+ case res of
+ Right _ -> return ()
+ Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
+
+ runAsService $ do
+ let upd dp = dp { dpIceSession = Just ice }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
+
+ Nothing -> do
#endif
- return ()
-
- | [ ipaddr, port ] <- words (T.unpack addr) -> do
- void $ liftIO $ forkIO $ do
- saddr <- head <$>
- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- peer <- serverPeer server (addrAddress saddr)
- runAsService $ do
- let upd dp = dp { dpPeer = Just peer }
- svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
+ tryAddresses rest
- | otherwise -> do
- svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr
+ DiscoveryTunnel : _ -> do
+ discoverySetupTunnelResponse dgst
+
+ addr : rest -> do
+ svcPrint $ "Discovery: unsupported address in result: " ++ T.unpack (toText addr)
+ tryAddresses rest
+
+ [] -> svcPrint $ "Discovery: no (supported) address received for " <> show dgst
+
+ tryAddresses addrs
DiscoveryConnectionRequest conn -> do
self <- svcSelf
+ attrs <- asks svcAttributes
let rconn = emptyConnection (dconnSource conn) (dconnTarget conn)
if either refDigest id (dconnTarget conn) `elem` identityDigests self
then if
+ -- request for us, create ICE sesssion or tunnel
+ | dconnTunnel conn -> do
+ receivedStreams >>= \case
+ (tunnelReader : _) -> do
+ tunnelWriter <- openStream
+ replyPacket $ DiscoveryConnectionResponse rconn
+ { dconnTunnel = True
+ }
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ server <- asks svcServer
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+
+ [] -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (endpoint)"
+
#ifdef ENABLE_ICE_SUPPORT
- -- request for us, create ICE sesssion
| Just prinfo <- dconnIceInfo conn -> do
server <- asks svcServer
peer <- asks svcPeer
@@ -338,31 +405,72 @@ instance Service DiscoveryService where
svcPrint $ "Discovery: unsupported connection request"
else do
- -- request to some of our peers, relay
- mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
- case mbdp of
+ -- request to some of our peers, relay
+ peer <- asks svcPeer
+ paddr <- asks svcPeerAddress
+ mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
+ streams <- receivedStreams
+ case mbdp of
Nothing -> replyPacket $ DiscoveryConnectionResponse rconn
Just dp
- | Just dpeer <- dpPeer dp -> do
- sendToPeer dpeer $ DiscoveryConnectionRequest conn
+ | Just dpeer <- dpPeer dp -> if
+ | dconnTunnel conn -> if
+ | not (discoveryProvideTunnel attrs peer paddr) -> do
+ replyPacket $ DiscoveryConnectionResponse rconn
+ | fromSource : _ <- streams -> do
+ void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ toTarget <- openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest conn
+ | otherwise -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (relay)"
+ | otherwise -> do
+ sendToPeer dpeer $ DiscoveryConnectionRequest conn
| otherwise -> svcPrint $ "Discovery: failed to relay connection request"
DiscoveryConnectionResponse conn -> do
self <- svcSelf
+ dps <- svcGet
dpeers <- dgsPeers <$> svcGetGlobal
+
if either refDigest id (dconnSource conn) `elem` identityDigests self
- then do
+ then do
-- response to our request, try to connect to the peer
server <- asks svcServer
- if | Just addr <- dconnAddress conn
- , [ipaddr, port] <- words (T.unpack addr) -> do
- saddr <- liftIO $ head <$>
- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- peer <- liftIO $ serverPeer server (addrAddress saddr)
+ if
+ | Just addr <- dconnAddress conn
+ , [ addrStr, portStr ] <- words (T.unpack addr)
+ , Just ipaddr <- readMaybe addrStr
+ , Just port <- readMaybe portStr
+ -> do
+ let saddr = inetToSockAddr ( ipaddr, port )
+ peer <- liftIO $ serverPeer server saddr
let upd dp = dp { dpPeer = Just peer }
svcModifyGlobal $ \s -> s
{ dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s }
+ | dconnTunnel conn
+ , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps)
+ -> do
+ receivedStreams >>= \case
+ tunnelReader : _ -> do
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+ [] -> do
+ svcPrint $ "Discovery: missing stream in tunnel response"
+ liftIO $ closeStream tunnelWriter
+
+ | Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps)
+ -> do
+ svcPrint $ "Discovery: tunnel request failed"
+ liftIO $ closeStream tunnelWriter
+
#ifdef ENABLE_ICE_SUPPORT
| Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers
, Just ice <- dpIceSession dp
@@ -371,24 +479,49 @@ instance Service DiscoveryService where
#endif
| otherwise -> svcPrint $ "Discovery: connection request failed"
- else do
- -- response to relayed request
- case M.lookup (either refDigest id $ dconnSource conn) dpeers of
- Just dp | Just dpeer <- dpPeer dp -> do
+ else do
+ -- response to relayed request
+ streams <- receivedStreams
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) }
+
+ case M.lookup (either refDigest id $ dconnSource conn) dpeers of
+ Just dp | Just dpeer <- dpPeer dp -> if
+ -- successful tunnel request
+ | dconnTunnel conn
+ , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps)
+ , fromTarget : _ <- streams
+ -> liftIO $ do
+ toSourceVar <- newEmptyMVar
+ void $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ liftIO . putMVar toSourceVar =<< openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionResponse conn
+ void $ forkIO $ do
+ relayStream fromSource toTarget
+ void $ forkIO $ do
+ toSource <- readMVar toSourceVar
+ relayStream fromTarget toSource
+
+ -- failed tunnel request
+ | Just ( _, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps)
+ -> do
+ liftIO $ closeStream toTarget
sendToPeer dpeer $ DiscoveryConnectionResponse conn
- _ -> svcPrint $ "Discovery: failed to relay connection response"
+
+ | otherwise -> do
+ sendToPeer dpeer $ DiscoveryConnectionResponse conn
+ _ -> svcPrint $ "Discovery: failed to relay connection response"
serviceNewPeer = do
server <- asks svcServer
peer <- asks svcPeer
- let addrToText saddr = do
- ( addr, port ) <- IP.fromSockAddr saddr
- Just $ T.pack $ show addr <> " " <> show port
addrs <- concat <$> sequence
- [ catMaybes . map addrToText <$> liftIO (getServerAddresses server)
+ [ catMaybes . map (fmap (uncurry DiscoveryIP) . inetFromSockAddr) <$> liftIO (getServerAddresses server)
#ifdef ENABLE_ICE_SUPPORT
- , return [ T.pack "ICE" ]
+ , return [ DiscoveryICE ]
#endif
]
@@ -437,7 +570,7 @@ discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> R
discoverySearch server dgst = do
peers <- liftIO $ getCurrentPeerList server
match <- forM peers $ \peer -> do
- peerIdentity peer >>= \case
+ getPeerIdentity peer >>= \case
PeerIdentityFull pid -> do
return $ dgst `elem` identityDigests pid
_ -> return False
@@ -447,3 +580,70 @@ discoverySearch server dgst = do
}
forM_ peers $ \peer -> do
sendToPeer peer $ DiscoverySearch $ Right dgst
+
+
+data TunnelAddress = TunnelAddress
+ { tunnelVia :: Peer
+ , tunnelIdentity :: UnifiedIdentity
+ , tunnelStreamNumber :: Int
+ , tunnelReader :: StreamReader
+ , tunnelWriter :: StreamWriter
+ }
+
+instance Eq TunnelAddress where
+ x == y = (==)
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Ord TunnelAddress where
+ compare x y = compare
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Show TunnelAddress where
+ show tunnel = concat
+ [ "tunnel@"
+ , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel
+ , "/" <> show (tunnelStreamNumber tunnel)
+ ]
+
+instance PeerAddressType TunnelAddress where
+ sendBytesToAddress TunnelAddress {..} bytes = do
+ writeStream tunnelWriter bytes
+
+ connectionToAddressClosed TunnelAddress {..} = do
+ closeStream tunnelWriter
+
+relayStream :: StreamReader -> StreamWriter -> IO ()
+relayStream r w = do
+ p <- readStreamPacket r
+ writeStreamPacket w p
+ case p of
+ StreamClosed {} -> return ()
+ _ -> relayStream r w
+
+receiveFromTunnel :: Server -> TunnelAddress -> IO ()
+receiveFromTunnel server taddr = do
+ p <- readStreamPacket (tunnelReader taddr)
+ case p of
+ StreamData {..} -> do
+ receivedFromCustomAddress server taddr stpData
+ receiveFromTunnel server taddr
+ StreamClosed {} -> do
+ return ()
+
+
+discoverySetupTunnel :: Peer -> RefDigest -> IO ()
+discoverySetupTunnel via target = do
+ runPeerService via $ do
+ discoverySetupTunnelResponse target
+
+discoverySetupTunnelResponse :: RefDigest -> ServiceHandler DiscoveryService ()
+discoverySetupTunnelResponse target = do
+ self <- refDigest . storedRef . idData <$> svcSelf
+ stream <- openStream
+ svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest
+ (emptyConnection (Right self) (Right target))
+ { dconnTunnel = True
+ }