diff options
Diffstat (limited to 'src/Network.hs')
-rw-r--r-- | src/Network.hs | 60 |
1 files changed, 23 insertions, 37 deletions
diff --git a/src/Network.hs b/src/Network.hs index 14adad6..5d3c0ec 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -54,7 +54,6 @@ import Service import State import Storage import Storage.Merge -import Sync discoveryPort :: PortNumber @@ -254,7 +253,6 @@ startServer opt origHead logd' services = do svcStates <- newTMVarIO M.empty peers <- newMVar M.empty midentity <- newMVar $ headLocalIdentity origHead - mshared <- newMVar $ lsShared $ load $ headRef origHead ssocket <- newEmptyMVar errlog <- newTQueueIO @@ -316,28 +314,20 @@ startServer opt origHead logd' services = do writeTQueue (peerOutQueue peer) (True, ackedBy, packet) _ -> return () - let shareState self shared peer = do - let refs = map (partialRef (peerInStorage peer) . storedRef) shared - hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef refs - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- refs ] - packet = TransportPacket (TransportHeader hitems) $ - map storedRef shared - atomically $ readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do - sendToPeerS peer ackedBy packet - _ -> return () - void $ watchHead origHead $ \h -> do let idt = headLocalIdentity h changedId <- modifyMVar midentity $ \cur -> return (idt, cur /= idt) when changedId $ announceUpdate idt - let shared = lsShared $ load $ headRef h - changedShared <- modifyMVar mshared $ \cur -> - return (shared, cur /= shared) - when changedShared $ do - mapM_ (shareState idt shared) =<< readMVar peers + forM_ services $ \(SomeService service _) -> do + forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do + watchHeadWith origHead (sel . headStoredObject) $ \x -> do + withMVar peers $ mapM_ $ \peer -> atomically $ do + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> writeTQueue ioActions $ do + runPeerService peer $ act x + _ -> return () void $ forkIO $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 @@ -370,7 +360,7 @@ startServer opt origHead logd' services = do prefs <- forM objs $ storeObject $ peerInStorage peer identity <- readMVar midentity let svcs = map someServiceID services - handlePacket origHead identity secure peer chanSvc svcs header prefs + handlePacket identity secure peer chanSvc svcs header prefs | otherwise -> atomically $ do logd $ show paddr ++ ": invalid objects" @@ -547,10 +537,10 @@ appendDistinct x (y:ys) | x == y = y : ys | otherwise = y : appendDistinct x ys appendDistinct x [] = [x] -handlePacket :: Head LocalState -> UnifiedIdentity -> Bool +handlePacket :: UnifiedIdentity -> Bool -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] -> TransportHeader -> [PartialRef] -> IO () -handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do +handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer processAcknowledgements peer headers ochannel <- readTVar $ peerChannel peer @@ -571,7 +561,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers readTVarP (peerChannel peer) >>= \case ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do writeTVarP (peerChannel peer) $ ChannelEstablished ch - liftSTM $ finalizedChannel peer origHead identity + liftSTM $ finalizedChannel peer identity _ -> return () Rejected ref -> do @@ -629,7 +619,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers TrChannelAccept accref -> do let process = do - handleChannelAccept origHead identity accref + handleChannelAccept identity accref readTVarP (peerChannel peer) >>= \case ChannelWait {} -> process ChannelOurRequest {} -> process @@ -705,8 +695,8 @@ handleChannelRequest peer identity req = do ] _ -> writeTQueue (serverErrorLog $ peerServer peer) $ "unexpected channel request" -handleChannelAccept :: Head LocalState -> UnifiedIdentity -> PartialRef -> PacketHandler () -handleChannelAccept oh identity accref = do +handleChannelAccept :: UnifiedIdentity -> PartialRef -> PacketHandler () +handleChannelAccept identity accref = do peer <- gets phPeer liftSTM $ writeTQueue (serverIOActions $ peerServer peer) $ do withPeerIdentity peer $ \upid -> do @@ -716,13 +706,13 @@ handleChannelAccept oh identity accref = do liftIO $ atomically $ do sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) [] writeTVar (peerChannel peer) $ ChannelEstablished ch - finalizedChannel peer oh identity + finalizedChannel peer identity Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) -finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM () -finalizedChannel peer oh self = do +finalizedChannel :: Peer -> UnifiedIdentity -> STM () +finalizedChannel peer self = do let ist = peerInStorage peer -- Identity update @@ -732,17 +722,13 @@ finalizedChannel peer oh self = do ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ] sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs - -- Shared state + -- Notify services about new peer readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do + PeerIdentityFull _ -> writeTQueue (serverIOActions $ peerServer peer) $ do - Just h <- liftIO $ reloadHead oh - let shared = lsShared $ headObject h - let hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef srefs - srefs = map (partialRef ist . storedRef) shared - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- srefs ] - liftIO $ atomically $ sendToPeerS peer ackedBy $ - TransportPacket (TransportHeader hitems) $ map storedRef shared + forM_ (serverServices $ peerServer peer) $ \case + service@(SomeService _ attrs) -> + runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer _ -> return () -- Outstanding service packets |