diff options
| -rw-r--r-- | erebos.cabal | 1 | ||||
| -rw-r--r-- | src/Main.hs | 2 | ||||
| -rw-r--r-- | src/Network.hs | 37 | ||||
| -rw-r--r-- | src/Storage.hs | 4 | ||||
| -rw-r--r-- | src/Sync.hs | 37 | 
5 files changed, 77 insertions, 4 deletions
| diff --git a/erebos.cabal b/erebos.cabal index 65a7182..766d708 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -32,6 +32,7 @@ executable erebos                         Storage.List                         Storage.Key                         Storage.Merge +                       Sync                         Util    default-extensions:  ExistentialQuantification diff --git a/src/Main.hs b/src/Main.hs index e23e1b5..696b896 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -34,6 +34,7 @@ import Service  import State  import Storage  import Storage.List +import Sync  main :: IO ()  main = do @@ -96,6 +97,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do          erebosHead <- loadLocalStateHead st          startServer erebosHead extPrintLn bhost              [ SomeService @AttachService Proxy +            , SomeService @SyncService Proxy              , SomeService @DirectMessageService Proxy              ] diff --git a/src/Network.hs b/src/Network.hs index 339be7a..89034da 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -32,6 +32,7 @@ import PubKey  import Service  import State  import Storage +import Sync  discoveryPort :: ServiceName @@ -160,6 +161,7 @@ startServer origHead logd bhost services = do      chanSvc <- newChan      peers <- newMVar M.empty      midentity <- newMVar $ headLocalIdentity origHead +    mshared <- newMVar $ lsShared $ load $ headRef origHead      let open addr = do              sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) @@ -175,14 +177,29 @@ startServer origHead logd bhost services = do                      baddr:_ <- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just bhost) (Just discoveryPort)                      void $ sendTo sock (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]) (addrAddress baddr) +            let shareState self shared peer +                    | PeerIdentityFull pid <- peerIdentity peer +                    , finalOwner pid `sameIdentity` finalOwner self = do +                        forM_ shared $ \s -> runExceptT (sendToPeer self peer $ SyncPacket s) >>= \case +                            Left err -> logd $ "failed to sync state with peer: " ++ show err +                            Right () -> return () +                    | otherwise = return () +              void $ forkIO $ forever $ do                  announce =<< readMVar midentity                  threadDelay $ announceIntervalSeconds * 1000 * 1000              watchHead origHead $ \h -> do                  let idt = headLocalIdentity h -                modifyMVar_ midentity $ \_ -> return idt -                announce idt +                changedId <- modifyMVar midentity $ \cur -> +                    return (idt, cur /= idt) +                when changedId $ announce idt + +                let shared = lsShared $ load $ headRef h +                changedShared <- modifyMVar mshared $ \cur -> +                    return (shared, cur /= shared) +                when changedShared $ do +                    mapM_ (shareState idt shared) =<< readMVar peers              forever $ do                  (msg, paddr) <- recvFrom sock 4096 @@ -306,6 +323,8 @@ handlePacket logd identity secure opeer chanSvc (TransportHeader headers) = do                      ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do                          updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) }                          sendIdentityUpdate identity +                        sendSharedState identity . lsShared . fromStored =<< +                            liftIO (loadLocalState $ storedStorage $ idData identity)                      _ -> return ()              DataRequest ref @@ -493,6 +512,8 @@ handleChannelAccept identity accref = do                  , peerChannel = ChannelEstablished $ fromStored ch                  }              sendIdentityUpdate identity +            sendSharedState identity . lsShared . fromStored =<< +                liftIO (loadLocalState $ storedStorage $ idData identity)          Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) @@ -502,6 +523,18 @@ sendIdentityUpdate self = do      mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self +sendSharedState :: UnifiedIdentity -> [Stored a] -> PacketHandler () +sendSharedState self shared = do +    gets phPeer >>= \case +        peer | PeerIdentityFull pid <- peerIdentity peer +             , finalOwner pid `sameIdentity` finalOwner self -> do +                 ist <- gets $ peerInStorage . phPeer +                 addHeader $ ServiceType $ serviceID @SyncService Proxy +                 mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared +                 mapM_ (addBody . storedRef) shared +             | otherwise -> return () + +  handleIdentityUpdate :: PacketHandler ()  handleIdentityUpdate = do      peer <- gets phPeer diff --git a/src/Storage.hs b/src/Storage.hs index 2e78c2f..5af34b7 100644 --- a/src/Storage.hs +++ b/src/Storage.hs @@ -758,8 +758,8 @@ storedRef (Stored ref _) = ref  storedStorage :: Stored a -> Storage  storedStorage (Stored (Ref st _) _) = st -wrappedStore :: Storable a => Storage -> a -> IO (Stored a) -wrappedStore st x = do ref <- store st x +wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a) +wrappedStore st x = do ref <- liftIO $ store st x                         return $ Stored ref x  wrappedLoad :: Storable a => Ref -> Stored a diff --git a/src/Sync.hs b/src/Sync.hs new file mode 100644 index 0000000..e8edf33 --- /dev/null +++ b/src/Sync.hs @@ -0,0 +1,37 @@ +module Sync ( +    SyncService, +    ServicePacket(..), +) where + +import Control.Monad + +import Data.List + +import Service +import State +import Storage +import Storage.Merge + +data SyncService + +instance Service SyncService where +    serviceID _ = mkServiceID "a4f538d0-4e50-4082-8e10-7e3ec2af175d" + +    data ServiceState SyncService = SyncService +    emptyServiceState = SyncService + +    newtype ServicePacket SyncService = SyncPacket (Stored SharedState) + +    serviceHandler packet = do +        let SyncPacket added = fromStored packet +        ls <- svcGetLocal +        let st = storedStorage ls +            current = sort $ lsShared $ fromStored ls +            updated = filterAncestors (added : current) +        when (current /= updated) $ do +            svcSetLocal =<< wrappedStore st (fromStored ls) { lsShared = updated } +        return Nothing + +instance Storable (ServicePacket SyncService) where +    store' (SyncPacket smsg) = store' smsg +    load' = SyncPacket <$> load' |