diff options
Diffstat (limited to 'src/Erebos/Discovery.hs')
-rw-r--r-- | src/Erebos/Discovery.hs | 259 |
1 files changed, 211 insertions, 48 deletions
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index ef289cb..ede9cc9 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -6,6 +6,7 @@ module Erebos.Discovery ( DiscoveryConnection(..), discoverySearch, + discoverySetupTunnel, ) where import Control.Concurrent @@ -32,8 +33,10 @@ import Erebos.ICE #endif import Erebos.Identity import Erebos.Network +import Erebos.Object import Erebos.Service -import Erebos.Storage +import Erebos.Service.Stream +import Erebos.Storable #ifndef ENABLE_ICE_SUPPORT @@ -46,8 +49,8 @@ type IceRemoteInfo = Stored Object data DiscoveryService = DiscoverySelf [ Text ] (Maybe Int) | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) - | DiscoverySearch Ref - | DiscoveryResult Ref [ Text ] + | DiscoverySearch (Either Ref RefDigest) + | DiscoveryResult (Either Ref RefDigest) [ Text ] | DiscoveryConnectionRequest DiscoveryConnection | DiscoveryConnectionResponse DiscoveryConnection @@ -56,6 +59,7 @@ data DiscoveryAttributes = DiscoveryAttributes , discoveryStunServer :: Maybe Text , discoveryTurnPort :: Maybe Word16 , discoveryTurnServer :: Maybe Text + , discoveryProvideTunnel :: Peer -> Bool } defaultDiscoveryAttributes :: DiscoveryAttributes @@ -64,19 +68,22 @@ defaultDiscoveryAttributes = DiscoveryAttributes , discoveryStunServer = Nothing , discoveryTurnPort = Nothing , discoveryTurnServer = Nothing + , discoveryProvideTunnel = const False } data DiscoveryConnection = DiscoveryConnection - { dconnSource :: Ref - , dconnTarget :: Ref + { dconnSource :: Either Ref RefDigest + , dconnTarget :: Either Ref RefDigest , dconnAddress :: Maybe Text + , dconnTunnel :: Bool , dconnIceInfo :: Maybe IceRemoteInfo } -emptyConnection :: Ref -> Ref -> DiscoveryConnection +emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConnection emptyConnection dconnSource dconnTarget = DiscoveryConnection {..} where dconnAddress = Nothing + dconnTunnel = False dconnIceInfo = Nothing instance Storable DiscoveryService where @@ -92,9 +99,9 @@ instance Storable DiscoveryService where storeMbInt "stun-port" stunPort storeMbText "turn-server" turnServer storeMbInt "turn-port" turnPort - DiscoverySearch ref -> storeRawRef "search" ref - DiscoveryResult ref addr -> do - storeRawRef "result" ref + DiscoverySearch edgst -> either (storeRawRef "search") (storeRawWeak "search") edgst + DiscoveryResult edgst addr -> do + either (storeRawRef "result") (storeRawWeak "result") edgst mapM_ (storeText "address") addr DiscoveryConnectionRequest conn -> storeConnection "request" conn DiscoveryConnectionResponse conn -> storeConnection "response" conn @@ -102,9 +109,10 @@ instance Storable DiscoveryService where where storeConnection ctype DiscoveryConnection {..} = do storeText "connection" $ ctype - storeRawRef "source" dconnSource - storeRawRef "target" dconnTarget + either (storeRawRef "source") (storeRawWeak "source") dconnSource + either (storeRawRef "target") (storeRawWeak "target") dconnTarget storeMbText "address" dconnAddress + when dconnTunnel $ storeEmpty "tunnel" storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum @@ -123,9 +131,15 @@ instance Storable DiscoveryService where <*> loadMbInt "stun-port" <*> loadMbText "turn-server" <*> loadMbInt "turn-port" - , DiscoverySearch <$> loadRawRef "search" + , DiscoverySearch <$> msum + [ Left <$> loadRawRef "search" + , Right <$> loadRawWeak "search" + ] , DiscoveryResult - <$> loadRawRef "result" + <$> msum + [ Left <$> loadRawRef "result" + , Right <$> loadRawWeak "result" + ] <*> loadTexts "address" , loadConnection "request" DiscoveryConnectionRequest , loadConnection "response" DiscoveryConnectionResponse @@ -134,9 +148,16 @@ instance Storable DiscoveryService where loadConnection ctype ctor = do ctype' <- loadText "connection" guard $ ctype == ctype' - dconnSource <- loadRawRef "source" - dconnTarget <- loadRawRef "target" + dconnSource <- msum + [ Left <$> loadRawRef "source" + , Right <$> loadRawWeak "source" + ] + dconnTarget <- msum + [ Left <$> loadRawRef "target" + , Right <$> loadRawWeak "target" + ] dconnAddress <- loadMbText "address" + dconnTunnel <- isJust <$> loadMbEmpty "tunnel" dconnIceInfo <- loadMbRef "ice-info" return $ ctor DiscoveryConnection {..} @@ -156,7 +177,11 @@ emptyPeer = DiscoveryPeer } data DiscoveryPeerState = DiscoveryPeerState - { dpsStunServer :: Maybe ( Text, Word16 ) + { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ] + -- ( original target, our write stream ) + , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ] + -- ( original source, ( from source, to target )) + , dpsStunServer :: Maybe ( Text, Word16 ) , dpsTurnServer :: Maybe ( Text, Word16 ) , dpsIceConfig :: Maybe IceConfig } @@ -174,7 +199,9 @@ instance Service DiscoveryService where type ServiceState DiscoveryService = DiscoveryPeerState emptyServiceState _ = DiscoveryPeerState - { dpsStunServer = Nothing + { dpsOurTunnelRequests = [] + , dpsRelayedTunnelRequests = [] + , dpsStunServer = Nothing , dpsTurnServer = Nothing , dpsIceConfig = Nothing } @@ -240,18 +267,19 @@ instance Service DiscoveryService where , dpsTurnServer = toIceServer turnServer turnPort } - DiscoverySearch ref -> do - dpeer <- M.lookup (refDigest ref) . dgsPeers <$> svcGetGlobal - replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer + DiscoverySearch edgst -> do + dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal + replyPacket $ DiscoveryResult edgst $ maybe [] dpAddress dpeer DiscoveryResult _ [] -> do -- not found return () - DiscoveryResult ref addrs -> do - let dgst = refDigest ref + DiscoveryResult edgst addrs -> do + let dgst = either refDigest id edgst -- TODO: check if we really requested that server <- asks svcServer + st <- getStorage self <- svcSelf discoveryPeer <- asks svcPeer let runAsService = runPeerService @DiscoveryService discoveryPeer @@ -265,8 +293,15 @@ instance Service DiscoveryService where ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do rinfo <- iceRemoteInfo ice + -- Try to promote weak ref to normal one for older peers: + edgst' <- case edgst of + Left r -> return (Left r) + Right d -> refFromDigest st d >>= \case + Just r -> return (Left r) + Nothing -> return (Right d) + res <- runExceptT $ sendToPeer discoveryPeer $ - DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo } + DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo } case res of Right _ -> return () Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err @@ -294,11 +329,31 @@ instance Service DiscoveryService where DiscoveryConnectionRequest conn -> do self <- svcSelf + attrs <- asks svcAttributes let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) - if refDigest (dconnTarget conn) `elem` identityDigests self + if either refDigest id (dconnTarget conn) `elem` identityDigests self then if + -- request for us, create ICE sesssion or tunnel + | dconnTunnel conn -> do + receivedStreams >>= \case + (tunnelReader : _) -> do + tunnelWriter <- openStream + replyPacket $ DiscoveryConnectionResponse rconn + { dconnTunnel = True + } + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + server <- asks svcServer + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + + [] -> do + svcPrint $ "Discovery: missing stream on tunnel request (endpoint)" + #ifdef ENABLE_ICE_SUPPORT - -- request for us, create ICE sesssion | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer @@ -318,20 +373,36 @@ instance Service DiscoveryService where svcPrint $ "Discovery: unsupported connection request" else do - -- request to some of our peers, relay - mbdp <- M.lookup (refDigest $ dconnTarget conn) . dgsPeers <$> svcGetGlobal - case mbdp of + -- request to some of our peers, relay + peer <- asks svcPeer + mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal + streams <- receivedStreams + case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn Just dp - | Just dpeer <- dpPeer dp -> do - sendToPeer dpeer $ DiscoveryConnectionRequest conn + | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn -> if + | not (discoveryProvideTunnel attrs peer) -> do + replyPacket $ DiscoveryConnectionResponse rconn + | fromSource : _ <- streams -> do + void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do + toTarget <- openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest conn + | otherwise -> do + svcPrint $ "Discovery: missing stream on tunnel request (relay)" + | otherwise -> do + sendToPeer dpeer $ DiscoveryConnectionRequest conn | otherwise -> svcPrint $ "Discovery: failed to relay connection request" DiscoveryConnectionResponse conn -> do self <- svcSelf + dps <- svcGet dpeers <- dgsPeers <$> svcGetGlobal - if refDigest (dconnSource conn) `elem` identityDigests self - then do + + if either refDigest id (dconnSource conn) `elem` identityDigests self + then do -- response to our request, try to connect to the peer server <- asks svcServer if | Just addr <- dconnAddress conn @@ -341,27 +412,61 @@ instance Service DiscoveryService where peer <- liftIO $ serverPeer server (addrAddress saddr) let upd dp = dp { dpPeer = Just peer } svcModifyGlobal $ \s -> s - { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (refDigest $ dconnTarget conn) $ dgsPeers s } + { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s } + + | dconnTunnel conn + , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps) + -> do + receivedStreams >>= \case + tunnelReader : _ -> do + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + [] -> svcPrint $ "Discovery: missing stream in tunnel response" #ifdef ENABLE_ICE_SUPPORT - | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers + | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp , Just rinfo <- dconnIceInfo conn -> do liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice #endif | otherwise -> svcPrint $ "Discovery: connection request failed" - else do - -- response to relayed request - case M.lookup (refDigest $ dconnSource conn) dpeers of - Just dp | Just dpeer <- dpPeer dp -> do + else do + -- response to relayed request + streams <- receivedStreams + svcModify $ \s -> s { dpsRelayedTunnelRequests = + filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) } + + case M.lookup (either refDigest id $ dconnSource conn) dpeers of + Just dp | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn + , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps) + , fromTarget : _ <- streams + -> liftIO $ do + toSourceVar <- newEmptyMVar + void $ forkIO $ runPeerService @DiscoveryService dpeer $ do + liftIO . putMVar toSourceVar =<< openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionResponse conn + void $ forkIO $ do + relayStream fromSource toTarget + void $ forkIO $ do + toSource <- readMVar toSourceVar + relayStream fromTarget toSource + + | otherwise -> do sendToPeer dpeer $ DiscoveryConnectionResponse conn - _ -> svcPrint $ "Discovery: failed to relay connection response" + _ -> svcPrint $ "Discovery: failed to relay connection response" serviceNewPeer = do server <- asks svcServer peer <- asks svcPeer - st <- getStorage let addrToText saddr = do ( addr, port ) <- IP.fromSockAddr saddr @@ -381,9 +486,7 @@ instance Service DiscoveryService where when (not $ null addrs) $ do sendToPeer peer $ DiscoverySelf addrs Nothing forM_ searchingFor $ \dgst -> do - liftIO (refFromDigest st dgst) >>= \case - Just ref -> sendToPeer peer $ DiscoverySearch ref - Nothing -> return () + sendToPeer peer $ DiscoverySearch (Right dgst) #ifdef ENABLE_ICE_SUPPORT serviceStopServer _ _ _ pstates = do @@ -416,17 +519,77 @@ getIceConfig = do #endif -discoverySearch :: (MonadIO m, MonadError String m) => Server -> Ref -> m () -discoverySearch server ref = do +discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> m () +discoverySearch server dgst = do peers <- liftIO $ getCurrentPeerList server match <- forM peers $ \peer -> do peerIdentity peer >>= \case PeerIdentityFull pid -> do - return $ refDigest ref `elem` identityDigests pid + return $ dgst `elem` identityDigests pid _ -> return False when (not $ or match) $ do modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s - { dgsSearchingFor = S.insert (refDigest ref) $ dgsSearchingFor s + { dgsSearchingFor = S.insert dgst $ dgsSearchingFor s } forM_ peers $ \peer -> do - sendToPeer peer $ DiscoverySearch ref + sendToPeer peer $ DiscoverySearch $ Right dgst + + +data TunnelAddress = TunnelAddress + { tunnelVia :: Peer + , tunnelIdentity :: UnifiedIdentity + , tunnelStreamNumber :: Int + , tunnelReader :: StreamReader + , tunnelWriter :: StreamWriter + } + +instance Eq TunnelAddress where + x == y = (==) + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Ord TunnelAddress where + compare x y = compare + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Show TunnelAddress where + show tunnel = concat + [ "tunnel@" + , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel + , "/" <> show (tunnelStreamNumber tunnel) + ] + +instance PeerAddressType TunnelAddress where + sendBytesToAddress TunnelAddress {..} bytes = do + writeStream tunnelWriter bytes + +relayStream :: StreamReader -> StreamWriter -> IO () +relayStream r w = do + p <- readStreamPacket r + writeStreamPacket w p + case p of + StreamClosed {} -> return () + _ -> relayStream r w + +receiveFromTunnel :: Server -> TunnelAddress -> IO () +receiveFromTunnel server taddr = do + p <- readStreamPacket (tunnelReader taddr) + case p of + StreamData {..} -> do + receivedFromCustomAddress server taddr stpData + receiveFromTunnel server taddr + StreamClosed {} -> do + return () + + +discoverySetupTunnel :: Peer -> RefDigest -> IO () +discoverySetupTunnel via target = do + runPeerService via $ do + self <- refDigest . storedRef . idData <$> svcSelf + stream <- openStream + svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest + (emptyConnection (Right self) (Right target)) + { dconnTunnel = True + } |