summaryrefslogtreecommitdiff
path: root/src/Erebos
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2025-06-28 21:21:37 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2025-06-29 09:00:20 +0200
commitd112701ce72b090b79f101f6a2d0ccac7440762e (patch)
tree9cf1f41f8baf8114c57372d05520e146ac0fe814 /src/Erebos
parent1355cf4d6bc293ab2c60f2fe4bb9bfae1caa5b7c (diff)
Discovery tunnel
Changelog: Support tunnel for peers in discovery service
Diffstat (limited to 'src/Erebos')
-rw-r--r--src/Erebos/Discovery.hs161
1 files changed, 150 insertions, 11 deletions
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs
index 2fb0ffe..5de9869 100644
--- a/src/Erebos/Discovery.hs
+++ b/src/Erebos/Discovery.hs
@@ -6,6 +6,7 @@ module Erebos.Discovery (
DiscoveryConnection(..),
discoverySearch,
+ discoverySetupTunnel,
) where
import Control.Concurrent
@@ -34,6 +35,7 @@ import Erebos.Identity
import Erebos.Network
import Erebos.Object
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Storable
@@ -71,6 +73,7 @@ data DiscoveryConnection = DiscoveryConnection
{ dconnSource :: Either Ref RefDigest
, dconnTarget :: Either Ref RefDigest
, dconnAddress :: Maybe Text
+ , dconnTunnel :: Bool
, dconnIceInfo :: Maybe IceRemoteInfo
}
@@ -78,6 +81,7 @@ emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConn
emptyConnection dconnSource dconnTarget = DiscoveryConnection {..}
where
dconnAddress = Nothing
+ dconnTunnel = False
dconnIceInfo = Nothing
instance Storable DiscoveryService where
@@ -106,6 +110,7 @@ instance Storable DiscoveryService where
either (storeRawRef "source") (storeRawWeak "source") dconnSource
either (storeRawRef "target") (storeRawWeak "target") dconnTarget
storeMbText "address" dconnAddress
+ when dconnTunnel $ storeEmpty "tunnel"
storeMbRef "ice-info" dconnIceInfo
load' = loadRec $ msum
@@ -150,6 +155,7 @@ instance Storable DiscoveryService where
, Right <$> loadRawWeak "target"
]
dconnAddress <- loadMbText "address"
+ dconnTunnel <- isJust <$> loadMbEmpty "tunnel"
dconnIceInfo <- loadMbRef "ice-info"
return $ ctor DiscoveryConnection {..}
@@ -169,7 +175,11 @@ emptyPeer = DiscoveryPeer
}
data DiscoveryPeerState = DiscoveryPeerState
- { dpsStunServer :: Maybe ( Text, Word16 )
+ { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ]
+ -- ( original target, our write stream )
+ , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ]
+ -- ( original source, ( from source, to target ))
+ , dpsStunServer :: Maybe ( Text, Word16 )
, dpsTurnServer :: Maybe ( Text, Word16 )
, dpsIceConfig :: Maybe IceConfig
}
@@ -187,7 +197,9 @@ instance Service DiscoveryService where
type ServiceState DiscoveryService = DiscoveryPeerState
emptyServiceState _ = DiscoveryPeerState
- { dpsStunServer = Nothing
+ { dpsOurTunnelRequests = []
+ , dpsRelayedTunnelRequests = []
+ , dpsStunServer = Nothing
, dpsTurnServer = Nothing
, dpsIceConfig = Nothing
}
@@ -317,8 +329,27 @@ instance Service DiscoveryService where
let rconn = emptyConnection (dconnSource conn) (dconnTarget conn)
if either refDigest id (dconnTarget conn) `elem` identityDigests self
then if
+ -- request for us, create ICE sesssion or tunnel
+ | dconnTunnel conn -> do
+ receivedStreams >>= \case
+ (tunnelReader : _) -> do
+ tunnelWriter <- openStream
+ replyPacket $ DiscoveryConnectionResponse rconn
+ { dconnTunnel = True
+ }
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ server <- asks svcServer
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+
+ [] -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (endpoint)"
+
#ifdef ENABLE_ICE_SUPPORT
- -- request for us, create ICE sesssion
| Just prinfo <- dconnIceInfo conn -> do
server <- asks svcServer
peer <- asks svcPeer
@@ -340,18 +371,31 @@ instance Service DiscoveryService where
else do
-- request to some of our peers, relay
mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
+ streams <- receivedStreams
case mbdp of
Nothing -> replyPacket $ DiscoveryConnectionResponse rconn
Just dp
- | Just dpeer <- dpPeer dp -> do
- sendToPeer dpeer $ DiscoveryConnectionRequest conn
+ | Just dpeer <- dpPeer dp -> if
+ | dconnTunnel conn -> if
+ | fromSource : _ <- streams -> do
+ void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ toTarget <- openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest conn
+ | otherwise -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (relay)"
+ | otherwise -> do
+ sendToPeer dpeer $ DiscoveryConnectionRequest conn
| otherwise -> svcPrint $ "Discovery: failed to relay connection request"
DiscoveryConnectionResponse conn -> do
self <- svcSelf
+ dps <- svcGet
dpeers <- dgsPeers <$> svcGetGlobal
+
if either refDigest id (dconnSource conn) `elem` identityDigests self
- then do
+ then do
-- response to our request, try to connect to the peer
server <- asks svcServer
if | Just addr <- dconnAddress conn
@@ -363,6 +407,20 @@ instance Service DiscoveryService where
svcModifyGlobal $ \s -> s
{ dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s }
+ | dconnTunnel conn
+ , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps)
+ -> do
+ receivedStreams >>= \case
+ tunnelReader : _ -> do
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+ [] -> svcPrint $ "Discovery: missing stream in tunnel response"
+
#ifdef ENABLE_ICE_SUPPORT
| Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers
, Just ice <- dpIceSession dp
@@ -371,12 +429,33 @@ instance Service DiscoveryService where
#endif
| otherwise -> svcPrint $ "Discovery: connection request failed"
- else do
- -- response to relayed request
- case M.lookup (either refDigest id $ dconnSource conn) dpeers of
- Just dp | Just dpeer <- dpPeer dp -> do
+ else do
+ -- response to relayed request
+ streams <- receivedStreams
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) }
+
+ case M.lookup (either refDigest id $ dconnSource conn) dpeers of
+ Just dp | Just dpeer <- dpPeer dp -> if
+ | dconnTunnel conn
+ , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps)
+ , fromTarget : _ <- streams
+ -> liftIO $ do
+ toSourceVar <- newEmptyMVar
+ void $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ liftIO . putMVar toSourceVar =<< openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionResponse conn
+ void $ forkIO $ do
+ relayStream fromSource toTarget
+ void $ forkIO $ do
+ toSource <- readMVar toSourceVar
+ relayStream fromTarget toSource
+
+ | otherwise -> do
sendToPeer dpeer $ DiscoveryConnectionResponse conn
- _ -> svcPrint $ "Discovery: failed to relay connection response"
+ _ -> svcPrint $ "Discovery: failed to relay connection response"
serviceNewPeer = do
server <- asks svcServer
@@ -447,3 +526,63 @@ discoverySearch server dgst = do
}
forM_ peers $ \peer -> do
sendToPeer peer $ DiscoverySearch $ Right dgst
+
+
+data TunnelAddress = TunnelAddress
+ { tunnelVia :: Peer
+ , tunnelIdentity :: UnifiedIdentity
+ , tunnelStreamNumber :: Int
+ , tunnelReader :: StreamReader
+ , tunnelWriter :: StreamWriter
+ }
+
+instance Eq TunnelAddress where
+ x == y = (==)
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Ord TunnelAddress where
+ compare x y = compare
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Show TunnelAddress where
+ show tunnel = concat
+ [ "tunnel@"
+ , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel
+ , "/" <> show (tunnelStreamNumber tunnel)
+ ]
+
+instance PeerAddressType TunnelAddress where
+ sendBytesToAddress TunnelAddress {..} bytes = do
+ writeStream tunnelWriter bytes
+
+relayStream :: StreamReader -> StreamWriter -> IO ()
+relayStream r w = do
+ p <- readStreamPacket r
+ writeStreamPacket w p
+ case p of
+ StreamClosed {} -> return ()
+ _ -> relayStream r w
+
+receiveFromTunnel :: Server -> TunnelAddress -> IO ()
+receiveFromTunnel server taddr = do
+ p <- readStreamPacket (tunnelReader taddr)
+ case p of
+ StreamData {..} -> do
+ receivedFromCustomAddress server taddr stpData
+ receiveFromTunnel server taddr
+ StreamClosed {} -> do
+ return ()
+
+
+discoverySetupTunnel :: Peer -> RefDigest -> IO ()
+discoverySetupTunnel via target = do
+ runPeerService via $ do
+ self <- refDigest . storedRef . idData <$> svcSelf
+ stream <- openStream
+ svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest
+ (emptyConnection (Right self) (Right target))
+ { dconnTunnel = True
+ }