From a0c6c341ba1629a1c1070edf69855c745c6bd7eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 19 Jan 2020 21:36:40 +0100 Subject: Synchronization service --- erebos.cabal | 1 + src/Main.hs | 2 ++ src/Network.hs | 37 +++++++++++++++++++++++++++++++++++-- src/Storage.hs | 4 ++-- src/Sync.hs | 37 +++++++++++++++++++++++++++++++++++++ 5 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 src/Sync.hs diff --git a/erebos.cabal b/erebos.cabal index 65a7182..766d708 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -32,6 +32,7 @@ executable erebos Storage.List Storage.Key Storage.Merge + Sync Util default-extensions: ExistentialQuantification diff --git a/src/Main.hs b/src/Main.hs index e23e1b5..696b896 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -34,6 +34,7 @@ import Service import State import Storage import Storage.List +import Sync main :: IO () main = do @@ -96,6 +97,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do erebosHead <- loadLocalStateHead st startServer erebosHead extPrintLn bhost [ SomeService @AttachService Proxy + , SomeService @SyncService Proxy , SomeService @DirectMessageService Proxy ] diff --git a/src/Network.hs b/src/Network.hs index 339be7a..89034da 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -32,6 +32,7 @@ import PubKey import Service import State import Storage +import Sync discoveryPort :: ServiceName @@ -160,6 +161,7 @@ startServer origHead logd bhost services = do chanSvc <- newChan peers <- newMVar M.empty midentity <- newMVar $ headLocalIdentity origHead + mshared <- newMVar $ lsShared $ load $ headRef origHead let open addr = do sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) @@ -175,14 +177,29 @@ startServer origHead logd bhost services = do baddr:_ <- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just bhost) (Just discoveryPort) void $ sendTo sock (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]) (addrAddress baddr) + let shareState self shared peer + | PeerIdentityFull pid <- peerIdentity peer + , finalOwner pid `sameIdentity` finalOwner self = do + forM_ shared $ \s -> runExceptT (sendToPeer self peer $ SyncPacket s) >>= \case + Left err -> logd $ "failed to sync state with peer: " ++ show err + Right () -> return () + | otherwise = return () + void $ forkIO $ forever $ do announce =<< readMVar midentity threadDelay $ announceIntervalSeconds * 1000 * 1000 watchHead origHead $ \h -> do let idt = headLocalIdentity h - modifyMVar_ midentity $ \_ -> return idt - announce idt + changedId <- modifyMVar midentity $ \cur -> + return (idt, cur /= idt) + when changedId $ announce idt + + let shared = lsShared $ load $ headRef h + changedShared <- modifyMVar mshared $ \cur -> + return (shared, cur /= shared) + when changedShared $ do + mapM_ (shareState idt shared) =<< readMVar peers forever $ do (msg, paddr) <- recvFrom sock 4096 @@ -306,6 +323,8 @@ handlePacket logd identity secure opeer chanSvc (TransportHeader headers) = do ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) } sendIdentityUpdate identity + sendSharedState identity . lsShared . fromStored =<< + liftIO (loadLocalState $ storedStorage $ idData identity) _ -> return () DataRequest ref @@ -493,6 +512,8 @@ handleChannelAccept identity accref = do , peerChannel = ChannelEstablished $ fromStored ch } sendIdentityUpdate identity + sendSharedState identity . lsShared . fromStored =<< + liftIO (loadLocalState $ storedStorage $ idData identity) Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) @@ -502,6 +523,18 @@ sendIdentityUpdate self = do mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self +sendSharedState :: UnifiedIdentity -> [Stored a] -> PacketHandler () +sendSharedState self shared = do + gets phPeer >>= \case + peer | PeerIdentityFull pid <- peerIdentity peer + , finalOwner pid `sameIdentity` finalOwner self -> do + ist <- gets $ peerInStorage . phPeer + addHeader $ ServiceType $ serviceID @SyncService Proxy + mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared + mapM_ (addBody . storedRef) shared + | otherwise -> return () + + handleIdentityUpdate :: PacketHandler () handleIdentityUpdate = do peer <- gets phPeer diff --git a/src/Storage.hs b/src/Storage.hs index 2e78c2f..5af34b7 100644 --- a/src/Storage.hs +++ b/src/Storage.hs @@ -758,8 +758,8 @@ storedRef (Stored ref _) = ref storedStorage :: Stored a -> Storage storedStorage (Stored (Ref st _) _) = st -wrappedStore :: Storable a => Storage -> a -> IO (Stored a) -wrappedStore st x = do ref <- store st x +wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a) +wrappedStore st x = do ref <- liftIO $ store st x return $ Stored ref x wrappedLoad :: Storable a => Ref -> Stored a diff --git a/src/Sync.hs b/src/Sync.hs new file mode 100644 index 0000000..e8edf33 --- /dev/null +++ b/src/Sync.hs @@ -0,0 +1,37 @@ +module Sync ( + SyncService, + ServicePacket(..), +) where + +import Control.Monad + +import Data.List + +import Service +import State +import Storage +import Storage.Merge + +data SyncService + +instance Service SyncService where + serviceID _ = mkServiceID "a4f538d0-4e50-4082-8e10-7e3ec2af175d" + + data ServiceState SyncService = SyncService + emptyServiceState = SyncService + + newtype ServicePacket SyncService = SyncPacket (Stored SharedState) + + serviceHandler packet = do + let SyncPacket added = fromStored packet + ls <- svcGetLocal + let st = storedStorage ls + current = sort $ lsShared $ fromStored ls + updated = filterAncestors (added : current) + when (current /= updated) $ do + svcSetLocal =<< wrappedStore st (fromStored ls) { lsShared = updated } + return Nothing + +instance Storable (ServicePacket SyncService) where + store' (SyncPacket smsg) = store' smsg + load' = SyncPacket <$> load' -- cgit v1.2.3