From e3771b8da67fa86bcee8cd678dfb92f92ead488a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Thu, 20 Jul 2023 22:43:20 +0200 Subject: Network: stop server function --- src/Network.hs | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) (limited to 'src/Network.hs') diff --git a/src/Network.hs b/src/Network.hs index 4b2453e..6fae8c5 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -1,6 +1,7 @@ module Network ( Server, startServer, + stopServer, getNextPeerChange, ServerOptions(..), serverIdentity, defaultServerOptions, @@ -67,6 +68,7 @@ data Server = Server { serverStorage :: Storage , serverOrigHead :: Head LocalState , serverIdentity_ :: MVar UnifiedIdentity + , serverThreads :: MVar [ThreadId] , serverSocket :: MVar Socket , serverChanPacket :: Chan (PeerAddress, BC.ByteString) , serverDataResponse :: TQueue (Peer, Maybe PartialRef) @@ -242,10 +244,15 @@ newWaitingRef pref act = do return wref +forkServerThread :: Server -> IO () -> IO () +forkServerThread server act = modifyMVar_ (serverThreads server) $ \ts -> do + (:ts) <$> forkIO act + startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server startServer opt serverOrigHead logd' serverServices = do let serverStorage = refStorage $ headRef serverOrigHead serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead + serverThreads <- newMVar [] serverSocket <- newEmptyMVar serverChanPacket <- newChan serverDataResponse <- newTQueueIO @@ -259,11 +266,11 @@ startServer opt serverOrigHead logd' serverServices = do chanSvc <- newTQueueIO let logd = writeTQueue serverErrorLog - void $ forkIO $ forever $ do + forkServerThread server $ forever $ do logd' =<< atomically (readTQueue serverErrorLog) - void $ forkIO $ dataResponseWorker server - void $ forkIO $ forever $ do + forkServerThread server $ dataResponseWorker server + forkServerThread server $ forever $ do either (atomically . logd) return =<< runExceptT =<< atomically (readTQueue serverIOActions) @@ -279,7 +286,7 @@ startServer opt serverOrigHead logd' serverServices = do return sock loop sock = do - when (serverLocalDiscovery opt) $ void $ forkIO $ forever $ do + when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do readMVar serverIdentity_ >>= \identity -> do st <- derivePartialStorage serverStorage let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ] @@ -316,7 +323,7 @@ startServer opt serverOrigHead logd' serverServices = do runPeerService peer $ act x _ -> return () - void $ forkIO $ forever $ do + forkServerThread server $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 writeChan serverChanPacket (DatagramAddress sock saddr, msg) @@ -355,7 +362,7 @@ startServer opt serverOrigHead logd' serverServices = do _ -> do atomically $ logd $ show paddr ++ ": invalid objects" - void $ forkIO $ withSocketsDo $ do + forkServerThread server $ withSocketsDo $ do let hints = defaultHints { addrFlags = [AI_PASSIVE] , addrFamily = AF_INET6 @@ -364,7 +371,7 @@ startServer opt serverOrigHead logd' serverServices = do addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort opt) bracket (open addr) close loop - void $ forkIO $ forever $ do + forkServerThread server $ forever $ do (peer, svc, ref) <- atomically $ readTQueue chanSvc case find ((svc ==) . someServiceID) serverServices of Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) @@ -372,13 +379,17 @@ startServer opt serverOrigHead logd' serverServices = do return server +stopServer :: Server -> IO () +stopServer Server {..} = do + mapM_ killThread =<< takeMVar serverThreads + sendWorker :: Peer -> IO () sendWorker peer = do startTime <- getTime MonotonicRaw nowVar <- newTVarIO startTime waitVar <- newTVarIO startTime - let waitTill time = void $ forkIO $ do + let waitTill time = forkServerThread (peerServer peer) $ do now <- getTime MonotonicRaw when (time > now) $ threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 @@ -464,7 +475,7 @@ dataResponseWorker server = forever $ do [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case Right ref -> do atomically (writeTVar tvar $ Right ref) - void $ forkIO $ runExceptT (wrefAction wr ref) >>= \case + forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case Left err -> atomically $ writeTQueue (serverErrorLog server) err Right () -> return () @@ -774,7 +785,7 @@ mkPeer server paddr = do <*> newTMVarIO M.empty <*> newTVarIO [] <*> newTMVarIO [] - void $ forkIO $ sendWorker peer + forkServerThread server $ sendWorker peer return peer serverPeer :: Server -> SockAddr -> IO Peer -- cgit v1.2.3