diff options
-rw-r--r-- | src/Network.hs | 60 | ||||
-rw-r--r-- | src/Service.hs | 10 | ||||
-rw-r--r-- | src/Sync.hs | 28 |
3 files changed, 55 insertions, 43 deletions
diff --git a/src/Network.hs b/src/Network.hs index 14adad6..5d3c0ec 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -54,7 +54,6 @@ import Service import State import Storage import Storage.Merge -import Sync discoveryPort :: PortNumber @@ -254,7 +253,6 @@ startServer opt origHead logd' services = do svcStates <- newTMVarIO M.empty peers <- newMVar M.empty midentity <- newMVar $ headLocalIdentity origHead - mshared <- newMVar $ lsShared $ load $ headRef origHead ssocket <- newEmptyMVar errlog <- newTQueueIO @@ -316,28 +314,20 @@ startServer opt origHead logd' services = do writeTQueue (peerOutQueue peer) (True, ackedBy, packet) _ -> return () - let shareState self shared peer = do - let refs = map (partialRef (peerInStorage peer) . storedRef) shared - hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef refs - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- refs ] - packet = TransportPacket (TransportHeader hitems) $ - map storedRef shared - atomically $ readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do - sendToPeerS peer ackedBy packet - _ -> return () - void $ watchHead origHead $ \h -> do let idt = headLocalIdentity h changedId <- modifyMVar midentity $ \cur -> return (idt, cur /= idt) when changedId $ announceUpdate idt - let shared = lsShared $ load $ headRef h - changedShared <- modifyMVar mshared $ \cur -> - return (shared, cur /= shared) - when changedShared $ do - mapM_ (shareState idt shared) =<< readMVar peers + forM_ services $ \(SomeService service _) -> do + forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do + watchHeadWith origHead (sel . headStoredObject) $ \x -> do + withMVar peers $ mapM_ $ \peer -> atomically $ do + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> writeTQueue ioActions $ do + runPeerService peer $ act x + _ -> return () void $ forkIO $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 @@ -370,7 +360,7 @@ startServer opt origHead logd' services = do prefs <- forM objs $ storeObject $ peerInStorage peer identity <- readMVar midentity let svcs = map someServiceID services - handlePacket origHead identity secure peer chanSvc svcs header prefs + handlePacket identity secure peer chanSvc svcs header prefs | otherwise -> atomically $ do logd $ show paddr ++ ": invalid objects" @@ -547,10 +537,10 @@ appendDistinct x (y:ys) | x == y = y : ys | otherwise = y : appendDistinct x ys appendDistinct x [] = [x] -handlePacket :: Head LocalState -> UnifiedIdentity -> Bool +handlePacket :: UnifiedIdentity -> Bool -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] -> TransportHeader -> [PartialRef] -> IO () -handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do +handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer processAcknowledgements peer headers ochannel <- readTVar $ peerChannel peer @@ -571,7 +561,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers readTVarP (peerChannel peer) >>= \case ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do writeTVarP (peerChannel peer) $ ChannelEstablished ch - liftSTM $ finalizedChannel peer origHead identity + liftSTM $ finalizedChannel peer identity _ -> return () Rejected ref -> do @@ -629,7 +619,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers TrChannelAccept accref -> do let process = do - handleChannelAccept origHead identity accref + handleChannelAccept identity accref readTVarP (peerChannel peer) >>= \case ChannelWait {} -> process ChannelOurRequest {} -> process @@ -705,8 +695,8 @@ handleChannelRequest peer identity req = do ] _ -> writeTQueue (serverErrorLog $ peerServer peer) $ "unexpected channel request" -handleChannelAccept :: Head LocalState -> UnifiedIdentity -> PartialRef -> PacketHandler () -handleChannelAccept oh identity accref = do +handleChannelAccept :: UnifiedIdentity -> PartialRef -> PacketHandler () +handleChannelAccept identity accref = do peer <- gets phPeer liftSTM $ writeTQueue (serverIOActions $ peerServer peer) $ do withPeerIdentity peer $ \upid -> do @@ -716,13 +706,13 @@ handleChannelAccept oh identity accref = do liftIO $ atomically $ do sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) [] writeTVar (peerChannel peer) $ ChannelEstablished ch - finalizedChannel peer oh identity + finalizedChannel peer identity Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) -finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM () -finalizedChannel peer oh self = do +finalizedChannel :: Peer -> UnifiedIdentity -> STM () +finalizedChannel peer self = do let ist = peerInStorage peer -- Identity update @@ -732,17 +722,13 @@ finalizedChannel peer oh self = do ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ] sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs - -- Shared state + -- Notify services about new peer readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do + PeerIdentityFull _ -> writeTQueue (serverIOActions $ peerServer peer) $ do - Just h <- liftIO $ reloadHead oh - let shared = lsShared $ headObject h - let hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef srefs - srefs = map (partialRef ist . storedRef) shared - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- srefs ] - liftIO $ atomically $ sendToPeerS peer ackedBy $ - TransportPacket (TransportHeader hitems) $ map storedRef shared + forM_ (serverServices $ peerServer peer) $ \case + service@(SomeService _ attrs) -> + runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer _ -> return () -- Outstanding service packets diff --git a/src/Service.hs b/src/Service.hs index 4fc8335..a3a19a4 100644 --- a/src/Service.hs +++ b/src/Service.hs @@ -3,6 +3,7 @@ module Service ( SomeService(..), someService, someServiceAttr, someServiceID, SomeServiceState(..), fromServiceState, someServiceEmptyState, SomeServiceGlobalState(..), fromServiceGlobalState, someServiceEmptyGlobalState, + SomeStorageWatcher(..), ServiceID, mkServiceID, ServiceHandler, @@ -40,6 +41,9 @@ class (Typeable s, Storable s, Typeable (ServiceState s), Typeable (ServiceGloba serviceID :: proxy s -> ServiceID serviceHandler :: Stored s -> ServiceHandler s () + serviceNewPeer :: ServiceHandler s () + serviceNewPeer = return () + type ServiceAttributes s = attr | attr -> s type ServiceAttributes s = Proxy s defaultServiceAttributes :: proxy s -> ServiceAttributes s @@ -58,6 +62,9 @@ class (Typeable s, Storable s, Typeable (ServiceState s), Typeable (ServiceGloba default emptyServiceGlobalState :: ServiceGlobalState s ~ () => proxy s -> ServiceGlobalState s emptyServiceGlobalState _ = () + serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s] + serviceStorageWatchers _ = [] + data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s) @@ -87,6 +94,9 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p) +data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) + + newtype ServiceID = ServiceID UUID deriving (Eq, Ord, StorableUUID) diff --git a/src/Sync.hs b/src/Sync.hs index afb45e6..b1c0ab0 100644 --- a/src/Sync.hs +++ b/src/Sync.hs @@ -3,9 +3,11 @@ module Sync ( ) where import Control.Monad +import Control.Monad.Reader import Data.List +import Identity import Service import State import Storage @@ -18,13 +20,27 @@ instance Service SyncService where serviceHandler packet = do let SyncPacket added = fromStored packet - ls <- svcGetLocal - let st = storedStorage ls - current = sort $ lsShared $ fromStored ls - updated = filterAncestors (added : current) - when (current /= updated) $ do - svcSetLocal =<< wrappedStore st (fromStored ls) { lsShared = updated } + pid <- asks svcPeerIdentity + self <- svcSelf + when (finalOwner pid `sameIdentity` finalOwner self) $ do + updateLocalHead_ $ \ls -> do + let current = sort $ lsShared $ fromStored ls + updated = filterAncestors (added : current) + if current /= updated + then wrappedStore (storedStorage ls) (fromStored ls) { lsShared = updated } + else return ls + + serviceNewPeer = notifyPeer . lsShared . fromStored =<< svcGetLocal + serviceStorageWatchers _ = (:[]) $ SomeStorageWatcher (lsShared . fromStored) notifyPeer instance Storable SyncService where store' (SyncPacket smsg) = store' smsg load' = SyncPacket <$> load' + +notifyPeer :: [Stored SharedState] -> ServiceHandler SyncService () +notifyPeer shared = do + pid <- asks svcPeerIdentity + self <- svcSelf + when (finalOwner pid `sameIdentity` finalOwner self) $ do + forM_ shared $ \sh -> + replyStoredRef =<< (wrappedStore (storedStorage sh) . SyncPacket) sh |