diff options
Diffstat (limited to 'src/Network.hs')
-rw-r--r-- | src/Network.hs | 81 |
1 files changed, 34 insertions, 47 deletions
diff --git a/src/Network.hs b/src/Network.hs index 5d3c0ec..4b2453e 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -243,48 +243,35 @@ newWaitingRef pref act = do startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server -startServer opt origHead logd' services = do - let storage = refStorage $ headRef origHead - chanPacket <- newChan - dataResponse <- newTQueueIO - ioActions <- newTQueueIO - chanPeer <- newTChanIO +startServer opt serverOrigHead logd' serverServices = do + let serverStorage = refStorage $ headRef serverOrigHead + serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead + serverSocket <- newEmptyMVar + serverChanPacket <- newChan + serverDataResponse <- newTQueueIO + serverIOActions <- newTQueueIO + serverServiceStates <- newTMVarIO M.empty + serverPeers <- newMVar M.empty + serverChanPeer <- newTChanIO + serverErrorLog <- newTQueueIO + let server = Server {..} + chanSvc <- newTQueueIO - svcStates <- newTMVarIO M.empty - peers <- newMVar M.empty - midentity <- newMVar $ headLocalIdentity origHead - ssocket <- newEmptyMVar - errlog <- newTQueueIO - - let server = Server - { serverStorage = storage - , serverOrigHead = origHead - , serverIdentity_ = midentity - , serverSocket = ssocket - , serverChanPacket = chanPacket - , serverDataResponse = dataResponse - , serverIOActions = ioActions - , serverServices = services - , serverServiceStates = svcStates - , serverPeers = peers - , serverChanPeer = chanPeer - , serverErrorLog = errlog - } - - let logd = writeTQueue errlog + + let logd = writeTQueue serverErrorLog void $ forkIO $ forever $ do - logd' =<< atomically (readTQueue errlog) + logd' =<< atomically (readTQueue serverErrorLog) void $ forkIO $ dataResponseWorker server void $ forkIO $ forever $ do either (atomically . logd) return =<< runExceptT =<< - atomically (readTQueue $ serverIOActions server) + atomically (readTQueue serverIOActions) broadcastAddreses <- getBroadcastAddresses discoveryPort let open addr = do sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) - putMVar ssocket sock + putMVar serverSocket sock setSocketOption sock ReuseAddr 1 setSocketOption sock Broadcast 1 withFdSocket sock setCloseOnExecIfNeeded @@ -293,49 +280,49 @@ startServer opt origHead logd' services = do loop sock = do when (serverLocalDiscovery opt) $ void $ forkIO $ forever $ do - readMVar midentity >>= \identity -> do - st <- derivePartialStorage storage + readMVar serverIdentity_ >>= \identity -> do + st <- derivePartialStorage serverStorage let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ] mapM_ (void . S.sendTo sock packet) broadcastAddreses threadDelay $ announceIntervalSeconds * 1000 * 1000 let announceUpdate identity = do - st <- derivePartialStorage storage + st <- derivePartialStorage serverStorage let selfRef = partialRef st $ storedRef $ idData identity updateRefs = selfRef : map (partialRef st . storedRef) (idUpdates identity) ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ] hitems = map AnnounceUpdate updateRefs packet = TransportPacket (TransportHeader $ hitems) [] - ps <- readMVar peers + ps <- readMVar serverPeers forM_ ps $ \peer -> atomically $ do ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case (PeerIdentityFull _, ChannelEstablished _) -> writeTQueue (peerOutQueue peer) (True, ackedBy, packet) _ -> return () - void $ watchHead origHead $ \h -> do + void $ watchHead serverOrigHead $ \h -> do let idt = headLocalIdentity h - changedId <- modifyMVar midentity $ \cur -> + changedId <- modifyMVar serverIdentity_ $ \cur -> return (idt, cur /= idt) when changedId $ announceUpdate idt - forM_ services $ \(SomeService service _) -> do + forM_ serverServices $ \(SomeService service _) -> do forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do - watchHeadWith origHead (sel . headStoredObject) $ \x -> do - withMVar peers $ mapM_ $ \peer -> atomically $ do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + withMVar serverPeers $ mapM_ $ \peer -> atomically $ do readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull _ -> writeTQueue ioActions $ do + PeerIdentityFull _ -> writeTQueue serverIOActions $ do runPeerService peer $ act x _ -> return () void $ forkIO $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 - writeChan chanPacket (DatagramAddress sock saddr, msg) + writeChan serverChanPacket (DatagramAddress sock saddr, msg) forever $ do - (paddr, msg) <- readChan chanPacket - (peer, content, secure) <- modifyMVar peers $ \pvalue -> do + (paddr, msg) <- readChan serverChanPacket + (peer, content, secure) <- modifyMVar serverPeers $ \pvalue -> do case M.lookup paddr pvalue of Just peer -> do mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case @@ -358,8 +345,8 @@ startServer opt origHead logd' services = do Right (obj:objs) | Just header <- transportFromObject obj -> do prefs <- forM objs $ storeObject $ peerInStorage peer - identity <- readMVar midentity - let svcs = map someServiceID services + identity <- readMVar serverIdentity_ + let svcs = map someServiceID serverServices handlePacket identity secure peer chanSvc svcs header prefs | otherwise -> atomically $ do @@ -379,7 +366,7 @@ startServer opt origHead logd' services = do void $ forkIO $ forever $ do (peer, svc, ref) <- atomically $ readTQueue chanSvc - case find ((svc ==) . someServiceID) (serverServices server) of + case find ((svc ==) . someServiceID) serverServices of Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" |