diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2025-06-28 21:21:37 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-06-29 09:00:20 +0200 |
commit | d112701ce72b090b79f101f6a2d0ccac7440762e (patch) | |
tree | 9cf1f41f8baf8114c57372d05520e146ac0fe814 /src/Erebos/Discovery.hs | |
parent | 1355cf4d6bc293ab2c60f2fe4bb9bfae1caa5b7c (diff) |
Discovery tunnel
Changelog: Support tunnel for peers in discovery service
Diffstat (limited to 'src/Erebos/Discovery.hs')
-rw-r--r-- | src/Erebos/Discovery.hs | 161 |
1 files changed, 150 insertions, 11 deletions
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 2fb0ffe..5de9869 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -6,6 +6,7 @@ module Erebos.Discovery ( DiscoveryConnection(..), discoverySearch, + discoverySetupTunnel, ) where import Control.Concurrent @@ -34,6 +35,7 @@ import Erebos.Identity import Erebos.Network import Erebos.Object import Erebos.Service +import Erebos.Service.Stream import Erebos.Storable @@ -71,6 +73,7 @@ data DiscoveryConnection = DiscoveryConnection { dconnSource :: Either Ref RefDigest , dconnTarget :: Either Ref RefDigest , dconnAddress :: Maybe Text + , dconnTunnel :: Bool , dconnIceInfo :: Maybe IceRemoteInfo } @@ -78,6 +81,7 @@ emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConn emptyConnection dconnSource dconnTarget = DiscoveryConnection {..} where dconnAddress = Nothing + dconnTunnel = False dconnIceInfo = Nothing instance Storable DiscoveryService where @@ -106,6 +110,7 @@ instance Storable DiscoveryService where either (storeRawRef "source") (storeRawWeak "source") dconnSource either (storeRawRef "target") (storeRawWeak "target") dconnTarget storeMbText "address" dconnAddress + when dconnTunnel $ storeEmpty "tunnel" storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum @@ -150,6 +155,7 @@ instance Storable DiscoveryService where , Right <$> loadRawWeak "target" ] dconnAddress <- loadMbText "address" + dconnTunnel <- isJust <$> loadMbEmpty "tunnel" dconnIceInfo <- loadMbRef "ice-info" return $ ctor DiscoveryConnection {..} @@ -169,7 +175,11 @@ emptyPeer = DiscoveryPeer } data DiscoveryPeerState = DiscoveryPeerState - { dpsStunServer :: Maybe ( Text, Word16 ) + { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ] + -- ( original target, our write stream ) + , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ] + -- ( original source, ( from source, to target )) + , dpsStunServer :: Maybe ( Text, Word16 ) , dpsTurnServer :: Maybe ( Text, Word16 ) , dpsIceConfig :: Maybe IceConfig } @@ -187,7 +197,9 @@ instance Service DiscoveryService where type ServiceState DiscoveryService = DiscoveryPeerState emptyServiceState _ = DiscoveryPeerState - { dpsStunServer = Nothing + { dpsOurTunnelRequests = [] + , dpsRelayedTunnelRequests = [] + , dpsStunServer = Nothing , dpsTurnServer = Nothing , dpsIceConfig = Nothing } @@ -317,8 +329,27 @@ instance Service DiscoveryService where let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) if either refDigest id (dconnTarget conn) `elem` identityDigests self then if + -- request for us, create ICE sesssion or tunnel + | dconnTunnel conn -> do + receivedStreams >>= \case + (tunnelReader : _) -> do + tunnelWriter <- openStream + replyPacket $ DiscoveryConnectionResponse rconn + { dconnTunnel = True + } + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + server <- asks svcServer + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + + [] -> do + svcPrint $ "Discovery: missing stream on tunnel request (endpoint)" + #ifdef ENABLE_ICE_SUPPORT - -- request for us, create ICE sesssion | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer @@ -340,18 +371,31 @@ instance Service DiscoveryService where else do -- request to some of our peers, relay mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal + streams <- receivedStreams case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn Just dp - | Just dpeer <- dpPeer dp -> do - sendToPeer dpeer $ DiscoveryConnectionRequest conn + | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn -> if + | fromSource : _ <- streams -> do + void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do + toTarget <- openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest conn + | otherwise -> do + svcPrint $ "Discovery: missing stream on tunnel request (relay)" + | otherwise -> do + sendToPeer dpeer $ DiscoveryConnectionRequest conn | otherwise -> svcPrint $ "Discovery: failed to relay connection request" DiscoveryConnectionResponse conn -> do self <- svcSelf + dps <- svcGet dpeers <- dgsPeers <$> svcGetGlobal + if either refDigest id (dconnSource conn) `elem` identityDigests self - then do + then do -- response to our request, try to connect to the peer server <- asks svcServer if | Just addr <- dconnAddress conn @@ -363,6 +407,20 @@ instance Service DiscoveryService where svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s } + | dconnTunnel conn + , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps) + -> do + receivedStreams >>= \case + tunnelReader : _ -> do + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + [] -> svcPrint $ "Discovery: missing stream in tunnel response" + #ifdef ENABLE_ICE_SUPPORT | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp @@ -371,12 +429,33 @@ instance Service DiscoveryService where #endif | otherwise -> svcPrint $ "Discovery: connection request failed" - else do - -- response to relayed request - case M.lookup (either refDigest id $ dconnSource conn) dpeers of - Just dp | Just dpeer <- dpPeer dp -> do + else do + -- response to relayed request + streams <- receivedStreams + svcModify $ \s -> s { dpsRelayedTunnelRequests = + filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) } + + case M.lookup (either refDigest id $ dconnSource conn) dpeers of + Just dp | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn + , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps) + , fromTarget : _ <- streams + -> liftIO $ do + toSourceVar <- newEmptyMVar + void $ forkIO $ runPeerService @DiscoveryService dpeer $ do + liftIO . putMVar toSourceVar =<< openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionResponse conn + void $ forkIO $ do + relayStream fromSource toTarget + void $ forkIO $ do + toSource <- readMVar toSourceVar + relayStream fromTarget toSource + + | otherwise -> do sendToPeer dpeer $ DiscoveryConnectionResponse conn - _ -> svcPrint $ "Discovery: failed to relay connection response" + _ -> svcPrint $ "Discovery: failed to relay connection response" serviceNewPeer = do server <- asks svcServer @@ -447,3 +526,63 @@ discoverySearch server dgst = do } forM_ peers $ \peer -> do sendToPeer peer $ DiscoverySearch $ Right dgst + + +data TunnelAddress = TunnelAddress + { tunnelVia :: Peer + , tunnelIdentity :: UnifiedIdentity + , tunnelStreamNumber :: Int + , tunnelReader :: StreamReader + , tunnelWriter :: StreamWriter + } + +instance Eq TunnelAddress where + x == y = (==) + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Ord TunnelAddress where + compare x y = compare + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Show TunnelAddress where + show tunnel = concat + [ "tunnel@" + , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel + , "/" <> show (tunnelStreamNumber tunnel) + ] + +instance PeerAddressType TunnelAddress where + sendBytesToAddress TunnelAddress {..} bytes = do + writeStream tunnelWriter bytes + +relayStream :: StreamReader -> StreamWriter -> IO () +relayStream r w = do + p <- readStreamPacket r + writeStreamPacket w p + case p of + StreamClosed {} -> return () + _ -> relayStream r w + +receiveFromTunnel :: Server -> TunnelAddress -> IO () +receiveFromTunnel server taddr = do + p <- readStreamPacket (tunnelReader taddr) + case p of + StreamData {..} -> do + receivedFromCustomAddress server taddr stpData + receiveFromTunnel server taddr + StreamClosed {} -> do + return () + + +discoverySetupTunnel :: Peer -> RefDigest -> IO () +discoverySetupTunnel via target = do + runPeerService via $ do + self <- refDigest . storedRef . idData <$> svcSelf + stream <- openStream + svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest + (emptyConnection (Right self) (Right target)) + { dconnTunnel = True + } |