diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2025-06-28 21:21:37 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-06-29 09:00:20 +0200 |
commit | d112701ce72b090b79f101f6a2d0ccac7440762e (patch) | |
tree | 9cf1f41f8baf8114c57372d05520e146ac0fe814 | |
parent | 1355cf4d6bc293ab2c60f2fe4bb9bfae1caa5b7c (diff) |
Discovery tunnel
Changelog: Support tunnel for peers in discovery service
-rw-r--r-- | main/Test.hs | 8 | ||||
-rw-r--r-- | src/Erebos/Discovery.hs | 161 | ||||
-rw-r--r-- | test/discovery.test | 62 |
3 files changed, 219 insertions, 12 deletions
diff --git a/main/Test.hs b/main/Test.hs index fa8501e..62c7229 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -316,6 +316,7 @@ commands = map (T.pack *** id) , ("chatroom-leave", cmdChatroomLeave) , ("chatroom-message-send", cmdChatroomMessageSend) , ("discovery-connect", cmdDiscoveryConnect) + , ("discovery-tunnel", cmdDiscoveryTunnel) ] cmdStore :: Command @@ -970,3 +971,10 @@ cmdDiscoveryConnect = do Just dgst <- return $ readRefDigest $ encodeUtf8 tref Just RunningServer {..} <- gets tsServer discoverySearch rsServer dgst + +cmdDiscoveryTunnel :: Command +cmdDiscoveryTunnel = do + [ tvia, ttarget ] <- asks tiParams + via <- getPeer tvia + Just target <- return $ readRefDigest $ encodeUtf8 ttarget + liftIO $ discoverySetupTunnel via target diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 2fb0ffe..5de9869 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -6,6 +6,7 @@ module Erebos.Discovery ( DiscoveryConnection(..), discoverySearch, + discoverySetupTunnel, ) where import Control.Concurrent @@ -34,6 +35,7 @@ import Erebos.Identity import Erebos.Network import Erebos.Object import Erebos.Service +import Erebos.Service.Stream import Erebos.Storable @@ -71,6 +73,7 @@ data DiscoveryConnection = DiscoveryConnection { dconnSource :: Either Ref RefDigest , dconnTarget :: Either Ref RefDigest , dconnAddress :: Maybe Text + , dconnTunnel :: Bool , dconnIceInfo :: Maybe IceRemoteInfo } @@ -78,6 +81,7 @@ emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConn emptyConnection dconnSource dconnTarget = DiscoveryConnection {..} where dconnAddress = Nothing + dconnTunnel = False dconnIceInfo = Nothing instance Storable DiscoveryService where @@ -106,6 +110,7 @@ instance Storable DiscoveryService where either (storeRawRef "source") (storeRawWeak "source") dconnSource either (storeRawRef "target") (storeRawWeak "target") dconnTarget storeMbText "address" dconnAddress + when dconnTunnel $ storeEmpty "tunnel" storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum @@ -150,6 +155,7 @@ instance Storable DiscoveryService where , Right <$> loadRawWeak "target" ] dconnAddress <- loadMbText "address" + dconnTunnel <- isJust <$> loadMbEmpty "tunnel" dconnIceInfo <- loadMbRef "ice-info" return $ ctor DiscoveryConnection {..} @@ -169,7 +175,11 @@ emptyPeer = DiscoveryPeer } data DiscoveryPeerState = DiscoveryPeerState - { dpsStunServer :: Maybe ( Text, Word16 ) + { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ] + -- ( original target, our write stream ) + , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ] + -- ( original source, ( from source, to target )) + , dpsStunServer :: Maybe ( Text, Word16 ) , dpsTurnServer :: Maybe ( Text, Word16 ) , dpsIceConfig :: Maybe IceConfig } @@ -187,7 +197,9 @@ instance Service DiscoveryService where type ServiceState DiscoveryService = DiscoveryPeerState emptyServiceState _ = DiscoveryPeerState - { dpsStunServer = Nothing + { dpsOurTunnelRequests = [] + , dpsRelayedTunnelRequests = [] + , dpsStunServer = Nothing , dpsTurnServer = Nothing , dpsIceConfig = Nothing } @@ -317,8 +329,27 @@ instance Service DiscoveryService where let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) if either refDigest id (dconnTarget conn) `elem` identityDigests self then if + -- request for us, create ICE sesssion or tunnel + | dconnTunnel conn -> do + receivedStreams >>= \case + (tunnelReader : _) -> do + tunnelWriter <- openStream + replyPacket $ DiscoveryConnectionResponse rconn + { dconnTunnel = True + } + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + server <- asks svcServer + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + + [] -> do + svcPrint $ "Discovery: missing stream on tunnel request (endpoint)" + #ifdef ENABLE_ICE_SUPPORT - -- request for us, create ICE sesssion | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer @@ -340,18 +371,31 @@ instance Service DiscoveryService where else do -- request to some of our peers, relay mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal + streams <- receivedStreams case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn Just dp - | Just dpeer <- dpPeer dp -> do - sendToPeer dpeer $ DiscoveryConnectionRequest conn + | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn -> if + | fromSource : _ <- streams -> do + void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do + toTarget <- openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest conn + | otherwise -> do + svcPrint $ "Discovery: missing stream on tunnel request (relay)" + | otherwise -> do + sendToPeer dpeer $ DiscoveryConnectionRequest conn | otherwise -> svcPrint $ "Discovery: failed to relay connection request" DiscoveryConnectionResponse conn -> do self <- svcSelf + dps <- svcGet dpeers <- dgsPeers <$> svcGetGlobal + if either refDigest id (dconnSource conn) `elem` identityDigests self - then do + then do -- response to our request, try to connect to the peer server <- asks svcServer if | Just addr <- dconnAddress conn @@ -363,6 +407,20 @@ instance Service DiscoveryService where svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s } + | dconnTunnel conn + , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps) + -> do + receivedStreams >>= \case + tunnelReader : _ -> do + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + [] -> svcPrint $ "Discovery: missing stream in tunnel response" + #ifdef ENABLE_ICE_SUPPORT | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp @@ -371,12 +429,33 @@ instance Service DiscoveryService where #endif | otherwise -> svcPrint $ "Discovery: connection request failed" - else do - -- response to relayed request - case M.lookup (either refDigest id $ dconnSource conn) dpeers of - Just dp | Just dpeer <- dpPeer dp -> do + else do + -- response to relayed request + streams <- receivedStreams + svcModify $ \s -> s { dpsRelayedTunnelRequests = + filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) } + + case M.lookup (either refDigest id $ dconnSource conn) dpeers of + Just dp | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn + , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps) + , fromTarget : _ <- streams + -> liftIO $ do + toSourceVar <- newEmptyMVar + void $ forkIO $ runPeerService @DiscoveryService dpeer $ do + liftIO . putMVar toSourceVar =<< openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionResponse conn + void $ forkIO $ do + relayStream fromSource toTarget + void $ forkIO $ do + toSource <- readMVar toSourceVar + relayStream fromTarget toSource + + | otherwise -> do sendToPeer dpeer $ DiscoveryConnectionResponse conn - _ -> svcPrint $ "Discovery: failed to relay connection response" + _ -> svcPrint $ "Discovery: failed to relay connection response" serviceNewPeer = do server <- asks svcServer @@ -447,3 +526,63 @@ discoverySearch server dgst = do } forM_ peers $ \peer -> do sendToPeer peer $ DiscoverySearch $ Right dgst + + +data TunnelAddress = TunnelAddress + { tunnelVia :: Peer + , tunnelIdentity :: UnifiedIdentity + , tunnelStreamNumber :: Int + , tunnelReader :: StreamReader + , tunnelWriter :: StreamWriter + } + +instance Eq TunnelAddress where + x == y = (==) + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Ord TunnelAddress where + compare x y = compare + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Show TunnelAddress where + show tunnel = concat + [ "tunnel@" + , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel + , "/" <> show (tunnelStreamNumber tunnel) + ] + +instance PeerAddressType TunnelAddress where + sendBytesToAddress TunnelAddress {..} bytes = do + writeStream tunnelWriter bytes + +relayStream :: StreamReader -> StreamWriter -> IO () +relayStream r w = do + p <- readStreamPacket r + writeStreamPacket w p + case p of + StreamClosed {} -> return () + _ -> relayStream r w + +receiveFromTunnel :: Server -> TunnelAddress -> IO () +receiveFromTunnel server taddr = do + p <- readStreamPacket (tunnelReader taddr) + case p of + StreamData {..} -> do + receivedFromCustomAddress server taddr stpData + receiveFromTunnel server taddr + StreamClosed {} -> do + return () + + +discoverySetupTunnel :: Peer -> RefDigest -> IO () +discoverySetupTunnel via target = do + runPeerService via $ do + self <- refDigest . storedRef . idData <$> svcSelf + stream <- openStream + svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest + (emptyConnection (Right self) (Right target)) + { dconnTunnel = True + } diff --git a/test/discovery.test b/test/discovery.test index 3be6275..9453e65 100644 --- a/test/discovery.test +++ b/test/discovery.test @@ -1,8 +1,9 @@ module discovery +def refpat = /blake2#[0-9a-f]*/ + test ManualDiscovery: let services = "discovery" - let refpat = /blake2#[0-9a-f]*/ subnet sd subnet s1 @@ -89,3 +90,62 @@ test ManualDiscovery: send "stop-server" to p for p in [ pd, p1, p2 ]: expect /stop-server-done/ from p + + +test DiscoveryTunnel: + let services = "discovery" + + subnet sd + subnet s1 + subnet s2 + + spawn as pd on sd + spawn as p1 on s1 + spawn as p2 on s2 + + for n in [ p1.node, p2.node ]: + shell on n: + nft add table inet filter + nft add chain inet filter input '{ type filter hook input priority filter ; policy drop; }' + nft add rule inet filter input 'ct state { established, related } accept' + + send "create-identity Discovery" to pd + send "create-identity Device1 Owner1" to p1 + send "create-identity Device2 Owner2" to p2 + + expect /create-identity-done ref ($refpat).*/ from p1 capture p1id + send "identity-info $p1id" to p1 + expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner + send "identity-info $p1owner" to p1 + expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase + + expect /create-identity-done ref $refpat.*/ from p2 + expect /create-identity-done ref $refpat.*/ from pd + + for id in [ p1obase ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services" to p + + for p in [ p1, p2 ]: + with p: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p.node.ip} 29665/ + /peer [12] id .*/ + + send "discovery-tunnel 1 $id" to p2 + + expect from p1: + /peer [0-9]+ addr tunnel@.*/ + /peer [0-9]+ id Device2 Owner2/ + expect from p2: + /peer [0-9]+ addr tunnel@.*/ + /peer [0-9]+ id Device1 Owner1/ + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p |