diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Network.hs | 81 | 
1 files changed, 34 insertions, 47 deletions
| diff --git a/src/Network.hs b/src/Network.hs index 5d3c0ec..4b2453e 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -243,48 +243,35 @@ newWaitingRef pref act = do  startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server -startServer opt origHead logd' services = do -    let storage = refStorage $ headRef origHead -    chanPacket <- newChan -    dataResponse <- newTQueueIO -    ioActions <- newTQueueIO -    chanPeer <- newTChanIO +startServer opt serverOrigHead logd' serverServices = do +    let serverStorage = refStorage $ headRef serverOrigHead +    serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead +    serverSocket <- newEmptyMVar +    serverChanPacket <- newChan +    serverDataResponse <- newTQueueIO +    serverIOActions <- newTQueueIO +    serverServiceStates <- newTMVarIO M.empty +    serverPeers <- newMVar M.empty +    serverChanPeer <- newTChanIO +    serverErrorLog <- newTQueueIO +    let server = Server {..} +      chanSvc <- newTQueueIO -    svcStates <- newTMVarIO M.empty -    peers <- newMVar M.empty -    midentity <- newMVar $ headLocalIdentity origHead -    ssocket <- newEmptyMVar -    errlog <- newTQueueIO - -    let server = Server -            { serverStorage = storage -            , serverOrigHead = origHead -            , serverIdentity_ = midentity -            , serverSocket = ssocket -            , serverChanPacket = chanPacket -            , serverDataResponse = dataResponse -            , serverIOActions = ioActions -            , serverServices = services -            , serverServiceStates = svcStates -            , serverPeers = peers -            , serverChanPeer = chanPeer -            , serverErrorLog = errlog -            } - -    let logd = writeTQueue errlog + +    let logd = writeTQueue serverErrorLog      void $ forkIO $ forever $ do -        logd' =<< atomically (readTQueue errlog) +        logd' =<< atomically (readTQueue serverErrorLog)      void $ forkIO $ dataResponseWorker server      void $ forkIO $ forever $ do          either (atomically . logd) return =<< runExceptT =<< -            atomically (readTQueue $ serverIOActions server) +            atomically (readTQueue serverIOActions)      broadcastAddreses <- getBroadcastAddresses discoveryPort      let open addr = do              sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) -            putMVar ssocket sock +            putMVar serverSocket sock              setSocketOption sock ReuseAddr 1              setSocketOption sock Broadcast 1              withFdSocket sock setCloseOnExecIfNeeded @@ -293,49 +280,49 @@ startServer opt origHead logd' services = do          loop sock = do              when (serverLocalDiscovery opt) $ void $ forkIO $ forever $ do -                readMVar midentity >>= \identity -> do -                    st <- derivePartialStorage storage +                readMVar serverIdentity_ >>= \identity -> do +                    st <- derivePartialStorage serverStorage                      let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]                      mapM_ (void . S.sendTo sock packet) broadcastAddreses                  threadDelay $ announceIntervalSeconds * 1000 * 1000              let announceUpdate identity = do -                    st <- derivePartialStorage storage +                    st <- derivePartialStorage serverStorage                      let selfRef = partialRef st $ storedRef $ idData identity                          updateRefs = selfRef : map (partialRef st . storedRef) (idUpdates identity)                          ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]                          hitems = map AnnounceUpdate updateRefs                          packet = TransportPacket (TransportHeader $  hitems) [] -                    ps <- readMVar peers +                    ps <- readMVar serverPeers                      forM_ ps $ \peer -> atomically $ do                          ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case                              (PeerIdentityFull _, ChannelEstablished _) ->                                  writeTQueue (peerOutQueue peer) (True, ackedBy, packet)                              _  -> return () -            void $ watchHead origHead $ \h -> do +            void $ watchHead serverOrigHead $ \h -> do                  let idt = headLocalIdentity h -                changedId <- modifyMVar midentity $ \cur -> +                changedId <- modifyMVar serverIdentity_ $ \cur ->                      return (idt, cur /= idt)                  when changedId $ announceUpdate idt -            forM_ services $ \(SomeService service _) -> do +            forM_ serverServices $ \(SomeService service _) -> do                  forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do -                    watchHeadWith origHead (sel . headStoredObject) $ \x -> do -                        withMVar peers $ mapM_ $ \peer -> atomically $ do +                    watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do +                        withMVar serverPeers $ mapM_ $ \peer -> atomically $ do                              readTVar (peerIdentityVar peer) >>= \case -                                PeerIdentityFull _ -> writeTQueue ioActions $ do +                                PeerIdentityFull _ -> writeTQueue serverIOActions $ do                                      runPeerService peer $ act x                                  _ -> return ()              void $ forkIO $ forever $ do                  (msg, saddr) <- S.recvFrom sock 4096 -                writeChan chanPacket (DatagramAddress sock saddr, msg) +                writeChan serverChanPacket (DatagramAddress sock saddr, msg)              forever $ do -                (paddr, msg) <- readChan chanPacket -                (peer, content, secure) <- modifyMVar peers $ \pvalue -> do +                (paddr, msg) <- readChan serverChanPacket +                (peer, content, secure) <- modifyMVar serverPeers $ \pvalue -> do                      case M.lookup paddr pvalue of                          Just peer -> do                              mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case @@ -358,8 +345,8 @@ startServer opt origHead logd' services = do                       Right (obj:objs)                           | Just header <- transportFromObject obj -> do                                 prefs <- forM objs $ storeObject $ peerInStorage peer -                               identity <- readMVar midentity -                               let svcs = map someServiceID services +                               identity <- readMVar serverIdentity_ +                               let svcs = map someServiceID serverServices                                 handlePacket identity secure peer chanSvc svcs header prefs                           | otherwise -> atomically $ do @@ -379,7 +366,7 @@ startServer opt origHead logd' services = do      void $ forkIO $ forever $ do          (peer, svc, ref) <- atomically $ readTQueue chanSvc -        case find ((svc ==) . someServiceID) (serverServices server) of +        case find ((svc ==) . someServiceID) serverServices of              Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)              _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" |