summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Network.hs31
-rw-r--r--src/Test.hs44
2 files changed, 50 insertions, 25 deletions
diff --git a/src/Network.hs b/src/Network.hs
index 4b2453e..6fae8c5 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -1,6 +1,7 @@
module Network (
Server,
startServer,
+ stopServer,
getNextPeerChange,
ServerOptions(..), serverIdentity, defaultServerOptions,
@@ -67,6 +68,7 @@ data Server = Server
{ serverStorage :: Storage
, serverOrigHead :: Head LocalState
, serverIdentity_ :: MVar UnifiedIdentity
+ , serverThreads :: MVar [ThreadId]
, serverSocket :: MVar Socket
, serverChanPacket :: Chan (PeerAddress, BC.ByteString)
, serverDataResponse :: TQueue (Peer, Maybe PartialRef)
@@ -242,10 +244,15 @@ newWaitingRef pref act = do
return wref
+forkServerThread :: Server -> IO () -> IO ()
+forkServerThread server act = modifyMVar_ (serverThreads server) $ \ts -> do
+ (:ts) <$> forkIO act
+
startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server
startServer opt serverOrigHead logd' serverServices = do
let serverStorage = refStorage $ headRef serverOrigHead
serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead
+ serverThreads <- newMVar []
serverSocket <- newEmptyMVar
serverChanPacket <- newChan
serverDataResponse <- newTQueueIO
@@ -259,11 +266,11 @@ startServer opt serverOrigHead logd' serverServices = do
chanSvc <- newTQueueIO
let logd = writeTQueue serverErrorLog
- void $ forkIO $ forever $ do
+ forkServerThread server $ forever $ do
logd' =<< atomically (readTQueue serverErrorLog)
- void $ forkIO $ dataResponseWorker server
- void $ forkIO $ forever $ do
+ forkServerThread server $ dataResponseWorker server
+ forkServerThread server $ forever $ do
either (atomically . logd) return =<< runExceptT =<<
atomically (readTQueue serverIOActions)
@@ -279,7 +286,7 @@ startServer opt serverOrigHead logd' serverServices = do
return sock
loop sock = do
- when (serverLocalDiscovery opt) $ void $ forkIO $ forever $ do
+ when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do
readMVar serverIdentity_ >>= \identity -> do
st <- derivePartialStorage serverStorage
let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]
@@ -316,7 +323,7 @@ startServer opt serverOrigHead logd' serverServices = do
runPeerService peer $ act x
_ -> return ()
- void $ forkIO $ forever $ do
+ forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
writeChan serverChanPacket (DatagramAddress sock saddr, msg)
@@ -355,7 +362,7 @@ startServer opt serverOrigHead logd' serverServices = do
_ -> do atomically $ logd $ show paddr ++ ": invalid objects"
- void $ forkIO $ withSocketsDo $ do
+ forkServerThread server $ withSocketsDo $ do
let hints = defaultHints
{ addrFlags = [AI_PASSIVE]
, addrFamily = AF_INET6
@@ -364,7 +371,7 @@ startServer opt serverOrigHead logd' serverServices = do
addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort opt)
bracket (open addr) close loop
- void $ forkIO $ forever $ do
+ forkServerThread server $ forever $ do
(peer, svc, ref) <- atomically $ readTQueue chanSvc
case find ((svc ==) . someServiceID) serverServices of
Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
@@ -372,13 +379,17 @@ startServer opt serverOrigHead logd' serverServices = do
return server
+stopServer :: Server -> IO ()
+stopServer Server {..} = do
+ mapM_ killThread =<< takeMVar serverThreads
+
sendWorker :: Peer -> IO ()
sendWorker peer = do
startTime <- getTime MonotonicRaw
nowVar <- newTVarIO startTime
waitVar <- newTVarIO startTime
- let waitTill time = void $ forkIO $ do
+ let waitTill time = forkServerThread (peerServer peer) $ do
now <- getTime MonotonicRaw
when (time > now) $
threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000
@@ -464,7 +475,7 @@ dataResponseWorker server = forever $ do
[] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case
Right ref -> do
atomically (writeTVar tvar $ Right ref)
- void $ forkIO $ runExceptT (wrefAction wr ref) >>= \case
+ forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case
Left err -> atomically $ writeTQueue (serverErrorLog server) err
Right () -> return ()
@@ -774,7 +785,7 @@ mkPeer server paddr = do
<*> newTMVarIO M.empty
<*> newTVarIO []
<*> newTMVarIO []
- void $ forkIO $ sendWorker peer
+ forkServerThread server $ sendWorker peer
return peer
serverPeer :: Server -> SockAddr -> IO Peer
diff --git a/src/Test.hs b/src/Test.hs
index 7694322..0778021 100644
--- a/src/Test.hs
+++ b/src/Test.hs
@@ -44,17 +44,21 @@ import Sync
data TestState = TestState
{ tsHead :: Maybe (Head LocalState)
- , tsServer :: Maybe Server
- , tsPeers :: Maybe (MVar (Int, [(Int, Peer)]))
+ , tsServer :: Maybe RunningServer
, tsWatchedLocalIdentity :: Maybe WatchedHead
, tsWatchedSharedIdentity :: Maybe WatchedHead
}
+data RunningServer = RunningServer
+ { rsServer :: Server
+ , rsPeers :: MVar (Int, [(Int, Peer)])
+ , rsPeerThread :: ThreadId
+ }
+
initTestState :: TestState
initTestState = TestState
{ tsHead = Nothing
, tsServer = Nothing
- , tsPeers = Nothing
, tsWatchedLocalIdentity = Nothing
, tsWatchedSharedIdentity = Nothing
}
@@ -115,8 +119,8 @@ cmdOut line = do
getPeer :: Text -> CommandM Peer
getPeer spidx = do
- Just pmvar <- gets tsPeers
- Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar pmvar)
+ Just RunningServer {..} <- gets tsServer
+ Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers)
return peer
getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int
@@ -233,6 +237,7 @@ commands = map (T.pack *** id)
, ("stored-set-list", cmdStoredSetList)
, ("create-identity", cmdCreateIdentity)
, ("start-server", cmdStartServer)
+ , ("stop-server", cmdStopServer)
, ("peer-add", cmdPeerAdd)
, ("shared-state-get", cmdSharedStateGet)
, ("shared-state-wait", cmdSharedStateWait)
@@ -323,16 +328,16 @@ cmdStartServer = do
out <- asks tiOutput
Just h <- gets tsHead
- peers <- liftIO $ newMVar (1, [])
- server <- liftIO $ startServer defaultServerOptions h (hPutStrLn stderr)
- [ someServiceAttr $ pairingAttributes (Proxy @AttachService) out peers "attach"
- , someServiceAttr $ pairingAttributes (Proxy @ContactService) out peers "contact"
+ rsPeers <- liftIO $ newMVar (1, [])
+ rsServer <- liftIO $ startServer defaultServerOptions h (hPutStrLn stderr)
+ [ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach"
+ , someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact"
, someServiceAttr $ directMessageAttributes out
, someService @SyncService Proxy
]
- void $ liftIO $ forkIO $ void $ forever $ do
- peer <- getNextPeerChange server
+ rsPeerThread <- liftIO $ forkIO $ void $ forever $ do
+ peer <- getNextPeerChange rsServer
let printPeer (idx, p) = do
params <- peerIdentity p >>= return . \case
@@ -344,19 +349,28 @@ cmdStartServer = do
update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur
| otherwise = fmap (p:) <$> update (nid, ps)
- modifyMVar_ peers update
+ modifyMVar_ rsPeers update
- modify $ \s -> s { tsServer = Just server, tsPeers = Just peers }
+ modify $ \s -> s { tsServer = Just RunningServer {..} }
+
+cmdStopServer :: Command
+cmdStopServer = do
+ Just RunningServer {..} <- gets tsServer
+ liftIO $ do
+ killThread rsPeerThread
+ stopServer rsServer
+ modify $ \s -> s { tsServer = Nothing }
+ cmdOut "stop-server-done"
cmdPeerAdd :: Command
cmdPeerAdd = do
- Just server <- gets tsServer
+ Just RunningServer {..} <- gets tsServer
host:rest <- map T.unpack <$> asks tiParams
let port = case rest of [] -> show discoveryPort
(p:_) -> p
addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just host) (Just port)
- void $ liftIO $ serverPeer server (addrAddress addr)
+ void $ liftIO $ serverPeer rsServer (addrAddress addr)
cmdSharedStateGet :: Command
cmdSharedStateGet = do