summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Erebos/Conversation.hs2
-rw-r--r--src/Erebos/Message.hs21
-rw-r--r--src/Erebos/Network.hs19
-rw-r--r--src/Erebos/Service.hs4
4 files changed, 32 insertions, 14 deletions
diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs
index f0ffa70..c165343 100644
--- a/src/Erebos/Conversation.hs
+++ b/src/Erebos/Conversation.hs
@@ -71,7 +71,7 @@ directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Con
directMessageConversation peer = do
(find (sameIdentity peer . msgPeer) . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case
Just thread -> return $ DirectMessageConversation thread
- Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] []
+ Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] []
chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation)
chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate)
diff --git a/src/Erebos/Message.hs b/src/Erebos/Message.hs
index 5ef27f3..78fb5e7 100644
--- a/src/Erebos/Message.hs
+++ b/src/Erebos/Message.hs
@@ -29,6 +29,7 @@ import qualified Data.Text as T
import Data.Time.Format
import Data.Time.LocalTime
+import Erebos.Discovery
import Erebos.Identity
import Erebos.Network
import Erebos.Service
@@ -103,8 +104,10 @@ instance Service DirectMessage where
serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal
- serviceStorageWatchers _ = (:[]) $
- SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer
+ serviceStorageWatchers _ =
+ [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer
+ , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers
+ ]
data MessageState = MessageState
@@ -210,12 +213,19 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do
else do
return unchanged
+findMissingPeers :: Server -> DirectMessageThreads -> ExceptT String IO ()
+findMissingPeers server threads = do
+ forM_ (toThreadList threads) $ \thread -> do
+ when (msgHead thread /= msgReceived thread) $ do
+ mapM_ (discoverySearch server) $ map storedRef $ idDataF $ msgPeer thread
+
data DirectMessageThread = DirectMessageThread
{ msgPeer :: ComposedIdentity
- , msgHead :: [Stored DirectMessage]
- , msgSent :: [Stored DirectMessage]
- , msgSeen :: [Stored DirectMessage]
+ , msgHead :: [ Stored DirectMessage ]
+ , msgSent :: [ Stored DirectMessage ]
+ , msgSeen :: [ Stored DirectMessage ]
+ , msgReceived :: [ Stored DirectMessage ]
}
threadToList :: DirectMessageThread -> [DirectMessage]
@@ -249,6 +259,7 @@ messageThreadFor peer mss =
, msgHead = filterAncestors $ ready ++ received
, msgSent = filterAncestors $ sent ++ received
, msgSeen = filterAncestors $ ready ++ seen
+ , msgReceived = filterAncestors $ received
}
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index ed68ed4..fb2b5e9 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -301,13 +301,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do
announceUpdate idt
forM_ serverServices $ \(SomeService service _) -> do
- forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
- watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
- withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
- readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull _ -> writeTQueue serverIOActions $ do
- runPeerService peer $ act x
- _ -> return ()
+ forM_ (serviceStorageWatchers service) $ \case
+ SomeStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> writeTQueue serverIOActions $ do
+ runPeerService peer $ act x
+ _ -> return ()
+ GlobalStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ atomically $ writeTQueue serverIOActions $ do
+ act server x
forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs
index 9cf71e7..b5e52dd 100644
--- a/src/Erebos/Service.hs
+++ b/src/Erebos/Service.hs
@@ -103,7 +103,9 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState
someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p)
-data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
+data SomeStorageWatcher s
+ = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
+ | forall a. Eq a => GlobalStorageWatcher (Stored LocalState -> a) (Server -> a -> ExceptT String IO ())
newtype ServiceID = ServiceID UUID