From b810f2dab0040e9c5a9f2344ecda67e5817893f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 1 Jun 2025 17:39:58 +0200 Subject: Discovery search using global state to ask new peers --- src/Erebos/Discovery.hs | 39 +++++++++++++++++++++++++++++++++++++-- src/Erebos/Network.hs | 24 +++++++++++++++++++++++- 2 files changed, 60 insertions(+), 3 deletions(-) (limited to 'src/Erebos') diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 0f194a9..3cb55bd 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -4,6 +4,8 @@ module Erebos.Discovery ( DiscoveryService(..), DiscoveryAttributes(..), DiscoveryConnection(..), + + discoverySearch, ) where import Control.Concurrent @@ -12,9 +14,13 @@ import Control.Monad.Except import Control.Monad.Reader import Data.IP qualified as IP +import Data.List import Data.Map.Strict (Map) import Data.Map.Strict qualified as M import Data.Maybe +import Data.Proxy +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) import Data.Text qualified as T import Data.Word @@ -165,6 +171,7 @@ data DiscoveryPeerState = DiscoveryPeerState data DiscoveryGlobalState = DiscoveryGlobalState { dgsPeers :: Map RefDigest DiscoveryPeer + , dgsSearchingFor :: Set RefDigest } instance Service DiscoveryService where @@ -181,6 +188,7 @@ instance Service DiscoveryService where type ServiceGlobalState DiscoveryService = DiscoveryGlobalState emptyServiceGlobalState _ = DiscoveryGlobalState { dgsPeers = M.empty + , dgsSearchingFor = S.empty } serviceHandler msg = case fromStored msg of @@ -312,7 +320,7 @@ instance Service DiscoveryService where DiscoveryConnectionRequest conn -> do self <- svcSelf let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) - if either refDigest id (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + if either refDigest id (dconnTarget conn) `elem` identityDigests self then if #ifdef ENABLE_ICE_SUPPORT -- request for us, create ICE sesssion @@ -347,7 +355,7 @@ instance Service DiscoveryService where DiscoveryConnectionResponse conn -> do self <- svcSelf dpeers <- dgsPeers <$> svcGetGlobal - if either refDigest id (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + if either refDigest id (dconnSource conn) `elem` identityDigests self then do -- response to our request, try to connect to the peer server <- asks svcServer @@ -389,5 +397,32 @@ instance Service DiscoveryService where #endif ] + pid <- asks svcPeerIdentity + gs <- svcGetGlobal + let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid) + svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor } + when (not $ null addrs) $ do sendToPeer peer $ DiscoverySelf addrs Nothing + forM_ searchingFor $ \dgst -> do + sendToPeer peer $ DiscoverySearch (Right dgst) + + +identityDigests :: Foldable f => Identity f -> [ RefDigest ] +identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid + + +discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> m () +discoverySearch server dgst = do + peers <- liftIO $ getCurrentPeerList server + match <- forM peers $ \peer -> do + peerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ dgst `elem` identityDigests pid + _ -> return False + when (not $ or match) $ do + modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s + { dgsSearchingFor = S.insert dgst $ dgsSearchingFor s + } + forM_ peers $ \peer -> do + sendToPeer peer $ DiscoverySearch $ Right dgst diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 0baeeb1..9c96bab 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -29,6 +29,7 @@ module Erebos.Network ( sendToPeerStored, sendManyToPeerStored, sendToPeerWith, runPeerService, + modifyServiceGlobalState, discoveryPort, ) where @@ -962,7 +963,7 @@ sendToPeerWith peer fobj = do Left err -> throwError $ fromErebosError err -lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) | Just (Refl :: s :~: t) <- eqT = Just (service, attr) | otherwise = lookupService proxy rest @@ -1018,6 +1019,27 @@ runPeerServiceOn mbservice newStreams peer handler = liftIO $ do _ -> atomically $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +modifyServiceGlobalState + :: forall s a m e proxy. (Service s, MonadIO m, MonadError e m, FromErebosError e) + => Server -> proxy s + -> (ServiceGlobalState s -> ( ServiceGlobalState s, a )) + -> m a +modifyServiceGlobalState server proxy f = do + let svc = serviceID proxy + case lookupService proxy (serverServices server) of + Just ( service, _ ) -> do + liftIO $ atomically $ do + global <- takeTMVar (serverServiceStates server) + ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of + SomeServiceGlobalState (_ :: Proxy gs) gs -> do + (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT + let ( gs', res ) = f gs + return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res ) + putTMVar (serverServiceStates server) global' + return res + Nothing -> do + throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'" + foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) -- cgit v1.2.3