summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2023-07-20 21:13:11 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2023-07-20 21:13:11 +0200
commit1abd855b5fb7cbcac2e77efec4a392d6d940a91f (patch)
treec3040ee6f59ae6ef7b6d3aae8a5bea746a731dae
parentf626ae18bddc6b5d2ba2fab4984e2219968e197b (diff)
Network: refactor startServer with RecordWildCards
-rw-r--r--erebos.cabal1
-rw-r--r--src/Network.hs81
2 files changed, 35 insertions, 47 deletions
diff --git a/erebos.cabal b/erebos.cabal
index 2b22604..f5d3fb9 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -50,6 +50,7 @@ executable erebos
LambdaCase,
MultiWayIf,
RankNTypes,
+ RecordWildCards
ScopedTypeVariables,
StandaloneDeriving,
TypeOperators
diff --git a/src/Network.hs b/src/Network.hs
index 5d3c0ec..4b2453e 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -243,48 +243,35 @@ newWaitingRef pref act = do
startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server
-startServer opt origHead logd' services = do
- let storage = refStorage $ headRef origHead
- chanPacket <- newChan
- dataResponse <- newTQueueIO
- ioActions <- newTQueueIO
- chanPeer <- newTChanIO
+startServer opt serverOrigHead logd' serverServices = do
+ let serverStorage = refStorage $ headRef serverOrigHead
+ serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead
+ serverSocket <- newEmptyMVar
+ serverChanPacket <- newChan
+ serverDataResponse <- newTQueueIO
+ serverIOActions <- newTQueueIO
+ serverServiceStates <- newTMVarIO M.empty
+ serverPeers <- newMVar M.empty
+ serverChanPeer <- newTChanIO
+ serverErrorLog <- newTQueueIO
+ let server = Server {..}
+
chanSvc <- newTQueueIO
- svcStates <- newTMVarIO M.empty
- peers <- newMVar M.empty
- midentity <- newMVar $ headLocalIdentity origHead
- ssocket <- newEmptyMVar
- errlog <- newTQueueIO
-
- let server = Server
- { serverStorage = storage
- , serverOrigHead = origHead
- , serverIdentity_ = midentity
- , serverSocket = ssocket
- , serverChanPacket = chanPacket
- , serverDataResponse = dataResponse
- , serverIOActions = ioActions
- , serverServices = services
- , serverServiceStates = svcStates
- , serverPeers = peers
- , serverChanPeer = chanPeer
- , serverErrorLog = errlog
- }
-
- let logd = writeTQueue errlog
+
+ let logd = writeTQueue serverErrorLog
void $ forkIO $ forever $ do
- logd' =<< atomically (readTQueue errlog)
+ logd' =<< atomically (readTQueue serverErrorLog)
void $ forkIO $ dataResponseWorker server
void $ forkIO $ forever $ do
either (atomically . logd) return =<< runExceptT =<<
- atomically (readTQueue $ serverIOActions server)
+ atomically (readTQueue serverIOActions)
broadcastAddreses <- getBroadcastAddresses discoveryPort
let open addr = do
sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
- putMVar ssocket sock
+ putMVar serverSocket sock
setSocketOption sock ReuseAddr 1
setSocketOption sock Broadcast 1
withFdSocket sock setCloseOnExecIfNeeded
@@ -293,49 +280,49 @@ startServer opt origHead logd' services = do
loop sock = do
when (serverLocalDiscovery opt) $ void $ forkIO $ forever $ do
- readMVar midentity >>= \identity -> do
- st <- derivePartialStorage storage
+ readMVar serverIdentity_ >>= \identity -> do
+ st <- derivePartialStorage serverStorage
let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]
mapM_ (void . S.sendTo sock packet) broadcastAddreses
threadDelay $ announceIntervalSeconds * 1000 * 1000
let announceUpdate identity = do
- st <- derivePartialStorage storage
+ st <- derivePartialStorage serverStorage
let selfRef = partialRef st $ storedRef $ idData identity
updateRefs = selfRef : map (partialRef st . storedRef) (idUpdates identity)
ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]
hitems = map AnnounceUpdate updateRefs
packet = TransportPacket (TransportHeader $ hitems) []
- ps <- readMVar peers
+ ps <- readMVar serverPeers
forM_ ps $ \peer -> atomically $ do
((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case
(PeerIdentityFull _, ChannelEstablished _) ->
writeTQueue (peerOutQueue peer) (True, ackedBy, packet)
_ -> return ()
- void $ watchHead origHead $ \h -> do
+ void $ watchHead serverOrigHead $ \h -> do
let idt = headLocalIdentity h
- changedId <- modifyMVar midentity $ \cur ->
+ changedId <- modifyMVar serverIdentity_ $ \cur ->
return (idt, cur /= idt)
when changedId $ announceUpdate idt
- forM_ services $ \(SomeService service _) -> do
+ forM_ serverServices $ \(SomeService service _) -> do
forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
- watchHeadWith origHead (sel . headStoredObject) $ \x -> do
- withMVar peers $ mapM_ $ \peer -> atomically $ do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull _ -> writeTQueue ioActions $ do
+ PeerIdentityFull _ -> writeTQueue serverIOActions $ do
runPeerService peer $ act x
_ -> return ()
void $ forkIO $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
- writeChan chanPacket (DatagramAddress sock saddr, msg)
+ writeChan serverChanPacket (DatagramAddress sock saddr, msg)
forever $ do
- (paddr, msg) <- readChan chanPacket
- (peer, content, secure) <- modifyMVar peers $ \pvalue -> do
+ (paddr, msg) <- readChan serverChanPacket
+ (peer, content, secure) <- modifyMVar serverPeers $ \pvalue -> do
case M.lookup paddr pvalue of
Just peer -> do
mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case
@@ -358,8 +345,8 @@ startServer opt origHead logd' services = do
Right (obj:objs)
| Just header <- transportFromObject obj -> do
prefs <- forM objs $ storeObject $ peerInStorage peer
- identity <- readMVar midentity
- let svcs = map someServiceID services
+ identity <- readMVar serverIdentity_
+ let svcs = map someServiceID serverServices
handlePacket identity secure peer chanSvc svcs header prefs
| otherwise -> atomically $ do
@@ -379,7 +366,7 @@ startServer opt origHead logd' services = do
void $ forkIO $ forever $ do
(peer, svc, ref) <- atomically $ readTQueue chanSvc
- case find ((svc ==) . someServiceID) (serverServices server) of
+ case find ((svc ==) . someServiceID) serverServices of
Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
_ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"