summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2025-06-01 17:39:58 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2025-06-12 20:53:10 +0200
commite8f3cbe08071c0507abafa76d0bf9d32908bbd7e (patch)
tree275347fffda7313b81ae3cfffc2df08d5cf20d17
parent0cc2b3ee83e46608495f4fb92ea8c2ca48b4e306 (diff)
Discovery search using global state to ask new peers
-rw-r--r--main/Main.hs4
-rw-r--r--main/Test.hs5
-rw-r--r--src/Erebos/Discovery.hs42
-rw-r--r--src/Erebos/Network.hs24
-rw-r--r--src/Erebos/Storage.hs2
-rw-r--r--test/discovery.test37
6 files changed, 104 insertions, 10 deletions
diff --git a/main/Main.hs b/main/Main.hs
index e055275..59ea7c3 100644
--- a/main/Main.hs
+++ b/main/Main.hs
@@ -893,14 +893,14 @@ cmdDiscoveryInit = void $ do
cmdDiscovery :: Command
cmdDiscovery = void $ do
- Just peer <- gets csIcePeer
+ server <- asks ciServer
st <- getStorage
sref <- asks ciLine
eprint <- asks ciPrint
liftIO $ readRef st (BC.pack sref) >>= \case
Nothing -> error "ref does not exist"
Just ref -> do
- res <- runExceptT $ sendToPeer peer $ DiscoverySearch ref
+ res <- runExceptT $ discoverySearch server ref
case res of
Right _ -> return ()
Left err -> eprint err
diff --git a/main/Test.hs b/main/Test.hs
index 0181575..75eaaaf 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -880,8 +880,5 @@ cmdDiscoveryConnect = do
st <- asks tiStorage
[ tref ] <- asks tiParams
Just ref <- liftIO $ readRef st $ encodeUtf8 tref
-
Just RunningServer {..} <- gets tsServer
- peers <- liftIO $ getCurrentPeerList rsServer
- forM_ peers $ \peer -> do
- sendToPeer peer $ DiscoverySearch ref
+ discoverySearch rsServer ref
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs
index 63400cb..1691ad9 100644
--- a/src/Erebos/Discovery.hs
+++ b/src/Erebos/Discovery.hs
@@ -4,6 +4,8 @@ module Erebos.Discovery (
DiscoveryService(..),
DiscoveryAttributes(..),
DiscoveryConnection(..),
+
+ discoverySearch,
) where
import Control.Concurrent
@@ -12,9 +14,13 @@ import Control.Monad.Except
import Control.Monad.Reader
import Data.IP qualified as IP
+import Data.List
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as M
import Data.Maybe
+import Data.Proxy
+import Data.Set (Set)
+import Data.Set qualified as S
import Data.Text (Text)
import Data.Text qualified as T
import Data.Word
@@ -152,6 +158,7 @@ data DiscoveryPeerState = DiscoveryPeerState
data DiscoveryGlobalState = DiscoveryGlobalState
{ dgsPeers :: Map RefDigest DiscoveryPeer
+ , dgsSearchingFor :: Set RefDigest
}
instance Service DiscoveryService where
@@ -168,6 +175,7 @@ instance Service DiscoveryService where
type ServiceGlobalState DiscoveryService = DiscoveryGlobalState
emptyServiceGlobalState _ = DiscoveryGlobalState
{ dgsPeers = M.empty
+ , dgsSearchingFor = S.empty
}
serviceHandler msg = case fromStored msg of
@@ -290,7 +298,7 @@ instance Service DiscoveryService where
DiscoveryConnectionRequest conn -> do
self <- svcSelf
let rconn = emptyConnection (dconnSource conn) (dconnTarget conn)
- if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
+ if refDigest (dconnTarget conn) `elem` identityDigests self
then if
#ifdef ENABLE_ICE_SUPPORT
-- request for us, create ICE sesssion
@@ -325,7 +333,7 @@ instance Service DiscoveryService where
DiscoveryConnectionResponse conn -> do
self <- svcSelf
dpeers <- dgsPeers <$> svcGetGlobal
- if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
+ if refDigest (dconnSource conn) `elem` identityDigests self
then do
-- response to our request, try to connect to the peer
server <- asks svcServer
@@ -356,6 +364,7 @@ instance Service DiscoveryService where
serviceNewPeer = do
server <- asks svcServer
peer <- asks svcPeer
+ st <- getStorage
let addrToText saddr = do
( addr, port ) <- IP.fromSockAddr saddr
@@ -367,5 +376,34 @@ instance Service DiscoveryService where
#endif
]
+ pid <- asks svcPeerIdentity
+ gs <- svcGetGlobal
+ let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid)
+ svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor }
+
when (not $ null addrs) $ do
sendToPeer peer $ DiscoverySelf addrs Nothing
+ forM_ searchingFor $ \dgst -> do
+ liftIO (refFromDigest st dgst) >>= \case
+ Just ref -> sendToPeer peer $ DiscoverySearch ref
+ Nothing -> return ()
+
+
+identityDigests :: Foldable f => Identity f -> [ RefDigest ]
+identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid
+
+
+discoverySearch :: (MonadIO m, MonadError String m) => Server -> Ref -> m ()
+discoverySearch server ref = do
+ peers <- liftIO $ getCurrentPeerList server
+ match <- forM peers $ \peer -> do
+ peerIdentity peer >>= \case
+ PeerIdentityFull pid -> do
+ return $ refDigest ref `elem` identityDigests pid
+ _ -> return False
+ when (not $ or match) $ do
+ modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s
+ { dgsSearchingFor = S.insert (refDigest ref) $ dgsSearchingFor s
+ }
+ forM_ peers $ \peer -> do
+ sendToPeer peer $ DiscoverySearch ref
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index d8e868a..32d06f2 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -24,6 +24,7 @@ module Erebos.Network (
sendToPeerStored, sendManyToPeerStored,
sendToPeerWith,
runPeerService,
+ modifyServiceGlobalState,
discoveryPort,
) where
@@ -899,7 +900,7 @@ sendToPeerWith peer fobj = do
Left err -> throwError err
-lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
+lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
| Just (Refl :: s :~: t) <- eqT = Just (service, attr)
| otherwise = lookupService proxy rest
@@ -954,6 +955,27 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
_ -> atomically $ do
logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+modifyServiceGlobalState
+ :: forall s a m proxy. (Service s, MonadIO m, MonadError String m)
+ => Server -> proxy s
+ -> (ServiceGlobalState s -> ( ServiceGlobalState s, a ))
+ -> m a
+modifyServiceGlobalState server proxy f = do
+ let svc = serviceID proxy
+ case lookupService proxy (serverServices server) of
+ Just ( service, _ ) -> do
+ liftIO $ atomically $ do
+ global <- takeTMVar (serverServiceStates server)
+ ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of
+ SomeServiceGlobalState (_ :: Proxy gs) gs -> do
+ (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT
+ let ( gs', res ) = f gs
+ return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res )
+ putTMVar (serverServiceStates server) global'
+ return res
+ Nothing -> do
+ throwError $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+
foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress)
diff --git a/src/Erebos/Storage.hs b/src/Erebos/Storage.hs
index c1e9664..9ccfdde 100644
--- a/src/Erebos/Storage.hs
+++ b/src/Erebos/Storage.hs
@@ -4,7 +4,7 @@ module Erebos.Storage (
deriveEphemeralStorage, derivePartialStorage,
Ref, PartialRef, RefDigest,
- refDigest,
+ refDigest, refFromDigest,
readRef, showRef, showRefDigest,
refDigestFromByteString, hashToRefDigest,
copyRef, partialRef, partialRefFromDigest,
diff --git a/test/discovery.test b/test/discovery.test
index f2dddb7..5f6c443 100644
--- a/test/discovery.test
+++ b/test/discovery.test
@@ -73,3 +73,40 @@ test ManualDiscovery:
send "stop-server" to p
for p in [ pd, p1, p2 ]:
expect /stop-server-done/ from p
+
+ # Test delayed discovery with new peer
+ for id in [ p1obase ]:
+ for p in [ pd, p1, p2 ]:
+ send "start-server services $services" to p
+
+ with p1:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p1.node.ip} 29665/
+ /peer [12] id Device1 Owner1/
+
+ send "discovery-connect $id" to p2
+
+ with p2:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p2.node.ip} 29665/
+ /peer [12] id Device2 Owner2/
+
+ expect from p1:
+ /peer [0-9]+ addr ${p2.node.ip} 29665/
+ /peer [0-9]+ id Device2 Owner2/
+ expect from p2:
+ /peer [0-9]+ addr ${p1.node.ip} 29665/
+ /peer [0-9]+ id Device1 Owner1/
+
+ for p in [ pd, p1, p2 ]:
+ send "stop-server" to p
+ for p in [ pd, p1, p2 ]:
+ expect /stop-server-done/ from p