From a0c6c341ba1629a1c1070edf69855c745c6bd7eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 19 Jan 2020 21:36:40 +0100 Subject: Synchronization service --- src/Network.hs | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) (limited to 'src/Network.hs') diff --git a/src/Network.hs b/src/Network.hs index 339be7a..89034da 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -32,6 +32,7 @@ import PubKey import Service import State import Storage +import Sync discoveryPort :: ServiceName @@ -160,6 +161,7 @@ startServer origHead logd bhost services = do chanSvc <- newChan peers <- newMVar M.empty midentity <- newMVar $ headLocalIdentity origHead + mshared <- newMVar $ lsShared $ load $ headRef origHead let open addr = do sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) @@ -175,14 +177,29 @@ startServer origHead logd bhost services = do baddr:_ <- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just bhost) (Just discoveryPort) void $ sendTo sock (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]) (addrAddress baddr) + let shareState self shared peer + | PeerIdentityFull pid <- peerIdentity peer + , finalOwner pid `sameIdentity` finalOwner self = do + forM_ shared $ \s -> runExceptT (sendToPeer self peer $ SyncPacket s) >>= \case + Left err -> logd $ "failed to sync state with peer: " ++ show err + Right () -> return () + | otherwise = return () + void $ forkIO $ forever $ do announce =<< readMVar midentity threadDelay $ announceIntervalSeconds * 1000 * 1000 watchHead origHead $ \h -> do let idt = headLocalIdentity h - modifyMVar_ midentity $ \_ -> return idt - announce idt + changedId <- modifyMVar midentity $ \cur -> + return (idt, cur /= idt) + when changedId $ announce idt + + let shared = lsShared $ load $ headRef h + changedShared <- modifyMVar mshared $ \cur -> + return (shared, cur /= shared) + when changedShared $ do + mapM_ (shareState idt shared) =<< readMVar peers forever $ do (msg, paddr) <- recvFrom sock 4096 @@ -306,6 +323,8 @@ handlePacket logd identity secure opeer chanSvc (TransportHeader headers) = do ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) } sendIdentityUpdate identity + sendSharedState identity . lsShared . fromStored =<< + liftIO (loadLocalState $ storedStorage $ idData identity) _ -> return () DataRequest ref @@ -493,6 +512,8 @@ handleChannelAccept identity accref = do , peerChannel = ChannelEstablished $ fromStored ch } sendIdentityUpdate identity + sendSharedState identity . lsShared . fromStored =<< + liftIO (loadLocalState $ storedStorage $ idData identity) Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) @@ -502,6 +523,18 @@ sendIdentityUpdate self = do mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self +sendSharedState :: UnifiedIdentity -> [Stored a] -> PacketHandler () +sendSharedState self shared = do + gets phPeer >>= \case + peer | PeerIdentityFull pid <- peerIdentity peer + , finalOwner pid `sameIdentity` finalOwner self -> do + ist <- gets $ peerInStorage . phPeer + addHeader $ ServiceType $ serviceID @SyncService Proxy + mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared + mapM_ (addBody . storedRef) shared + | otherwise -> return () + + handleIdentityUpdate :: PacketHandler () handleIdentityUpdate = do peer <- gets phPeer -- cgit v1.2.3