summaryrefslogtreecommitdiff
path: root/src/Network.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network.hs')
-rw-r--r--src/Network.hs31
1 files changed, 21 insertions, 10 deletions
diff --git a/src/Network.hs b/src/Network.hs
index 4b2453e..6fae8c5 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -1,6 +1,7 @@
module Network (
Server,
startServer,
+ stopServer,
getNextPeerChange,
ServerOptions(..), serverIdentity, defaultServerOptions,
@@ -67,6 +68,7 @@ data Server = Server
{ serverStorage :: Storage
, serverOrigHead :: Head LocalState
, serverIdentity_ :: MVar UnifiedIdentity
+ , serverThreads :: MVar [ThreadId]
, serverSocket :: MVar Socket
, serverChanPacket :: Chan (PeerAddress, BC.ByteString)
, serverDataResponse :: TQueue (Peer, Maybe PartialRef)
@@ -242,10 +244,15 @@ newWaitingRef pref act = do
return wref
+forkServerThread :: Server -> IO () -> IO ()
+forkServerThread server act = modifyMVar_ (serverThreads server) $ \ts -> do
+ (:ts) <$> forkIO act
+
startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server
startServer opt serverOrigHead logd' serverServices = do
let serverStorage = refStorage $ headRef serverOrigHead
serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead
+ serverThreads <- newMVar []
serverSocket <- newEmptyMVar
serverChanPacket <- newChan
serverDataResponse <- newTQueueIO
@@ -259,11 +266,11 @@ startServer opt serverOrigHead logd' serverServices = do
chanSvc <- newTQueueIO
let logd = writeTQueue serverErrorLog
- void $ forkIO $ forever $ do
+ forkServerThread server $ forever $ do
logd' =<< atomically (readTQueue serverErrorLog)
- void $ forkIO $ dataResponseWorker server
- void $ forkIO $ forever $ do
+ forkServerThread server $ dataResponseWorker server
+ forkServerThread server $ forever $ do
either (atomically . logd) return =<< runExceptT =<<
atomically (readTQueue serverIOActions)
@@ -279,7 +286,7 @@ startServer opt serverOrigHead logd' serverServices = do
return sock
loop sock = do
- when (serverLocalDiscovery opt) $ void $ forkIO $ forever $ do
+ when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do
readMVar serverIdentity_ >>= \identity -> do
st <- derivePartialStorage serverStorage
let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]
@@ -316,7 +323,7 @@ startServer opt serverOrigHead logd' serverServices = do
runPeerService peer $ act x
_ -> return ()
- void $ forkIO $ forever $ do
+ forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
writeChan serverChanPacket (DatagramAddress sock saddr, msg)
@@ -355,7 +362,7 @@ startServer opt serverOrigHead logd' serverServices = do
_ -> do atomically $ logd $ show paddr ++ ": invalid objects"
- void $ forkIO $ withSocketsDo $ do
+ forkServerThread server $ withSocketsDo $ do
let hints = defaultHints
{ addrFlags = [AI_PASSIVE]
, addrFamily = AF_INET6
@@ -364,7 +371,7 @@ startServer opt serverOrigHead logd' serverServices = do
addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort opt)
bracket (open addr) close loop
- void $ forkIO $ forever $ do
+ forkServerThread server $ forever $ do
(peer, svc, ref) <- atomically $ readTQueue chanSvc
case find ((svc ==) . someServiceID) serverServices of
Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
@@ -372,13 +379,17 @@ startServer opt serverOrigHead logd' serverServices = do
return server
+stopServer :: Server -> IO ()
+stopServer Server {..} = do
+ mapM_ killThread =<< takeMVar serverThreads
+
sendWorker :: Peer -> IO ()
sendWorker peer = do
startTime <- getTime MonotonicRaw
nowVar <- newTVarIO startTime
waitVar <- newTVarIO startTime
- let waitTill time = void $ forkIO $ do
+ let waitTill time = forkServerThread (peerServer peer) $ do
now <- getTime MonotonicRaw
when (time > now) $
threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000
@@ -464,7 +475,7 @@ dataResponseWorker server = forever $ do
[] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case
Right ref -> do
atomically (writeTVar tvar $ Right ref)
- void $ forkIO $ runExceptT (wrefAction wr ref) >>= \case
+ forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case
Left err -> atomically $ writeTQueue (serverErrorLog server) err
Right () -> return ()
@@ -774,7 +785,7 @@ mkPeer server paddr = do
<*> newTMVarIO M.empty
<*> newTVarIO []
<*> newTMVarIO []
- void $ forkIO $ sendWorker peer
+ forkServerThread server $ sendWorker peer
return peer
serverPeer :: Server -> SockAddr -> IO Peer