summaryrefslogtreecommitdiff
path: root/src/Erebos
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos')
-rw-r--r--src/Erebos/Network.hs41
1 files changed, 23 insertions, 18 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index e7b08a0..ebecbc0 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -241,14 +241,18 @@ newWaitingRef dgst act = do
return wref
-forkServerThread :: Server -> IO () -> IO ()
-forkServerThread server act = do
+forkServerThread :: Server -> String -> IO () -> IO ()
+forkServerThread server label act = do
modifyMVar_ (serverThreads server) $ \ts -> do
t <- forkIO $ do
t <- myThreadId
- act
+ catchAndLogError act
modifyMVar_ (serverThreads server) $ return . filter (/=t)
return (t:ts)
+ where
+ catchAndLogError = handle $ \(e :: SomeException) ->
+ atomically $ writeTQueue (serverErrorLog server) $ "server " <> label <> ": " <> show e
+
startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server
startServer serverOptions serverOrigHead logd' serverServices = do
@@ -269,20 +273,20 @@ startServer serverOptions serverOrigHead logd' serverServices = do
chanSvc <- newTQueueIO
let logd = writeTQueue serverErrorLog
- forkServerThread server $ forever $ do
+ forkServerThread server "logger" $ forever $ do
logd' . (serverErrorPrefix serverOptions <>) =<< atomically (readTQueue serverErrorLog)
logt <- if
| serverTestLog serverOptions -> do
serverTestLog <- newTQueueIO
- forkServerThread server $ forever $ do
+ forkServerThread server "test-logger" $ forever $ do
logd' =<< atomically (readTQueue serverTestLog)
return $ writeTQueue serverTestLog
| otherwise -> do
return $ \_ -> return ()
- forkServerThread server $ dataResponseWorker server
- forkServerThread server $ forever $ do
+ forkServerThread server "data-response-worker" $ dataResponseWorker server
+ forkServerThread server "io-action-worker" $ forever $ do
either (atomically . logd . showErebosError) return =<< runExceptT =<<
atomically (readTQueue serverIOActions)
@@ -296,7 +300,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do
return sock
loop sock = do
- when (serverLocalDiscovery serverOptions) $ forkServerThread server $ do
+ when (serverLocalDiscovery serverOptions) $ forkServerThread server "discovery" $ do
announceAddreses <- fmap concat $ sequence $
[ map (SockAddrInet6 discoveryPort 0 discoveryMulticastGroup) <$> joinMulticast sock
, getBroadcastAddresses discoveryPort
@@ -342,11 +346,11 @@ startServer serverOptions serverOrigHead logd' serverServices = do
atomically $ writeTQueue serverIOActions $ do
act server x
- forkServerThread server $ forever $ do
+ forkServerThread server "receiver" $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
writeFlowIO serverRawPath (DatagramAddress saddr, msg)
- forkServerThread server $ forever $ do
+ forkServerThread server "sender" $ forever $ do
( paddr, msg ) <- readFlowIO serverRawPath
let logAndDropAddress :: SomeException -> IO ()
logAndDropAddress e = do
@@ -357,7 +361,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do
CustomPeerAddress addr -> sendBytesToAddress addr msg
DatagramAddress addr -> void $ S.sendTo sock msg addr
- forkServerThread server $ forever $ do
+ forkServerThread server "control-handler" $ forever $ do
readFlowIO serverControlFlow >>= \case
NewConnection conn mbpid -> do
let paddr = connAddress conn
@@ -368,7 +372,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do
peer <- mkPeer server paddr
return (M.insert paddr peer pvalue, peer)
- forkServerThread server $ do
+ forkServerThread server "peer-handler" $ do
atomically $ do
readTVar (peerState peer) >>= \case
PeerInit packets -> do
@@ -402,7 +406,8 @@ startServer serverOptions serverOrigHead logd' serverServices = do
Nothing -> do
case paddr of
DatagramAddress _ -> return ()
- CustomPeerAddress caddr -> connectionToAddressClosed caddr
+ CustomPeerAddress caddr -> connectionToAddressClosed caddr `catch`
+ \(e :: SomeException) -> atomically $ logd $ "connectionToAddressClosed on " <> show paddr <> " failed: " <> show e
dropPeer peer
peerLoop
@@ -411,7 +416,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do
erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd logt protocolRawPath protocolControlFlow
- forkServerThread server $ withSocketsDo $ do
+ forkServerThread server "main-loop" $ withSocketsDo $ do
let hints = defaultHints
{ addrFlags = [AI_PASSIVE]
, addrFamily = AF_INET6
@@ -420,7 +425,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do
addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort serverOptions)
bracket (open addr) close loop
- forkServerThread server $ forever $ do
+ forkServerThread server "service-handler" $ forever $ do
( peer, paddr, svc, ref, streams ) <- atomically $ readTQueue chanSvc
case find ((svc ==) . someServiceID) serverServices of
Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams paddr peer (serviceHandler $ wrappedLoad @s ref)
@@ -456,7 +461,7 @@ dataResponseWorker server = forever $ do
[] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case
Right ref -> do
atomically (writeTVar tvar $ Right $ DataRequestFulfilled ref)
- forkServerThread server $ runExceptT (wrefAction wr $ DataRequestFulfilled ref) >>= \case
+ forkServerThread server "request-fulfilled" $ runExceptT (wrefAction wr $ DataRequestFulfilled ref) >>= \case
Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err)
Right () -> return ()
@@ -553,7 +558,7 @@ openStream = do
_ -> throwError "can't open stream without established connection"
(hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn)
- liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
+ liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ "stream-writer" handler)
addHeader hdr
return writer
@@ -943,7 +948,7 @@ sendToPeerList peer parts = do
(hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn)
lift $ writeTQueue (serverIOActions (peerServer peer)) $ do
- liftIO $ forkServerThread (peerServer peer) handler
+ liftIO $ forkServerThread (peerServer peer) "service-stream-writer" handler
return [ ( hdr, cb writer ) ]
_ -> return []
liftIO $ sequence_ $ map snd streamHeaders