summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--main/Test.hs8
-rw-r--r--src/Erebos/Discovery.hs161
-rw-r--r--test/discovery.test62
3 files changed, 219 insertions, 12 deletions
diff --git a/main/Test.hs b/main/Test.hs
index fa8501e..62c7229 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -316,6 +316,7 @@ commands = map (T.pack *** id)
, ("chatroom-leave", cmdChatroomLeave)
, ("chatroom-message-send", cmdChatroomMessageSend)
, ("discovery-connect", cmdDiscoveryConnect)
+ , ("discovery-tunnel", cmdDiscoveryTunnel)
]
cmdStore :: Command
@@ -970,3 +971,10 @@ cmdDiscoveryConnect = do
Just dgst <- return $ readRefDigest $ encodeUtf8 tref
Just RunningServer {..} <- gets tsServer
discoverySearch rsServer dgst
+
+cmdDiscoveryTunnel :: Command
+cmdDiscoveryTunnel = do
+ [ tvia, ttarget ] <- asks tiParams
+ via <- getPeer tvia
+ Just target <- return $ readRefDigest $ encodeUtf8 ttarget
+ liftIO $ discoverySetupTunnel via target
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs
index 2fb0ffe..5de9869 100644
--- a/src/Erebos/Discovery.hs
+++ b/src/Erebos/Discovery.hs
@@ -6,6 +6,7 @@ module Erebos.Discovery (
DiscoveryConnection(..),
discoverySearch,
+ discoverySetupTunnel,
) where
import Control.Concurrent
@@ -34,6 +35,7 @@ import Erebos.Identity
import Erebos.Network
import Erebos.Object
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Storable
@@ -71,6 +73,7 @@ data DiscoveryConnection = DiscoveryConnection
{ dconnSource :: Either Ref RefDigest
, dconnTarget :: Either Ref RefDigest
, dconnAddress :: Maybe Text
+ , dconnTunnel :: Bool
, dconnIceInfo :: Maybe IceRemoteInfo
}
@@ -78,6 +81,7 @@ emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConn
emptyConnection dconnSource dconnTarget = DiscoveryConnection {..}
where
dconnAddress = Nothing
+ dconnTunnel = False
dconnIceInfo = Nothing
instance Storable DiscoveryService where
@@ -106,6 +110,7 @@ instance Storable DiscoveryService where
either (storeRawRef "source") (storeRawWeak "source") dconnSource
either (storeRawRef "target") (storeRawWeak "target") dconnTarget
storeMbText "address" dconnAddress
+ when dconnTunnel $ storeEmpty "tunnel"
storeMbRef "ice-info" dconnIceInfo
load' = loadRec $ msum
@@ -150,6 +155,7 @@ instance Storable DiscoveryService where
, Right <$> loadRawWeak "target"
]
dconnAddress <- loadMbText "address"
+ dconnTunnel <- isJust <$> loadMbEmpty "tunnel"
dconnIceInfo <- loadMbRef "ice-info"
return $ ctor DiscoveryConnection {..}
@@ -169,7 +175,11 @@ emptyPeer = DiscoveryPeer
}
data DiscoveryPeerState = DiscoveryPeerState
- { dpsStunServer :: Maybe ( Text, Word16 )
+ { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ]
+ -- ( original target, our write stream )
+ , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ]
+ -- ( original source, ( from source, to target ))
+ , dpsStunServer :: Maybe ( Text, Word16 )
, dpsTurnServer :: Maybe ( Text, Word16 )
, dpsIceConfig :: Maybe IceConfig
}
@@ -187,7 +197,9 @@ instance Service DiscoveryService where
type ServiceState DiscoveryService = DiscoveryPeerState
emptyServiceState _ = DiscoveryPeerState
- { dpsStunServer = Nothing
+ { dpsOurTunnelRequests = []
+ , dpsRelayedTunnelRequests = []
+ , dpsStunServer = Nothing
, dpsTurnServer = Nothing
, dpsIceConfig = Nothing
}
@@ -317,8 +329,27 @@ instance Service DiscoveryService where
let rconn = emptyConnection (dconnSource conn) (dconnTarget conn)
if either refDigest id (dconnTarget conn) `elem` identityDigests self
then if
+ -- request for us, create ICE sesssion or tunnel
+ | dconnTunnel conn -> do
+ receivedStreams >>= \case
+ (tunnelReader : _) -> do
+ tunnelWriter <- openStream
+ replyPacket $ DiscoveryConnectionResponse rconn
+ { dconnTunnel = True
+ }
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ server <- asks svcServer
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+
+ [] -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (endpoint)"
+
#ifdef ENABLE_ICE_SUPPORT
- -- request for us, create ICE sesssion
| Just prinfo <- dconnIceInfo conn -> do
server <- asks svcServer
peer <- asks svcPeer
@@ -340,18 +371,31 @@ instance Service DiscoveryService where
else do
-- request to some of our peers, relay
mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
+ streams <- receivedStreams
case mbdp of
Nothing -> replyPacket $ DiscoveryConnectionResponse rconn
Just dp
- | Just dpeer <- dpPeer dp -> do
- sendToPeer dpeer $ DiscoveryConnectionRequest conn
+ | Just dpeer <- dpPeer dp -> if
+ | dconnTunnel conn -> if
+ | fromSource : _ <- streams -> do
+ void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ toTarget <- openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest conn
+ | otherwise -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (relay)"
+ | otherwise -> do
+ sendToPeer dpeer $ DiscoveryConnectionRequest conn
| otherwise -> svcPrint $ "Discovery: failed to relay connection request"
DiscoveryConnectionResponse conn -> do
self <- svcSelf
+ dps <- svcGet
dpeers <- dgsPeers <$> svcGetGlobal
+
if either refDigest id (dconnSource conn) `elem` identityDigests self
- then do
+ then do
-- response to our request, try to connect to the peer
server <- asks svcServer
if | Just addr <- dconnAddress conn
@@ -363,6 +407,20 @@ instance Service DiscoveryService where
svcModifyGlobal $ \s -> s
{ dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s }
+ | dconnTunnel conn
+ , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps)
+ -> do
+ receivedStreams >>= \case
+ tunnelReader : _ -> do
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+ [] -> svcPrint $ "Discovery: missing stream in tunnel response"
+
#ifdef ENABLE_ICE_SUPPORT
| Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers
, Just ice <- dpIceSession dp
@@ -371,12 +429,33 @@ instance Service DiscoveryService where
#endif
| otherwise -> svcPrint $ "Discovery: connection request failed"
- else do
- -- response to relayed request
- case M.lookup (either refDigest id $ dconnSource conn) dpeers of
- Just dp | Just dpeer <- dpPeer dp -> do
+ else do
+ -- response to relayed request
+ streams <- receivedStreams
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) }
+
+ case M.lookup (either refDigest id $ dconnSource conn) dpeers of
+ Just dp | Just dpeer <- dpPeer dp -> if
+ | dconnTunnel conn
+ , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps)
+ , fromTarget : _ <- streams
+ -> liftIO $ do
+ toSourceVar <- newEmptyMVar
+ void $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ liftIO . putMVar toSourceVar =<< openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionResponse conn
+ void $ forkIO $ do
+ relayStream fromSource toTarget
+ void $ forkIO $ do
+ toSource <- readMVar toSourceVar
+ relayStream fromTarget toSource
+
+ | otherwise -> do
sendToPeer dpeer $ DiscoveryConnectionResponse conn
- _ -> svcPrint $ "Discovery: failed to relay connection response"
+ _ -> svcPrint $ "Discovery: failed to relay connection response"
serviceNewPeer = do
server <- asks svcServer
@@ -447,3 +526,63 @@ discoverySearch server dgst = do
}
forM_ peers $ \peer -> do
sendToPeer peer $ DiscoverySearch $ Right dgst
+
+
+data TunnelAddress = TunnelAddress
+ { tunnelVia :: Peer
+ , tunnelIdentity :: UnifiedIdentity
+ , tunnelStreamNumber :: Int
+ , tunnelReader :: StreamReader
+ , tunnelWriter :: StreamWriter
+ }
+
+instance Eq TunnelAddress where
+ x == y = (==)
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Ord TunnelAddress where
+ compare x y = compare
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Show TunnelAddress where
+ show tunnel = concat
+ [ "tunnel@"
+ , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel
+ , "/" <> show (tunnelStreamNumber tunnel)
+ ]
+
+instance PeerAddressType TunnelAddress where
+ sendBytesToAddress TunnelAddress {..} bytes = do
+ writeStream tunnelWriter bytes
+
+relayStream :: StreamReader -> StreamWriter -> IO ()
+relayStream r w = do
+ p <- readStreamPacket r
+ writeStreamPacket w p
+ case p of
+ StreamClosed {} -> return ()
+ _ -> relayStream r w
+
+receiveFromTunnel :: Server -> TunnelAddress -> IO ()
+receiveFromTunnel server taddr = do
+ p <- readStreamPacket (tunnelReader taddr)
+ case p of
+ StreamData {..} -> do
+ receivedFromCustomAddress server taddr stpData
+ receiveFromTunnel server taddr
+ StreamClosed {} -> do
+ return ()
+
+
+discoverySetupTunnel :: Peer -> RefDigest -> IO ()
+discoverySetupTunnel via target = do
+ runPeerService via $ do
+ self <- refDigest . storedRef . idData <$> svcSelf
+ stream <- openStream
+ svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest
+ (emptyConnection (Right self) (Right target))
+ { dconnTunnel = True
+ }
diff --git a/test/discovery.test b/test/discovery.test
index 3be6275..9453e65 100644
--- a/test/discovery.test
+++ b/test/discovery.test
@@ -1,8 +1,9 @@
module discovery
+def refpat = /blake2#[0-9a-f]*/
+
test ManualDiscovery:
let services = "discovery"
- let refpat = /blake2#[0-9a-f]*/
subnet sd
subnet s1
@@ -89,3 +90,62 @@ test ManualDiscovery:
send "stop-server" to p
for p in [ pd, p1, p2 ]:
expect /stop-server-done/ from p
+
+
+test DiscoveryTunnel:
+ let services = "discovery"
+
+ subnet sd
+ subnet s1
+ subnet s2
+
+ spawn as pd on sd
+ spawn as p1 on s1
+ spawn as p2 on s2
+
+ for n in [ p1.node, p2.node ]:
+ shell on n:
+ nft add table inet filter
+ nft add chain inet filter input '{ type filter hook input priority filter ; policy drop; }'
+ nft add rule inet filter input 'ct state { established, related } accept'
+
+ send "create-identity Discovery" to pd
+ send "create-identity Device1 Owner1" to p1
+ send "create-identity Device2 Owner2" to p2
+
+ expect /create-identity-done ref ($refpat).*/ from p1 capture p1id
+ send "identity-info $p1id" to p1
+ expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner
+ send "identity-info $p1owner" to p1
+ expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase
+
+ expect /create-identity-done ref $refpat.*/ from p2
+ expect /create-identity-done ref $refpat.*/ from pd
+
+ for id in [ p1obase ]:
+ for p in [ pd, p1, p2 ]:
+ send "start-server services $services" to p
+
+ for p in [ p1, p2 ]:
+ with p:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p.node.ip} 29665/
+ /peer [12] id .*/
+
+ send "discovery-tunnel 1 $id" to p2
+
+ expect from p1:
+ /peer [0-9]+ addr tunnel@.*/
+ /peer [0-9]+ id Device2 Owner2/
+ expect from p2:
+ /peer [0-9]+ addr tunnel@.*/
+ /peer [0-9]+ id Device1 Owner1/
+
+ for p in [ pd, p1, p2 ]:
+ send "stop-server" to p
+ for p in [ pd, p1, p2 ]:
+ expect /stop-server-done/ from p