summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2023-07-03 20:21:04 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2023-07-03 22:58:13 +0200
commitba636680dc5fdd7d5db81248e4fa737d026f985f (patch)
treed6ebbd6166d8974bb9ded012f660583de84be8f4 /src
parent4bed9f9d88f4ce57b540e756e3d26ed708078604 (diff)
Handle Sync service outside of Network module
Diffstat (limited to 'src')
-rw-r--r--src/Network.hs60
-rw-r--r--src/Service.hs10
-rw-r--r--src/Sync.hs28
3 files changed, 55 insertions, 43 deletions
diff --git a/src/Network.hs b/src/Network.hs
index 14adad6..5d3c0ec 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -54,7 +54,6 @@ import Service
import State
import Storage
import Storage.Merge
-import Sync
discoveryPort :: PortNumber
@@ -254,7 +253,6 @@ startServer opt origHead logd' services = do
svcStates <- newTMVarIO M.empty
peers <- newMVar M.empty
midentity <- newMVar $ headLocalIdentity origHead
- mshared <- newMVar $ lsShared $ load $ headRef origHead
ssocket <- newEmptyMVar
errlog <- newTQueueIO
@@ -316,28 +314,20 @@ startServer opt origHead logd' services = do
writeTQueue (peerOutQueue peer) (True, ackedBy, packet)
_ -> return ()
- let shareState self shared peer = do
- let refs = map (partialRef (peerInStorage peer) . storedRef) shared
- hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef refs
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- refs ]
- packet = TransportPacket (TransportHeader hitems) $
- map storedRef shared
- atomically $ readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do
- sendToPeerS peer ackedBy packet
- _ -> return ()
-
void $ watchHead origHead $ \h -> do
let idt = headLocalIdentity h
changedId <- modifyMVar midentity $ \cur ->
return (idt, cur /= idt)
when changedId $ announceUpdate idt
- let shared = lsShared $ load $ headRef h
- changedShared <- modifyMVar mshared $ \cur ->
- return (shared, cur /= shared)
- when changedShared $ do
- mapM_ (shareState idt shared) =<< readMVar peers
+ forM_ services $ \(SomeService service _) -> do
+ forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
+ watchHeadWith origHead (sel . headStoredObject) $ \x -> do
+ withMVar peers $ mapM_ $ \peer -> atomically $ do
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> writeTQueue ioActions $ do
+ runPeerService peer $ act x
+ _ -> return ()
void $ forkIO $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
@@ -370,7 +360,7 @@ startServer opt origHead logd' services = do
prefs <- forM objs $ storeObject $ peerInStorage peer
identity <- readMVar midentity
let svcs = map someServiceID services
- handlePacket origHead identity secure peer chanSvc svcs header prefs
+ handlePacket identity secure peer chanSvc svcs header prefs
| otherwise -> atomically $ do
logd $ show paddr ++ ": invalid objects"
@@ -547,10 +537,10 @@ appendDistinct x (y:ys) | x == y = y : ys
| otherwise = y : appendDistinct x ys
appendDistinct x [] = [x]
-handlePacket :: Head LocalState -> UnifiedIdentity -> Bool
+handlePacket :: UnifiedIdentity -> Bool
-> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
-> TransportHeader -> [PartialRef] -> IO ()
-handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
+handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
processAcknowledgements peer headers
ochannel <- readTVar $ peerChannel peer
@@ -571,7 +561,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
readTVarP (peerChannel peer) >>= \case
ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do
writeTVarP (peerChannel peer) $ ChannelEstablished ch
- liftSTM $ finalizedChannel peer origHead identity
+ liftSTM $ finalizedChannel peer identity
_ -> return ()
Rejected ref -> do
@@ -629,7 +619,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
TrChannelAccept accref -> do
let process = do
- handleChannelAccept origHead identity accref
+ handleChannelAccept identity accref
readTVarP (peerChannel peer) >>= \case
ChannelWait {} -> process
ChannelOurRequest {} -> process
@@ -705,8 +695,8 @@ handleChannelRequest peer identity req = do
]
_ -> writeTQueue (serverErrorLog $ peerServer peer) $ "unexpected channel request"
-handleChannelAccept :: Head LocalState -> UnifiedIdentity -> PartialRef -> PacketHandler ()
-handleChannelAccept oh identity accref = do
+handleChannelAccept :: UnifiedIdentity -> PartialRef -> PacketHandler ()
+handleChannelAccept identity accref = do
peer <- gets phPeer
liftSTM $ writeTQueue (serverIOActions $ peerServer peer) $ do
withPeerIdentity peer $ \upid -> do
@@ -716,13 +706,13 @@ handleChannelAccept oh identity accref = do
liftIO $ atomically $ do
sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) []
writeTVar (peerChannel peer) $ ChannelEstablished ch
- finalizedChannel peer oh identity
+ finalizedChannel peer identity
Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
-finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM ()
-finalizedChannel peer oh self = do
+finalizedChannel :: Peer -> UnifiedIdentity -> STM ()
+finalizedChannel peer self = do
let ist = peerInStorage peer
-- Identity update
@@ -732,17 +722,13 @@ finalizedChannel peer oh self = do
ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]
sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs
- -- Shared state
+ -- Notify services about new peer
readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do
+ PeerIdentityFull _ ->
writeTQueue (serverIOActions $ peerServer peer) $ do
- Just h <- liftIO $ reloadHead oh
- let shared = lsShared $ headObject h
- let hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef srefs
- srefs = map (partialRef ist . storedRef) shared
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- srefs ]
- liftIO $ atomically $ sendToPeerS peer ackedBy $
- TransportPacket (TransportHeader hitems) $ map storedRef shared
+ forM_ (serverServices $ peerServer peer) $ \case
+ service@(SomeService _ attrs) ->
+ runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
_ -> return ()
-- Outstanding service packets
diff --git a/src/Service.hs b/src/Service.hs
index 4fc8335..a3a19a4 100644
--- a/src/Service.hs
+++ b/src/Service.hs
@@ -3,6 +3,7 @@ module Service (
SomeService(..), someService, someServiceAttr, someServiceID,
SomeServiceState(..), fromServiceState, someServiceEmptyState,
SomeServiceGlobalState(..), fromServiceGlobalState, someServiceEmptyGlobalState,
+ SomeStorageWatcher(..),
ServiceID, mkServiceID,
ServiceHandler,
@@ -40,6 +41,9 @@ class (Typeable s, Storable s, Typeable (ServiceState s), Typeable (ServiceGloba
serviceID :: proxy s -> ServiceID
serviceHandler :: Stored s -> ServiceHandler s ()
+ serviceNewPeer :: ServiceHandler s ()
+ serviceNewPeer = return ()
+
type ServiceAttributes s = attr | attr -> s
type ServiceAttributes s = Proxy s
defaultServiceAttributes :: proxy s -> ServiceAttributes s
@@ -58,6 +62,9 @@ class (Typeable s, Storable s, Typeable (ServiceState s), Typeable (ServiceGloba
default emptyServiceGlobalState :: ServiceGlobalState s ~ () => proxy s -> ServiceGlobalState s
emptyServiceGlobalState _ = ()
+ serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s]
+ serviceStorageWatchers _ = []
+
data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s)
@@ -87,6 +94,9 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState
someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p)
+data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
+
+
newtype ServiceID = ServiceID UUID
deriving (Eq, Ord, StorableUUID)
diff --git a/src/Sync.hs b/src/Sync.hs
index afb45e6..b1c0ab0 100644
--- a/src/Sync.hs
+++ b/src/Sync.hs
@@ -3,9 +3,11 @@ module Sync (
) where
import Control.Monad
+import Control.Monad.Reader
import Data.List
+import Identity
import Service
import State
import Storage
@@ -18,13 +20,27 @@ instance Service SyncService where
serviceHandler packet = do
let SyncPacket added = fromStored packet
- ls <- svcGetLocal
- let st = storedStorage ls
- current = sort $ lsShared $ fromStored ls
- updated = filterAncestors (added : current)
- when (current /= updated) $ do
- svcSetLocal =<< wrappedStore st (fromStored ls) { lsShared = updated }
+ pid <- asks svcPeerIdentity
+ self <- svcSelf
+ when (finalOwner pid `sameIdentity` finalOwner self) $ do
+ updateLocalHead_ $ \ls -> do
+ let current = sort $ lsShared $ fromStored ls
+ updated = filterAncestors (added : current)
+ if current /= updated
+ then wrappedStore (storedStorage ls) (fromStored ls) { lsShared = updated }
+ else return ls
+
+ serviceNewPeer = notifyPeer . lsShared . fromStored =<< svcGetLocal
+ serviceStorageWatchers _ = (:[]) $ SomeStorageWatcher (lsShared . fromStored) notifyPeer
instance Storable SyncService where
store' (SyncPacket smsg) = store' smsg
load' = SyncPacket <$> load'
+
+notifyPeer :: [Stored SharedState] -> ServiceHandler SyncService ()
+notifyPeer shared = do
+ pid <- asks svcPeerIdentity
+ self <- svcSelf
+ when (finalOwner pid `sameIdentity` finalOwner self) $ do
+ forM_ shared $ \sh ->
+ replyStoredRef =<< (wrappedStore (storedStorage sh) . SyncPacket) sh