diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Network.hs | 31 | ||||
| -rw-r--r-- | src/Test.hs | 44 | 
2 files changed, 50 insertions, 25 deletions
| diff --git a/src/Network.hs b/src/Network.hs index 4b2453e..6fae8c5 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -1,6 +1,7 @@  module Network (      Server,      startServer, +    stopServer,      getNextPeerChange,      ServerOptions(..), serverIdentity, defaultServerOptions, @@ -67,6 +68,7 @@ data Server = Server      { serverStorage :: Storage      , serverOrigHead :: Head LocalState      , serverIdentity_ :: MVar UnifiedIdentity +    , serverThreads :: MVar [ThreadId]      , serverSocket :: MVar Socket      , serverChanPacket :: Chan (PeerAddress, BC.ByteString)      , serverDataResponse :: TQueue (Peer, Maybe PartialRef) @@ -242,10 +244,15 @@ newWaitingRef pref act = do      return wref +forkServerThread :: Server -> IO () -> IO () +forkServerThread server act = modifyMVar_ (serverThreads server) $ \ts -> do +    (:ts) <$> forkIO act +  startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server  startServer opt serverOrigHead logd' serverServices = do      let serverStorage = refStorage $ headRef serverOrigHead      serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead +    serverThreads <- newMVar []      serverSocket <- newEmptyMVar      serverChanPacket <- newChan      serverDataResponse <- newTQueueIO @@ -259,11 +266,11 @@ startServer opt serverOrigHead logd' serverServices = do      chanSvc <- newTQueueIO      let logd = writeTQueue serverErrorLog -    void $ forkIO $ forever $ do +    forkServerThread server $ forever $ do          logd' =<< atomically (readTQueue serverErrorLog) -    void $ forkIO $ dataResponseWorker server -    void $ forkIO $ forever $ do +    forkServerThread server $ dataResponseWorker server +    forkServerThread server $ forever $ do          either (atomically . logd) return =<< runExceptT =<<              atomically (readTQueue serverIOActions) @@ -279,7 +286,7 @@ startServer opt serverOrigHead logd' serverServices = do              return sock          loop sock = do -            when (serverLocalDiscovery opt) $ void $ forkIO $ forever $ do +            when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do                  readMVar serverIdentity_ >>= \identity -> do                      st <- derivePartialStorage serverStorage                      let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ] @@ -316,7 +323,7 @@ startServer opt serverOrigHead logd' serverServices = do                                      runPeerService peer $ act x                                  _ -> return () -            void $ forkIO $ forever $ do +            forkServerThread server $ forever $ do                  (msg, saddr) <- S.recvFrom sock 4096                  writeChan serverChanPacket (DatagramAddress sock saddr, msg) @@ -355,7 +362,7 @@ startServer opt serverOrigHead logd' serverServices = do                       _ -> do atomically $ logd $ show paddr ++ ": invalid objects" -    void $ forkIO $ withSocketsDo $ do +    forkServerThread server $ withSocketsDo $ do          let hints = defaultHints                { addrFlags = [AI_PASSIVE]                , addrFamily = AF_INET6 @@ -364,7 +371,7 @@ startServer opt serverOrigHead logd' serverServices = do          addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort opt)          bracket (open addr) close loop -    void $ forkIO $ forever $ do +    forkServerThread server $ forever $ do          (peer, svc, ref) <- atomically $ readTQueue chanSvc          case find ((svc ==) . someServiceID) serverServices of              Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) @@ -372,13 +379,17 @@ startServer opt serverOrigHead logd' serverServices = do      return server +stopServer :: Server -> IO () +stopServer Server {..} = do +    mapM_ killThread =<< takeMVar serverThreads +  sendWorker :: Peer -> IO ()  sendWorker peer = do      startTime <- getTime MonotonicRaw      nowVar <- newTVarIO startTime      waitVar <- newTVarIO startTime -    let waitTill time = void $ forkIO $ do +    let waitTill time = forkServerThread (peerServer peer) $ do              now <- getTime MonotonicRaw              when (time > now) $                  threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 @@ -464,7 +475,7 @@ dataResponseWorker server = forever $ do                  [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case                            Right ref -> do                                atomically (writeTVar tvar $ Right ref) -                              void $ forkIO $ runExceptT (wrefAction wr ref) >>= \case +                              forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case                                    Left err -> atomically $ writeTQueue (serverErrorLog server) err                                    Right () -> return () @@ -774,7 +785,7 @@ mkPeer server paddr = do          <*> newTMVarIO M.empty          <*> newTVarIO []          <*> newTMVarIO [] -    void $ forkIO $ sendWorker peer +    forkServerThread server $ sendWorker peer      return peer  serverPeer :: Server -> SockAddr -> IO Peer diff --git a/src/Test.hs b/src/Test.hs index 7694322..0778021 100644 --- a/src/Test.hs +++ b/src/Test.hs @@ -44,17 +44,21 @@ import Sync  data TestState = TestState      { tsHead :: Maybe (Head LocalState) -    , tsServer :: Maybe Server -    , tsPeers :: Maybe (MVar (Int, [(Int, Peer)])) +    , tsServer :: Maybe RunningServer      , tsWatchedLocalIdentity :: Maybe WatchedHead      , tsWatchedSharedIdentity :: Maybe WatchedHead      } +data RunningServer = RunningServer +    { rsServer :: Server +    , rsPeers :: MVar (Int, [(Int, Peer)]) +    , rsPeerThread :: ThreadId +    } +  initTestState :: TestState  initTestState = TestState      { tsHead = Nothing      , tsServer = Nothing -    , tsPeers = Nothing      , tsWatchedLocalIdentity = Nothing      , tsWatchedSharedIdentity = Nothing      } @@ -115,8 +119,8 @@ cmdOut line = do  getPeer :: Text -> CommandM Peer  getPeer spidx = do -    Just pmvar <- gets tsPeers -    Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar pmvar) +    Just RunningServer {..} <- gets tsServer +    Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers)      return peer  getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int @@ -233,6 +237,7 @@ commands = map (T.pack *** id)      , ("stored-set-list", cmdStoredSetList)      , ("create-identity", cmdCreateIdentity)      , ("start-server", cmdStartServer) +    , ("stop-server", cmdStopServer)      , ("peer-add", cmdPeerAdd)      , ("shared-state-get", cmdSharedStateGet)      , ("shared-state-wait", cmdSharedStateWait) @@ -323,16 +328,16 @@ cmdStartServer = do      out <- asks tiOutput      Just h <- gets tsHead -    peers <- liftIO $ newMVar (1, []) -    server <- liftIO $ startServer defaultServerOptions h (hPutStrLn stderr) -        [ someServiceAttr $ pairingAttributes (Proxy @AttachService) out peers "attach" -        , someServiceAttr $ pairingAttributes (Proxy @ContactService) out peers "contact" +    rsPeers <- liftIO $ newMVar (1, []) +    rsServer <- liftIO $ startServer defaultServerOptions h (hPutStrLn stderr) +        [ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach" +        , someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact"          , someServiceAttr $ directMessageAttributes out          , someService @SyncService Proxy          ] -    void $ liftIO $ forkIO $ void $ forever $ do -        peer <- getNextPeerChange server +    rsPeerThread <- liftIO $ forkIO $ void $ forever $ do +        peer <- getNextPeerChange rsServer          let printPeer (idx, p) = do                  params <- peerIdentity p >>= return . \case @@ -344,19 +349,28 @@ cmdStartServer = do              update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur                                     | otherwise = fmap (p:) <$> update (nid, ps) -        modifyMVar_ peers update +        modifyMVar_ rsPeers update -    modify $ \s -> s { tsServer = Just server, tsPeers = Just peers } +    modify $ \s -> s { tsServer = Just RunningServer {..} } + +cmdStopServer :: Command +cmdStopServer = do +    Just RunningServer {..} <- gets tsServer +    liftIO $ do +        killThread rsPeerThread +        stopServer rsServer +    modify $ \s -> s { tsServer = Nothing } +    cmdOut "stop-server-done"  cmdPeerAdd :: Command  cmdPeerAdd = do -    Just server <- gets tsServer +    Just RunningServer {..} <- gets tsServer      host:rest <- map T.unpack <$> asks tiParams      let port = case rest of [] -> show discoveryPort                              (p:_) -> p      addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just host) (Just port) -    void $ liftIO $ serverPeer server (addrAddress addr) +    void $ liftIO $ serverPeer rsServer (addrAddress addr)  cmdSharedStateGet :: Command  cmdSharedStateGet = do |