diff options
Diffstat (limited to 'src/Erebos/Discovery.hs')
-rw-r--r-- | src/Erebos/Discovery.hs | 278 |
1 files changed, 177 insertions, 101 deletions
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index f156c85..d098f98 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -3,7 +3,9 @@ module Erebos.Discovery ( DiscoveryService(..), DiscoveryAttributes(..), - DiscoveryConnection(..) + DiscoveryConnection(..), + + discoverySearch, ) where import Control.Concurrent @@ -12,9 +14,13 @@ import Control.Monad.Except import Control.Monad.Reader import Data.IP qualified as IP +import Data.List import Data.Map.Strict (Map) import Data.Map.Strict qualified as M import Data.Maybe +import Data.Proxy +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) import Data.Text qualified as T import Data.Word @@ -30,6 +36,13 @@ import Erebos.Service import Erebos.Storage +#ifndef ENABLE_ICE_SUPPORT +type IceConfig = () +type IceSession = () +type IceRemoteInfo = Stored Object +#endif + + data DiscoveryService = DiscoverySelf [ Text ] (Maybe Int) | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) @@ -57,11 +70,7 @@ data DiscoveryConnection = DiscoveryConnection { dconnSource :: Ref , dconnTarget :: Ref , dconnAddress :: Maybe Text -#ifdef ENABLE_ICE_SUPPORT , dconnIceInfo :: Maybe IceRemoteInfo -#else - , dconnIceInfo :: Maybe (Stored Object) -#endif } emptyConnection :: Ref -> Ref -> DiscoveryConnection @@ -90,12 +99,13 @@ instance Storable DiscoveryService where DiscoveryConnectionRequest conn -> storeConnection "request" conn DiscoveryConnectionResponse conn -> storeConnection "response" conn - where storeConnection ctype conn = do - storeText "connection" $ ctype - storeRawRef "source" $ dconnSource conn - storeRawRef "target" $ dconnTarget conn - storeMbText "address" $ dconnAddress conn - storeMbRef "ice-info" $ dconnIceInfo conn + where + storeConnection ctype DiscoveryConnection {..} = do + storeText "connection" $ ctype + storeRawRef "source" dconnSource + storeRawRef "target" dconnTarget + storeMbText "address" dconnAddress + storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum [ do @@ -120,22 +130,40 @@ instance Storable DiscoveryService where , loadConnection "request" DiscoveryConnectionRequest , loadConnection "response" DiscoveryConnectionResponse ] - where loadConnection ctype ctor = do - ctype' <- loadText "connection" - guard $ ctype == ctype' - return . ctor =<< DiscoveryConnection - <$> loadRawRef "source" - <*> loadRawRef "target" - <*> loadMbText "address" - <*> loadMbRef "ice-info" + where + loadConnection ctype ctor = do + ctype' <- loadText "connection" + guard $ ctype == ctype' + dconnSource <- loadRawRef "source" + dconnTarget <- loadRawRef "target" + dconnAddress <- loadMbText "address" + dconnIceInfo <- loadMbRef "ice-info" + return $ ctor DiscoveryConnection {..} data DiscoveryPeer = DiscoveryPeer { dpPriority :: Int , dpPeer :: Maybe Peer , dpAddress :: [ Text ] -#ifdef ENABLE_ICE_SUPPORT , dpIceSession :: Maybe IceSession -#endif + } + +emptyPeer :: DiscoveryPeer +emptyPeer = DiscoveryPeer + { dpPriority = 0 + , dpPeer = Nothing + , dpAddress = [] + , dpIceSession = Nothing + } + +data DiscoveryPeerState = DiscoveryPeerState + { dpsStunServer :: Maybe ( Text, Word16 ) + , dpsTurnServer :: Maybe ( Text, Word16 ) + , dpsIceConfig :: Maybe IceConfig + } + +data DiscoveryGlobalState = DiscoveryGlobalState + { dgsPeers :: Map RefDigest DiscoveryPeer + , dgsSearchingFor :: Set RefDigest } instance Service DiscoveryService where @@ -144,13 +172,18 @@ instance Service DiscoveryService where type ServiceAttributes DiscoveryService = DiscoveryAttributes defaultServiceAttributes _ = defaultDiscoveryAttributes -#ifdef ENABLE_ICE_SUPPORT - type ServiceState DiscoveryService = Maybe IceConfig - emptyServiceState _ = Nothing -#endif + type ServiceState DiscoveryService = DiscoveryPeerState + emptyServiceState _ = DiscoveryPeerState + { dpsStunServer = Nothing + , dpsTurnServer = Nothing + , dpsIceConfig = Nothing + } - type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer - emptyServiceGlobalState _ = M.empty + type ServiceGlobalState DiscoveryService = DiscoveryGlobalState + emptyServiceGlobalState _ = DiscoveryGlobalState + { dgsPeers = M.empty + , dgsSearchingFor = S.empty + } serviceHandler msg = case fromStored msg of DiscoverySelf addrs priority -> do @@ -171,15 +204,14 @@ instance Service DiscoveryService where | otherwise -> return Nothing - forM_ (idDataF =<< unfoldOwners pid) $ \s -> - svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer - { dpPriority = fromMaybe 0 priority - , dpPeer = Just peer - , dpAddress = addrs -#ifdef ENABLE_ICE_SUPPORT - , dpIceSession = Nothing -#endif - } + forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do + let dp = DiscoveryPeer + { dpPriority = fromMaybe 0 priority + , dpPeer = Just peer + , dpAddress = addrs + , dpIceSession = Nothing + } + svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s } attrs <- asks svcAttributes replyPacket $ DiscoveryAcknowledged matchedAddrs (discoveryStunServer attrs) @@ -188,7 +220,6 @@ instance Service DiscoveryService where (discoveryTurnPort attrs) DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do -#ifdef ENABLE_ICE_SUPPORT paddr <- asks (peerAddress . svcPeer) >>= return . \case (DatagramAddress saddr) -> case IP.fromSockAddr saddr of Just (IP.IPv6 ipv6, _) @@ -204,100 +235,90 @@ instance Service DiscoveryService where toIceServer (Just server) Nothing = Just ( server, 0 ) toIceServer (Just server) (Just port) = Just ( server, port ) - cfg <- liftIO $ iceCreateConfig - (toIceServer stunServer stunPort) - (toIceServer turnServer turnPort) - svcSet cfg -#endif - return () + svcModify $ \s -> s + { dpsStunServer = toIceServer stunServer stunPort + , dpsTurnServer = toIceServer turnServer turnPort + } DiscoverySearch ref -> do - dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal + dpeer <- M.lookup (refDigest ref) . dgsPeers <$> svcGetGlobal replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer DiscoveryResult ref [] -> do svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found" DiscoveryResult ref addrs -> do + let dgst = refDigest ref -- TODO: check if we really requested that server <- asks svcServer self <- svcSelf - mbIceConfig <- svcGet discoveryPeer <- asks svcPeer let runAsService = runPeerService @DiscoveryService discoveryPeer - liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if + forM_ addrs $ \addr -> if | addr == T.pack "ICE" -#ifdef ENABLE_ICE_SUPPORT - , Just config <- mbIceConfig -> do - ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do - rinfo <- iceRemoteInfo ice - res <- runExceptT $ sendToPeer discoveryPeer $ - DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo } - case res of - Right _ -> return () - Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err - - runAsService $ do - svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer - { dpPriority = 0 - , dpPeer = Nothing - , dpAddress = [] - , dpIceSession = Just ice - } -#else - -> do - return () +#ifdef ENABLE_ICE_SUPPORT + getIceConfig >>= \case + Just config -> void $ liftIO $ forkIO $ do + ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do + rinfo <- iceRemoteInfo ice + + res <- runExceptT $ sendToPeer discoveryPeer $ + DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo } + case res of + Right _ -> return () + Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err + + runAsService $ do + let upd dp = dp { dpIceSession = Just ice } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } + + Nothing -> do + return () #endif + return () | [ ipaddr, port ] <- words (T.unpack addr) -> do - saddr <- head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- serverPeer server (addrAddress saddr) - runAsService $ do - svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer - { dpPriority = 0 - , dpPeer = Just peer - , dpAddress = [] -#ifdef ENABLE_ICE_SUPPORT - , dpIceSession = Nothing -#endif - } + void $ liftIO $ forkIO $ do + saddr <- head <$> + getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) + peer <- serverPeer server (addrAddress saddr) + runAsService $ do + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } | otherwise -> do - runAsService $ do - svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr + svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr DiscoveryConnectionRequest conn -> do self <- svcSelf let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) - if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) - then do + if refDigest (dconnTarget conn) `elem` identityDigests self + then if #ifdef ENABLE_ICE_SUPPORT - -- request for us, create ICE sesssion + -- request for us, create ICE sesssion + | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer - svcGet >>= \case + getIceConfig >>= \case Just config -> do liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do rinfo <- iceRemoteInfo ice res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo } case res of - Right _ -> do - case dconnIceInfo conn of - Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice - Nothing -> putStrLn $ "Discovery: connection request without ICE remote info" + Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err Nothing -> do - svcPrint $ "Discovery: ICE request from peer without ICE configuration" -#else - return () + return () #endif - else do + | otherwise -> do + svcPrint $ "Discovery: unsupported connection request" + + else do -- request to some of our peers, relay - mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal + mbdp <- M.lookup (refDigest $ dconnTarget conn) . dgsPeers <$> svcGetGlobal case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn Just dp @@ -307,29 +328,28 @@ instance Service DiscoveryService where DiscoveryConnectionResponse conn -> do self <- svcSelf - dpeers <- svcGetGlobal - if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + dpeers <- dgsPeers <$> svcGetGlobal + if refDigest (dconnSource conn) `elem` identityDigests self then do -- response to our request, try to connect to the peer -#ifdef ENABLE_ICE_SUPPORT server <- asks svcServer if | Just addr <- dconnAddress conn , [ipaddr, port] <- words (T.unpack addr) -> do saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) peer <- liftIO $ serverPeer server (addrAddress saddr) - svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $ - DiscoveryPeer 0 (Just peer) [] Nothing + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s + { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (refDigest $ dconnTarget conn) $ dgsPeers s } +#ifdef ENABLE_ICE_SUPPORT | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp , Just rinfo <- dconnIceInfo conn -> do liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice +#endif | otherwise -> svcPrint $ "Discovery: connection request failed" -#else - return () -#endif else do -- response to relayed request case M.lookup (refDigest $ dconnSource conn) dpeers of @@ -340,6 +360,7 @@ instance Service DiscoveryService where serviceNewPeer = do server <- asks svcServer peer <- asks svcPeer + st <- getStorage let addrToText saddr = do ( addr, port ) <- IP.fromSockAddr saddr @@ -351,5 +372,60 @@ instance Service DiscoveryService where #endif ] + pid <- asks svcPeerIdentity + gs <- svcGetGlobal + let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid) + svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor } + when (not $ null addrs) $ do sendToPeer peer $ DiscoverySelf addrs Nothing + forM_ searchingFor $ \dgst -> do + liftIO (refFromDigest st dgst) >>= \case + Just ref -> sendToPeer peer $ DiscoverySearch ref + Nothing -> return () + +#ifdef ENABLE_ICE_SUPPORT + serviceStopServer _ _ _ pstates = do + forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do + mapM_ iceStopThread dpsIceConfig +#endif + + +identityDigests :: Foldable f => Identity f -> [ RefDigest ] +identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid + + +getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig) +getIceConfig = do + dpsIceConfig <$> svcGet >>= \case + Just cfg -> return $ Just cfg + Nothing -> do +#ifdef ENABLE_ICE_SUPPORT + stun <- dpsStunServer <$> svcGet + turn <- dpsTurnServer <$> svcGet + liftIO (iceCreateConfig stun turn) >>= \case + Just cfg -> do + svcModify $ \s -> s { dpsIceConfig = Just cfg } + return $ Just cfg + Nothing -> do + svcPrint $ "Discovery: failed to create ICE config" + return Nothing +#else + return Nothing +#endif + + +discoverySearch :: (MonadIO m, MonadError String m) => Server -> Ref -> m () +discoverySearch server ref = do + peers <- liftIO $ getCurrentPeerList server + match <- forM peers $ \peer -> do + peerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ refDigest ref `elem` identityDigests pid + _ -> return False + when (not $ or match) $ do + modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s + { dgsSearchingFor = S.insert (refDigest ref) $ dgsSearchingFor s + } + forM_ peers $ \peer -> do + sendToPeer peer $ DiscoverySearch ref |