diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2025-06-01 17:39:58 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-06-12 20:53:10 +0200 |
commit | e8f3cbe08071c0507abafa76d0bf9d32908bbd7e (patch) | |
tree | 275347fffda7313b81ae3cfffc2df08d5cf20d17 | |
parent | 0cc2b3ee83e46608495f4fb92ea8c2ca48b4e306 (diff) |
Discovery search using global state to ask new peers
-rw-r--r-- | main/Main.hs | 4 | ||||
-rw-r--r-- | main/Test.hs | 5 | ||||
-rw-r--r-- | src/Erebos/Discovery.hs | 42 | ||||
-rw-r--r-- | src/Erebos/Network.hs | 24 | ||||
-rw-r--r-- | src/Erebos/Storage.hs | 2 | ||||
-rw-r--r-- | test/discovery.test | 37 |
6 files changed, 104 insertions, 10 deletions
diff --git a/main/Main.hs b/main/Main.hs index e055275..59ea7c3 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -893,14 +893,14 @@ cmdDiscoveryInit = void $ do cmdDiscovery :: Command cmdDiscovery = void $ do - Just peer <- gets csIcePeer + server <- asks ciServer st <- getStorage sref <- asks ciLine eprint <- asks ciPrint liftIO $ readRef st (BC.pack sref) >>= \case Nothing -> error "ref does not exist" Just ref -> do - res <- runExceptT $ sendToPeer peer $ DiscoverySearch ref + res <- runExceptT $ discoverySearch server ref case res of Right _ -> return () Left err -> eprint err diff --git a/main/Test.hs b/main/Test.hs index 0181575..75eaaaf 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -880,8 +880,5 @@ cmdDiscoveryConnect = do st <- asks tiStorage [ tref ] <- asks tiParams Just ref <- liftIO $ readRef st $ encodeUtf8 tref - Just RunningServer {..} <- gets tsServer - peers <- liftIO $ getCurrentPeerList rsServer - forM_ peers $ \peer -> do - sendToPeer peer $ DiscoverySearch ref + discoverySearch rsServer ref diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 63400cb..1691ad9 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -4,6 +4,8 @@ module Erebos.Discovery ( DiscoveryService(..), DiscoveryAttributes(..), DiscoveryConnection(..), + + discoverySearch, ) where import Control.Concurrent @@ -12,9 +14,13 @@ import Control.Monad.Except import Control.Monad.Reader import Data.IP qualified as IP +import Data.List import Data.Map.Strict (Map) import Data.Map.Strict qualified as M import Data.Maybe +import Data.Proxy +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) import Data.Text qualified as T import Data.Word @@ -152,6 +158,7 @@ data DiscoveryPeerState = DiscoveryPeerState data DiscoveryGlobalState = DiscoveryGlobalState { dgsPeers :: Map RefDigest DiscoveryPeer + , dgsSearchingFor :: Set RefDigest } instance Service DiscoveryService where @@ -168,6 +175,7 @@ instance Service DiscoveryService where type ServiceGlobalState DiscoveryService = DiscoveryGlobalState emptyServiceGlobalState _ = DiscoveryGlobalState { dgsPeers = M.empty + , dgsSearchingFor = S.empty } serviceHandler msg = case fromStored msg of @@ -290,7 +298,7 @@ instance Service DiscoveryService where DiscoveryConnectionRequest conn -> do self <- svcSelf let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) - if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + if refDigest (dconnTarget conn) `elem` identityDigests self then if #ifdef ENABLE_ICE_SUPPORT -- request for us, create ICE sesssion @@ -325,7 +333,7 @@ instance Service DiscoveryService where DiscoveryConnectionResponse conn -> do self <- svcSelf dpeers <- dgsPeers <$> svcGetGlobal - if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + if refDigest (dconnSource conn) `elem` identityDigests self then do -- response to our request, try to connect to the peer server <- asks svcServer @@ -356,6 +364,7 @@ instance Service DiscoveryService where serviceNewPeer = do server <- asks svcServer peer <- asks svcPeer + st <- getStorage let addrToText saddr = do ( addr, port ) <- IP.fromSockAddr saddr @@ -367,5 +376,34 @@ instance Service DiscoveryService where #endif ] + pid <- asks svcPeerIdentity + gs <- svcGetGlobal + let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid) + svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor } + when (not $ null addrs) $ do sendToPeer peer $ DiscoverySelf addrs Nothing + forM_ searchingFor $ \dgst -> do + liftIO (refFromDigest st dgst) >>= \case + Just ref -> sendToPeer peer $ DiscoverySearch ref + Nothing -> return () + + +identityDigests :: Foldable f => Identity f -> [ RefDigest ] +identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid + + +discoverySearch :: (MonadIO m, MonadError String m) => Server -> Ref -> m () +discoverySearch server ref = do + peers <- liftIO $ getCurrentPeerList server + match <- forM peers $ \peer -> do + peerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ refDigest ref `elem` identityDigests pid + _ -> return False + when (not $ or match) $ do + modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s + { dgsSearchingFor = S.insert (refDigest ref) $ dgsSearchingFor s + } + forM_ peers $ \peer -> do + sendToPeer peer $ DiscoverySearch ref diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index d8e868a..32d06f2 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -24,6 +24,7 @@ module Erebos.Network ( sendToPeerStored, sendManyToPeerStored, sendToPeerWith, runPeerService, + modifyServiceGlobalState, discoveryPort, ) where @@ -899,7 +900,7 @@ sendToPeerWith peer fobj = do Left err -> throwError err -lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) | Just (Refl :: s :~: t) <- eqT = Just (service, attr) | otherwise = lookupService proxy rest @@ -954,6 +955,27 @@ runPeerServiceOn mbservice peer handler = liftIO $ do _ -> atomically $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +modifyServiceGlobalState + :: forall s a m proxy. (Service s, MonadIO m, MonadError String m) + => Server -> proxy s + -> (ServiceGlobalState s -> ( ServiceGlobalState s, a )) + -> m a +modifyServiceGlobalState server proxy f = do + let svc = serviceID proxy + case lookupService proxy (serverServices server) of + Just ( service, _ ) -> do + liftIO $ atomically $ do + global <- takeTMVar (serverServiceStates server) + ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of + SomeServiceGlobalState (_ :: Proxy gs) gs -> do + (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT + let ( gs', res ) = f gs + return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res ) + putTMVar (serverServiceStates server) global' + return res + Nothing -> do + throwError $ "unhandled service '" ++ show (toUUID svc) ++ "'" + foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) diff --git a/src/Erebos/Storage.hs b/src/Erebos/Storage.hs index c1e9664..9ccfdde 100644 --- a/src/Erebos/Storage.hs +++ b/src/Erebos/Storage.hs @@ -4,7 +4,7 @@ module Erebos.Storage ( deriveEphemeralStorage, derivePartialStorage, Ref, PartialRef, RefDigest, - refDigest, + refDigest, refFromDigest, readRef, showRef, showRefDigest, refDigestFromByteString, hashToRefDigest, copyRef, partialRef, partialRefFromDigest, diff --git a/test/discovery.test b/test/discovery.test index f2dddb7..5f6c443 100644 --- a/test/discovery.test +++ b/test/discovery.test @@ -73,3 +73,40 @@ test ManualDiscovery: send "stop-server" to p for p in [ pd, p1, p2 ]: expect /stop-server-done/ from p + + # Test delayed discovery with new peer + for id in [ p1obase ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services" to p + + with p1: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p1.node.ip} 29665/ + /peer [12] id Device1 Owner1/ + + send "discovery-connect $id" to p2 + + with p2: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p2.node.ip} 29665/ + /peer [12] id Device2 Owner2/ + + expect from p1: + /peer [0-9]+ addr ${p2.node.ip} 29665/ + /peer [0-9]+ id Device2 Owner2/ + expect from p2: + /peer [0-9]+ addr ${p1.node.ip} 29665/ + /peer [0-9]+ id Device1 Owner1/ + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p |