summaryrefslogtreecommitdiff
path: root/src/Network.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network.hs')
-rw-r--r--src/Network.hs60
1 files changed, 23 insertions, 37 deletions
diff --git a/src/Network.hs b/src/Network.hs
index 14adad6..5d3c0ec 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -54,7 +54,6 @@ import Service
import State
import Storage
import Storage.Merge
-import Sync
discoveryPort :: PortNumber
@@ -254,7 +253,6 @@ startServer opt origHead logd' services = do
svcStates <- newTMVarIO M.empty
peers <- newMVar M.empty
midentity <- newMVar $ headLocalIdentity origHead
- mshared <- newMVar $ lsShared $ load $ headRef origHead
ssocket <- newEmptyMVar
errlog <- newTQueueIO
@@ -316,28 +314,20 @@ startServer opt origHead logd' services = do
writeTQueue (peerOutQueue peer) (True, ackedBy, packet)
_ -> return ()
- let shareState self shared peer = do
- let refs = map (partialRef (peerInStorage peer) . storedRef) shared
- hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef refs
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- refs ]
- packet = TransportPacket (TransportHeader hitems) $
- map storedRef shared
- atomically $ readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do
- sendToPeerS peer ackedBy packet
- _ -> return ()
-
void $ watchHead origHead $ \h -> do
let idt = headLocalIdentity h
changedId <- modifyMVar midentity $ \cur ->
return (idt, cur /= idt)
when changedId $ announceUpdate idt
- let shared = lsShared $ load $ headRef h
- changedShared <- modifyMVar mshared $ \cur ->
- return (shared, cur /= shared)
- when changedShared $ do
- mapM_ (shareState idt shared) =<< readMVar peers
+ forM_ services $ \(SomeService service _) -> do
+ forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
+ watchHeadWith origHead (sel . headStoredObject) $ \x -> do
+ withMVar peers $ mapM_ $ \peer -> atomically $ do
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> writeTQueue ioActions $ do
+ runPeerService peer $ act x
+ _ -> return ()
void $ forkIO $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
@@ -370,7 +360,7 @@ startServer opt origHead logd' services = do
prefs <- forM objs $ storeObject $ peerInStorage peer
identity <- readMVar midentity
let svcs = map someServiceID services
- handlePacket origHead identity secure peer chanSvc svcs header prefs
+ handlePacket identity secure peer chanSvc svcs header prefs
| otherwise -> atomically $ do
logd $ show paddr ++ ": invalid objects"
@@ -547,10 +537,10 @@ appendDistinct x (y:ys) | x == y = y : ys
| otherwise = y : appendDistinct x ys
appendDistinct x [] = [x]
-handlePacket :: Head LocalState -> UnifiedIdentity -> Bool
+handlePacket :: UnifiedIdentity -> Bool
-> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
-> TransportHeader -> [PartialRef] -> IO ()
-handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
+handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
processAcknowledgements peer headers
ochannel <- readTVar $ peerChannel peer
@@ -571,7 +561,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
readTVarP (peerChannel peer) >>= \case
ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do
writeTVarP (peerChannel peer) $ ChannelEstablished ch
- liftSTM $ finalizedChannel peer origHead identity
+ liftSTM $ finalizedChannel peer identity
_ -> return ()
Rejected ref -> do
@@ -629,7 +619,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
TrChannelAccept accref -> do
let process = do
- handleChannelAccept origHead identity accref
+ handleChannelAccept identity accref
readTVarP (peerChannel peer) >>= \case
ChannelWait {} -> process
ChannelOurRequest {} -> process
@@ -705,8 +695,8 @@ handleChannelRequest peer identity req = do
]
_ -> writeTQueue (serverErrorLog $ peerServer peer) $ "unexpected channel request"
-handleChannelAccept :: Head LocalState -> UnifiedIdentity -> PartialRef -> PacketHandler ()
-handleChannelAccept oh identity accref = do
+handleChannelAccept :: UnifiedIdentity -> PartialRef -> PacketHandler ()
+handleChannelAccept identity accref = do
peer <- gets phPeer
liftSTM $ writeTQueue (serverIOActions $ peerServer peer) $ do
withPeerIdentity peer $ \upid -> do
@@ -716,13 +706,13 @@ handleChannelAccept oh identity accref = do
liftIO $ atomically $ do
sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) []
writeTVar (peerChannel peer) $ ChannelEstablished ch
- finalizedChannel peer oh identity
+ finalizedChannel peer identity
Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
-finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM ()
-finalizedChannel peer oh self = do
+finalizedChannel :: Peer -> UnifiedIdentity -> STM ()
+finalizedChannel peer self = do
let ist = peerInStorage peer
-- Identity update
@@ -732,17 +722,13 @@ finalizedChannel peer oh self = do
ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]
sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs
- -- Shared state
+ -- Notify services about new peer
readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do
+ PeerIdentityFull _ ->
writeTQueue (serverIOActions $ peerServer peer) $ do
- Just h <- liftIO $ reloadHead oh
- let shared = lsShared $ headObject h
- let hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef srefs
- srefs = map (partialRef ist . storedRef) shared
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- srefs ]
- liftIO $ atomically $ sendToPeerS peer ackedBy $
- TransportPacket (TransportHeader hitems) $ map storedRef shared
+ forM_ (serverServices $ peerServer peer) $ \case
+ service@(SomeService _ attrs) ->
+ runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
_ -> return ()
-- Outstanding service packets