diff options
Diffstat (limited to 'src/Erebos/Discovery.hs')
| -rw-r--r-- | src/Erebos/Discovery.hs | 386 |
1 files changed, 293 insertions, 93 deletions
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 2fb0ffe..5590e4c 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -1,4 +1,5 @@ {-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} module Erebos.Discovery ( DiscoveryService(..), @@ -6,6 +7,7 @@ module Erebos.Discovery ( DiscoveryConnection(..), discoverySearch, + discoverySetupTunnel, ) where import Control.Concurrent @@ -13,7 +15,6 @@ import Control.Monad import Control.Monad.Except import Control.Monad.Reader -import Data.IP qualified as IP import Data.List import Data.Map.Strict (Map) import Data.Map.Strict qualified as M @@ -25,15 +26,17 @@ import Data.Text (Text) import Data.Text qualified as T import Data.Word -import Network.Socket +import Text.Read #ifdef ENABLE_ICE_SUPPORT import Erebos.ICE #endif import Erebos.Identity import Erebos.Network +import Erebos.Network.Address import Erebos.Object import Erebos.Service +import Erebos.Service.Stream import Erebos.Storable @@ -45,18 +48,25 @@ type IceRemoteInfo = Stored Object data DiscoveryService - = DiscoverySelf [ Text ] (Maybe Int) - | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) + = DiscoverySelf [ DiscoveryAddress ] (Maybe Int) + | DiscoveryAcknowledged [ DiscoveryAddress ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) | DiscoverySearch (Either Ref RefDigest) - | DiscoveryResult (Either Ref RefDigest) [ Text ] + | DiscoveryResult (Either Ref RefDigest) [ DiscoveryAddress ] | DiscoveryConnectionRequest DiscoveryConnection | DiscoveryConnectionResponse DiscoveryConnection +data DiscoveryAddress + = DiscoveryIP InetAddress PortNumber + | DiscoveryICE + | DiscoveryTunnel + | DiscoveryOther Text + data DiscoveryAttributes = DiscoveryAttributes { discoveryStunPort :: Maybe Word16 , discoveryStunServer :: Maybe Text , discoveryTurnPort :: Maybe Word16 , discoveryTurnServer :: Maybe Text + , discoveryProvideTunnel :: Peer -> PeerAddress -> Bool } defaultDiscoveryAttributes :: DiscoveryAttributes @@ -65,12 +75,14 @@ defaultDiscoveryAttributes = DiscoveryAttributes , discoveryStunServer = Nothing , discoveryTurnPort = Nothing , discoveryTurnServer = Nothing + , discoveryProvideTunnel = \_ _ -> False } data DiscoveryConnection = DiscoveryConnection { dconnSource :: Either Ref RefDigest , dconnTarget :: Either Ref RefDigest , dconnAddress :: Maybe Text + , dconnTunnel :: Bool , dconnIceInfo :: Maybe IceRemoteInfo } @@ -78,6 +90,7 @@ emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConn emptyConnection dconnSource dconnTarget = DiscoveryConnection {..} where dconnAddress = Nothing + dconnTunnel = False dconnIceInfo = Nothing instance Storable DiscoveryService where @@ -101,11 +114,12 @@ instance Storable DiscoveryService where DiscoveryConnectionResponse conn -> storeConnection "response" conn where - storeConnection ctype DiscoveryConnection {..} = do + storeConnection (ctype :: Text) DiscoveryConnection {..} = do storeText "connection" $ ctype either (storeRawRef "source") (storeRawWeak "source") dconnSource either (storeRawRef "target") (storeRawWeak "target") dconnTarget storeMbText "address" dconnAddress + when dconnTunnel $ storeEmpty "tunnel" storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum @@ -138,7 +152,7 @@ instance Storable DiscoveryService where , loadConnection "response" DiscoveryConnectionResponse ] where - loadConnection ctype ctor = do + loadConnection (ctype :: Text) ctor = do ctype' <- loadText "connection" guard $ ctype == ctype' dconnSource <- msum @@ -150,13 +164,37 @@ instance Storable DiscoveryService where , Right <$> loadRawWeak "target" ] dconnAddress <- loadMbText "address" + dconnTunnel <- isJust <$> loadMbEmpty "tunnel" dconnIceInfo <- loadMbRef "ice-info" return $ ctor DiscoveryConnection {..} +instance StorableText DiscoveryAddress where + toText = \case + DiscoveryIP addr port -> T.unwords [ T.pack $ show addr, T.pack $ show port ] + DiscoveryICE -> "ICE" + DiscoveryTunnel -> "tunnel" + DiscoveryOther str -> str + + fromText str = return $ if + | [ addrStr, portStr ] <- T.words str + , Just addr <- readMaybe $ T.unpack addrStr + , Just port <- readMaybe $ T.unpack portStr + -> DiscoveryIP addr port + + | "ice" <- T.toLower str + -> DiscoveryICE + + | "tunnel" <- str + -> DiscoveryTunnel + + | otherwise + -> DiscoveryOther str + + data DiscoveryPeer = DiscoveryPeer { dpPriority :: Int , dpPeer :: Maybe Peer - , dpAddress :: [ Text ] + , dpAddress :: [ DiscoveryAddress ] , dpIceSession :: Maybe IceSession } @@ -169,7 +207,11 @@ emptyPeer = DiscoveryPeer } data DiscoveryPeerState = DiscoveryPeerState - { dpsStunServer :: Maybe ( Text, Word16 ) + { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ] + -- ( original target, our write stream ) + , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ] + -- ( original source, ( from source, to target )) + , dpsStunServer :: Maybe ( Text, Word16 ) , dpsTurnServer :: Maybe ( Text, Word16 ) , dpsIceConfig :: Maybe IceConfig } @@ -187,7 +229,9 @@ instance Service DiscoveryService where type ServiceState DiscoveryService = DiscoveryPeerState emptyServiceState _ = DiscoveryPeerState - { dpsStunServer = Nothing + { dpsOurTunnelRequests = [] + , dpsRelayedTunnelRequests = [] + , dpsStunServer = Nothing , dpsTurnServer = Nothing , dpsIceConfig = Nothing } @@ -202,26 +246,22 @@ instance Service DiscoveryService where DiscoverySelf addrs priority -> do pid <- asks svcPeerIdentity peer <- asks svcPeer + paddrs <- getPeerAddresses peer + let insertHelper new old | dpPriority new > dpPriority old = new | otherwise = old - matchedAddrs <- fmap catMaybes $ forM addrs $ \addr -> if - | addr == T.pack "ICE" -> do - return $ Just addr - | [ ipaddr, port ] <- words (T.unpack addr) - , DatagramAddress paddr <- peerAddress peer -> do - saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - return $ if paddr == addrAddress saddr - then Just addr - else Nothing - - | otherwise -> return Nothing + let matchedAddrs = flip filter addrs $ \case + DiscoveryICE -> True + DiscoveryIP ipaddr port -> + DatagramAddress (inetToSockAddr ( ipaddr, port )) `elem` paddrs + _ -> False forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do let dp = DiscoveryPeer { dpPriority = fromMaybe 0 priority , dpPeer = Just peer - , dpAddress = addrs + , dpAddress = matchedAddrs , dpIceSession = Nothing } svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s } @@ -233,14 +273,8 @@ instance Service DiscoveryService where (discoveryTurnPort attrs) DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do - paddr <- asks (peerAddress . svcPeer) >>= return . \case - (DatagramAddress saddr) -> case IP.fromSockAddr saddr of - Just (IP.IPv6 ipv6, _) - | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 - -> Just $ T.pack $ show (IP.toIPv4w ipv4) - Just (addr, _) - -> Just $ T.pack $ show addr - _ -> Nothing + paddr <- asks svcPeerAddress >>= return . \case + (DatagramAddress saddr) -> T.pack . show . fst <$> inetFromSockAddr saddr _ -> Nothing let toIceServer Nothing Nothing = Nothing @@ -255,10 +289,17 @@ instance Service DiscoveryService where DiscoverySearch edgst -> do dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal - replyPacket $ DiscoveryResult edgst $ maybe [] dpAddress dpeer + peer <- asks svcPeer + paddr <- asks svcPeerAddress + attrs <- asks svcAttributes + let offerTunnel + | discoveryProvideTunnel attrs peer paddr = (++ [ DiscoveryTunnel ]) + | otherwise = id + replyPacket $ DiscoveryResult edgst $ maybe [] (offerTunnel . dpAddress) dpeer - DiscoveryResult edgst [] -> do - svcPrint $ "Discovery: " ++ show (either refDigest id edgst) ++ " not found" + DiscoveryResult _ [] -> do + -- not found + return () DiscoveryResult edgst addrs -> do let dgst = either refDigest id edgst @@ -269,56 +310,82 @@ instance Service DiscoveryService where discoveryPeer <- asks svcPeer let runAsService = runPeerService @DiscoveryService discoveryPeer - forM_ addrs $ \addr -> if - | addr == T.pack "ICE" - -> do -#ifdef ENABLE_ICE_SUPPORT - getIceConfig >>= \case - Just config -> void $ liftIO $ forkIO $ do - ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do - rinfo <- iceRemoteInfo ice - - -- Try to promote weak ref to normal one for older peers: - edgst' <- case edgst of - Left r -> return (Left r) - Right d -> refFromDigest st d >>= \case - Just r -> return (Left r) - Nothing -> return (Right d) - - res <- runExceptT $ sendToPeer discoveryPeer $ - DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo } - case res of - Right _ -> return () - Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err - + let tryAddresses = \case + DiscoveryIP ipaddr port : _ -> do + void $ liftIO $ forkIO $ do + let saddr = inetToSockAddr ( ipaddr, port ) + peer <- serverPeer server saddr runAsService $ do - let upd dp = dp { dpIceSession = Just ice } + let upd dp = dp { dpPeer = Just peer } svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } - Nothing -> do - return () + DiscoveryICE : rest -> do +#ifdef ENABLE_ICE_SUPPORT + getIceConfig >>= \case + Just config -> do + void $ liftIO $ forkIO $ do + ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do + rinfo <- iceRemoteInfo ice + + -- Try to promote weak ref to normal one for older peers: + edgst' <- case edgst of + Left r -> return (Left r) + Right d -> refFromDigest st d >>= \case + Just r -> return (Left r) + Nothing -> return (Right d) + + res <- runExceptT $ sendToPeer discoveryPeer $ + DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo } + case res of + Right _ -> return () + Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err + + runAsService $ do + let upd dp = dp { dpIceSession = Just ice } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } + + Nothing -> do #endif - return () - - | [ ipaddr, port ] <- words (T.unpack addr) -> do - void $ liftIO $ forkIO $ do - saddr <- head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- serverPeer server (addrAddress saddr) - runAsService $ do - let upd dp = dp { dpPeer = Just peer } - svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } + tryAddresses rest - | otherwise -> do - svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr + DiscoveryTunnel : _ -> do + discoverySetupTunnelResponse dgst + + addr : rest -> do + svcPrint $ "Discovery: unsupported address in result: " ++ T.unpack (toText addr) + tryAddresses rest + + [] -> svcPrint $ "Discovery: no (supported) address received for " <> show dgst + + tryAddresses addrs DiscoveryConnectionRequest conn -> do self <- svcSelf + attrs <- asks svcAttributes let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) if either refDigest id (dconnTarget conn) `elem` identityDigests self then if + -- request for us, create ICE sesssion or tunnel + | dconnTunnel conn -> do + receivedStreams >>= \case + (tunnelReader : _) -> do + tunnelWriter <- openStream + replyPacket $ DiscoveryConnectionResponse rconn + { dconnTunnel = True + } + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + server <- asks svcServer + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + + [] -> do + svcPrint $ "Discovery: missing stream on tunnel request (endpoint)" + #ifdef ENABLE_ICE_SUPPORT - -- request for us, create ICE sesssion | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer @@ -338,31 +405,72 @@ instance Service DiscoveryService where svcPrint $ "Discovery: unsupported connection request" else do - -- request to some of our peers, relay - mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal - case mbdp of + -- request to some of our peers, relay + peer <- asks svcPeer + paddr <- asks svcPeerAddress + mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal + streams <- receivedStreams + case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn Just dp - | Just dpeer <- dpPeer dp -> do - sendToPeer dpeer $ DiscoveryConnectionRequest conn + | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn -> if + | not (discoveryProvideTunnel attrs peer paddr) -> do + replyPacket $ DiscoveryConnectionResponse rconn + | fromSource : _ <- streams -> do + void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do + toTarget <- openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest conn + | otherwise -> do + svcPrint $ "Discovery: missing stream on tunnel request (relay)" + | otherwise -> do + sendToPeer dpeer $ DiscoveryConnectionRequest conn | otherwise -> svcPrint $ "Discovery: failed to relay connection request" DiscoveryConnectionResponse conn -> do self <- svcSelf + dps <- svcGet dpeers <- dgsPeers <$> svcGetGlobal + if either refDigest id (dconnSource conn) `elem` identityDigests self - then do + then do -- response to our request, try to connect to the peer server <- asks svcServer - if | Just addr <- dconnAddress conn - , [ipaddr, port] <- words (T.unpack addr) -> do - saddr <- liftIO $ head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- liftIO $ serverPeer server (addrAddress saddr) + if + | Just addr <- dconnAddress conn + , [ addrStr, portStr ] <- words (T.unpack addr) + , Just ipaddr <- readMaybe addrStr + , Just port <- readMaybe portStr + -> do + let saddr = inetToSockAddr ( ipaddr, port ) + peer <- liftIO $ serverPeer server saddr let upd dp = dp { dpPeer = Just peer } svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s } + | dconnTunnel conn + , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps) + -> do + receivedStreams >>= \case + tunnelReader : _ -> do + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + [] -> do + svcPrint $ "Discovery: missing stream in tunnel response" + liftIO $ closeStream tunnelWriter + + | Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps) + -> do + svcPrint $ "Discovery: tunnel request failed" + liftIO $ closeStream tunnelWriter + #ifdef ENABLE_ICE_SUPPORT | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp @@ -371,24 +479,49 @@ instance Service DiscoveryService where #endif | otherwise -> svcPrint $ "Discovery: connection request failed" - else do - -- response to relayed request - case M.lookup (either refDigest id $ dconnSource conn) dpeers of - Just dp | Just dpeer <- dpPeer dp -> do + else do + -- response to relayed request + streams <- receivedStreams + svcModify $ \s -> s { dpsRelayedTunnelRequests = + filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) } + + case M.lookup (either refDigest id $ dconnSource conn) dpeers of + Just dp | Just dpeer <- dpPeer dp -> if + -- successful tunnel request + | dconnTunnel conn + , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps) + , fromTarget : _ <- streams + -> liftIO $ do + toSourceVar <- newEmptyMVar + void $ forkIO $ runPeerService @DiscoveryService dpeer $ do + liftIO . putMVar toSourceVar =<< openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionResponse conn + void $ forkIO $ do + relayStream fromSource toTarget + void $ forkIO $ do + toSource <- readMVar toSourceVar + relayStream fromTarget toSource + + -- failed tunnel request + | Just ( _, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps) + -> do + liftIO $ closeStream toTarget sendToPeer dpeer $ DiscoveryConnectionResponse conn - _ -> svcPrint $ "Discovery: failed to relay connection response" + + | otherwise -> do + sendToPeer dpeer $ DiscoveryConnectionResponse conn + _ -> svcPrint $ "Discovery: failed to relay connection response" serviceNewPeer = do server <- asks svcServer peer <- asks svcPeer - let addrToText saddr = do - ( addr, port ) <- IP.fromSockAddr saddr - Just $ T.pack $ show addr <> " " <> show port addrs <- concat <$> sequence - [ catMaybes . map addrToText <$> liftIO (getServerAddresses server) + [ catMaybes . map (fmap (uncurry DiscoveryIP) . inetFromSockAddr) <$> liftIO (getServerAddresses server) #ifdef ENABLE_ICE_SUPPORT - , return [ T.pack "ICE" ] + , return [ DiscoveryICE ] #endif ] @@ -437,7 +570,7 @@ discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> R discoverySearch server dgst = do peers <- liftIO $ getCurrentPeerList server match <- forM peers $ \peer -> do - peerIdentity peer >>= \case + getPeerIdentity peer >>= \case PeerIdentityFull pid -> do return $ dgst `elem` identityDigests pid _ -> return False @@ -447,3 +580,70 @@ discoverySearch server dgst = do } forM_ peers $ \peer -> do sendToPeer peer $ DiscoverySearch $ Right dgst + + +data TunnelAddress = TunnelAddress + { tunnelVia :: Peer + , tunnelIdentity :: UnifiedIdentity + , tunnelStreamNumber :: Int + , tunnelReader :: StreamReader + , tunnelWriter :: StreamWriter + } + +instance Eq TunnelAddress where + x == y = (==) + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Ord TunnelAddress where + compare x y = compare + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Show TunnelAddress where + show tunnel = concat + [ "tunnel@" + , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel + , "/" <> show (tunnelStreamNumber tunnel) + ] + +instance PeerAddressType TunnelAddress where + sendBytesToAddress TunnelAddress {..} bytes = do + writeStream tunnelWriter bytes + + connectionToAddressClosed TunnelAddress {..} = do + closeStream tunnelWriter + +relayStream :: StreamReader -> StreamWriter -> IO () +relayStream r w = do + p <- readStreamPacket r + writeStreamPacket w p + case p of + StreamClosed {} -> return () + _ -> relayStream r w + +receiveFromTunnel :: Server -> TunnelAddress -> IO () +receiveFromTunnel server taddr = do + p <- readStreamPacket (tunnelReader taddr) + case p of + StreamData {..} -> do + receivedFromCustomAddress server taddr stpData + receiveFromTunnel server taddr + StreamClosed {} -> do + return () + + +discoverySetupTunnel :: Peer -> RefDigest -> IO () +discoverySetupTunnel via target = do + runPeerService via $ do + discoverySetupTunnelResponse target + +discoverySetupTunnelResponse :: RefDigest -> ServiceHandler DiscoveryService () +discoverySetupTunnelResponse target = do + self <- refDigest . storedRef . idData <$> svcSelf + stream <- openStream + svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest + (emptyConnection (Right self) (Right target)) + { dconnTunnel = True + } |