diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Network.hs | 60 | ||||
| -rw-r--r-- | src/Service.hs | 10 | ||||
| -rw-r--r-- | src/Sync.hs | 28 | 
3 files changed, 55 insertions, 43 deletions
| diff --git a/src/Network.hs b/src/Network.hs index 14adad6..5d3c0ec 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -54,7 +54,6 @@ import Service  import State  import Storage  import Storage.Merge -import Sync  discoveryPort :: PortNumber @@ -254,7 +253,6 @@ startServer opt origHead logd' services = do      svcStates <- newTMVarIO M.empty      peers <- newMVar M.empty      midentity <- newMVar $ headLocalIdentity origHead -    mshared <- newMVar $ lsShared $ load $ headRef origHead      ssocket <- newEmptyMVar      errlog <- newTQueueIO @@ -316,28 +314,20 @@ startServer opt origHead logd' services = do                                  writeTQueue (peerOutQueue peer) (True, ackedBy, packet)                              _  -> return () -            let shareState self shared peer = do -                    let refs = map (partialRef (peerInStorage peer) . storedRef) shared -                        hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef refs -                        ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- refs ] -                        packet = TransportPacket (TransportHeader hitems) $ -                            map storedRef shared -                    atomically $ readTVar (peerIdentityVar peer) >>= \case -                        PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do -                            sendToPeerS peer ackedBy packet -                        _  -> return () -              void $ watchHead origHead $ \h -> do                  let idt = headLocalIdentity h                  changedId <- modifyMVar midentity $ \cur ->                      return (idt, cur /= idt)                  when changedId $ announceUpdate idt -                let shared = lsShared $ load $ headRef h -                changedShared <- modifyMVar mshared $ \cur -> -                    return (shared, cur /= shared) -                when changedShared $ do -                    mapM_ (shareState idt shared) =<< readMVar peers +            forM_ services $ \(SomeService service _) -> do +                forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do +                    watchHeadWith origHead (sel . headStoredObject) $ \x -> do +                        withMVar peers $ mapM_ $ \peer -> atomically $ do +                            readTVar (peerIdentityVar peer) >>= \case +                                PeerIdentityFull _ -> writeTQueue ioActions $ do +                                    runPeerService peer $ act x +                                _ -> return ()              void $ forkIO $ forever $ do                  (msg, saddr) <- S.recvFrom sock 4096 @@ -370,7 +360,7 @@ startServer opt origHead logd' services = do                                 prefs <- forM objs $ storeObject $ peerInStorage peer                                 identity <- readMVar midentity                                 let svcs = map someServiceID services -                               handlePacket origHead identity secure peer chanSvc svcs header prefs +                               handlePacket identity secure peer chanSvc svcs header prefs                           | otherwise -> atomically $ do                                 logd $ show paddr ++ ": invalid objects" @@ -547,10 +537,10 @@ appendDistinct x (y:ys) | x == y    = y : ys                          | otherwise = y : appendDistinct x ys  appendDistinct x [] = [x] -handlePacket :: Head LocalState -> UnifiedIdentity -> Bool +handlePacket :: UnifiedIdentity -> Bool      -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]      -> TransportHeader -> [PartialRef] -> IO () -handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do +handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do      let server = peerServer peer      processAcknowledgements peer headers      ochannel <- readTVar $ peerChannel peer @@ -571,7 +561,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers                  readTVarP (peerChannel peer) >>= \case                      ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do                          writeTVarP (peerChannel peer) $ ChannelEstablished ch -                        liftSTM $ finalizedChannel peer origHead identity +                        liftSTM $ finalizedChannel peer identity                      _ -> return ()              Rejected ref -> do @@ -629,7 +619,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers              TrChannelAccept accref -> do                  let process = do -                        handleChannelAccept origHead identity accref +                        handleChannelAccept identity accref                  readTVarP (peerChannel peer) >>= \case                      ChannelWait {} -> process                      ChannelOurRequest {} -> process @@ -705,8 +695,8 @@ handleChannelRequest peer identity req = do                          ]                  _ -> writeTQueue (serverErrorLog $ peerServer peer) $ "unexpected channel request" -handleChannelAccept :: Head LocalState -> UnifiedIdentity -> PartialRef -> PacketHandler () -handleChannelAccept oh identity accref = do +handleChannelAccept :: UnifiedIdentity -> PartialRef -> PacketHandler () +handleChannelAccept identity accref = do      peer <- gets phPeer      liftSTM $ writeTQueue (serverIOActions $ peerServer peer) $ do          withPeerIdentity peer $ \upid -> do @@ -716,13 +706,13 @@ handleChannelAccept oh identity accref = do                      liftIO $ atomically $ do                          sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) []                          writeTVar (peerChannel peer) $ ChannelEstablished ch -                        finalizedChannel peer oh identity +                        finalizedChannel peer identity                  Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) -finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM () -finalizedChannel peer oh self = do +finalizedChannel :: Peer -> UnifiedIdentity -> STM () +finalizedChannel peer self = do      let ist = peerInStorage peer      -- Identity update @@ -732,17 +722,13 @@ finalizedChannel peer oh self = do              ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]          sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs -    -- Shared state +    -- Notify services about new peer      readTVar (peerIdentityVar peer) >>= \case -        PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do +        PeerIdentityFull _ ->              writeTQueue (serverIOActions $ peerServer peer) $ do -                Just h <- liftIO $ reloadHead oh -                let shared = lsShared $ headObject h -                let hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef srefs -                    srefs = map (partialRef ist . storedRef) shared -                    ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- srefs ] -                liftIO $ atomically $ sendToPeerS peer ackedBy $ -                    TransportPacket (TransportHeader hitems) $ map storedRef shared +                forM_ (serverServices $ peerServer peer) $ \case +                    service@(SomeService _ attrs) -> +                        runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer          _ -> return ()      -- Outstanding service packets diff --git a/src/Service.hs b/src/Service.hs index 4fc8335..a3a19a4 100644 --- a/src/Service.hs +++ b/src/Service.hs @@ -3,6 +3,7 @@ module Service (      SomeService(..), someService, someServiceAttr, someServiceID,      SomeServiceState(..), fromServiceState, someServiceEmptyState,      SomeServiceGlobalState(..), fromServiceGlobalState, someServiceEmptyGlobalState, +    SomeStorageWatcher(..),      ServiceID, mkServiceID,      ServiceHandler, @@ -40,6 +41,9 @@ class (Typeable s, Storable s, Typeable (ServiceState s), Typeable (ServiceGloba      serviceID :: proxy s -> ServiceID      serviceHandler :: Stored s -> ServiceHandler s () +    serviceNewPeer :: ServiceHandler s () +    serviceNewPeer = return () +      type ServiceAttributes s = attr | attr -> s      type ServiceAttributes s = Proxy s      defaultServiceAttributes :: proxy s -> ServiceAttributes s @@ -58,6 +62,9 @@ class (Typeable s, Storable s, Typeable (ServiceState s), Typeable (ServiceGloba      default emptyServiceGlobalState :: ServiceGlobalState s ~ () => proxy s -> ServiceGlobalState s      emptyServiceGlobalState _ = () +    serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s] +    serviceStorageWatchers _ = [] +  data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s) @@ -87,6 +94,9 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState  someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p) +data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) + +  newtype ServiceID = ServiceID UUID      deriving (Eq, Ord, StorableUUID) diff --git a/src/Sync.hs b/src/Sync.hs index afb45e6..b1c0ab0 100644 --- a/src/Sync.hs +++ b/src/Sync.hs @@ -3,9 +3,11 @@ module Sync (  ) where  import Control.Monad +import Control.Monad.Reader  import Data.List +import Identity  import Service  import State  import Storage @@ -18,13 +20,27 @@ instance Service SyncService where      serviceHandler packet = do          let SyncPacket added = fromStored packet -        ls <- svcGetLocal -        let st = storedStorage ls -            current = sort $ lsShared $ fromStored ls -            updated = filterAncestors (added : current) -        when (current /= updated) $ do -            svcSetLocal =<< wrappedStore st (fromStored ls) { lsShared = updated } +        pid <- asks svcPeerIdentity +        self <- svcSelf +        when (finalOwner pid `sameIdentity` finalOwner self) $ do +            updateLocalHead_ $ \ls -> do +                let current = sort $ lsShared $ fromStored ls +                    updated = filterAncestors (added : current) +                if current /= updated +                   then wrappedStore (storedStorage ls) (fromStored ls) { lsShared = updated } +                   else return ls + +    serviceNewPeer = notifyPeer . lsShared . fromStored =<< svcGetLocal +    serviceStorageWatchers _ = (:[]) $ SomeStorageWatcher (lsShared . fromStored) notifyPeer  instance Storable SyncService where      store' (SyncPacket smsg) = store' smsg      load' = SyncPacket <$> load' + +notifyPeer :: [Stored SharedState] -> ServiceHandler SyncService () +notifyPeer shared = do +    pid <- asks svcPeerIdentity +    self <- svcSelf +    when (finalOwner pid `sameIdentity` finalOwner self) $ do +        forM_ shared $ \sh -> +            replyStoredRef =<< (wrappedStore (storedStorage sh) . SyncPacket) sh |