diff options
Diffstat (limited to 'src/Erebos/Discovery.hs')
-rw-r--r-- | src/Erebos/Discovery.hs | 484 |
1 files changed, 355 insertions, 129 deletions
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 48df9c3..2fb0ffe 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -1,6 +1,11 @@ +{-# LANGUAGE CPP #-} + module Erebos.Discovery ( DiscoveryService(..), - DiscoveryConnection(..) + DiscoveryAttributes(..), + DiscoveryConnection(..), + + discoverySearch, ) where import Control.Concurrent @@ -8,196 +13,344 @@ import Control.Monad import Control.Monad.Except import Control.Monad.Reader +import Data.IP qualified as IP +import Data.List import Data.Map.Strict (Map) -import qualified Data.Map.Strict as M +import Data.Map.Strict qualified as M import Data.Maybe +import Data.Proxy +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) -import qualified Data.Text as T +import Data.Text qualified as T +import Data.Word import Network.Socket +#ifdef ENABLE_ICE_SUPPORT import Erebos.ICE +#endif import Erebos.Identity import Erebos.Network +import Erebos.Object import Erebos.Service -import Erebos.Storage +import Erebos.Storable -keepaliveSeconds :: Int -keepaliveSeconds = 20 +#ifndef ENABLE_ICE_SUPPORT +type IceConfig = () +type IceSession = () +type IceRemoteInfo = Stored Object +#endif -data DiscoveryService = DiscoverySelf Text Int - | DiscoveryAcknowledged Text - | DiscoverySearch Ref - | DiscoveryResult Ref (Maybe Text) - | DiscoveryConnectionRequest DiscoveryConnection - | DiscoveryConnectionResponse DiscoveryConnection +data DiscoveryService + = DiscoverySelf [ Text ] (Maybe Int) + | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) + | DiscoverySearch (Either Ref RefDigest) + | DiscoveryResult (Either Ref RefDigest) [ Text ] + | DiscoveryConnectionRequest DiscoveryConnection + | DiscoveryConnectionResponse DiscoveryConnection + +data DiscoveryAttributes = DiscoveryAttributes + { discoveryStunPort :: Maybe Word16 + , discoveryStunServer :: Maybe Text + , discoveryTurnPort :: Maybe Word16 + , discoveryTurnServer :: Maybe Text + } + +defaultDiscoveryAttributes :: DiscoveryAttributes +defaultDiscoveryAttributes = DiscoveryAttributes + { discoveryStunPort = Nothing + , discoveryStunServer = Nothing + , discoveryTurnPort = Nothing + , discoveryTurnServer = Nothing + } data DiscoveryConnection = DiscoveryConnection - { dconnSource :: Ref - , dconnTarget :: Ref + { dconnSource :: Either Ref RefDigest + , dconnTarget :: Either Ref RefDigest , dconnAddress :: Maybe Text - , dconnIceSession :: Maybe IceRemoteInfo + , dconnIceInfo :: Maybe IceRemoteInfo } -emptyConnection :: Ref -> Ref -> DiscoveryConnection -emptyConnection source target = DiscoveryConnection source target Nothing Nothing +emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConnection +emptyConnection dconnSource dconnTarget = DiscoveryConnection {..} + where + dconnAddress = Nothing + dconnIceInfo = Nothing instance Storable DiscoveryService where store' x = storeRec $ do case x of - DiscoverySelf addr priority -> do - storeText "self" addr - storeInt "priority" priority - DiscoveryAcknowledged addr -> do - storeText "ack" addr - DiscoverySearch ref -> storeRawRef "search" ref - DiscoveryResult ref addr -> do - storeRawRef "result" ref - storeMbText "address" addr + DiscoverySelf addrs priority -> do + mapM_ (storeText "self") addrs + mapM_ (storeInt "priority") priority + DiscoveryAcknowledged addrs stunServer stunPort turnServer turnPort -> do + if null addrs then storeEmpty "ack" + else mapM_ (storeText "ack") addrs + storeMbText "stun-server" stunServer + storeMbInt "stun-port" stunPort + storeMbText "turn-server" turnServer + storeMbInt "turn-port" turnPort + DiscoverySearch edgst -> either (storeRawRef "search") (storeRawWeak "search") edgst + DiscoveryResult edgst addr -> do + either (storeRawRef "result") (storeRawWeak "result") edgst + mapM_ (storeText "address") addr DiscoveryConnectionRequest conn -> storeConnection "request" conn DiscoveryConnectionResponse conn -> storeConnection "response" conn - where storeConnection ctype conn = do - storeText "connection" $ ctype - storeRawRef "source" $ dconnSource conn - storeRawRef "target" $ dconnTarget conn - storeMbText "address" $ dconnAddress conn - storeMbRef "ice-session" $ dconnIceSession conn + where + storeConnection ctype DiscoveryConnection {..} = do + storeText "connection" $ ctype + either (storeRawRef "source") (storeRawWeak "source") dconnSource + either (storeRawRef "target") (storeRawWeak "target") dconnTarget + storeMbText "address" dconnAddress + storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum - [ DiscoverySelf - <$> loadText "self" - <*> loadInt "priority" - , DiscoveryAcknowledged - <$> loadText "ack" - , DiscoverySearch <$> loadRawRef "search" + [ do + addrs <- loadTexts "self" + guard (not $ null addrs) + DiscoverySelf addrs + <$> loadMbInt "priority" + , do + addrs <- loadTexts "ack" + mbEmpty <- loadMbEmpty "ack" + guard (not (null addrs) || isJust mbEmpty) + DiscoveryAcknowledged + <$> pure addrs + <*> loadMbText "stun-server" + <*> loadMbInt "stun-port" + <*> loadMbText "turn-server" + <*> loadMbInt "turn-port" + , DiscoverySearch <$> msum + [ Left <$> loadRawRef "search" + , Right <$> loadRawWeak "search" + ] , DiscoveryResult - <$> loadRawRef "result" - <*> loadMbText "address" + <$> msum + [ Left <$> loadRawRef "result" + , Right <$> loadRawWeak "result" + ] + <*> loadTexts "address" , loadConnection "request" DiscoveryConnectionRequest , loadConnection "response" DiscoveryConnectionResponse ] - where loadConnection ctype ctor = do - ctype' <- loadText "connection" - guard $ ctype == ctype' - return . ctor =<< DiscoveryConnection - <$> loadRawRef "source" - <*> loadRawRef "target" - <*> loadMbText "address" - <*> loadMbRef "ice-session" + where + loadConnection ctype ctor = do + ctype' <- loadText "connection" + guard $ ctype == ctype' + dconnSource <- msum + [ Left <$> loadRawRef "source" + , Right <$> loadRawWeak "source" + ] + dconnTarget <- msum + [ Left <$> loadRawRef "target" + , Right <$> loadRawWeak "target" + ] + dconnAddress <- loadMbText "address" + dconnIceInfo <- loadMbRef "ice-info" + return $ ctor DiscoveryConnection {..} data DiscoveryPeer = DiscoveryPeer { dpPriority :: Int , dpPeer :: Maybe Peer - , dpAddress :: Maybe Text + , dpAddress :: [ Text ] , dpIceSession :: Maybe IceSession } +emptyPeer :: DiscoveryPeer +emptyPeer = DiscoveryPeer + { dpPriority = 0 + , dpPeer = Nothing + , dpAddress = [] + , dpIceSession = Nothing + } + +data DiscoveryPeerState = DiscoveryPeerState + { dpsStunServer :: Maybe ( Text, Word16 ) + , dpsTurnServer :: Maybe ( Text, Word16 ) + , dpsIceConfig :: Maybe IceConfig + } + +data DiscoveryGlobalState = DiscoveryGlobalState + { dgsPeers :: Map RefDigest DiscoveryPeer + , dgsSearchingFor :: Set RefDigest + } + instance Service DiscoveryService where - serviceID _ = mkServiceID "dd59c89c-69cc-4703-b75b-4ddcd4b3c23b" + serviceID _ = mkServiceID "dd59c89c-69cc-4703-b75b-4ddcd4b3c23c" + + type ServiceAttributes DiscoveryService = DiscoveryAttributes + defaultServiceAttributes _ = defaultDiscoveryAttributes - type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer - emptyServiceGlobalState _ = M.empty + type ServiceState DiscoveryService = DiscoveryPeerState + emptyServiceState _ = DiscoveryPeerState + { dpsStunServer = Nothing + , dpsTurnServer = Nothing + , dpsIceConfig = Nothing + } + + type ServiceGlobalState DiscoveryService = DiscoveryGlobalState + emptyServiceGlobalState _ = DiscoveryGlobalState + { dgsPeers = M.empty + , dgsSearchingFor = S.empty + } serviceHandler msg = case fromStored msg of - DiscoverySelf addr priority -> do + DiscoverySelf addrs priority -> do pid <- asks svcPeerIdentity peer <- asks svcPeer let insertHelper new old | dpPriority new > dpPriority old = new | otherwise = old - mbaddr <- case words (T.unpack addr) of - [ipaddr, port] | DatagramAddress paddr <- peerAddress peer -> do + matchedAddrs <- fmap catMaybes $ forM addrs $ \addr -> if + | addr == T.pack "ICE" -> do + return $ Just addr + + | [ ipaddr, port ] <- words (T.unpack addr) + , DatagramAddress paddr <- peerAddress peer -> do saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) return $ if paddr == addrAddress saddr then Just addr else Nothing - _ -> return Nothing - forM_ (idDataF =<< unfoldOwners pid) $ \s -> - svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) $ - DiscoveryPeer priority (Just peer) mbaddr Nothing - replyPacket $ DiscoveryAcknowledged $ fromMaybe (T.pack "ICE") mbaddr - - DiscoveryAcknowledged addr -> do - when (addr == T.pack "ICE") $ do - -- keep-alive packet from behind NAT - peer <- asks svcPeer - liftIO $ void $ forkIO $ do - threadDelay (keepaliveSeconds * 1000 * 1000) - res <- runExceptT $ sendToPeer peer $ DiscoverySelf addr 0 - case res of - Right _ -> return () - Left err -> putStrLn $ "Discovery: failed to send keep-alive: " ++ err - - DiscoverySearch ref -> do - addr <- M.lookup (refDigest ref) <$> svcGetGlobal - replyPacket $ DiscoveryResult ref $ fromMaybe (T.pack "ICE") . dpAddress <$> addr - - DiscoveryResult ref Nothing -> do - svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found" - - DiscoveryResult ref (Just addr) -> do + + | otherwise -> return Nothing + + forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do + let dp = DiscoveryPeer + { dpPriority = fromMaybe 0 priority + , dpPeer = Just peer + , dpAddress = addrs + , dpIceSession = Nothing + } + svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s } + attrs <- asks svcAttributes + replyPacket $ DiscoveryAcknowledged matchedAddrs + (discoveryStunServer attrs) + (discoveryStunPort attrs) + (discoveryTurnServer attrs) + (discoveryTurnPort attrs) + + DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do + paddr <- asks (peerAddress . svcPeer) >>= return . \case + (DatagramAddress saddr) -> case IP.fromSockAddr saddr of + Just (IP.IPv6 ipv6, _) + | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 + -> Just $ T.pack $ show (IP.toIPv4w ipv4) + Just (addr, _) + -> Just $ T.pack $ show addr + _ -> Nothing + _ -> Nothing + + let toIceServer Nothing Nothing = Nothing + toIceServer Nothing (Just port) = ( , port) <$> paddr + toIceServer (Just server) Nothing = Just ( server, 0 ) + toIceServer (Just server) (Just port) = Just ( server, port ) + + svcModify $ \s -> s + { dpsStunServer = toIceServer stunServer stunPort + , dpsTurnServer = toIceServer turnServer turnPort + } + + DiscoverySearch edgst -> do + dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal + replyPacket $ DiscoveryResult edgst $ maybe [] dpAddress dpeer + + DiscoveryResult edgst [] -> do + svcPrint $ "Discovery: " ++ show (either refDigest id edgst) ++ " not found" + + DiscoveryResult edgst addrs -> do + let dgst = either refDigest id edgst -- TODO: check if we really requested that server <- asks svcServer - if addr == T.pack "ICE" - then do - self <- svcSelf - peer <- asks svcPeer - ice <- liftIO $ iceCreate PjIceSessRoleControlling $ \ice -> do - rinfo <- iceRemoteInfo ice - res <- runExceptT $ sendToPeer peer $ - DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceSession = Just rinfo } - case res of - Right _ -> return () - Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err - - svcModifyGlobal $ M.insert (refDigest ref) $ - DiscoveryPeer 0 Nothing Nothing (Just ice) - else do - case words (T.unpack addr) of - [ipaddr, port] -> do - saddr <- liftIO $ head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- liftIO $ serverPeer server (addrAddress saddr) - svcModifyGlobal $ M.insert (refDigest ref) $ - DiscoveryPeer 0 (Just peer) Nothing Nothing + st <- getStorage + self <- svcSelf + discoveryPeer <- asks svcPeer + let runAsService = runPeerService @DiscoveryService discoveryPeer + + forM_ addrs $ \addr -> if + | addr == T.pack "ICE" + -> do +#ifdef ENABLE_ICE_SUPPORT + getIceConfig >>= \case + Just config -> void $ liftIO $ forkIO $ do + ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do + rinfo <- iceRemoteInfo ice + + -- Try to promote weak ref to normal one for older peers: + edgst' <- case edgst of + Left r -> return (Left r) + Right d -> refFromDigest st d >>= \case + Just r -> return (Left r) + Nothing -> return (Right d) + + res <- runExceptT $ sendToPeer discoveryPeer $ + DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo } + case res of + Right _ -> return () + Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err - _ -> svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr + runAsService $ do + let upd dp = dp { dpIceSession = Just ice } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } + + Nothing -> do + return () +#endif + return () + + | [ ipaddr, port ] <- words (T.unpack addr) -> do + void $ liftIO $ forkIO $ do + saddr <- head <$> + getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) + peer <- serverPeer server (addrAddress saddr) + runAsService $ do + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } + + | otherwise -> do + svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr DiscoveryConnectionRequest conn -> do self <- svcSelf let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) - if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) - then do - -- request for us, create ICE sesssion + if either refDigest id (dconnTarget conn) `elem` identityDigests self + then if +#ifdef ENABLE_ICE_SUPPORT + -- request for us, create ICE sesssion + | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer - liftIO $ void $ iceCreate PjIceSessRoleControlled $ \ice -> do - rinfo <- iceRemoteInfo ice - res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceSession = Just rinfo } - case res of - Right _ -> do - case dconnIceSession conn of - Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice - Nothing -> putStrLn $ "Discovery: connection request without ICE remote info" - Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err + getIceConfig >>= \case + Just config -> do + liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do + rinfo <- iceRemoteInfo ice + res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo } + case res of + Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice + Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err + Nothing -> do + return () +#endif - else do + | otherwise -> do + svcPrint $ "Discovery: unsupported connection request" + + else do -- request to some of our peers, relay - mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal + mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn - Just dp | Just addr <- dpAddress dp -> do - replyPacket $ DiscoveryConnectionResponse rconn { dconnAddress = Just addr } - | Just dpeer <- dpPeer dp -> do - sendToPeer dpeer $ DiscoveryConnectionRequest conn - | otherwise -> svcPrint $ "Discovery: failed to relay connection request" + Just dp + | Just dpeer <- dpPeer dp -> do + sendToPeer dpeer $ DiscoveryConnectionRequest conn + | otherwise -> svcPrint $ "Discovery: failed to relay connection request" DiscoveryConnectionResponse conn -> do self <- svcSelf - dpeers <- svcGetGlobal - if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + dpeers <- dgsPeers <$> svcGetGlobal + if either refDigest id (dconnSource conn) `elem` identityDigests self then do -- response to our request, try to connect to the peer server <- asks svcServer @@ -206,18 +359,91 @@ instance Service DiscoveryService where saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) peer <- liftIO $ serverPeer server (addrAddress saddr) - svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $ - DiscoveryPeer 0 (Just peer) Nothing Nothing + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s + { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s } - | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers +#ifdef ENABLE_ICE_SUPPORT + | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp - , Just rinfo <- dconnIceSession conn -> do + , Just rinfo <- dconnIceInfo conn -> do liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice +#endif | otherwise -> svcPrint $ "Discovery: connection request failed" else do -- response to relayed request - case M.lookup (refDigest $ dconnSource conn) dpeers of + case M.lookup (either refDigest id $ dconnSource conn) dpeers of Just dp | Just dpeer <- dpPeer dp -> do sendToPeer dpeer $ DiscoveryConnectionResponse conn _ -> svcPrint $ "Discovery: failed to relay connection response" + + serviceNewPeer = do + server <- asks svcServer + peer <- asks svcPeer + + let addrToText saddr = do + ( addr, port ) <- IP.fromSockAddr saddr + Just $ T.pack $ show addr <> " " <> show port + addrs <- concat <$> sequence + [ catMaybes . map addrToText <$> liftIO (getServerAddresses server) +#ifdef ENABLE_ICE_SUPPORT + , return [ T.pack "ICE" ] +#endif + ] + + pid <- asks svcPeerIdentity + gs <- svcGetGlobal + let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid) + svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor } + + when (not $ null addrs) $ do + sendToPeer peer $ DiscoverySelf addrs Nothing + forM_ searchingFor $ \dgst -> do + sendToPeer peer $ DiscoverySearch (Right dgst) + +#ifdef ENABLE_ICE_SUPPORT + serviceStopServer _ _ _ pstates = do + forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do + mapM_ iceStopThread dpsIceConfig +#endif + + +identityDigests :: Foldable f => Identity f -> [ RefDigest ] +identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid + + +getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig) +getIceConfig = do + dpsIceConfig <$> svcGet >>= \case + Just cfg -> return $ Just cfg + Nothing -> do +#ifdef ENABLE_ICE_SUPPORT + stun <- dpsStunServer <$> svcGet + turn <- dpsTurnServer <$> svcGet + liftIO (iceCreateConfig stun turn) >>= \case + Just cfg -> do + svcModify $ \s -> s { dpsIceConfig = Just cfg } + return $ Just cfg + Nothing -> do + svcPrint $ "Discovery: failed to create ICE config" + return Nothing +#else + return Nothing +#endif + + +discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> m () +discoverySearch server dgst = do + peers <- liftIO $ getCurrentPeerList server + match <- forM peers $ \peer -> do + peerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ dgst `elem` identityDigests pid + _ -> return False + when (not $ or match) $ do + modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s + { dgsSearchingFor = S.insert dgst $ dgsSearchingFor s + } + forM_ peers $ \peer -> do + sendToPeer peer $ DiscoverySearch $ Right dgst |