From b810f2dab0040e9c5a9f2344ecda67e5817893f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 1 Jun 2025 17:39:58 +0200 Subject: Discovery search using global state to ask new peers --- main/Main.hs | 9 ++------- main/Test.hs | 5 +---- src/Erebos/Discovery.hs | 39 +++++++++++++++++++++++++++++++++++++-- src/Erebos/Network.hs | 24 +++++++++++++++++++++++- test/discovery.test | 37 +++++++++++++++++++++++++++++++++++++ 5 files changed, 100 insertions(+), 14 deletions(-) diff --git a/main/Main.hs b/main/Main.hs index e9c0ae4..26f4b12 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -923,16 +923,11 @@ cmdDiscoveryInit = void $ do cmdDiscovery :: Command cmdDiscovery = void $ do - Just peer <- gets csIcePeer + server <- asks ciServer sref <- asks ciLine - eprint <- asks ciPrint case readRefDigest (BC.pack sref) of Nothing -> throwOtherError "failed to parse ref" - Just dgst -> liftIO $ do - res <- runExceptT $ sendToPeer peer $ DiscoverySearch $ Right dgst - case res of - Right _ -> return () - Left err -> eprint err + Just dgst -> discoverySearch server dgst #ifdef ENABLE_ICE_SUPPORT diff --git a/main/Test.hs b/main/Test.hs index a119b0f..c563291 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -959,8 +959,5 @@ cmdDiscoveryConnect :: Command cmdDiscoveryConnect = do [ tref ] <- asks tiParams Just dgst <- return $ readRefDigest $ encodeUtf8 tref - Just RunningServer {..} <- gets tsServer - peers <- liftIO $ getCurrentPeerList rsServer - forM_ peers $ \peer -> do - sendToPeer peer $ DiscoverySearch $ Right dgst + discoverySearch rsServer dgst diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 0f194a9..3cb55bd 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -4,6 +4,8 @@ module Erebos.Discovery ( DiscoveryService(..), DiscoveryAttributes(..), DiscoveryConnection(..), + + discoverySearch, ) where import Control.Concurrent @@ -12,9 +14,13 @@ import Control.Monad.Except import Control.Monad.Reader import Data.IP qualified as IP +import Data.List import Data.Map.Strict (Map) import Data.Map.Strict qualified as M import Data.Maybe +import Data.Proxy +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) import Data.Text qualified as T import Data.Word @@ -165,6 +171,7 @@ data DiscoveryPeerState = DiscoveryPeerState data DiscoveryGlobalState = DiscoveryGlobalState { dgsPeers :: Map RefDigest DiscoveryPeer + , dgsSearchingFor :: Set RefDigest } instance Service DiscoveryService where @@ -181,6 +188,7 @@ instance Service DiscoveryService where type ServiceGlobalState DiscoveryService = DiscoveryGlobalState emptyServiceGlobalState _ = DiscoveryGlobalState { dgsPeers = M.empty + , dgsSearchingFor = S.empty } serviceHandler msg = case fromStored msg of @@ -312,7 +320,7 @@ instance Service DiscoveryService where DiscoveryConnectionRequest conn -> do self <- svcSelf let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) - if either refDigest id (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + if either refDigest id (dconnTarget conn) `elem` identityDigests self then if #ifdef ENABLE_ICE_SUPPORT -- request for us, create ICE sesssion @@ -347,7 +355,7 @@ instance Service DiscoveryService where DiscoveryConnectionResponse conn -> do self <- svcSelf dpeers <- dgsPeers <$> svcGetGlobal - if either refDigest id (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + if either refDigest id (dconnSource conn) `elem` identityDigests self then do -- response to our request, try to connect to the peer server <- asks svcServer @@ -389,5 +397,32 @@ instance Service DiscoveryService where #endif ] + pid <- asks svcPeerIdentity + gs <- svcGetGlobal + let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid) + svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor } + when (not $ null addrs) $ do sendToPeer peer $ DiscoverySelf addrs Nothing + forM_ searchingFor $ \dgst -> do + sendToPeer peer $ DiscoverySearch (Right dgst) + + +identityDigests :: Foldable f => Identity f -> [ RefDigest ] +identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid + + +discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> m () +discoverySearch server dgst = do + peers <- liftIO $ getCurrentPeerList server + match <- forM peers $ \peer -> do + peerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ dgst `elem` identityDigests pid + _ -> return False + when (not $ or match) $ do + modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s + { dgsSearchingFor = S.insert dgst $ dgsSearchingFor s + } + forM_ peers $ \peer -> do + sendToPeer peer $ DiscoverySearch $ Right dgst diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 0baeeb1..9c96bab 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -29,6 +29,7 @@ module Erebos.Network ( sendToPeerStored, sendManyToPeerStored, sendToPeerWith, runPeerService, + modifyServiceGlobalState, discoveryPort, ) where @@ -962,7 +963,7 @@ sendToPeerWith peer fobj = do Left err -> throwError $ fromErebosError err -lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) | Just (Refl :: s :~: t) <- eqT = Just (service, attr) | otherwise = lookupService proxy rest @@ -1018,6 +1019,27 @@ runPeerServiceOn mbservice newStreams peer handler = liftIO $ do _ -> atomically $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +modifyServiceGlobalState + :: forall s a m e proxy. (Service s, MonadIO m, MonadError e m, FromErebosError e) + => Server -> proxy s + -> (ServiceGlobalState s -> ( ServiceGlobalState s, a )) + -> m a +modifyServiceGlobalState server proxy f = do + let svc = serviceID proxy + case lookupService proxy (serverServices server) of + Just ( service, _ ) -> do + liftIO $ atomically $ do + global <- takeTMVar (serverServiceStates server) + ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of + SomeServiceGlobalState (_ :: Proxy gs) gs -> do + (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT + let ( gs', res ) = f gs + return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res ) + putTMVar (serverServiceStates server) global' + return res + Nothing -> do + throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'" + foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) diff --git a/test/discovery.test b/test/discovery.test index 69d73df..3be6275 100644 --- a/test/discovery.test +++ b/test/discovery.test @@ -52,3 +52,40 @@ test ManualDiscovery: send "stop-server" to p for p in [ pd, p1, p2 ]: expect /stop-server-done/ from p + + # Test delayed discovery with new peer + for id in [ p1obase ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services" to p + + with p1: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p1.node.ip} 29665/ + /peer [12] id Device1 Owner1/ + + send "discovery-connect $id" to p2 + + with p2: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p2.node.ip} 29665/ + /peer [12] id Device2 Owner2/ + + expect from p1: + /peer [0-9]+ addr ${p2.node.ip} 29665/ + /peer [0-9]+ id Device2 Owner2/ + expect from p2: + /peer [0-9]+ addr ${p1.node.ip} 29665/ + /peer [0-9]+ id Device1 Owner1/ + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p -- cgit v1.2.3