diff options
Diffstat (limited to 'src/Erebos/DirectMessage.hs')
| -rw-r--r-- | src/Erebos/DirectMessage.hs | 211 |
1 files changed, 162 insertions, 49 deletions
diff --git a/src/Erebos/DirectMessage.hs b/src/Erebos/DirectMessage.hs index 05da865..dd10d35 100644 --- a/src/Erebos/DirectMessage.hs +++ b/src/Erebos/DirectMessage.hs @@ -1,43 +1,62 @@ module Erebos.DirectMessage ( DirectMessage(..), sendDirectMessage, + dmMarkAsSeen, + updateDirectMessagePeer, + createOrUpdateDirectMessagePeer, DirectMessageAttributes(..), defaultDirectMessageAttributes, DirectMessageThreads, - toThreadList, + dmThreadList, DirectMessageThread(..), - threadToList, - messageThreadView, + dmThreadToList, dmThreadToListSince, dmThreadToListUnread, dmThreadToListSinceUnread, + dmThreadView, - watchReceivedMessages, + watchDirectMessageThreads, formatDirectMessage, ) where +import Control.Concurrent.MVar import Control.Monad +import Control.Monad.Except import Control.Monad.Reader import Data.List import Data.Ord -import qualified Data.Set as S +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) -import qualified Data.Text as T +import Data.Text qualified as T import Data.Time.Format import Data.Time.LocalTime +import Erebos.Conversation.Class +import Erebos.Discovery import Erebos.Identity import Erebos.Network +import Erebos.Object import Erebos.Service import Erebos.State import Erebos.Storable import Erebos.Storage.Head import Erebos.Storage.Merge + +instance ConversationType DirectMessageThread DirectMessage where + convMessageFrom = msgFrom + convMessageTime = msgTime + convMessageText = Just . msgText + + convMessageListSince mbSince thread = + threadToListHelper (msgSeen thread) (maybe S.empty (S.fromAscList . msgHead) mbSince) (msgHead thread) + + data DirectMessage = DirectMessage { msgFrom :: ComposedIdentity - , msgPrev :: [Stored DirectMessage] + , msgPrev :: [ Stored DirectMessage ] , msgTime :: ZonedTime , msgText :: Text } @@ -74,7 +93,6 @@ instance Service DirectMessage where let msg = fromStored smsg powner <- asks $ finalOwner . svcPeerIdentity erb <- svcGetLocal - st <- getStorage let DirectMessageThreads prev _ = lookupSharedValue $ lsShared $ fromStored erb sent = findMsgProperty powner msSent prev received = findMsgProperty powner msReceived prev @@ -83,7 +101,7 @@ instance Service DirectMessage where filterAncestors sent == filterAncestors (smsg : sent) then do when (received' /= received) $ do - next <- wrappedStore st $ MessageState + next <- mstore MessageState { msPrev = prev , msPeer = powner , msReady = [] @@ -91,37 +109,43 @@ instance Service DirectMessage where , msReceived = received' , msSeen = [] } - let threads = DirectMessageThreads [next] (messageThreadView [next]) - shared <- makeSharedStateUpdate st threads (lsShared $ fromStored erb) - svcSetLocal =<< wrappedStore st (fromStored erb) { lsShared = [shared] } + let threads = DirectMessageThreads [ next ] (dmThreadView [ next ]) + shared <- makeSharedStateUpdate threads (lsShared $ fromStored erb) + svcSetLocal =<< mstore (fromStored erb) { lsShared = [ shared ] } when (powner `sameIdentity` msgFrom msg) $ do replyStoredRef smsg else join $ asks $ dmOwnerMismatch . svcAttributes - serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal + serviceNewPeer = do + syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal - serviceStorageWatchers _ = (:[]) $ - SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + serviceUpdatedPeer = do + updateDirectMessagePeer . finalOwner =<< asks svcPeerIdentity + + serviceStorageWatchers _ = + [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers + ] data MessageState = MessageState - { msPrev :: [Stored MessageState] + { msPrev :: [ Stored MessageState ] , msPeer :: ComposedIdentity - , msReady :: [Stored DirectMessage] - , msSent :: [Stored DirectMessage] - , msReceived :: [Stored DirectMessage] - , msSeen :: [Stored DirectMessage] + , msReady :: [ Stored DirectMessage ] + , msSent :: [ Stored DirectMessage ] + , msReceived :: [ Stored DirectMessage ] + , msSeen :: [ Stored DirectMessage ] } -data DirectMessageThreads = DirectMessageThreads [Stored MessageState] [DirectMessageThread] +data DirectMessageThreads = DirectMessageThreads [ Stored MessageState ] [ DirectMessageThread ] instance Eq DirectMessageThreads where DirectMessageThreads mss _ == DirectMessageThreads mss' _ = mss == mss' -toThreadList :: DirectMessageThreads -> [DirectMessageThread] -toThreadList (DirectMessageThreads _ threads) = threads +dmThreadList :: DirectMessageThreads -> [ DirectMessageThread ] +dmThreadList (DirectMessageThreads _ threads) = threads instance Storable MessageState where store' MessageState {..} = storeRec $ do @@ -143,13 +167,13 @@ instance Storable MessageState where instance Mergeable DirectMessageThreads where type Component DirectMessageThreads = MessageState - mergeSorted mss = DirectMessageThreads mss (messageThreadView mss) + mergeSorted mss = DirectMessageThreads mss (dmThreadView mss) toComponents (DirectMessageThreads mss _) = mss instance SharedType DirectMessageThreads where sharedTypeID _ = mkSharedTypeID "ee793681-5976-466a-b0f0-4e1907d3fade" -findMsgProperty :: Foldable m => Identity m -> (MessageState -> [a]) -> [Stored MessageState] -> [a] +findMsgProperty :: Foldable m => Identity m -> (MessageState -> [ a ]) -> [ Stored MessageState ] -> [ a ] findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do guard $ msPeer x `sameIdentity` pid guard $ not $ null $ sel x @@ -157,11 +181,11 @@ findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do sendDirectMessage :: (Foldable f, Applicative f, MonadHead LocalState m) - => Identity f -> Text -> m (Stored DirectMessage) -sendDirectMessage pid text = updateLocalState $ \ls -> do + => Identity f -> Text -> m () +sendDirectMessage pid text = updateLocalState_ $ \ls -> do let self = localIdentity $ fromStored ls powner = finalOwner pid - flip updateSharedState ls $ \(DirectMessageThreads prev _) -> do + flip updateSharedState_ ls $ \(DirectMessageThreads prev _) -> do let ready = findMsgProperty powner msReady prev received = findMsgProperty powner msReceived prev @@ -175,12 +199,69 @@ sendDirectMessage pid text = updateLocalState $ \ls -> do next <- mstore MessageState { msPrev = prev , msPeer = powner - , msReady = [smsg] + , msReady = [ smsg ] , msSent = [] , msReceived = [] , msSeen = [] } - return (DirectMessageThreads [next] (messageThreadView [next]), smsg) + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) + +dmMarkAsSeen + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Identity f -> m () +dmMarkAsSeen pid = do + updateLocalState_ $ updateSharedState_ $ \(DirectMessageThreads prev _) -> do + let powner = finalOwner pid + received = findMsgProperty powner msReceived prev + next <- mstore MessageState + { msPrev = prev + , msPeer = powner + , msReady = [] + , msSent = [] + , msReceived = [] + , msSeen = received + } + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) + +updateDirectMessagePeer + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Identity f -> m () +updateDirectMessagePeer = createOrUpdateDirectMessagePeer' False + +createOrUpdateDirectMessagePeer + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Identity f -> m () +createOrUpdateDirectMessagePeer = createOrUpdateDirectMessagePeer' True + +createOrUpdateDirectMessagePeer' + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Bool -> Identity f -> m () +createOrUpdateDirectMessagePeer' create pid = do + let powner = finalOwner pid + updateLocalState_ $ updateSharedState_ $ \old@(DirectMessageThreads prev threads) -> do + let updatePeerThread = do + next <- mstore MessageState + { msPrev = prev + , msPeer = powner + , msReady = [] + , msSent = [] + , msReceived = [] + , msSeen = [] + } + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) + case find (sameIdentity powner . msgPeer) threads of + Nothing + | create + -> updatePeerThread + + Just thread + | oldPeer <- msgPeer thread + , newPeer <- updateIdentity (idExtDataF powner) oldPeer + , oldPeer /= newPeer + -> updatePeerThread + + _ -> return old + syncDirectMessageToPeer :: DirectMessageThreads -> ServiceHandler DirectMessage () syncDirectMessageToPeer (DirectMessageThreads mss _) = do @@ -205,28 +286,47 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do , msReceived = [] , msSeen = [] } - return $ DirectMessageThreads [next] (messageThreadView [next]) + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) else do return unchanged +findMissingPeers :: Server -> DirectMessageThreads -> ExceptT ErebosError IO () +findMissingPeers server threads = do + forM_ (dmThreadList threads) $ \thread -> do + when (msgHead thread /= msgReceived thread) $ do + mapM_ (discoverySearch server) $ map (refDigest . storedRef) $ idDataF $ msgPeer thread + data DirectMessageThread = DirectMessageThread { msgPeer :: ComposedIdentity - , msgHead :: [Stored DirectMessage] - , msgSent :: [Stored DirectMessage] - , msgSeen :: [Stored DirectMessage] + , msgHead :: [ Stored DirectMessage ] + , msgSent :: [ Stored DirectMessage ] + , msgSeen :: [ Stored DirectMessage ] + , msgReceived :: [ Stored DirectMessage ] } -threadToList :: DirectMessageThread -> [DirectMessage] -threadToList thread = helper S.empty $ msgHead thread - where helper seen msgs - | msg : msgs' <- filter (`S.notMember` seen) $ reverse $ sortBy (comparing cmpView) msgs = - fromStored msg : helper (S.insert msg seen) (msgs' ++ msgPrev (fromStored msg)) - | otherwise = [] - cmpView msg = (zonedTimeToUTC $ msgTime $ fromStored msg, msg) +dmThreadToList :: DirectMessageThread -> [ DirectMessage ] +dmThreadToList thread = map fst $ threadToListHelper (msgSeen thread) S.empty $ msgHead thread + +dmThreadToListSince :: DirectMessageThread -> DirectMessageThread -> [ DirectMessage ] +dmThreadToListSince since thread = map fst $ threadToListHelper (msgSeen thread) (S.fromAscList $ msgHead since) (msgHead thread) + +dmThreadToListUnread :: DirectMessageThread -> [ ( DirectMessage, Bool ) ] +dmThreadToListUnread thread = threadToListHelper (msgSeen thread) S.empty $ msgHead thread + +dmThreadToListSinceUnread :: DirectMessageThread -> DirectMessageThread -> [ ( DirectMessage, Bool ) ] +dmThreadToListSinceUnread since thread = threadToListHelper (msgSeen thread) (S.fromAscList $ msgHead since) (msgHead thread) + +threadToListHelper :: [ Stored DirectMessage ] -> Set (Stored DirectMessage) -> [ Stored DirectMessage ] -> [ ( DirectMessage, Bool ) ] +threadToListHelper seen used msgs + | msg : msgs' <- filter (`S.notMember` used) $ reverse $ sortBy (comparing cmpView) msgs = + ( fromStored msg, not $ any (msg `precedesOrEquals`) seen ) : threadToListHelper seen (S.insert msg used) (msgs' ++ msgPrev (fromStored msg)) + | otherwise = [] + where + cmpView msg = (zonedTimeToUTC $ msgTime $ fromStored msg, msg) -messageThreadView :: [Stored MessageState] -> [DirectMessageThread] -messageThreadView = helper [] +dmThreadView :: [ Stored MessageState ] -> [ DirectMessageThread ] +dmThreadView = helper [] where helper used ms' = case filterAncestors ms' of mss@(sms : rest) | any (sameIdentity $ msPeer $ fromStored sms) used -> @@ -236,7 +336,7 @@ messageThreadView = helper [] in messageThreadFor peer mss : helper (peer : used) (msPrev (fromStored sms) ++ rest) _ -> [] -messageThreadFor :: ComposedIdentity -> [Stored MessageState] -> DirectMessageThread +messageThreadFor :: ComposedIdentity -> [ Stored MessageState ] -> DirectMessageThread messageThreadFor peer mss = let ready = findMsgProperty peer msReady mss sent = findMsgProperty peer msSent mss @@ -248,15 +348,28 @@ messageThreadFor peer mss = , msgHead = filterAncestors $ ready ++ received , msgSent = filterAncestors $ sent ++ received , msgSeen = filterAncestors $ ready ++ seen + , msgReceived = filterAncestors $ received } -watchReceivedMessages :: Head LocalState -> (Stored DirectMessage -> IO ()) -> IO WatchedHead -watchReceivedMessages h f = do - let self = finalOwner $ localIdentity $ headObject h +watchDirectMessageThreads :: Head LocalState -> (DirectMessageThread -> DirectMessageThread -> IO ()) -> IO WatchedHead +watchDirectMessageThreads h f = do + prevVar <- newMVar Nothing watchHeadWith h (lookupSharedValue . lsShared . headObject) $ \(DirectMessageThreads sms _) -> do - forM_ (map fromStored sms) $ \ms -> do - mapM_ f $ filter (not . sameIdentity self . msgFrom . fromStored) $ msReceived ms + modifyMVar_ prevVar $ \case + Just prev -> do + let addPeer (p : ps) p' + | p `sameIdentity` p' = p : ps + | otherwise = p : addPeer ps p' + addPeer [] p' = [ p' ] + + let peers = foldl' addPeer [] $ map (msPeer . fromStored) $ storedDifference prev sms + forM_ peers $ \peer -> do + f (messageThreadFor peer prev) (messageThreadFor peer sms) + return (Just sms) + + Nothing -> do + return (Just sms) formatDirectMessage :: TimeZone -> DirectMessage -> String formatDirectMessage tzone msg = concat |