summaryrefslogtreecommitdiff
path: root/src/Network.hs
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2020-01-19 21:36:40 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2020-01-19 21:36:40 +0100
commita0c6c341ba1629a1c1070edf69855c745c6bd7eb (patch)
treeba5000382f7f7a905c4011d991fb286ef7abfda5 /src/Network.hs
parent25324026a1033c43652a058f966dfb3d255102ae (diff)
Synchronization service
Diffstat (limited to 'src/Network.hs')
-rw-r--r--src/Network.hs37
1 files changed, 35 insertions, 2 deletions
diff --git a/src/Network.hs b/src/Network.hs
index 339be7a..89034da 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -32,6 +32,7 @@ import PubKey
import Service
import State
import Storage
+import Sync
discoveryPort :: ServiceName
@@ -160,6 +161,7 @@ startServer origHead logd bhost services = do
chanSvc <- newChan
peers <- newMVar M.empty
midentity <- newMVar $ headLocalIdentity origHead
+ mshared <- newMVar $ lsShared $ load $ headRef origHead
let open addr = do
sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
@@ -175,14 +177,29 @@ startServer origHead logd bhost services = do
baddr:_ <- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just bhost) (Just discoveryPort)
void $ sendTo sock (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]) (addrAddress baddr)
+ let shareState self shared peer
+ | PeerIdentityFull pid <- peerIdentity peer
+ , finalOwner pid `sameIdentity` finalOwner self = do
+ forM_ shared $ \s -> runExceptT (sendToPeer self peer $ SyncPacket s) >>= \case
+ Left err -> logd $ "failed to sync state with peer: " ++ show err
+ Right () -> return ()
+ | otherwise = return ()
+
void $ forkIO $ forever $ do
announce =<< readMVar midentity
threadDelay $ announceIntervalSeconds * 1000 * 1000
watchHead origHead $ \h -> do
let idt = headLocalIdentity h
- modifyMVar_ midentity $ \_ -> return idt
- announce idt
+ changedId <- modifyMVar midentity $ \cur ->
+ return (idt, cur /= idt)
+ when changedId $ announce idt
+
+ let shared = lsShared $ load $ headRef h
+ changedShared <- modifyMVar mshared $ \cur ->
+ return (shared, cur /= shared)
+ when changedShared $ do
+ mapM_ (shareState idt shared) =<< readMVar peers
forever $ do
(msg, paddr) <- recvFrom sock 4096
@@ -306,6 +323,8 @@ handlePacket logd identity secure opeer chanSvc (TransportHeader headers) = do
ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do
updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) }
sendIdentityUpdate identity
+ sendSharedState identity . lsShared . fromStored =<<
+ liftIO (loadLocalState $ storedStorage $ idData identity)
_ -> return ()
DataRequest ref
@@ -493,6 +512,8 @@ handleChannelAccept identity accref = do
, peerChannel = ChannelEstablished $ fromStored ch
}
sendIdentityUpdate identity
+ sendSharedState identity . lsShared . fromStored =<<
+ liftIO (loadLocalState $ storedStorage $ idData identity)
Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
@@ -502,6 +523,18 @@ sendIdentityUpdate self = do
mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self
+sendSharedState :: UnifiedIdentity -> [Stored a] -> PacketHandler ()
+sendSharedState self shared = do
+ gets phPeer >>= \case
+ peer | PeerIdentityFull pid <- peerIdentity peer
+ , finalOwner pid `sameIdentity` finalOwner self -> do
+ ist <- gets $ peerInStorage . phPeer
+ addHeader $ ServiceType $ serviceID @SyncService Proxy
+ mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared
+ mapM_ (addBody . storedRef) shared
+ | otherwise -> return ()
+
+
handleIdentityUpdate :: PacketHandler ()
handleIdentityUpdate = do
peer <- gets phPeer