diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2026-02-18 21:56:02 +0100 |
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2026-02-18 22:05:52 +0100 |
| commit | a2bca4901229efb09ea510fe31fb2be94a6b4a9c (patch) | |
| tree | 6b094d082459aa6abf52db77b28527b60034cd05 /src/Erebos | |
| parent | 1cc35a5258d1b2806627fa6ceb9fbc74ec2228f2 (diff) | |
Diffstat (limited to 'src/Erebos')
| -rw-r--r-- | src/Erebos/Network.hs | 41 |
1 files changed, 23 insertions, 18 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index e7b08a0..ebecbc0 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -241,14 +241,18 @@ newWaitingRef dgst act = do return wref -forkServerThread :: Server -> IO () -> IO () -forkServerThread server act = do +forkServerThread :: Server -> String -> IO () -> IO () +forkServerThread server label act = do modifyMVar_ (serverThreads server) $ \ts -> do t <- forkIO $ do t <- myThreadId - act + catchAndLogError act modifyMVar_ (serverThreads server) $ return . filter (/=t) return (t:ts) + where + catchAndLogError = handle $ \(e :: SomeException) -> + atomically $ writeTQueue (serverErrorLog server) $ "server " <> label <> ": " <> show e + startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server startServer serverOptions serverOrigHead logd' serverServices = do @@ -269,20 +273,20 @@ startServer serverOptions serverOrigHead logd' serverServices = do chanSvc <- newTQueueIO let logd = writeTQueue serverErrorLog - forkServerThread server $ forever $ do + forkServerThread server "logger" $ forever $ do logd' . (serverErrorPrefix serverOptions <>) =<< atomically (readTQueue serverErrorLog) logt <- if | serverTestLog serverOptions -> do serverTestLog <- newTQueueIO - forkServerThread server $ forever $ do + forkServerThread server "test-logger" $ forever $ do logd' =<< atomically (readTQueue serverTestLog) return $ writeTQueue serverTestLog | otherwise -> do return $ \_ -> return () - forkServerThread server $ dataResponseWorker server - forkServerThread server $ forever $ do + forkServerThread server "data-response-worker" $ dataResponseWorker server + forkServerThread server "io-action-worker" $ forever $ do either (atomically . logd . showErebosError) return =<< runExceptT =<< atomically (readTQueue serverIOActions) @@ -296,7 +300,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do return sock loop sock = do - when (serverLocalDiscovery serverOptions) $ forkServerThread server $ do + when (serverLocalDiscovery serverOptions) $ forkServerThread server "discovery" $ do announceAddreses <- fmap concat $ sequence $ [ map (SockAddrInet6 discoveryPort 0 discoveryMulticastGroup) <$> joinMulticast sock , getBroadcastAddresses discoveryPort @@ -342,11 +346,11 @@ startServer serverOptions serverOrigHead logd' serverServices = do atomically $ writeTQueue serverIOActions $ do act server x - forkServerThread server $ forever $ do + forkServerThread server "receiver" $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 writeFlowIO serverRawPath (DatagramAddress saddr, msg) - forkServerThread server $ forever $ do + forkServerThread server "sender" $ forever $ do ( paddr, msg ) <- readFlowIO serverRawPath let logAndDropAddress :: SomeException -> IO () logAndDropAddress e = do @@ -357,7 +361,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do CustomPeerAddress addr -> sendBytesToAddress addr msg DatagramAddress addr -> void $ S.sendTo sock msg addr - forkServerThread server $ forever $ do + forkServerThread server "control-handler" $ forever $ do readFlowIO serverControlFlow >>= \case NewConnection conn mbpid -> do let paddr = connAddress conn @@ -368,7 +372,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do peer <- mkPeer server paddr return (M.insert paddr peer pvalue, peer) - forkServerThread server $ do + forkServerThread server "peer-handler" $ do atomically $ do readTVar (peerState peer) >>= \case PeerInit packets -> do @@ -402,7 +406,8 @@ startServer serverOptions serverOrigHead logd' serverServices = do Nothing -> do case paddr of DatagramAddress _ -> return () - CustomPeerAddress caddr -> connectionToAddressClosed caddr + CustomPeerAddress caddr -> connectionToAddressClosed caddr `catch` + \(e :: SomeException) -> atomically $ logd $ "connectionToAddressClosed on " <> show paddr <> " failed: " <> show e dropPeer peer peerLoop @@ -411,7 +416,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd logt protocolRawPath protocolControlFlow - forkServerThread server $ withSocketsDo $ do + forkServerThread server "main-loop" $ withSocketsDo $ do let hints = defaultHints { addrFlags = [AI_PASSIVE] , addrFamily = AF_INET6 @@ -420,7 +425,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort serverOptions) bracket (open addr) close loop - forkServerThread server $ forever $ do + forkServerThread server "service-handler" $ forever $ do ( peer, paddr, svc, ref, streams ) <- atomically $ readTQueue chanSvc case find ((svc ==) . someServiceID) serverServices of Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams paddr peer (serviceHandler $ wrappedLoad @s ref) @@ -456,7 +461,7 @@ dataResponseWorker server = forever $ do [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case Right ref -> do atomically (writeTVar tvar $ Right $ DataRequestFulfilled ref) - forkServerThread server $ runExceptT (wrefAction wr $ DataRequestFulfilled ref) >>= \case + forkServerThread server "request-fulfilled" $ runExceptT (wrefAction wr $ DataRequestFulfilled ref) >>= \case Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err) Right () -> return () @@ -553,7 +558,7 @@ openStream = do _ -> throwError "can't open stream without established connection" (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn) - liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler) + liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ "stream-writer" handler) addHeader hdr return writer @@ -943,7 +948,7 @@ sendToPeerList peer parts = do (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn) lift $ writeTQueue (serverIOActions (peerServer peer)) $ do - liftIO $ forkServerThread (peerServer peer) handler + liftIO $ forkServerThread (peerServer peer) "service-stream-writer" handler return [ ( hdr, cb writer ) ] _ -> return [] liftIO $ sequence_ $ map snd streamHeaders |