summaryrefslogtreecommitdiff
path: root/src/Erebos/Network.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r--src/Erebos/Network.hs60
1 files changed, 50 insertions, 10 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index d8e868a..fb2b5e9 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -24,6 +24,7 @@ module Erebos.Network (
sendToPeerStored, sendManyToPeerStored,
sendToPeerWith,
runPeerService,
+ modifyServiceGlobalState,
discoveryPort,
) where
@@ -300,13 +301,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do
announceUpdate idt
forM_ serverServices $ \(SomeService service _) -> do
- forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
- watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
- withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
- readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull _ -> writeTQueue serverIOActions $ do
- runPeerService peer $ act x
- _ -> return ()
+ forM_ (serviceStorageWatchers service) $ \case
+ SomeStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> writeTQueue serverIOActions $ do
+ runPeerService peer $ act x
+ _ -> return ()
+ GlobalStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ atomically $ writeTQueue serverIOActions $ do
+ act server x
forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
@@ -391,8 +397,21 @@ startServer serverOptions serverOrigHead logd' serverServices = do
return server
stopServer :: Server -> IO ()
-stopServer Server {..} = do
- mapM_ killThread =<< takeMVar serverThreads
+stopServer server@Server {..} = do
+ withMVar serverPeers $ \peers -> do
+ ( global, peerStates ) <- atomically $ (,)
+ <$> takeTMVar serverServiceStates
+ <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState)
+
+ forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do
+ ps <- forM peerStates $ \( peer, states ) ->
+ return $ ( peer, ) $ case M.lookup (serviceID proxy) states of
+ Just (SomeServiceState (_ :: Proxy ps) pstate)
+ | Just (Refl :: s :~: ps) <- eqT
+ -> pstate
+ _ -> emptyServiceState proxy
+ serviceStopServer proxy server gs ps
+ mapM_ killThread =<< takeMVar serverThreads
dataResponseWorker :: Server -> IO ()
dataResponseWorker server = forever $ do
@@ -899,7 +918,7 @@ sendToPeerWith peer fobj = do
Left err -> throwError err
-lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
+lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
| Just (Refl :: s :~: t) <- eqT = Just (service, attr)
| otherwise = lookupService proxy rest
@@ -954,6 +973,27 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
_ -> atomically $ do
logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+modifyServiceGlobalState
+ :: forall s a m proxy. (Service s, MonadIO m, MonadError String m)
+ => Server -> proxy s
+ -> (ServiceGlobalState s -> ( ServiceGlobalState s, a ))
+ -> m a
+modifyServiceGlobalState server proxy f = do
+ let svc = serviceID proxy
+ case lookupService proxy (serverServices server) of
+ Just ( service, _ ) -> do
+ liftIO $ atomically $ do
+ global <- takeTMVar (serverServiceStates server)
+ ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of
+ SomeServiceGlobalState (_ :: Proxy gs) gs -> do
+ (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT
+ let ( gs', res ) = f gs
+ return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res )
+ putTMVar (serverServiceStates server) global'
+ return res
+ Nothing -> do
+ throwError $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+
foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress)