summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2020-01-19 21:36:40 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2020-01-19 21:36:40 +0100
commita0c6c341ba1629a1c1070edf69855c745c6bd7eb (patch)
treeba5000382f7f7a905c4011d991fb286ef7abfda5
parent25324026a1033c43652a058f966dfb3d255102ae (diff)
Synchronization service
-rw-r--r--erebos.cabal1
-rw-r--r--src/Main.hs2
-rw-r--r--src/Network.hs37
-rw-r--r--src/Storage.hs4
-rw-r--r--src/Sync.hs37
5 files changed, 77 insertions, 4 deletions
diff --git a/erebos.cabal b/erebos.cabal
index 65a7182..766d708 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -32,6 +32,7 @@ executable erebos
Storage.List
Storage.Key
Storage.Merge
+ Sync
Util
default-extensions: ExistentialQuantification
diff --git a/src/Main.hs b/src/Main.hs
index e23e1b5..696b896 100644
--- a/src/Main.hs
+++ b/src/Main.hs
@@ -34,6 +34,7 @@ import Service
import State
import Storage
import Storage.List
+import Sync
main :: IO ()
main = do
@@ -96,6 +97,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do
erebosHead <- loadLocalStateHead st
startServer erebosHead extPrintLn bhost
[ SomeService @AttachService Proxy
+ , SomeService @SyncService Proxy
, SomeService @DirectMessageService Proxy
]
diff --git a/src/Network.hs b/src/Network.hs
index 339be7a..89034da 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -32,6 +32,7 @@ import PubKey
import Service
import State
import Storage
+import Sync
discoveryPort :: ServiceName
@@ -160,6 +161,7 @@ startServer origHead logd bhost services = do
chanSvc <- newChan
peers <- newMVar M.empty
midentity <- newMVar $ headLocalIdentity origHead
+ mshared <- newMVar $ lsShared $ load $ headRef origHead
let open addr = do
sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
@@ -175,14 +177,29 @@ startServer origHead logd bhost services = do
baddr:_ <- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just bhost) (Just discoveryPort)
void $ sendTo sock (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]) (addrAddress baddr)
+ let shareState self shared peer
+ | PeerIdentityFull pid <- peerIdentity peer
+ , finalOwner pid `sameIdentity` finalOwner self = do
+ forM_ shared $ \s -> runExceptT (sendToPeer self peer $ SyncPacket s) >>= \case
+ Left err -> logd $ "failed to sync state with peer: " ++ show err
+ Right () -> return ()
+ | otherwise = return ()
+
void $ forkIO $ forever $ do
announce =<< readMVar midentity
threadDelay $ announceIntervalSeconds * 1000 * 1000
watchHead origHead $ \h -> do
let idt = headLocalIdentity h
- modifyMVar_ midentity $ \_ -> return idt
- announce idt
+ changedId <- modifyMVar midentity $ \cur ->
+ return (idt, cur /= idt)
+ when changedId $ announce idt
+
+ let shared = lsShared $ load $ headRef h
+ changedShared <- modifyMVar mshared $ \cur ->
+ return (shared, cur /= shared)
+ when changedShared $ do
+ mapM_ (shareState idt shared) =<< readMVar peers
forever $ do
(msg, paddr) <- recvFrom sock 4096
@@ -306,6 +323,8 @@ handlePacket logd identity secure opeer chanSvc (TransportHeader headers) = do
ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do
updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) }
sendIdentityUpdate identity
+ sendSharedState identity . lsShared . fromStored =<<
+ liftIO (loadLocalState $ storedStorage $ idData identity)
_ -> return ()
DataRequest ref
@@ -493,6 +512,8 @@ handleChannelAccept identity accref = do
, peerChannel = ChannelEstablished $ fromStored ch
}
sendIdentityUpdate identity
+ sendSharedState identity . lsShared . fromStored =<<
+ liftIO (loadLocalState $ storedStorage $ idData identity)
Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
@@ -502,6 +523,18 @@ sendIdentityUpdate self = do
mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self
+sendSharedState :: UnifiedIdentity -> [Stored a] -> PacketHandler ()
+sendSharedState self shared = do
+ gets phPeer >>= \case
+ peer | PeerIdentityFull pid <- peerIdentity peer
+ , finalOwner pid `sameIdentity` finalOwner self -> do
+ ist <- gets $ peerInStorage . phPeer
+ addHeader $ ServiceType $ serviceID @SyncService Proxy
+ mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared
+ mapM_ (addBody . storedRef) shared
+ | otherwise -> return ()
+
+
handleIdentityUpdate :: PacketHandler ()
handleIdentityUpdate = do
peer <- gets phPeer
diff --git a/src/Storage.hs b/src/Storage.hs
index 2e78c2f..5af34b7 100644
--- a/src/Storage.hs
+++ b/src/Storage.hs
@@ -758,8 +758,8 @@ storedRef (Stored ref _) = ref
storedStorage :: Stored a -> Storage
storedStorage (Stored (Ref st _) _) = st
-wrappedStore :: Storable a => Storage -> a -> IO (Stored a)
-wrappedStore st x = do ref <- store st x
+wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a)
+wrappedStore st x = do ref <- liftIO $ store st x
return $ Stored ref x
wrappedLoad :: Storable a => Ref -> Stored a
diff --git a/src/Sync.hs b/src/Sync.hs
new file mode 100644
index 0000000..e8edf33
--- /dev/null
+++ b/src/Sync.hs
@@ -0,0 +1,37 @@
+module Sync (
+ SyncService,
+ ServicePacket(..),
+) where
+
+import Control.Monad
+
+import Data.List
+
+import Service
+import State
+import Storage
+import Storage.Merge
+
+data SyncService
+
+instance Service SyncService where
+ serviceID _ = mkServiceID "a4f538d0-4e50-4082-8e10-7e3ec2af175d"
+
+ data ServiceState SyncService = SyncService
+ emptyServiceState = SyncService
+
+ newtype ServicePacket SyncService = SyncPacket (Stored SharedState)
+
+ serviceHandler packet = do
+ let SyncPacket added = fromStored packet
+ ls <- svcGetLocal
+ let st = storedStorage ls
+ current = sort $ lsShared $ fromStored ls
+ updated = filterAncestors (added : current)
+ when (current /= updated) $ do
+ svcSetLocal =<< wrappedStore st (fromStored ls) { lsShared = updated }
+ return Nothing
+
+instance Storable (ServicePacket SyncService) where
+ store' (SyncPacket smsg) = store' smsg
+ load' = SyncPacket <$> load'