From e3771b8da67fa86bcee8cd678dfb92f92ead488a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Thu, 20 Jul 2023 22:43:20 +0200 Subject: Network: stop server function --- src/Network.hs | 31 +++++++++++++++++++++---------- src/Test.hs | 44 +++++++++++++++++++++++++++++--------------- 2 files changed, 50 insertions(+), 25 deletions(-) diff --git a/src/Network.hs b/src/Network.hs index 4b2453e..6fae8c5 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -1,6 +1,7 @@ module Network ( Server, startServer, + stopServer, getNextPeerChange, ServerOptions(..), serverIdentity, defaultServerOptions, @@ -67,6 +68,7 @@ data Server = Server { serverStorage :: Storage , serverOrigHead :: Head LocalState , serverIdentity_ :: MVar UnifiedIdentity + , serverThreads :: MVar [ThreadId] , serverSocket :: MVar Socket , serverChanPacket :: Chan (PeerAddress, BC.ByteString) , serverDataResponse :: TQueue (Peer, Maybe PartialRef) @@ -242,10 +244,15 @@ newWaitingRef pref act = do return wref +forkServerThread :: Server -> IO () -> IO () +forkServerThread server act = modifyMVar_ (serverThreads server) $ \ts -> do + (:ts) <$> forkIO act + startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server startServer opt serverOrigHead logd' serverServices = do let serverStorage = refStorage $ headRef serverOrigHead serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead + serverThreads <- newMVar [] serverSocket <- newEmptyMVar serverChanPacket <- newChan serverDataResponse <- newTQueueIO @@ -259,11 +266,11 @@ startServer opt serverOrigHead logd' serverServices = do chanSvc <- newTQueueIO let logd = writeTQueue serverErrorLog - void $ forkIO $ forever $ do + forkServerThread server $ forever $ do logd' =<< atomically (readTQueue serverErrorLog) - void $ forkIO $ dataResponseWorker server - void $ forkIO $ forever $ do + forkServerThread server $ dataResponseWorker server + forkServerThread server $ forever $ do either (atomically . logd) return =<< runExceptT =<< atomically (readTQueue serverIOActions) @@ -279,7 +286,7 @@ startServer opt serverOrigHead logd' serverServices = do return sock loop sock = do - when (serverLocalDiscovery opt) $ void $ forkIO $ forever $ do + when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do readMVar serverIdentity_ >>= \identity -> do st <- derivePartialStorage serverStorage let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ] @@ -316,7 +323,7 @@ startServer opt serverOrigHead logd' serverServices = do runPeerService peer $ act x _ -> return () - void $ forkIO $ forever $ do + forkServerThread server $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 writeChan serverChanPacket (DatagramAddress sock saddr, msg) @@ -355,7 +362,7 @@ startServer opt serverOrigHead logd' serverServices = do _ -> do atomically $ logd $ show paddr ++ ": invalid objects" - void $ forkIO $ withSocketsDo $ do + forkServerThread server $ withSocketsDo $ do let hints = defaultHints { addrFlags = [AI_PASSIVE] , addrFamily = AF_INET6 @@ -364,7 +371,7 @@ startServer opt serverOrigHead logd' serverServices = do addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort opt) bracket (open addr) close loop - void $ forkIO $ forever $ do + forkServerThread server $ forever $ do (peer, svc, ref) <- atomically $ readTQueue chanSvc case find ((svc ==) . someServiceID) serverServices of Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) @@ -372,13 +379,17 @@ startServer opt serverOrigHead logd' serverServices = do return server +stopServer :: Server -> IO () +stopServer Server {..} = do + mapM_ killThread =<< takeMVar serverThreads + sendWorker :: Peer -> IO () sendWorker peer = do startTime <- getTime MonotonicRaw nowVar <- newTVarIO startTime waitVar <- newTVarIO startTime - let waitTill time = void $ forkIO $ do + let waitTill time = forkServerThread (peerServer peer) $ do now <- getTime MonotonicRaw when (time > now) $ threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 @@ -464,7 +475,7 @@ dataResponseWorker server = forever $ do [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case Right ref -> do atomically (writeTVar tvar $ Right ref) - void $ forkIO $ runExceptT (wrefAction wr ref) >>= \case + forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case Left err -> atomically $ writeTQueue (serverErrorLog server) err Right () -> return () @@ -774,7 +785,7 @@ mkPeer server paddr = do <*> newTMVarIO M.empty <*> newTVarIO [] <*> newTMVarIO [] - void $ forkIO $ sendWorker peer + forkServerThread server $ sendWorker peer return peer serverPeer :: Server -> SockAddr -> IO Peer diff --git a/src/Test.hs b/src/Test.hs index 7694322..0778021 100644 --- a/src/Test.hs +++ b/src/Test.hs @@ -44,17 +44,21 @@ import Sync data TestState = TestState { tsHead :: Maybe (Head LocalState) - , tsServer :: Maybe Server - , tsPeers :: Maybe (MVar (Int, [(Int, Peer)])) + , tsServer :: Maybe RunningServer , tsWatchedLocalIdentity :: Maybe WatchedHead , tsWatchedSharedIdentity :: Maybe WatchedHead } +data RunningServer = RunningServer + { rsServer :: Server + , rsPeers :: MVar (Int, [(Int, Peer)]) + , rsPeerThread :: ThreadId + } + initTestState :: TestState initTestState = TestState { tsHead = Nothing , tsServer = Nothing - , tsPeers = Nothing , tsWatchedLocalIdentity = Nothing , tsWatchedSharedIdentity = Nothing } @@ -115,8 +119,8 @@ cmdOut line = do getPeer :: Text -> CommandM Peer getPeer spidx = do - Just pmvar <- gets tsPeers - Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar pmvar) + Just RunningServer {..} <- gets tsServer + Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers) return peer getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int @@ -233,6 +237,7 @@ commands = map (T.pack *** id) , ("stored-set-list", cmdStoredSetList) , ("create-identity", cmdCreateIdentity) , ("start-server", cmdStartServer) + , ("stop-server", cmdStopServer) , ("peer-add", cmdPeerAdd) , ("shared-state-get", cmdSharedStateGet) , ("shared-state-wait", cmdSharedStateWait) @@ -323,16 +328,16 @@ cmdStartServer = do out <- asks tiOutput Just h <- gets tsHead - peers <- liftIO $ newMVar (1, []) - server <- liftIO $ startServer defaultServerOptions h (hPutStrLn stderr) - [ someServiceAttr $ pairingAttributes (Proxy @AttachService) out peers "attach" - , someServiceAttr $ pairingAttributes (Proxy @ContactService) out peers "contact" + rsPeers <- liftIO $ newMVar (1, []) + rsServer <- liftIO $ startServer defaultServerOptions h (hPutStrLn stderr) + [ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach" + , someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact" , someServiceAttr $ directMessageAttributes out , someService @SyncService Proxy ] - void $ liftIO $ forkIO $ void $ forever $ do - peer <- getNextPeerChange server + rsPeerThread <- liftIO $ forkIO $ void $ forever $ do + peer <- getNextPeerChange rsServer let printPeer (idx, p) = do params <- peerIdentity p >>= return . \case @@ -344,19 +349,28 @@ cmdStartServer = do update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur | otherwise = fmap (p:) <$> update (nid, ps) - modifyMVar_ peers update + modifyMVar_ rsPeers update - modify $ \s -> s { tsServer = Just server, tsPeers = Just peers } + modify $ \s -> s { tsServer = Just RunningServer {..} } + +cmdStopServer :: Command +cmdStopServer = do + Just RunningServer {..} <- gets tsServer + liftIO $ do + killThread rsPeerThread + stopServer rsServer + modify $ \s -> s { tsServer = Nothing } + cmdOut "stop-server-done" cmdPeerAdd :: Command cmdPeerAdd = do - Just server <- gets tsServer + Just RunningServer {..} <- gets tsServer host:rest <- map T.unpack <$> asks tiParams let port = case rest of [] -> show discoveryPort (p:_) -> p addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just host) (Just port) - void $ liftIO $ serverPeer server (addrAddress addr) + void $ liftIO $ serverPeer rsServer (addrAddress addr) cmdSharedStateGet :: Command cmdSharedStateGet = do -- cgit v1.2.3