diff options
Diffstat (limited to 'src/Erebos')
32 files changed, 1433 insertions, 608 deletions
diff --git a/src/Erebos/Attach.hs b/src/Erebos/Attach.hs index fad6197..b7c642f 100644 --- a/src/Erebos/Attach.hs +++ b/src/Erebos/Attach.hs @@ -59,7 +59,7 @@ instance PairingResult AttachIdentity where liftIO $ mapM_ storeKey $ catMaybes [ keyFromData sec pub | sec <- keys, pub <- pkeys ] identity' <- mergeIdentity $ updateIdentity [ lsIdentity $ fromStored slocal ] identity - shared <- makeSharedStateUpdate st (Just owner) (lsShared $ fromStored slocal) + shared <- makeSharedStateUpdate (Just owner) (lsShared $ fromStored slocal) mstore (fromStored slocal) { lsIdentity = idExtData identity' , lsShared = [ shared ] diff --git a/src/Erebos/Chatroom.hs b/src/Erebos/Chatroom.hs index 74456ff..f9bf545 100644 --- a/src/Erebos/Chatroom.hs +++ b/src/Erebos/Chatroom.hs @@ -17,6 +17,7 @@ module Erebos.Chatroom ( joinChatroomAs, joinChatroomAsByStateData, leaveChatroom, leaveChatroomByStateData, getMessagesSinceState, + isSameChatroom, ChatroomSetChange(..), watchChatrooms, @@ -294,8 +295,7 @@ createChatroom rdName rdDescription = do } updateLocalState $ updateSharedState $ \rooms -> do - st <- getStorage - (, cstate) <$> storeSetAdd st cstate rooms + (, cstate) <$> storeSetAdd cstate rooms findAndUpdateChatroomState :: (MonadStorage m, MonadHead LocalState m) @@ -309,8 +309,7 @@ findAndUpdateChatroomState f = do upd <- act if roomStateData orig /= roomStateData upd then do - st <- getStorage - roomSet' <- storeSetAdd st upd roomSet + roomSet' <- storeSetAdd upd roomSet return (roomSet', Just upd) else do return (roomSet, Just upd) @@ -422,6 +421,11 @@ leaveChatroomByStateData lookupData = sendRawChatroomMessageByStateData lookupDa getMessagesSinceState :: ChatroomState -> ChatroomState -> [ChatMessage] getMessagesSinceState cur old = threadToListSince (roomStateMessageData old) (roomStateMessageData cur) +isSameChatroom :: ChatroomState -> ChatroomState -> Bool +isSameChatroom rstate rstate' = + let roots = filterAncestors . concatMap storedRoots . roomStateData + in intersectsSorted (roots rstate) (roots rstate') + data ChatroomSetChange = AddedChatroom ChatroomState | RemovedChatroom ChatroomState diff --git a/src/Erebos/Contact.hs b/src/Erebos/Contact.hs index 88e6c44..b081ddb 100644 --- a/src/Erebos/Contact.hs +++ b/src/Erebos/Contact.hs @@ -83,13 +83,12 @@ contactName c = fromJust $ msum contactSetName :: MonadHead LocalState m => Contact -> Text -> Set Contact -> m (Set Contact) contactSetName contact name set = do - st <- getStorage - cdata <- wrappedStore st ContactData + cdata <- mstore ContactData { cdPrev = toComponents contact , cdIdentity = [] , cdName = Just name } - storeSetAdd st (mergeSorted @Contact [cdata]) set + storeSetAdd (mergeSorted @Contact [cdata]) set type ContactService = PairingService ContactAccepted @@ -166,10 +165,9 @@ contactReject = pairingReject @ContactAccepted Proxy finalizeContact :: MonadHead LocalState m => UnifiedIdentity -> m () finalizeContact identity = updateLocalState_ $ updateSharedState_ $ \contacts -> do - st <- getStorage - cdata <- wrappedStore st ContactData + cdata <- mstore ContactData { cdPrev = [] , cdIdentity = idExtDataF $ finalOwner identity , cdName = Nothing } - storeSetAdd st (mergeSorted @Contact [cdata]) contacts + storeSetAdd (mergeSorted @Contact [cdata]) contacts diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs index dee6faa..2d007c9 100644 --- a/src/Erebos/Conversation.hs +++ b/src/Erebos/Conversation.hs @@ -7,9 +7,11 @@ module Erebos.Conversation ( formatMessage, Conversation, + isSameConversation, directMessageConversation, chatroomConversation, chatroomConversationByStateData, + isChatroomStateConversation, reloadConversation, lookupConversations, @@ -64,14 +66,22 @@ formatMessage tzone msg = concat ] -data Conversation = DirectMessageConversation DirectMessageThread - | ChatroomConversation ChatroomState +data Conversation + = DirectMessageConversation DirectMessageThread + | ChatroomConversation ChatroomState + +isSameConversation :: Conversation -> Conversation -> Bool +isSameConversation (DirectMessageConversation t) (DirectMessageConversation t') + = sameIdentity (msgPeer t) (msgPeer t') +isSameConversation (ChatroomConversation rstate) (ChatroomConversation rstate') = isSameChatroom rstate rstate' +isSameConversation _ _ = False directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Conversation directMessageConversation peer = do - (find (sameIdentity peer . msgPeer) . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case + createOrUpdateDirectMessagePeer peer + (find (sameIdentity peer . msgPeer) . dmThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case Just thread -> return $ DirectMessageConversation thread - Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] + Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] [] chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation) chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate) @@ -79,13 +89,17 @@ chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateD chatroomConversationByStateData :: MonadHead LocalState m => Stored ChatroomStateData -> m (Maybe Conversation) chatroomConversationByStateData sdata = fmap ChatroomConversation <$> findChatroomByStateData sdata +isChatroomStateConversation :: ChatroomState -> Conversation -> Bool +isChatroomStateConversation rstate (ChatroomConversation rstate') = isSameChatroom rstate rstate' +isChatroomStateConversation _ _ = False + reloadConversation :: MonadHead LocalState m => Conversation -> m Conversation reloadConversation (DirectMessageConversation thread) = directMessageConversation (msgPeer thread) reloadConversation cur@(ChatroomConversation rstate) = fromMaybe cur <$> chatroomConversation rstate -lookupConversations :: MonadHead LocalState m => m [Conversation] -lookupConversations = map DirectMessageConversation . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead +lookupConversations :: MonadHead LocalState m => m [ Conversation ] +lookupConversations = map DirectMessageConversation . dmThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead conversationName :: Conversation -> Text @@ -96,14 +110,14 @@ conversationPeer :: Conversation -> Maybe ComposedIdentity conversationPeer (DirectMessageConversation thread) = Just $ msgPeer thread conversationPeer (ChatroomConversation _) = Nothing -conversationHistory :: Conversation -> [Message] -conversationHistory (DirectMessageConversation thread) = map (\msg -> DirectMessageMessage msg False) $ threadToList thread +conversationHistory :: Conversation -> [ Message ] +conversationHistory (DirectMessageConversation thread) = map (\msg -> DirectMessageMessage msg False) $ dmThreadToList thread conversationHistory (ChatroomConversation rstate) = map (\msg -> ChatroomMessage msg False) $ roomStateMessages rstate -sendMessage :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> Text -> m (Maybe Message) -sendMessage (DirectMessageConversation thread) text = fmap Just $ DirectMessageMessage <$> (fromStored <$> sendDirectMessage (msgPeer thread) text) <*> pure False -sendMessage (ChatroomConversation rstate) text = sendChatroomMessage rstate text >> return Nothing +sendMessage :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> Text -> m () +sendMessage (DirectMessageConversation thread) text = sendDirectMessage (msgPeer thread) text +sendMessage (ChatroomConversation rstate) text = sendChatroomMessage rstate text deleteConversation :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> m () deleteConversation (DirectMessageConversation _) = throwOtherError "deleting direct message conversation is not supported" diff --git a/src/Erebos/DirectMessage.hs b/src/Erebos/DirectMessage.hs index 05da865..f518b57 100644 --- a/src/Erebos/DirectMessage.hs +++ b/src/Erebos/DirectMessage.hs @@ -1,34 +1,41 @@ module Erebos.DirectMessage ( DirectMessage(..), sendDirectMessage, + updateDirectMessagePeer, + createOrUpdateDirectMessagePeer, DirectMessageAttributes(..), defaultDirectMessageAttributes, DirectMessageThreads, - toThreadList, + dmThreadList, DirectMessageThread(..), - threadToList, - messageThreadView, + dmThreadToList, dmThreadToListSince, + dmThreadView, - watchReceivedMessages, + watchDirectMessageThreads, formatDirectMessage, ) where +import Control.Concurrent.MVar import Control.Monad +import Control.Monad.Except import Control.Monad.Reader import Data.List import Data.Ord -import qualified Data.Set as S +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) -import qualified Data.Text as T +import Data.Text qualified as T import Data.Time.Format import Data.Time.LocalTime +import Erebos.Discovery import Erebos.Identity import Erebos.Network +import Erebos.Object import Erebos.Service import Erebos.State import Erebos.Storable @@ -37,7 +44,7 @@ import Erebos.Storage.Merge data DirectMessage = DirectMessage { msgFrom :: ComposedIdentity - , msgPrev :: [Stored DirectMessage] + , msgPrev :: [ Stored DirectMessage ] , msgTime :: ZonedTime , msgText :: Text } @@ -74,7 +81,6 @@ instance Service DirectMessage where let msg = fromStored smsg powner <- asks $ finalOwner . svcPeerIdentity erb <- svcGetLocal - st <- getStorage let DirectMessageThreads prev _ = lookupSharedValue $ lsShared $ fromStored erb sent = findMsgProperty powner msSent prev received = findMsgProperty powner msReceived prev @@ -83,7 +89,7 @@ instance Service DirectMessage where filterAncestors sent == filterAncestors (smsg : sent) then do when (received' /= received) $ do - next <- wrappedStore st $ MessageState + next <- mstore MessageState { msPrev = prev , msPeer = powner , msReady = [] @@ -91,37 +97,43 @@ instance Service DirectMessage where , msReceived = received' , msSeen = [] } - let threads = DirectMessageThreads [next] (messageThreadView [next]) - shared <- makeSharedStateUpdate st threads (lsShared $ fromStored erb) - svcSetLocal =<< wrappedStore st (fromStored erb) { lsShared = [shared] } + let threads = DirectMessageThreads [ next ] (dmThreadView [ next ]) + shared <- makeSharedStateUpdate threads (lsShared $ fromStored erb) + svcSetLocal =<< mstore (fromStored erb) { lsShared = [ shared ] } when (powner `sameIdentity` msgFrom msg) $ do replyStoredRef smsg else join $ asks $ dmOwnerMismatch . svcAttributes - serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal + serviceNewPeer = do + syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal - serviceStorageWatchers _ = (:[]) $ - SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + serviceUpdatedPeer = do + updateDirectMessagePeer . finalOwner =<< asks svcPeerIdentity + + serviceStorageWatchers _ = + [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers + ] data MessageState = MessageState - { msPrev :: [Stored MessageState] + { msPrev :: [ Stored MessageState ] , msPeer :: ComposedIdentity - , msReady :: [Stored DirectMessage] - , msSent :: [Stored DirectMessage] - , msReceived :: [Stored DirectMessage] - , msSeen :: [Stored DirectMessage] + , msReady :: [ Stored DirectMessage ] + , msSent :: [ Stored DirectMessage ] + , msReceived :: [ Stored DirectMessage ] + , msSeen :: [ Stored DirectMessage ] } -data DirectMessageThreads = DirectMessageThreads [Stored MessageState] [DirectMessageThread] +data DirectMessageThreads = DirectMessageThreads [ Stored MessageState ] [ DirectMessageThread ] instance Eq DirectMessageThreads where DirectMessageThreads mss _ == DirectMessageThreads mss' _ = mss == mss' -toThreadList :: DirectMessageThreads -> [DirectMessageThread] -toThreadList (DirectMessageThreads _ threads) = threads +dmThreadList :: DirectMessageThreads -> [ DirectMessageThread ] +dmThreadList (DirectMessageThreads _ threads) = threads instance Storable MessageState where store' MessageState {..} = storeRec $ do @@ -143,13 +155,13 @@ instance Storable MessageState where instance Mergeable DirectMessageThreads where type Component DirectMessageThreads = MessageState - mergeSorted mss = DirectMessageThreads mss (messageThreadView mss) + mergeSorted mss = DirectMessageThreads mss (dmThreadView mss) toComponents (DirectMessageThreads mss _) = mss instance SharedType DirectMessageThreads where sharedTypeID _ = mkSharedTypeID "ee793681-5976-466a-b0f0-4e1907d3fade" -findMsgProperty :: Foldable m => Identity m -> (MessageState -> [a]) -> [Stored MessageState] -> [a] +findMsgProperty :: Foldable m => Identity m -> (MessageState -> [ a ]) -> [ Stored MessageState ] -> [ a ] findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do guard $ msPeer x `sameIdentity` pid guard $ not $ null $ sel x @@ -157,11 +169,11 @@ findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do sendDirectMessage :: (Foldable f, Applicative f, MonadHead LocalState m) - => Identity f -> Text -> m (Stored DirectMessage) -sendDirectMessage pid text = updateLocalState $ \ls -> do + => Identity f -> Text -> m () +sendDirectMessage pid text = updateLocalState_ $ \ls -> do let self = localIdentity $ fromStored ls powner = finalOwner pid - flip updateSharedState ls $ \(DirectMessageThreads prev _) -> do + flip updateSharedState_ ls $ \(DirectMessageThreads prev _) -> do let ready = findMsgProperty powner msReady prev received = findMsgProperty powner msReceived prev @@ -175,12 +187,52 @@ sendDirectMessage pid text = updateLocalState $ \ls -> do next <- mstore MessageState { msPrev = prev , msPeer = powner - , msReady = [smsg] + , msReady = [ smsg ] , msSent = [] , msReceived = [] , msSeen = [] } - return (DirectMessageThreads [next] (messageThreadView [next]), smsg) + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) + +updateDirectMessagePeer + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Identity f -> m () +updateDirectMessagePeer = createOrUpdateDirectMessagePeer' False + +createOrUpdateDirectMessagePeer + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Identity f -> m () +createOrUpdateDirectMessagePeer = createOrUpdateDirectMessagePeer' True + +createOrUpdateDirectMessagePeer' + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Bool -> Identity f -> m () +createOrUpdateDirectMessagePeer' create pid = do + let powner = finalOwner pid + updateLocalState_ $ updateSharedState_ $ \old@(DirectMessageThreads prev threads) -> do + let updatePeerThread = do + next <- mstore MessageState + { msPrev = prev + , msPeer = powner + , msReady = [] + , msSent = [] + , msReceived = [] + , msSeen = [] + } + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) + case find (sameIdentity powner . msgPeer) threads of + Nothing + | create + -> updatePeerThread + + Just thread + | oldPeer <- msgPeer thread + , newPeer <- updateIdentity (idExtDataF powner) oldPeer + , oldPeer /= newPeer + -> updatePeerThread + + _ -> return old + syncDirectMessageToPeer :: DirectMessageThreads -> ServiceHandler DirectMessage () syncDirectMessageToPeer (DirectMessageThreads mss _) = do @@ -205,28 +257,41 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do , msReceived = [] , msSeen = [] } - return $ DirectMessageThreads [next] (messageThreadView [next]) + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) else do return unchanged +findMissingPeers :: Server -> DirectMessageThreads -> ExceptT ErebosError IO () +findMissingPeers server threads = do + forM_ (dmThreadList threads) $ \thread -> do + when (msgHead thread /= msgReceived thread) $ do + mapM_ (discoverySearch server) $ map (refDigest . storedRef) $ idDataF $ msgPeer thread + data DirectMessageThread = DirectMessageThread { msgPeer :: ComposedIdentity - , msgHead :: [Stored DirectMessage] - , msgSent :: [Stored DirectMessage] - , msgSeen :: [Stored DirectMessage] + , msgHead :: [ Stored DirectMessage ] + , msgSent :: [ Stored DirectMessage ] + , msgSeen :: [ Stored DirectMessage ] + , msgReceived :: [ Stored DirectMessage ] } -threadToList :: DirectMessageThread -> [DirectMessage] -threadToList thread = helper S.empty $ msgHead thread - where helper seen msgs - | msg : msgs' <- filter (`S.notMember` seen) $ reverse $ sortBy (comparing cmpView) msgs = - fromStored msg : helper (S.insert msg seen) (msgs' ++ msgPrev (fromStored msg)) - | otherwise = [] - cmpView msg = (zonedTimeToUTC $ msgTime $ fromStored msg, msg) +dmThreadToList :: DirectMessageThread -> [ DirectMessage ] +dmThreadToList thread = threadToListHelper S.empty $ msgHead thread + +dmThreadToListSince :: DirectMessageThread -> DirectMessageThread -> [ DirectMessage ] +dmThreadToListSince since thread = threadToListHelper (S.fromAscList $ msgHead since) (msgHead thread) + +threadToListHelper :: Set (Stored DirectMessage) -> [ Stored DirectMessage ] -> [ DirectMessage ] +threadToListHelper seen msgs + | msg : msgs' <- filter (`S.notMember` seen) $ reverse $ sortBy (comparing cmpView) msgs = + fromStored msg : threadToListHelper (S.insert msg seen) (msgs' ++ msgPrev (fromStored msg)) + | otherwise = [] + where + cmpView msg = (zonedTimeToUTC $ msgTime $ fromStored msg, msg) -messageThreadView :: [Stored MessageState] -> [DirectMessageThread] -messageThreadView = helper [] +dmThreadView :: [ Stored MessageState ] -> [ DirectMessageThread ] +dmThreadView = helper [] where helper used ms' = case filterAncestors ms' of mss@(sms : rest) | any (sameIdentity $ msPeer $ fromStored sms) used -> @@ -236,7 +301,7 @@ messageThreadView = helper [] in messageThreadFor peer mss : helper (peer : used) (msPrev (fromStored sms) ++ rest) _ -> [] -messageThreadFor :: ComposedIdentity -> [Stored MessageState] -> DirectMessageThread +messageThreadFor :: ComposedIdentity -> [ Stored MessageState ] -> DirectMessageThread messageThreadFor peer mss = let ready = findMsgProperty peer msReady mss sent = findMsgProperty peer msSent mss @@ -248,15 +313,28 @@ messageThreadFor peer mss = , msgHead = filterAncestors $ ready ++ received , msgSent = filterAncestors $ sent ++ received , msgSeen = filterAncestors $ ready ++ seen + , msgReceived = filterAncestors $ received } -watchReceivedMessages :: Head LocalState -> (Stored DirectMessage -> IO ()) -> IO WatchedHead -watchReceivedMessages h f = do - let self = finalOwner $ localIdentity $ headObject h +watchDirectMessageThreads :: Head LocalState -> (DirectMessageThread -> DirectMessageThread -> IO ()) -> IO WatchedHead +watchDirectMessageThreads h f = do + prevVar <- newMVar Nothing watchHeadWith h (lookupSharedValue . lsShared . headObject) $ \(DirectMessageThreads sms _) -> do - forM_ (map fromStored sms) $ \ms -> do - mapM_ f $ filter (not . sameIdentity self . msgFrom . fromStored) $ msReceived ms + modifyMVar_ prevVar $ \case + Just prev -> do + let addPeer (p : ps) p' + | p `sameIdentity` p' = p : ps + | otherwise = p : addPeer ps p' + addPeer [] p' = [ p' ] + + let peers = foldl' addPeer [] $ map (msPeer . fromStored) $ storedDifference prev sms + forM_ peers $ \peer -> do + f (messageThreadFor peer prev) (messageThreadFor peer sms) + return (Just sms) + + Nothing -> do + return (Just sms) formatDirectMessage :: TimeZone -> DirectMessage -> String formatDirectMessage tzone msg = concat diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 48500d7..5590e4c 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -1,9 +1,13 @@ {-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} module Erebos.Discovery ( DiscoveryService(..), DiscoveryAttributes(..), - DiscoveryConnection(..) + DiscoveryConnection(..), + + discoverySearch, + discoverySetupTunnel, ) where import Control.Concurrent @@ -11,39 +15,58 @@ import Control.Monad import Control.Monad.Except import Control.Monad.Reader -import Data.IP qualified as IP +import Data.List import Data.Map.Strict (Map) import Data.Map.Strict qualified as M import Data.Maybe +import Data.Proxy +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) import Data.Text qualified as T import Data.Word -import Network.Socket +import Text.Read #ifdef ENABLE_ICE_SUPPORT import Erebos.ICE #endif import Erebos.Identity import Erebos.Network +import Erebos.Network.Address import Erebos.Object import Erebos.Service +import Erebos.Service.Stream import Erebos.Storable +#ifndef ENABLE_ICE_SUPPORT +type IceConfig = () +type IceSession = () +type IceRemoteInfo = Stored Object +#endif + + data DiscoveryService - = DiscoverySelf [ Text ] (Maybe Int) - | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) - | DiscoverySearch Ref - | DiscoveryResult Ref [ Text ] + = DiscoverySelf [ DiscoveryAddress ] (Maybe Int) + | DiscoveryAcknowledged [ DiscoveryAddress ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) + | DiscoverySearch (Either Ref RefDigest) + | DiscoveryResult (Either Ref RefDigest) [ DiscoveryAddress ] | DiscoveryConnectionRequest DiscoveryConnection | DiscoveryConnectionResponse DiscoveryConnection +data DiscoveryAddress + = DiscoveryIP InetAddress PortNumber + | DiscoveryICE + | DiscoveryTunnel + | DiscoveryOther Text + data DiscoveryAttributes = DiscoveryAttributes { discoveryStunPort :: Maybe Word16 , discoveryStunServer :: Maybe Text , discoveryTurnPort :: Maybe Word16 , discoveryTurnServer :: Maybe Text + , discoveryProvideTunnel :: Peer -> PeerAddress -> Bool } defaultDiscoveryAttributes :: DiscoveryAttributes @@ -52,23 +75,22 @@ defaultDiscoveryAttributes = DiscoveryAttributes , discoveryStunServer = Nothing , discoveryTurnPort = Nothing , discoveryTurnServer = Nothing + , discoveryProvideTunnel = \_ _ -> False } data DiscoveryConnection = DiscoveryConnection - { dconnSource :: Ref - , dconnTarget :: Ref + { dconnSource :: Either Ref RefDigest + , dconnTarget :: Either Ref RefDigest , dconnAddress :: Maybe Text -#ifdef ENABLE_ICE_SUPPORT + , dconnTunnel :: Bool , dconnIceInfo :: Maybe IceRemoteInfo -#else - , dconnIceInfo :: Maybe (Stored Object) -#endif } -emptyConnection :: Ref -> Ref -> DiscoveryConnection +emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConnection emptyConnection dconnSource dconnTarget = DiscoveryConnection {..} where dconnAddress = Nothing + dconnTunnel = False dconnIceInfo = Nothing instance Storable DiscoveryService where @@ -84,19 +106,21 @@ instance Storable DiscoveryService where storeMbInt "stun-port" stunPort storeMbText "turn-server" turnServer storeMbInt "turn-port" turnPort - DiscoverySearch ref -> storeRawRef "search" ref - DiscoveryResult ref addr -> do - storeRawRef "result" ref + DiscoverySearch edgst -> either (storeRawRef "search") (storeRawWeak "search") edgst + DiscoveryResult edgst addr -> do + either (storeRawRef "result") (storeRawWeak "result") edgst mapM_ (storeText "address") addr DiscoveryConnectionRequest conn -> storeConnection "request" conn DiscoveryConnectionResponse conn -> storeConnection "response" conn - where storeConnection ctype conn = do - storeText "connection" $ ctype - storeRawRef "source" $ dconnSource conn - storeRawRef "target" $ dconnTarget conn - storeMbText "address" $ dconnAddress conn - storeMbRef "ice-info" $ dconnIceInfo conn + where + storeConnection (ctype :: Text) DiscoveryConnection {..} = do + storeText "connection" $ ctype + either (storeRawRef "source") (storeRawWeak "source") dconnSource + either (storeRawRef "target") (storeRawWeak "target") dconnTarget + storeMbText "address" dconnAddress + when dconnTunnel $ storeEmpty "tunnel" + storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum [ do @@ -114,29 +138,87 @@ instance Storable DiscoveryService where <*> loadMbInt "stun-port" <*> loadMbText "turn-server" <*> loadMbInt "turn-port" - , DiscoverySearch <$> loadRawRef "search" + , DiscoverySearch <$> msum + [ Left <$> loadRawRef "search" + , Right <$> loadRawWeak "search" + ] , DiscoveryResult - <$> loadRawRef "result" + <$> msum + [ Left <$> loadRawRef "result" + , Right <$> loadRawWeak "result" + ] <*> loadTexts "address" , loadConnection "request" DiscoveryConnectionRequest , loadConnection "response" DiscoveryConnectionResponse ] - where loadConnection ctype ctor = do - ctype' <- loadText "connection" - guard $ ctype == ctype' - return . ctor =<< DiscoveryConnection - <$> loadRawRef "source" - <*> loadRawRef "target" - <*> loadMbText "address" - <*> loadMbRef "ice-info" + where + loadConnection (ctype :: Text) ctor = do + ctype' <- loadText "connection" + guard $ ctype == ctype' + dconnSource <- msum + [ Left <$> loadRawRef "source" + , Right <$> loadRawWeak "source" + ] + dconnTarget <- msum + [ Left <$> loadRawRef "target" + , Right <$> loadRawWeak "target" + ] + dconnAddress <- loadMbText "address" + dconnTunnel <- isJust <$> loadMbEmpty "tunnel" + dconnIceInfo <- loadMbRef "ice-info" + return $ ctor DiscoveryConnection {..} + +instance StorableText DiscoveryAddress where + toText = \case + DiscoveryIP addr port -> T.unwords [ T.pack $ show addr, T.pack $ show port ] + DiscoveryICE -> "ICE" + DiscoveryTunnel -> "tunnel" + DiscoveryOther str -> str + + fromText str = return $ if + | [ addrStr, portStr ] <- T.words str + , Just addr <- readMaybe $ T.unpack addrStr + , Just port <- readMaybe $ T.unpack portStr + -> DiscoveryIP addr port + + | "ice" <- T.toLower str + -> DiscoveryICE + + | "tunnel" <- str + -> DiscoveryTunnel + + | otherwise + -> DiscoveryOther str + data DiscoveryPeer = DiscoveryPeer { dpPriority :: Int , dpPeer :: Maybe Peer - , dpAddress :: [ Text ] -#ifdef ENABLE_ICE_SUPPORT + , dpAddress :: [ DiscoveryAddress ] , dpIceSession :: Maybe IceSession -#endif + } + +emptyPeer :: DiscoveryPeer +emptyPeer = DiscoveryPeer + { dpPriority = 0 + , dpPeer = Nothing + , dpAddress = [] + , dpIceSession = Nothing + } + +data DiscoveryPeerState = DiscoveryPeerState + { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ] + -- ( original target, our write stream ) + , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ] + -- ( original source, ( from source, to target )) + , dpsStunServer :: Maybe ( Text, Word16 ) + , dpsTurnServer :: Maybe ( Text, Word16 ) + , dpsIceConfig :: Maybe IceConfig + } + +data DiscoveryGlobalState = DiscoveryGlobalState + { dgsPeers :: Map RefDigest DiscoveryPeer + , dgsSearchingFor :: Set RefDigest } instance Service DiscoveryService where @@ -145,42 +227,44 @@ instance Service DiscoveryService where type ServiceAttributes DiscoveryService = DiscoveryAttributes defaultServiceAttributes _ = defaultDiscoveryAttributes -#ifdef ENABLE_ICE_SUPPORT - type ServiceState DiscoveryService = Maybe IceConfig - emptyServiceState _ = Nothing -#endif - - type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer - emptyServiceGlobalState _ = M.empty + type ServiceState DiscoveryService = DiscoveryPeerState + emptyServiceState _ = DiscoveryPeerState + { dpsOurTunnelRequests = [] + , dpsRelayedTunnelRequests = [] + , dpsStunServer = Nothing + , dpsTurnServer = Nothing + , dpsIceConfig = Nothing + } + + type ServiceGlobalState DiscoveryService = DiscoveryGlobalState + emptyServiceGlobalState _ = DiscoveryGlobalState + { dgsPeers = M.empty + , dgsSearchingFor = S.empty + } serviceHandler msg = case fromStored msg of DiscoverySelf addrs priority -> do pid <- asks svcPeerIdentity peer <- asks svcPeer + paddrs <- getPeerAddresses peer + let insertHelper new old | dpPriority new > dpPriority old = new | otherwise = old - matchedAddrs <- fmap catMaybes $ forM addrs $ \addr -> if - | addr == T.pack "ICE" -> do - return $ Just addr - - | [ ipaddr, port ] <- words (T.unpack addr) - , DatagramAddress paddr <- peerAddress peer -> do - saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - return $ if paddr == addrAddress saddr - then Just addr - else Nothing - - | otherwise -> return Nothing - - forM_ (idDataF =<< unfoldOwners pid) $ \s -> - svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer - { dpPriority = fromMaybe 0 priority - , dpPeer = Just peer - , dpAddress = addrs -#ifdef ENABLE_ICE_SUPPORT - , dpIceSession = Nothing -#endif - } + + let matchedAddrs = flip filter addrs $ \case + DiscoveryICE -> True + DiscoveryIP ipaddr port -> + DatagramAddress (inetToSockAddr ( ipaddr, port )) `elem` paddrs + _ -> False + + forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do + let dp = DiscoveryPeer + { dpPriority = fromMaybe 0 priority + , dpPeer = Just peer + , dpAddress = matchedAddrs + , dpIceSession = Nothing + } + svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s } attrs <- asks svcAttributes replyPacket $ DiscoveryAcknowledged matchedAddrs (discoveryStunServer attrs) @@ -189,15 +273,8 @@ instance Service DiscoveryService where (discoveryTurnPort attrs) DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do -#ifdef ENABLE_ICE_SUPPORT - paddr <- asks (peerAddress . svcPeer) >>= return . \case - (DatagramAddress saddr) -> case IP.fromSockAddr saddr of - Just (IP.IPv6 ipv6, _) - | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 - -> Just $ T.pack $ show (IP.toIPv4w ipv4) - Just (addr, _) - -> Just $ T.pack $ show addr - _ -> Nothing + paddr <- asks svcPeerAddress >>= return . \case + (DatagramAddress saddr) -> T.pack . show . fst <$> inetFromSockAddr saddr _ -> Nothing let toIceServer Nothing Nothing = Nothing @@ -205,152 +282,368 @@ instance Service DiscoveryService where toIceServer (Just server) Nothing = Just ( server, 0 ) toIceServer (Just server) (Just port) = Just ( server, port ) - cfg <- liftIO $ iceCreateConfig - (toIceServer stunServer stunPort) - (toIceServer turnServer turnPort) - svcSet cfg -#endif - return () + svcModify $ \s -> s + { dpsStunServer = toIceServer stunServer stunPort + , dpsTurnServer = toIceServer turnServer turnPort + } - DiscoverySearch ref -> do - dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal - replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer + DiscoverySearch edgst -> do + dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal + peer <- asks svcPeer + paddr <- asks svcPeerAddress + attrs <- asks svcAttributes + let offerTunnel + | discoveryProvideTunnel attrs peer paddr = (++ [ DiscoveryTunnel ]) + | otherwise = id + replyPacket $ DiscoveryResult edgst $ maybe [] (offerTunnel . dpAddress) dpeer - DiscoveryResult ref [] -> do - svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found" + DiscoveryResult _ [] -> do + -- not found + return () - DiscoveryResult ref addrs -> do + DiscoveryResult edgst addrs -> do + let dgst = either refDigest id edgst -- TODO: check if we really requested that server <- asks svcServer + st <- getStorage self <- svcSelf - mbIceConfig <- svcGet discoveryPeer <- asks svcPeer let runAsService = runPeerService @DiscoveryService discoveryPeer - liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if - | addr == T.pack "ICE" -#ifdef ENABLE_ICE_SUPPORT - , Just config <- mbIceConfig - -> do - ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do - rinfo <- iceRemoteInfo ice - res <- runExceptT $ sendToPeer discoveryPeer $ - DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo } - case res of - Right _ -> return () - Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err - - runAsService $ do - svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer - { dpPriority = 0 - , dpPeer = Nothing - , dpAddress = [] - , dpIceSession = Just ice - } -#else - -> do - return () -#endif + let tryAddresses = \case + DiscoveryIP ipaddr port : _ -> do + void $ liftIO $ forkIO $ do + let saddr = inetToSockAddr ( ipaddr, port ) + peer <- serverPeer server saddr + runAsService $ do + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } - | [ ipaddr, port ] <- words (T.unpack addr) -> do - saddr <- head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- serverPeer server (addrAddress saddr) - runAsService $ do - svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer - { dpPriority = 0 - , dpPeer = Just peer - , dpAddress = [] + DiscoveryICE : rest -> do #ifdef ENABLE_ICE_SUPPORT - , dpIceSession = Nothing + getIceConfig >>= \case + Just config -> do + void $ liftIO $ forkIO $ do + ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do + rinfo <- iceRemoteInfo ice + + -- Try to promote weak ref to normal one for older peers: + edgst' <- case edgst of + Left r -> return (Left r) + Right d -> refFromDigest st d >>= \case + Just r -> return (Left r) + Nothing -> return (Right d) + + res <- runExceptT $ sendToPeer discoveryPeer $ + DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo } + case res of + Right _ -> return () + Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err + + runAsService $ do + let upd dp = dp { dpIceSession = Just ice } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } + + Nothing -> do #endif - } + tryAddresses rest - | otherwise -> do - runAsService $ do - svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr + DiscoveryTunnel : _ -> do + discoverySetupTunnelResponse dgst + + addr : rest -> do + svcPrint $ "Discovery: unsupported address in result: " ++ T.unpack (toText addr) + tryAddresses rest + + [] -> svcPrint $ "Discovery: no (supported) address received for " <> show dgst + + tryAddresses addrs DiscoveryConnectionRequest conn -> do self <- svcSelf + attrs <- asks svcAttributes let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) - if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) - then do + if either refDigest id (dconnTarget conn) `elem` identityDigests self + then if + -- request for us, create ICE sesssion or tunnel + | dconnTunnel conn -> do + receivedStreams >>= \case + (tunnelReader : _) -> do + tunnelWriter <- openStream + replyPacket $ DiscoveryConnectionResponse rconn + { dconnTunnel = True + } + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + server <- asks svcServer + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + + [] -> do + svcPrint $ "Discovery: missing stream on tunnel request (endpoint)" + #ifdef ENABLE_ICE_SUPPORT - -- request for us, create ICE sesssion + | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer - svcGet >>= \case + getIceConfig >>= \case Just config -> do liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do rinfo <- iceRemoteInfo ice res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo } case res of - Right _ -> do - case dconnIceInfo conn of - Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice - Nothing -> putStrLn $ "Discovery: connection request without ICE remote info" + Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err Nothing -> do - svcPrint $ "Discovery: ICE request from peer without ICE configuration" -#else - return () + return () #endif - else do - -- request to some of our peers, relay - mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal - case mbdp of + | otherwise -> do + svcPrint $ "Discovery: unsupported connection request" + + else do + -- request to some of our peers, relay + peer <- asks svcPeer + paddr <- asks svcPeerAddress + mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal + streams <- receivedStreams + case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn Just dp - | Just dpeer <- dpPeer dp -> do - sendToPeer dpeer $ DiscoveryConnectionRequest conn + | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn -> if + | not (discoveryProvideTunnel attrs peer paddr) -> do + replyPacket $ DiscoveryConnectionResponse rconn + | fromSource : _ <- streams -> do + void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do + toTarget <- openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest conn + | otherwise -> do + svcPrint $ "Discovery: missing stream on tunnel request (relay)" + | otherwise -> do + sendToPeer dpeer $ DiscoveryConnectionRequest conn | otherwise -> svcPrint $ "Discovery: failed to relay connection request" DiscoveryConnectionResponse conn -> do self <- svcSelf - dpeers <- svcGetGlobal - if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) - then do + dps <- svcGet + dpeers <- dgsPeers <$> svcGetGlobal + + if either refDigest id (dconnSource conn) `elem` identityDigests self + then do -- response to our request, try to connect to the peer -#ifdef ENABLE_ICE_SUPPORT server <- asks svcServer - if | Just addr <- dconnAddress conn - , [ipaddr, port] <- words (T.unpack addr) -> do - saddr <- liftIO $ head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- liftIO $ serverPeer server (addrAddress saddr) - svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $ - DiscoveryPeer 0 (Just peer) [] Nothing - - | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers + if + | Just addr <- dconnAddress conn + , [ addrStr, portStr ] <- words (T.unpack addr) + , Just ipaddr <- readMaybe addrStr + , Just port <- readMaybe portStr + -> do + let saddr = inetToSockAddr ( ipaddr, port ) + peer <- liftIO $ serverPeer server saddr + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s + { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s } + + | dconnTunnel conn + , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps) + -> do + receivedStreams >>= \case + tunnelReader : _ -> do + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + [] -> do + svcPrint $ "Discovery: missing stream in tunnel response" + liftIO $ closeStream tunnelWriter + + | Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps) + -> do + svcPrint $ "Discovery: tunnel request failed" + liftIO $ closeStream tunnelWriter + +#ifdef ENABLE_ICE_SUPPORT + | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp , Just rinfo <- dconnIceInfo conn -> do liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice +#endif | otherwise -> svcPrint $ "Discovery: connection request failed" -#else - return () -#endif - else do - -- response to relayed request - case M.lookup (refDigest $ dconnSource conn) dpeers of - Just dp | Just dpeer <- dpPeer dp -> do + else do + -- response to relayed request + streams <- receivedStreams + svcModify $ \s -> s { dpsRelayedTunnelRequests = + filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) } + + case M.lookup (either refDigest id $ dconnSource conn) dpeers of + Just dp | Just dpeer <- dpPeer dp -> if + -- successful tunnel request + | dconnTunnel conn + , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps) + , fromTarget : _ <- streams + -> liftIO $ do + toSourceVar <- newEmptyMVar + void $ forkIO $ runPeerService @DiscoveryService dpeer $ do + liftIO . putMVar toSourceVar =<< openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionResponse conn + void $ forkIO $ do + relayStream fromSource toTarget + void $ forkIO $ do + toSource <- readMVar toSourceVar + relayStream fromTarget toSource + + -- failed tunnel request + | Just ( _, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps) + -> do + liftIO $ closeStream toTarget + sendToPeer dpeer $ DiscoveryConnectionResponse conn + + | otherwise -> do sendToPeer dpeer $ DiscoveryConnectionResponse conn - _ -> svcPrint $ "Discovery: failed to relay connection response" + _ -> svcPrint $ "Discovery: failed to relay connection response" serviceNewPeer = do server <- asks svcServer peer <- asks svcPeer - let addrToText saddr = do - ( addr, port ) <- IP.fromSockAddr saddr - Just $ T.pack $ show addr <> " " <> show port addrs <- concat <$> sequence - [ catMaybes . map addrToText <$> liftIO (getServerAddresses server) + [ catMaybes . map (fmap (uncurry DiscoveryIP) . inetFromSockAddr) <$> liftIO (getServerAddresses server) #ifdef ENABLE_ICE_SUPPORT - , return [ T.pack "ICE" ] + , return [ DiscoveryICE ] #endif ] + pid <- asks svcPeerIdentity + gs <- svcGetGlobal + let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid) + svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor } + when (not $ null addrs) $ do sendToPeer peer $ DiscoverySelf addrs Nothing + forM_ searchingFor $ \dgst -> do + sendToPeer peer $ DiscoverySearch (Right dgst) + +#ifdef ENABLE_ICE_SUPPORT + serviceStopServer _ _ _ pstates = do + forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do + mapM_ iceStopThread dpsIceConfig +#endif + + +identityDigests :: Foldable f => Identity f -> [ RefDigest ] +identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid + + +getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig) +getIceConfig = do + dpsIceConfig <$> svcGet >>= \case + Just cfg -> return $ Just cfg + Nothing -> do +#ifdef ENABLE_ICE_SUPPORT + stun <- dpsStunServer <$> svcGet + turn <- dpsTurnServer <$> svcGet + liftIO (iceCreateConfig stun turn) >>= \case + Just cfg -> do + svcModify $ \s -> s { dpsIceConfig = Just cfg } + return $ Just cfg + Nothing -> do + svcPrint $ "Discovery: failed to create ICE config" + return Nothing +#else + return Nothing +#endif + + +discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> m () +discoverySearch server dgst = do + peers <- liftIO $ getCurrentPeerList server + match <- forM peers $ \peer -> do + getPeerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ dgst `elem` identityDigests pid + _ -> return False + when (not $ or match) $ do + modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s + { dgsSearchingFor = S.insert dgst $ dgsSearchingFor s + } + forM_ peers $ \peer -> do + sendToPeer peer $ DiscoverySearch $ Right dgst + + +data TunnelAddress = TunnelAddress + { tunnelVia :: Peer + , tunnelIdentity :: UnifiedIdentity + , tunnelStreamNumber :: Int + , tunnelReader :: StreamReader + , tunnelWriter :: StreamWriter + } + +instance Eq TunnelAddress where + x == y = (==) + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Ord TunnelAddress where + compare x y = compare + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Show TunnelAddress where + show tunnel = concat + [ "tunnel@" + , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel + , "/" <> show (tunnelStreamNumber tunnel) + ] + +instance PeerAddressType TunnelAddress where + sendBytesToAddress TunnelAddress {..} bytes = do + writeStream tunnelWriter bytes + + connectionToAddressClosed TunnelAddress {..} = do + closeStream tunnelWriter + +relayStream :: StreamReader -> StreamWriter -> IO () +relayStream r w = do + p <- readStreamPacket r + writeStreamPacket w p + case p of + StreamClosed {} -> return () + _ -> relayStream r w + +receiveFromTunnel :: Server -> TunnelAddress -> IO () +receiveFromTunnel server taddr = do + p <- readStreamPacket (tunnelReader taddr) + case p of + StreamData {..} -> do + receivedFromCustomAddress server taddr stpData + receiveFromTunnel server taddr + StreamClosed {} -> do + return () + + +discoverySetupTunnel :: Peer -> RefDigest -> IO () +discoverySetupTunnel via target = do + runPeerService via $ do + discoverySetupTunnelResponse target + +discoverySetupTunnelResponse :: RefDigest -> ServiceHandler DiscoveryService () +discoverySetupTunnelResponse target = do + self <- refDigest . storedRef . idData <$> svcSelf + stream <- openStream + svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest + (emptyConnection (Right self) (Right target)) + { dconnTunnel = True + } diff --git a/src/Erebos/Flow.hs b/src/Erebos/Flow.hs index ba2607a..1e1a521 100644 --- a/src/Erebos/Flow.hs +++ b/src/Erebos/Flow.hs @@ -11,54 +11,53 @@ module Erebos.Flow ( import Control.Concurrent.STM -data Flow r w = Flow (TMVar [r]) (TMVar [w]) - | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') +data Flow r w + = Flow (TBQueue r) (TBQueue w) + | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') type SymFlow a = Flow a a newFlow :: STM (Flow a b, Flow b a) newFlow = do - x <- newEmptyTMVar - y <- newEmptyTMVar + x <- newTBQueue 16 + y <- newTBQueue 16 return (Flow x y, Flow y x) newFlowIO :: IO (Flow a b, Flow b a) newFlowIO = atomically newFlow readFlow :: Flow r w -> STM r -readFlow (Flow rvar _) = takeTMVar rvar >>= \case - (x:[]) -> return x - (x:xs) -> putTMVar rvar xs >> return x - [] -> error "Flow: empty list" +readFlow (Flow rvar _) = readTBQueue rvar readFlow (MappedFlow f _ up) = f <$> readFlow up tryReadFlow :: Flow r w -> STM (Maybe r) -tryReadFlow (Flow rvar _) = tryTakeTMVar rvar >>= \case - Just (x:[]) -> return (Just x) - Just (x:xs) -> putTMVar rvar xs >> return (Just x) - Just [] -> error "Flow: empty list" - Nothing -> return Nothing +tryReadFlow (Flow rvar _) = tryReadTBQueue rvar tryReadFlow (MappedFlow f _ up) = fmap f <$> tryReadFlow up canReadFlow :: Flow r w -> STM Bool -canReadFlow (Flow rvar _) = not <$> isEmptyTMVar rvar +canReadFlow (Flow rvar _) = not <$> isEmptyTBQueue rvar canReadFlow (MappedFlow _ _ up) = canReadFlow up writeFlow :: Flow r w -> w -> STM () -writeFlow (Flow _ wvar) = putTMVar wvar . (:[]) +writeFlow (Flow _ wvar) = writeTBQueue wvar writeFlow (MappedFlow _ f up) = writeFlow up . f writeFlowBulk :: Flow r w -> [w] -> STM () writeFlowBulk _ [] = return () -writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs +writeFlowBulk (Flow _ wvar) xs = mapM_ (writeTBQueue wvar) xs writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs tryWriteFlow :: Flow r w -> w -> STM Bool -tryWriteFlow (Flow _ wvar) = tryPutTMVar wvar . (:[]) -tryWriteFlow (MappedFlow _ f up) = tryWriteFlow up . f +tryWriteFlow (Flow _ wvar) x = do + isFullTBQueue wvar >>= \case + True -> return False + False -> do + writeTBQueue wvar x + return True +tryWriteFlow (MappedFlow _ f up) x = tryWriteFlow up $ f x canWriteFlow :: Flow r w -> STM Bool -canWriteFlow (Flow _ wvar) = isEmptyTMVar wvar +canWriteFlow (Flow _ wvar) = not <$> isFullTBQueue wvar canWriteFlow (MappedFlow _ _ up) = canWriteFlow up readFlowIO :: Flow r w -> IO r diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs index 2c6f500..a3dd9bc 100644 --- a/src/Erebos/ICE.chs +++ b/src/Erebos/ICE.chs @@ -8,6 +8,7 @@ module Erebos.ICE ( IceRemoteInfo, iceCreateConfig, + iceStopThread, iceCreateSession, iceDestroy, iceRemoteInfo, @@ -15,7 +16,7 @@ module Erebos.ICE ( iceConnect, iceSend, - iceSetChan, + serverPeerIce, ) where import Control.Arrow @@ -31,7 +32,6 @@ import Data.Text (Text) import Data.Text qualified as T import Data.Text.Encoding qualified as T import Data.Text.Read qualified as T -import Data.Void import Data.Word import Foreign.C.String @@ -42,7 +42,7 @@ import Foreign.Marshal.Array import Foreign.Ptr import Foreign.StablePtr -import Erebos.Flow +import Erebos.Network import Erebos.Object import Erebos.Storable import Erebos.Storage @@ -52,7 +52,7 @@ import Erebos.Storage data IceSession = IceSession { isStrans :: PjIceStrans , _isConfig :: IceConfig - , isChan :: MVar (Either [ByteString] (Flow Void ByteString)) + , isChan :: MVar (Either [ ByteString ] (ByteString -> IO ())) } instance Eq IceSession where @@ -64,6 +64,10 @@ instance Ord IceSession where instance Show IceSession where show _ = "<ICE>" +instance PeerAddressType IceSession where + sendBytesToAddress = iceSend + connectionToAddressClosed = iceDestroy + data IceRemoteInfo = IceRemoteInfo { iriUsernameFrament :: Text @@ -125,9 +129,9 @@ instance StorableText IceCandidate where data PjIceStransCfg newtype IceConfig = IceConfig (ForeignPtr PjIceStransCfg) -foreign import ccall unsafe "pjproject.h &ice_cfg_free" +foreign import ccall unsafe "pjproject.h &erebos_ice_cfg_free" ice_cfg_free :: FunPtr (Ptr PjIceStransCfg -> IO ()) -foreign import ccall unsafe "pjproject.h ice_cfg_create" +foreign import ccall unsafe "pjproject.h erebos_ice_cfg_create" ice_cfg_create :: CString -> Word16 -> CString -> Word16 -> IO (Ptr PjIceStransCfg) iceCreateConfig :: Maybe ( Text, Word16 ) -> Maybe ( Text, Word16 ) -> IO (Maybe IceConfig) @@ -139,6 +143,12 @@ iceCreateConfig stun turn = then return Nothing else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg +foreign import ccall unsafe "pjproject.h erebos_ice_cfg_stop_thread" + ice_cfg_stop_thread :: Ptr PjIceStransCfg -> IO () + +iceStopThread :: IceConfig -> IO () +iceStopThread (IceConfig fcfg) = withForeignPtr fcfg ice_cfg_stop_thread + {#pointer *pj_ice_strans as ^ #} iceCreateSession :: IceConfig -> IceSessionRole -> (IceSession -> IO ()) -> IO IceSession @@ -151,13 +161,13 @@ iceCreateSession icfg@(IceConfig fcfg) role cb = do forkIO $ cb sess sess <- IceSession <$> (withForeignPtr fcfg $ \cfg -> - {#call ice_create #} (castPtr cfg) (fromIntegral $ fromEnum role) (castStablePtrToPtr sptr) (castStablePtrToPtr cbptr) + {#call erebos_ice_create #} (castPtr cfg) (fromIntegral $ fromEnum role) (castStablePtrToPtr sptr) (castStablePtrToPtr cbptr) ) <*> pure icfg <*> (newMVar $ Left []) return $ sess -{#fun ice_destroy as ^ { isStrans `IceSession' } -> `()' #} +{#fun erebos_ice_destroy as iceDestroy { isStrans `IceSession' } -> `()' #} iceRemoteInfo :: IceSession -> IO IceRemoteInfo iceRemoteInfo sess = do @@ -172,7 +182,7 @@ iceRemoteInfo sess = do let cptrs = take maxcand $ iterate (`plusPtr` maxlen) bytes pokeArray carr $ take maxcand cptrs - ncand <- {#call ice_encode_session #} (isStrans sess) ufrag pass def carr (fromIntegral maxlen) (fromIntegral maxcand) + ncand <- {#call erebos_ice_encode_session #} (isStrans sess) ufrag pass def carr (fromIntegral maxlen) (fromIntegral maxcand) if ncand < 0 then fail "failed to generate ICE remote info" else IceRemoteInfo <$> (T.pack <$> peekCString ufrag) @@ -189,13 +199,13 @@ iceShow sess = do iceConnect :: IceSession -> IceRemoteInfo -> (IO ()) -> IO () iceConnect sess remote cb = do cbptr <- newStablePtr $ cb - ice_connect sess cbptr + erebos_ice_connect sess cbptr (iriUsernameFrament remote) (iriPassword remote) (iriDefaultCandidate remote) (iriCandidates remote) -{#fun ice_connect { isStrans `IceSession', castStablePtrToPtr `StablePtr (IO ())', +{#fun erebos_ice_connect { isStrans `IceSession', castStablePtrToPtr `StablePtr (IO ())', withText* `Text', withText* `Text', withText* `Text', withTextArray* `[Text]'& } -> `()' #} withText :: Text -> (Ptr CChar -> IO a) -> IO a @@ -211,19 +221,19 @@ withTextArray tsAll f = helper tsAll [] withByteStringLen :: Num n => ByteString -> ((Ptr CChar, n) -> IO a) -> IO a withByteStringLen t f = unsafeUseAsCStringLen t (f . (id *** fromIntegral)) -{#fun ice_send as ^ { isStrans `IceSession', withByteStringLen* `ByteString'& } -> `()' #} +{#fun erebos_ice_send as iceSend { isStrans `IceSession', withByteStringLen* `ByteString'& } -> `()' #} foreign export ccall ice_call_cb :: StablePtr (IO ()) -> IO () ice_call_cb :: StablePtr (IO ()) -> IO () ice_call_cb = join . deRefStablePtr -iceSetChan :: IceSession -> Flow Void ByteString -> IO () -iceSetChan sess chan = do +iceSetServer :: IceSession -> Server -> IO () +iceSetServer sess server = do modifyMVar_ (isChan sess) $ \orig -> do case orig of - Left buf -> mapM_ (writeFlowIO chan) $ reverse buf + Left buf -> mapM_ (receivedFromCustomAddress server sess) $ reverse buf Right _ -> return () - return $ Right chan + return $ Right $ receivedFromCustomAddress server sess foreign export ccall ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO () ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO () @@ -231,5 +241,12 @@ ice_rx_data sptr buf len = do sess <- deRefStablePtr sptr bs <- packCStringLen (buf, len) modifyMVar_ (isChan sess) $ \case - mc@(Right chan) -> writeFlowIO chan bs >> return mc - Left bss -> return $ Left (bs:bss) + mc@(Right sendToServer) -> sendToServer bs >> return mc + Left bss -> return $ Left (bs : bss) + + +serverPeerIce :: Server -> IceSession -> IO Peer +serverPeerIce server ice = do + peer <- serverPeerCustom server ice + iceSetServer ice server + return peer diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c index e79fb9d..8d91eac 100644 --- a/src/Erebos/ICE/pjproject.c +++ b/src/Erebos/ICE/pjproject.c @@ -1,6 +1,7 @@ #include "pjproject.h" #include "Erebos/ICE_stub.h" +#include <stdatomic.h> #include <stdio.h> #include <stdlib.h> #include <stdbool.h> @@ -15,6 +16,13 @@ static struct pj_sockaddr def_addr; } ice; +struct erebos_ice_cfg +{ + pj_ice_strans_cfg cfg; + pj_thread_t * thread; + atomic_bool exit; +}; + struct user_data { pj_ice_sess_role role; @@ -30,17 +38,17 @@ static void ice_perror(const char * msg, pj_status_t status) fprintf(stderr, "ICE: %s: %s\n", msg, err); } -static int ice_worker_thread(void * vcfg) +static int ice_worker_thread( void * vcfg ) { - pj_ice_strans_cfg * cfg = (pj_ice_strans_cfg *) vcfg; + struct erebos_ice_cfg * ecfg = (struct erebos_ice_cfg *)( vcfg ); - while (true) { + while( ! ecfg->exit ){ pj_time_val max_timeout = { 0, 0 }; pj_time_val timeout = { 0, 0 }; max_timeout.msec = 500; - pj_timer_heap_poll(cfg->stun_cfg.timer_heap, &timeout); + pj_timer_heap_poll( ecfg->cfg.stun_cfg.timer_heap, &timeout ); pj_assert(timeout.sec >= 0 && timeout.msec >= 0); if (timeout.msec >= 1000) @@ -49,7 +57,7 @@ static int ice_worker_thread(void * vcfg) if (PJ_TIME_VAL_GT(timeout, max_timeout)) timeout = max_timeout; - int c = pj_ioqueue_poll(cfg->stun_cfg.ioqueue, &timeout); + int c = pj_ioqueue_poll( ecfg->cfg.stun_cfg.ioqueue, &timeout ); if (c < 0) pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout)); } @@ -70,7 +78,7 @@ static void cb_on_ice_complete(pj_ice_strans * strans, { if (status != PJ_SUCCESS) { ice_perror("cb_on_ice_complete", status); - ice_destroy(strans); + erebos_ice_destroy(strans); return; } @@ -131,80 +139,91 @@ exit: pthread_mutex_unlock(&mutex); } -pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, +struct erebos_ice_cfg * erebos_ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ) { ice_init(); - pj_ice_strans_cfg * cfg = malloc( sizeof(pj_ice_strans_cfg) ); - pj_ice_strans_cfg_default( cfg ); + struct erebos_ice_cfg * ecfg = malloc( sizeof(struct erebos_ice_cfg) ); + pj_ice_strans_cfg_default( &ecfg->cfg ); + ecfg->exit = false; + ecfg->thread = NULL; - cfg->stun_cfg.pf = &ice.cp.factory; + ecfg->cfg.stun_cfg.pf = &ice.cp.factory; if( pj_timer_heap_create( ice.pool, 100, - &cfg->stun_cfg.timer_heap ) != PJ_SUCCESS ){ + &ecfg->cfg.stun_cfg.timer_heap ) != PJ_SUCCESS ){ fprintf( stderr, "pj_timer_heap_create failed\n" ); goto fail; } - if( pj_ioqueue_create( ice.pool, 16, &cfg->stun_cfg.ioqueue ) != PJ_SUCCESS ){ + if( pj_ioqueue_create( ice.pool, 16, &ecfg->cfg.stun_cfg.ioqueue ) != PJ_SUCCESS ){ fprintf( stderr, "pj_ioqueue_create failed\n" ); goto fail; } - pj_thread_t * thread; if( pj_thread_create( ice.pool, NULL, &ice_worker_thread, - cfg, 0, 0, &thread ) != PJ_SUCCESS ){ + ecfg, 0, 0, &ecfg->thread ) != PJ_SUCCESS ){ fprintf( stderr, "pj_thread_create failed\n" ); goto fail; } - cfg->af = pj_AF_INET(); - cfg->opt.aggressive = PJ_TRUE; + ecfg->cfg.af = pj_AF_INET(); + ecfg->cfg.opt.aggressive = PJ_TRUE; if( stun_server ){ - cfg->stun.server.ptr = malloc( strlen( stun_server )); - pj_strcpy2( &cfg->stun.server, stun_server ); + ecfg->cfg.stun.server.ptr = malloc( strlen( stun_server )); + pj_strcpy2( &ecfg->cfg.stun.server, stun_server ); if( stun_port ) - cfg->stun.port = stun_port; + ecfg->cfg.stun.port = stun_port; } if( turn_server ){ - cfg->turn.server.ptr = malloc( strlen( turn_server )); - pj_strcpy2( &cfg->turn.server, turn_server ); + ecfg->cfg.turn.server.ptr = malloc( strlen( turn_server )); + pj_strcpy2( &ecfg->cfg.turn.server, turn_server ); if( turn_port ) - cfg->turn.port = turn_port; - cfg->turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; - cfg->turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; - cfg->turn.conn_type = PJ_TURN_TP_UDP; + ecfg->cfg.turn.port = turn_port; + ecfg->cfg.turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; + ecfg->cfg.turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; + ecfg->cfg.turn.conn_type = PJ_TURN_TP_UDP; } - return cfg; + return ecfg; fail: - ice_cfg_free( cfg ); + erebos_ice_cfg_free( ecfg ); return NULL; } -void ice_cfg_free( pj_ice_strans_cfg * cfg ) +void erebos_ice_cfg_free( struct erebos_ice_cfg * ecfg ) { - if( ! cfg ) + if( ! ecfg ) return; - if( cfg->turn.server.ptr ) - free( cfg->turn.server.ptr ); + ecfg->exit = true; + pj_thread_join( ecfg->thread ); - if( cfg->stun.server.ptr ) - free( cfg->stun.server.ptr ); + if( ecfg->cfg.turn.server.ptr ) + free( ecfg->cfg.turn.server.ptr ); - if( cfg->stun_cfg.ioqueue ) - pj_ioqueue_destroy( cfg->stun_cfg.ioqueue ); + if( ecfg->cfg.stun.server.ptr ) + free( ecfg->cfg.stun.server.ptr ); - if( cfg->stun_cfg.timer_heap ) - pj_timer_heap_destroy( cfg->stun_cfg.timer_heap ); + if( ecfg->cfg.stun_cfg.ioqueue ) + pj_ioqueue_destroy( ecfg->cfg.stun_cfg.ioqueue ); - free( cfg ); + if( ecfg->cfg.stun_cfg.timer_heap ) + pj_timer_heap_destroy( ecfg->cfg.stun_cfg.timer_heap ); + + free( ecfg ); +} + +void erebos_ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg ) +{ + if( ! ecfg ) + return; + ecfg->exit = true; } -pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role, +pj_ice_strans * erebos_ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ) { ice_init(); @@ -221,7 +240,7 @@ pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role .on_ice_complete = cb_on_ice_complete, }; - pj_status_t status = pj_ice_strans_create( NULL, cfg, 1, + pj_status_t status = pj_ice_strans_create( NULL, &ecfg->cfg, 1, udata, &icecb, &res ); if (status != PJ_SUCCESS) @@ -230,7 +249,7 @@ pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role return res; } -void ice_destroy(pj_ice_strans * strans) +void erebos_ice_destroy(pj_ice_strans * strans) { struct user_data * udata = pj_ice_strans_get_user_data(strans); if (udata->sptr) @@ -245,7 +264,7 @@ void ice_destroy(pj_ice_strans * strans) pj_ice_strans_destroy(strans); } -ssize_t ice_encode_session(pj_ice_strans * strans, char * ufrag, char * pass, +ssize_t erebos_ice_encode_session(pj_ice_strans * strans, char * ufrag, char * pass, char * def, char * candidates[], size_t maxlen, size_t maxcand) { int n; @@ -299,7 +318,7 @@ ssize_t ice_encode_session(pj_ice_strans * strans, char * ufrag, char * pass, return cand_cnt; } -void ice_connect(pj_ice_strans * strans, HsStablePtr cb, +void erebos_ice_connect(pj_ice_strans * strans, HsStablePtr cb, const char * ufrag, const char * pass, const char * defcand, const char * tcandidates[], size_t ncand) { @@ -390,7 +409,7 @@ void ice_connect(pj_ice_strans * strans, HsStablePtr cb, } } -void ice_send(pj_ice_strans * strans, const char * data, size_t len) +void erebos_ice_send(pj_ice_strans * strans, const char * data, size_t len) { if (!pj_ice_strans_sess_is_complete(strans)) { fprintf(stderr, "ICE: negotiation has not been started or is in progress\n"); diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h index e4fcbdb..7a1b96d 100644 --- a/src/Erebos/ICE/pjproject.h +++ b/src/Erebos/ICE/pjproject.h @@ -3,17 +3,18 @@ #include <pjnath.h> #include <HsFFI.h> -pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, +struct erebos_ice_cfg * erebos_ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ); -void ice_cfg_free( pj_ice_strans_cfg * cfg ); +void erebos_ice_cfg_free( struct erebos_ice_cfg * cfg ); +void erebos_ice_cfg_stop_thread( struct erebos_ice_cfg * cfg ); -pj_ice_strans * ice_create( const pj_ice_strans_cfg *, pj_ice_sess_role role, +pj_ice_strans * erebos_ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ); -void ice_destroy(pj_ice_strans * strans); +void erebos_ice_destroy(pj_ice_strans * strans); -ssize_t ice_encode_session(pj_ice_strans *, char * ufrag, char * pass, +ssize_t erebos_ice_encode_session(pj_ice_strans *, char * ufrag, char * pass, char * def, char * candidates[], size_t maxlen, size_t maxcand); -void ice_connect(pj_ice_strans * strans, HsStablePtr cb, +void erebos_ice_connect(pj_ice_strans * strans, HsStablePtr cb, const char * ufrag, const char * pass, const char * defcand, const char * candidates[], size_t ncand); -void ice_send(pj_ice_strans *, const char * data, size_t len); +void erebos_ice_send(pj_ice_strans *, const char * data, size_t len); diff --git a/src/Erebos/Identity.hs b/src/Erebos/Identity.hs index a3f17b5..bd5acb3 100644 --- a/src/Erebos/Identity.hs +++ b/src/Erebos/Identity.hs @@ -214,29 +214,33 @@ isExtension x = case fromSigned x of BaseIdentityData {} -> False _ -> True -createIdentity :: Storage -> Maybe Text -> Maybe UnifiedIdentity -> IO UnifiedIdentity -createIdentity st name owner = do - (secret, public) <- generateKeys st - (_secretMsg, publicMsg) <- generateKeys st - - let signOwner :: Signed a -> ReaderT Storage IO (Signed a) +createIdentity + :: forall m e. (MonadStorage m, MonadError e m, FromErebosError e, MonadIO m) + => Maybe Text -> Maybe UnifiedIdentity -> m UnifiedIdentity +createIdentity name owner = do + st <- getStorage + ( secret, public ) <- liftIO $ generateKeys st + ( _secretMsg, publicMsg ) <- liftIO $ generateKeys st + + let signOwner :: Signed a -> m (Signed a) signOwner idd | Just o <- owner = do - Just ownerSecret <- loadKeyMb (iddKeyIdentity $ fromSigned $ idData o) + ownerSecret <- maybe (throwOtherError "failed to load private key") return =<< + loadKeyMb (iddKeyIdentity $ fromSigned $ idData o) signAdd ownerSecret idd | otherwise = return idd - Just identity <- flip runReaderT st $ do - baseData <- mstore =<< signOwner =<< sign secret =<< - mstore (emptyIdentityData public) - { iddOwner = idData <$> owner - , iddKeyMessage = Just publicMsg - } - let extOwner = do - odata <- idExtData <$> owner - guard $ isExtension odata - return odata - + baseData <- mstore =<< signOwner =<< sign secret =<< + mstore (emptyIdentityData public) + { iddOwner = idData <$> owner + , iddKeyMessage = Just publicMsg + } + let extOwner = do + odata <- idExtData <$> owner + guard $ isExtension odata + return odata + + maybe (throwOtherError "created invalid identity") return =<< do validateExtendedIdentityF . I.Identity <$> if isJust name || isJust extOwner then mstore =<< signOwner =<< sign secret =<< @@ -245,7 +249,6 @@ createIdentity st name owner = do , ideOwner = extOwner } else return $ baseToExtended baseData - return identity validateIdentity :: Stored (Signed IdentityData) -> Maybe UnifiedIdentity validateIdentity = validateIdentityF . I.Identity diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 54658de..6265bbf 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE CPP #-} - module Erebos.Network ( Server, startServer, @@ -10,20 +8,24 @@ module Erebos.Network ( ServerOptions(..), serverIdentity, defaultServerOptions, Peer, peerServer, peerStorage, - PeerAddress(..), peerAddress, - PeerIdentity(..), peerIdentity, + PeerAddress(..), getPeerAddress, getPeerAddresses, + PeerIdentity(..), getPeerIdentity, WaitingRef, wrDigest, Service(..), + + PeerAddressType(..), + receivedFromCustomAddress, + serverPeer, -#ifdef ENABLE_ICE_SUPPORT - serverPeerIce, -#endif + serverPeerCustom, + findPeer, dropPeer, isPeerDropped, sendToPeer, sendManyToPeer, sendToPeerStored, sendManyToPeerStored, sendToPeerWith, runPeerService, + modifyServiceGlobalState, discoveryPort, ) where @@ -36,13 +38,14 @@ import Control.Monad.Except import Control.Monad.Reader import Control.Monad.State +import Data.ByteString (ByteString) import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL import Data.Function import Data.IP qualified as IP import Data.List import Data.Map (Map) -import qualified Data.Map as M +import Data.Map qualified as M import Data.Maybe import Data.Typeable import Data.Word @@ -56,13 +59,11 @@ import Foreign.Storable as F import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket hiding (ControlMessage) -import qualified Network.Socket.ByteString as S +import Network.Socket.ByteString qualified as S import Erebos.Error -#ifdef ENABLE_ICE_SUPPORT -import Erebos.ICE -#endif import Erebos.Identity +import Erebos.Network.Address import Erebos.Network.Channel import Erebos.Network.Protocol import Erebos.Object.Internal @@ -114,12 +115,16 @@ getNextPeerChange = atomically . readTChan . serverChanPeer data ServerOptions = ServerOptions { serverPort :: PortNumber , serverLocalDiscovery :: Bool + , serverErrorPrefix :: String + , serverTestLog :: Bool } defaultServerOptions :: ServerOptions defaultServerOptions = ServerOptions { serverPort = discoveryPort , serverLocalDiscovery = True + , serverErrorPrefix = "" + , serverTestLog = False } @@ -134,6 +139,14 @@ data Peer = Peer , peerWaitingRefs :: TMVar [WaitingRef] } +-- | Get current main address of the peer (used to send new packets). +getPeerAddress :: MonadIO m => Peer -> m PeerAddress +getPeerAddress = liftIO . return . peerAddress + +-- | Get all known addresses of given peer. +getPeerAddresses :: MonadIO m => Peer -> m [ PeerAddress ] +getPeerAddresses = fmap (: []) . getPeerAddress + peerServer :: Peer -> Server peerServer = peerServer_ @@ -157,50 +170,52 @@ setPeerChannel Peer {..} ch = do instance Eq Peer where (==) = (==) `on` peerIdentityVar -data PeerAddress = DatagramAddress SockAddr -#ifdef ENABLE_ICE_SUPPORT - | PeerIceSession IceSession -#endif +class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where + sendBytesToAddress :: addr -> ByteString -> IO () + connectionToAddressClosed :: addr -> IO () + +data PeerAddress + = forall addr. PeerAddressType addr => CustomPeerAddress addr + | DatagramAddress SockAddr instance Show PeerAddress where - show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of - Just (IP.IPv6 ipv6, port) - | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 - -> [show (IP.toIPv4w ipv4), show port] - Just (addr, port) - -> [show addr, show port] - _ -> [show saddr] -#ifdef ENABLE_ICE_SUPPORT - show (PeerIceSession ice) = show ice -#endif + show (CustomPeerAddress addr) = show addr + + show (DatagramAddress saddr) = + case inetFromSockAddr saddr of + Just ( addr, port ) -> unwords [ show addr, show port ] + _ -> show saddr instance Eq PeerAddress where + CustomPeerAddress addr == CustomPeerAddress addr' + | Just addr'' <- cast addr' = addr == addr'' DatagramAddress addr == DatagramAddress addr' = addr == addr' -#ifdef ENABLE_ICE_SUPPORT - PeerIceSession ice == PeerIceSession ice' = ice == ice' _ == _ = False -#endif instance Ord PeerAddress where + compare (CustomPeerAddress addr) (CustomPeerAddress addr') + | Just addr'' <- cast addr' = compare addr addr'' + | otherwise = compare (typeOf addr) (typeOf addr') + compare (CustomPeerAddress _ ) _ = LT + compare _ (CustomPeerAddress _ ) = GT + compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr' -#ifdef ENABLE_ICE_SUPPORT - compare (DatagramAddress _ ) _ = LT - compare _ (DatagramAddress _ ) = GT - compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice' -#endif -data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) - | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) - | PeerIdentityFull UnifiedIdentity +data PeerIdentity + = PeerIdentityUnknown (TVar [ UnifiedIdentity -> ExceptT ErebosError IO () ]) + | PeerIdentityRef WaitingRef (TVar [ UnifiedIdentity -> ExceptT ErebosError IO () ]) + | PeerIdentityFull UnifiedIdentity -peerIdentity :: MonadIO m => Peer -> m PeerIdentity -peerIdentity = liftIO . atomically . readTVar . peerIdentityVar +-- | Get currently known identity of the given peer +getPeerIdentity :: MonadIO m => Peer -> m PeerIdentity +getPeerIdentity = liftIO . atomically . readTVar . peerIdentityVar -data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])] - | PeerConnected (Connection PeerAddress) - | PeerDropped +data PeerState + = PeerInit [ ( SecurityRequirement, TransportPacket Ref, [ TransportHeaderItem ] ) ] + | PeerConnected (Connection PeerAddress) + | PeerDropped lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID @@ -252,7 +267,16 @@ startServer serverOptions serverOrigHead logd' serverServices = do let logd = writeTQueue serverErrorLog forkServerThread server $ forever $ do - logd' =<< atomically (readTQueue serverErrorLog) + logd' . (serverErrorPrefix serverOptions <>) =<< atomically (readTQueue serverErrorLog) + + logt <- if + | serverTestLog serverOptions -> do + serverTestLog <- newTQueueIO + forkServerThread server $ forever $ do + logd' =<< atomically (readTQueue serverTestLog) + return $ writeTQueue serverTestLog + | otherwise -> do + return $ \_ -> return () forkServerThread server $ dataResponseWorker server forkServerThread server $ forever $ do @@ -302,13 +326,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do announceUpdate idt forM_ serverServices $ \(SomeService service _) -> do - forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do - watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do - withMVar serverPeers $ mapM_ $ \peer -> atomically $ do - readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull _ -> writeTQueue serverIOActions $ do - runPeerService peer $ act x - _ -> return () + forM_ (serviceStorageWatchers service) $ \case + SomeStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + withMVar serverPeers $ mapM_ $ \peer -> atomically $ do + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> writeTQueue serverIOActions $ do + runPeerService peer $ act x + _ -> return () + GlobalStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + atomically $ writeTQueue serverIOActions $ do + act server x forkServerThread server $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 @@ -316,12 +345,10 @@ startServer serverOptions serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (paddr, msg) <- readFlowIO serverRawPath - handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do + handle (\(e :: SomeException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do case paddr of + CustomPeerAddress addr -> sendBytesToAddress addr msg DatagramAddress addr -> void $ S.sendTo sock msg addr -#ifdef ENABLE_ICE_SUPPORT - PeerIceSession ice -> iceSend ice msg -#endif forkServerThread server $ forever $ do readFlowIO serverControlFlow >>= \case @@ -363,9 +390,13 @@ startServer serverOptions serverOrigHead logd' serverServices = do prefs <- forM objs $ storeObject $ peerInStorage peer identity <- readMVar serverIdentity_ let svcs = map someServiceID serverServices - handlePacket identity secure peer chanSvc svcs header prefs + handlePacket paddr identity secure peer chanSvc svcs header prefs peerLoop Nothing -> do + case paddr of + DatagramAddress _ -> return () + CustomPeerAddress caddr -> connectionToAddressClosed caddr + dropPeer peer atomically $ writeTChan serverChanPeer peer peerLoop @@ -373,7 +404,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do ReceivedAnnounce addr _ -> do void $ serverPeer' server addr - erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd protocolRawPath protocolControlFlow + erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd logt protocolRawPath protocolControlFlow forkServerThread server $ withSocketsDo $ do let hints = defaultHints @@ -385,16 +416,29 @@ startServer serverOptions serverOrigHead logd' serverServices = do bracket (open addr) close loop forkServerThread server $ forever $ do - (peer, svc, ref) <- atomically $ readTQueue chanSvc + ( peer, paddr, svc, ref, streams ) <- atomically $ readTQueue chanSvc case find ((svc ==) . someServiceID) serverServices of - Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) + Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams paddr peer (serviceHandler $ wrappedLoad @s ref) _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" return server stopServer :: Server -> IO () -stopServer Server {..} = do - mapM_ killThread =<< takeMVar serverThreads +stopServer server@Server {..} = do + withMVar serverPeers $ \peers -> do + ( global, peerStates ) <- atomically $ (,) + <$> takeTMVar serverServiceStates + <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState) + + forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do + ps <- forM peerStates $ \( peer, states ) -> + return $ ( peer, ) $ case M.lookup (serviceID proxy) states of + Just (SomeServiceState (_ :: Proxy ps) pstate) + | Just (Refl :: s :~: ps) <- eqT + -> pstate + _ -> emptyServiceState proxy + serviceStopServer proxy server gs ps + mapM_ killThread =<< takeMVar serverThreads dataResponseWorker :: Server -> IO () dataResponseWorker server = forever $ do @@ -502,9 +546,7 @@ openStream = do conn <- readTVarP peerState >>= \case PeerConnected conn -> return conn _ -> throwError "can't open stream without established connection" - (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case - Right res -> return res - Left err -> throwError err + (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn) liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler) addHeader hdr @@ -523,10 +565,10 @@ appendDistinct x (y:ys) | x == y = y : ys | otherwise = y : appendDistinct x ys appendDistinct x [] = [x] -handlePacket :: UnifiedIdentity -> Bool - -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] - -> TransportHeader -> [PartialRef] -> IO () -handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do +handlePacket :: PeerAddress -> UnifiedIdentity -> Bool + -> Peer -> TQueue ( Peer, PeerAddress, ServiceID, Ref, [ RawStreamReader ] ) -> [ ServiceID ] + -> TransportHeader -> [ PartialRef ] -> IO () +handlePacket paddr identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer ochannel <- getPeerChannel peer let sidentity = idData identity @@ -659,10 +701,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = | Just svc <- lookupServiceType headers -> if | svc `elem` svcs -> do if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -} - then do - void $ newWaitingRef dgst $ \ref -> - liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) - else throwError $ "missing service object " ++ show dgst + then do + streamReaders <- mapM acceptStream $ lookupNewStreams headers + void $ newWaitingRef dgst $ \ref -> + liftIO $ atomically $ writeTQueue chanSvc ( peer, paddr, svc, ref, streamReaders ) + else throwError $ "missing service object " ++ show dgst | otherwise -> addHeader $ Rejected dgst | otherwise -> throwError $ "service ref without type" @@ -741,7 +784,7 @@ finalizedChannel peer@Peer {..} ch self = do -- Notify services about new peer readTVar peerIdentityVar >>= \case - PeerIdentityFull _ -> notifyServicesOfPeer peer + PeerIdentityFull _ -> notifyServicesOfPeer True peer _ -> return () @@ -767,7 +810,7 @@ handleIdentityAnnounce self peer ref = liftIO $ atomically $ do PeerIdentityFull pid | idData pid `precedes` wrappedLoad ref -> validateAndUpdate (idUpdates pid) $ \_ -> do - notifyServicesOfPeer peer + notifyServicesOfPeer False peer _ -> return () @@ -779,16 +822,23 @@ handleIdentityUpdate peer ref = liftIO $ atomically $ do -> do writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid' writeTChan (serverChanPeer $ peerServer peer) peer - when (idData pid /= idData pid') $ notifyServicesOfPeer peer + when (pid /= pid') $ do + notifyServicesOfPeer False peer | otherwise -> return () -notifyServicesOfPeer :: Peer -> STM () -notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do +notifyServicesOfPeer :: Bool -> Peer -> STM () +notifyServicesOfPeer new peer@Peer { peerServer_ = Server {..} } = do writeTQueue serverIOActions $ do + paddr <- getPeerAddress peer forM_ serverServices $ \service@(SomeService _ attrs) -> - runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer + runPeerServiceOn (Just ( service, attrs )) [] paddr peer $ + if new then serviceNewPeer else serviceUpdatedPeer + +receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO () +receivedFromCustomAddress Server {..} addr msg = do + writeFlowIO serverRawPath ( CustomPeerAddress addr, msg ) mkPeer :: Server -> PeerAddress -> IO Peer mkPeer peerServer_ peerAddress = do @@ -808,14 +858,8 @@ serverPeer server paddr = do _ -> paddr serverPeer' server (DatagramAddress paddr') -#ifdef ENABLE_ICE_SUPPORT -serverPeerIce :: Server -> IceSession -> IO Peer -serverPeerIce server@Server {..} ice = do - let paddr = PeerIceSession ice - peer <- serverPeer' server paddr - iceSetChan ice $ mapFlow undefined (paddr,) serverRawPath - return peer -#endif +serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer +serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr) serverPeer' :: Server -> PeerAddress -> IO Peer serverPeer' server paddr = do @@ -829,6 +873,13 @@ serverPeer' server paddr = do writeFlow (serverControlFlow server) (RequestConnection paddr) return peer +findPeer :: Server -> (Peer -> IO Bool) -> IO (Maybe Peer) +findPeer server test = withMVar (serverPeers server) (helper . M.elems) + where + helper (p : ps) = test p >>= \case True -> return (Just p) + False -> helper ps + helper [] = return Nothing + dropPeer :: MonadIO m => Peer -> m () dropPeer peer = liftIO $ do modifyMVar_ (serverPeers $ peerServer peer) $ \pvalue -> do @@ -856,19 +907,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: []) sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m () sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True) -sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () +sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m () sendToPeerList peer parts = do let st = peerStorage peer - srefs <- liftIO $ fmap catMaybes $ forM parts $ \case - ServiceReply (Left x) use -> Just . (,use) <$> store st x - ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) - ServiceFinally act -> act >> return Nothing - let dgsts = map (refDigest . fst) srefs - let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU - header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts) - packet = TransportPacket header content - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] - liftIO $ atomically $ sendToPeerS peer ackedBy packet + res <- runExceptT $ do + srefs <- liftIO $ fmap catMaybes $ forM parts $ \case + ServiceReply (Left x) use -> Just . (,use) <$> store st x + ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) + _ -> return Nothing + + streamHeaders <- concat <$> do + (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do + forM parts $ \case + ServiceOpenStream cb -> do + conn <- lift (readTVar (peerState peer)) >>= \case + PeerConnected conn -> return conn + _ -> throwError "can't open stream without established connection" + (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn) + + lift $ writeTQueue (serverIOActions (peerServer peer)) $ do + liftIO $ forkServerThread (peerServer peer) handler + return [ ( hdr, cb writer ) ] + _ -> return [] + liftIO $ sequence_ $ map snd streamHeaders + + liftIO $ forM_ parts $ \case + ServiceFinally act -> act + _ -> return () + + let dgsts = map (refDigest . fst) srefs + let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU + header = TransportHeader $ concat + [ [ ServiceType (serviceID $ head parts) ] + , map ServiceRef dgsts + , map fst streamHeaders + ] + packet = TransportPacket header content + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] + liftIO $ atomically $ sendToPeerS peer ackedBy packet + + case res of + Right () -> return () + Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $ + "failed to send packet to " <> show (peerAddress peer) <> ": " <> err sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerS' secure Peer {..} ackedBy packet = do @@ -901,17 +982,19 @@ sendToPeerWith peer fobj = do Left err -> throwError $ fromErebosError err -lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) | Just (Refl :: s :~: t) <- eqT = Just (service, attr) | otherwise = lookupService proxy rest lookupService _ [] = Nothing runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m () -runPeerService = runPeerServiceOn Nothing +runPeerService peer handler = do + paddr <- getPeerAddress peer + runPeerServiceOn Nothing [] paddr peer handler -runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m () -runPeerServiceOn mbservice peer handler = liftIO $ do +runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> PeerAddress -> Peer -> ServiceHandler s () -> m () +runPeerServiceOn mbservice newStreams paddr peer handler = liftIO $ do let server = peerServer peer proxy = Proxy @s svc = serviceID proxy @@ -933,9 +1016,11 @@ runPeerServiceOn mbservice peer handler = liftIO $ do let inp = ServiceInput { svcAttributes = attr , svcPeer = peer + , svcPeerAddress = paddr , svcPeerIdentity = peerId , svcServer = server , svcPrintOp = atomically . logd + , svcNewStreams = newStreams } reloadHead (serverOrigHead server) >>= \case Nothing -> atomically $ do @@ -951,35 +1036,37 @@ runPeerServiceOn mbservice peer handler = liftIO $ do putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs putTMVar (serverServiceStates server) $ M.insert svc (SomeServiceGlobalState proxy gs') global _ -> do - atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show (peerAddress peer) + atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show paddr _ -> atomically $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +modifyServiceGlobalState + :: forall s a m e proxy. (Service s, MonadIO m, MonadError e m, FromErebosError e) + => Server -> proxy s + -> (ServiceGlobalState s -> ( ServiceGlobalState s, a )) + -> m a +modifyServiceGlobalState server proxy f = do + let svc = serviceID proxy + case lookupService proxy (serverServices server) of + Just ( service, _ ) -> do + liftIO $ atomically $ do + global <- takeTMVar (serverServiceStates server) + ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of + SomeServiceGlobalState (_ :: Proxy gs) gs -> do + (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT + let ( gs', res ) = f gs + return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res ) + putTMVar (serverServiceStates server) global' + return res + Nothing -> do + throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'" -foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) -foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) -foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) -foreign import ccall unsafe "stdlib.h free" cFree :: Ptr a -> IO () - -data InetAddress = InetAddress { fromInetAddress :: IP.IP } - -instance F.Storable InetAddress where - sizeOf _ = sizeOf (undefined :: CInt) + 16 - alignment _ = 8 - - peek ptr = (unpackFamily <$> peekByteOff ptr 0) >>= \case - AF_INET -> InetAddress . IP.IPv4 . IP.fromHostAddress <$> peekByteOff ptr (sizeOf (undefined :: CInt)) - AF_INET6 -> InetAddress . IP.IPv6 . IP.toIPv6b . map fromIntegral <$> peekArray 16 (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) - _ -> fail "InetAddress: unknown family" - poke ptr (InetAddress addr) = case addr of - IP.IPv4 ip -> do - pokeByteOff ptr 0 (packFamily AF_INET) - pokeByteOff ptr (sizeOf (undefined :: CInt)) (IP.toHostAddress ip) - IP.IPv6 ip -> do - pokeByteOff ptr 0 (packFamily AF_INET6) - pokeArray (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) (map fromIntegral $ IP.fromIPv6b ip) +foreign import ccall unsafe "Network/ifaddrs.h erebos_join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) +foreign import ccall unsafe "Network/ifaddrs.h erebos_local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) +foreign import ccall unsafe "Network/ifaddrs.h erebos_broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) +foreign import ccall unsafe "stdlib.h free" cFree :: Ptr a -> IO () joinMulticast :: Socket -> IO [ Word32 ] joinMulticast sock = @@ -1007,7 +1094,7 @@ getServerAddresses Server {..} = do count <- fromIntegral <$> peek pcount res <- peekArray count ptr cFree ptr - return $ map (IP.toSockAddr . (, serverPort serverOptions ) . fromInetAddress) res + return $ map (inetToSockAddr . (, serverPort serverOptions )) res getBroadcastAddresses :: PortNumber -> IO [SockAddr] getBroadcastAddresses port = do diff --git a/src/Erebos/Network.hs-boot b/src/Erebos/Network.hs-boot index af77581..17a5275 100644 --- a/src/Erebos/Network.hs-boot +++ b/src/Erebos/Network.hs-boot @@ -4,5 +4,6 @@ import Erebos.Object.Internal data Server data Peer +data PeerAddress peerStorage :: Peer -> Storage diff --git a/src/Erebos/Network/Address.hs b/src/Erebos/Network/Address.hs new file mode 100644 index 0000000..63f6af1 --- /dev/null +++ b/src/Erebos/Network/Address.hs @@ -0,0 +1,65 @@ +module Erebos.Network.Address ( + InetAddress(..), + inetFromSockAddr, + inetToSockAddr, + + SockAddr, PortNumber, +) where + +import Data.Bifunctor +import Data.IP qualified as IP +import Data.Word + +import Foreign.C.Types +import Foreign.Marshal.Array +import Foreign.Ptr +import Foreign.Storable as F + +import Network.Socket + +import Text.Read + + +newtype InetAddress = InetAddress { fromInetAddress :: IP.IP } + deriving (Eq, Ord) + +instance Show InetAddress where + show (InetAddress ipaddr) + | IP.IPv6 ipv6 <- ipaddr + , ( 0, 0, 0xffff, ipv4 ) <- IP.fromIPv6w ipv6 + = show (IP.toIPv4w ipv4) + + | otherwise + = show ipaddr + +instance Read InetAddress where + readPrec = do + readPrec >>= return . InetAddress . \case + IP.IPv4 ipv4 -> IP.IPv6 $ IP.toIPv6w ( 0, 0, 0xffff, IP.fromIPv4w ipv4 ) + ipaddr -> ipaddr + + readListPrec = readListPrecDefault + +instance F.Storable InetAddress where + sizeOf _ = sizeOf (undefined :: CInt) + 16 + alignment _ = 8 + + peek ptr = (unpackFamily <$> peekByteOff ptr 0) >>= \case + AF_INET -> InetAddress . IP.IPv4 . IP.fromHostAddress <$> peekByteOff ptr (sizeOf (undefined :: CInt)) + AF_INET6 -> InetAddress . IP.IPv6 . IP.toIPv6b . map fromIntegral <$> peekArray 16 (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) + _ -> fail "InetAddress: unknown family" + + poke ptr (InetAddress addr) = case addr of + IP.IPv4 ip -> do + pokeByteOff ptr 0 (packFamily AF_INET) + pokeByteOff ptr (sizeOf (undefined :: CInt)) (IP.toHostAddress ip) + IP.IPv6 ip -> do + pokeByteOff ptr 0 (packFamily AF_INET6) + pokeArray (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) (map fromIntegral $ IP.fromIPv6b ip) + + +inetFromSockAddr :: SockAddr -> Maybe ( InetAddress, PortNumber ) +inetFromSockAddr saddr = first InetAddress <$> IP.fromSockAddr saddr + +inetToSockAddr :: ( InetAddress, PortNumber ) -> SockAddr +inetToSockAddr = IP.toSockAddr . first fromInetAddress diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index c340503..f67e296 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -3,6 +3,7 @@ module Erebos.Network.Protocol ( transportToObject, TransportHeader(..), TransportHeaderItem(..), + ServiceID(..), SecurityRequirement(..), WaitingRef(..), @@ -22,7 +23,8 @@ module Erebos.Network.Protocol ( connSetChannel, connClose, - RawStreamReader, RawStreamWriter, + RawStreamReader(..), RawStreamWriter(..), + StreamPacket(..), connAddWriteStream, connAddReadStream, readStreamToList, @@ -36,6 +38,7 @@ import Control.Applicative import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM +import Control.Exception import Control.Monad import Control.Monad.Except import Control.Monad.Trans @@ -68,9 +71,9 @@ import Erebos.Flow import Erebos.Identity import Erebos.Network.Channel import Erebos.Object -import Erebos.Service import Erebos.Storable import Erebos.Storage +import Erebos.UUID (UUID) protocolVersion :: Text @@ -107,6 +110,9 @@ data TransportHeaderItem | StreamOpen Word8 deriving (Eq, Show) +newtype ServiceID = ServiceID UUID + deriving (Eq, Ord, Show, StorableUUID) + newtype Cookie = Cookie ByteString deriving (Eq, Show) @@ -207,6 +213,7 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState , gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr) , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject)) , gLog :: String -> STM () + , gTestLog :: String -> STM () , gStorage :: PartialStorage , gStartTime :: TimeSpec , gNowVar :: TVar TimeSpec @@ -243,6 +250,12 @@ instance Eq (Connection addr) where connAddress :: Connection addr -> addr connAddress = cAddress +showConnAddress :: forall addr. Connection addr -> String +showConnAddress Connection {..} = helper cGlobalState cAddress + where + helper :: GlobalState addr -> addr -> String + helper GlobalState {} = show + connData :: Connection addr -> Flow (Maybe (Bool, TransportPacket PartialObject)) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) @@ -267,6 +280,7 @@ connClose conn@Connection {..} = do connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ())) connAddWriteStream conn@Connection {..} = do + let GlobalState {..} = cGlobalState outStreams <- readTVar cOutStreams let doInsert :: Word8 -> [(Word8, Stream)] -> ExceptT String STM ((Word8, Stream), [(Word8, Stream)]) doInsert n (s@(n', _) : rest) | n == n' = @@ -283,10 +297,16 @@ connAddWriteStream conn@Connection {..} = do runExceptT $ do ((streamNumber, stream), outStreams') <- doInsert 1 outStreams lift $ writeTVar cOutStreams outStreams' - return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream) + lift $ gTestLog $ "net-ostream-open " <> showConnAddress conn <> " " <> show streamNumber <> " " <> show (length outStreams') + return + ( StreamOpen streamNumber + , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream) + , go streamNumber stream + ) where - go gs@GlobalState {..} streamNumber stream = do + go streamNumber stream = do + let GlobalState {..} = cGlobalState (reserved, msg) <- atomically $ do readTVar (sState stream) >>= \case StreamRunning -> return () @@ -299,6 +319,8 @@ connAddWriteStream conn@Connection {..} = do return (stpData, True, return ()) StreamClosed {} -> do atomically $ do + gTestLog $ "net-ostream-close-send " <> showConnAddress conn <> " " <> show streamNumber + atomically $ do -- wait for ack on all sent stream data waits <- readTVar (sWaitingForAck stream) when (waits > 0) retry @@ -342,7 +364,7 @@ connAddWriteStream conn@Connection {..} = do sendBytes conn mbReserved' bs Nothing -> return () - when cont $ go gs streamNumber stream + when cont $ go streamNumber stream connAddReadStream :: Connection addr -> Word8 -> STM RawStreamReader connAddReadStream Connection {..} streamNumber = do @@ -356,14 +378,21 @@ connAddReadStream Connection {..} streamNumber = do sNextSequence <- newTVar 0 sWaitingForAck <- newTVar 0 let stream = Stream {..} - return (stream, (streamNumber, stream) : streams) - (stream, inStreams') <- doInsert inStreams + return ( streamNumber, stream, (streamNumber, stream) : streams ) + ( num, stream, inStreams' ) <- doInsert inStreams writeTVar cInStreams inStreams' - return $ sFlowOut stream + return $ RawStreamReader (fromIntegral num) (sFlowOut stream) -type RawStreamReader = Flow StreamPacket Void -type RawStreamWriter = Flow Void StreamPacket +data RawStreamReader = RawStreamReader + { rsrNum :: Int + , rsrFlow :: Flow StreamPacket Void + } + +data RawStreamWriter = RawStreamWriter + { rswNum :: Int + , rswFlow :: Flow Void StreamPacket + } data Stream = Stream { sState :: TVar StreamState @@ -394,11 +423,13 @@ streamAccepted Connection {..} snum = atomically $ do Nothing -> return () streamClosed :: Connection addr -> Word8 -> IO () -streamClosed Connection {..} snum = atomically $ do - modifyTVar' cOutStreams $ filter ((snum /=) . fst) +streamClosed conn@Connection {..} snum = atomically $ do + streams <- filter ((snum /=) . fst) <$> readTVar cOutStreams + writeTVar cOutStreams streams + gTestLog cGlobalState $ "net-ostream-close-ack " <> showConnAddress conn <> " " <> show snum <> " " <> show (length streams) readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) -readStreamToList stream = readFlowIO stream >>= \case +readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream StreamClosed sqEnd -> return (sqEnd, []) @@ -420,10 +451,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO () writeByteStringToStream stream = go 0 where go seqNum bstr - | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum + | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum | otherwise = do let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU - writeFlowIO stream $ StreamData seqNum (BL.toStrict cur) + writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur) go (seqNum + 1) rest @@ -477,10 +508,11 @@ data ControlMessage addr = NewConnection (Connection addr) (Maybe RefDigest) erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr) => UnifiedIdentity -> (String -> STM ()) + -> (String -> STM ()) -> SymFlow (addr, ByteString) -> Flow (ControlRequest addr) (ControlMessage addr) -> IO () -erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do +erebosNetworkProtocol initialIdentity gLog gTestLog gDataFlow gControlFlow = do gIdentity <- newTVarIO (initialIdentity, []) gConnections <- newTVarIO [] gNextUp <- newEmptyTMVarIO @@ -512,8 +544,10 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do race_ (waitTill next) waitForUpdate - race_ signalTimeouts $ forever $ join $ atomically $ - passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs + race_ signalTimeouts $ forever $ do + io <- atomically $ do + passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs + catch io $ \(e :: SomeException) -> atomically $ gLog $ "exception during network protocol handling: " <> show e getConnection :: GlobalState addr -> addr -> STM (Connection addr) @@ -542,6 +576,7 @@ newConnection cGlobalState@GlobalState {..} addr = do cOutStreams <- newTVar [] let conn = Connection {..} + gTestLog $ "net-conn-new " <> show cAddress writeTVar gConnections (conn : conns) return conn @@ -898,7 +933,10 @@ processOutgoing gs@GlobalState {..} = do , rsOnAck = rsOnAck rs >> onAck }) <$> mbReserved sendBytes conn mbReserved' bs - Nothing -> return () + Nothing -> do + when (isJust mbReserved) $ do + atomically $ do + modifyTVar' cReservedPackets (subtract 1) let waitUntil :: TimeSpec -> TimeSpec -> STM () waitUntil now till = do diff --git a/src/Erebos/Network/ifaddrs.c b/src/Erebos/Network/ifaddrs.c index ff4382a..8139b5e 100644 --- a/src/Erebos/Network/ifaddrs.c +++ b/src/Erebos/Network/ifaddrs.c @@ -22,7 +22,7 @@ #define DISCOVERY_MULTICAST_GROUP "ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1" -uint32_t * join_multicast(int fd, size_t * count) +uint32_t * erebos_join_multicast(int fd, size_t * count) { size_t capacity = 16; *count = 0; @@ -117,7 +117,7 @@ static bool copy_local_address( struct InetAddress * dst, const struct sockaddr #ifndef _WIN32 -struct InetAddress * local_addresses( size_t * count ) +struct InetAddress * erebos_local_addresses( size_t * count ) { struct ifaddrs * addrs; if( getifaddrs( &addrs ) < 0 ) @@ -153,7 +153,7 @@ struct InetAddress * local_addresses( size_t * count ) return ret; } -uint32_t * broadcast_addresses(void) +uint32_t * erebos_broadcast_addresses(void) { struct ifaddrs * addrs; if (getifaddrs(&addrs) < 0) @@ -196,7 +196,7 @@ uint32_t * broadcast_addresses(void) #pragma comment(lib, "ws2_32.lib") -struct InetAddress * local_addresses( size_t * count ) +struct InetAddress * erebos_local_addresses( size_t * count ) { * count = 0; struct InetAddress * ret = NULL; @@ -237,7 +237,7 @@ cleanup: return ret; } -uint32_t * broadcast_addresses(void) +uint32_t * erebos_broadcast_addresses(void) { uint32_t * ret = NULL; SOCKET wsock = INVALID_SOCKET; diff --git a/src/Erebos/Network/ifaddrs.h b/src/Erebos/Network/ifaddrs.h index 2ee45a7..2b3c014 100644 --- a/src/Erebos/Network/ifaddrs.h +++ b/src/Erebos/Network/ifaddrs.h @@ -13,6 +13,6 @@ struct InetAddress uint8_t addr[16]; } __attribute__((packed)); -uint32_t * join_multicast(int fd, size_t * count); -struct InetAddress * local_addresses( size_t * count ); -uint32_t * broadcast_addresses(void); +uint32_t * erebos_join_multicast(int fd, size_t * count); +struct InetAddress * erebos_local_addresses( size_t * count ); +uint32_t * erebos_broadcast_addresses(void); diff --git a/src/Erebos/Object.hs b/src/Erebos/Object.hs index 26ca09f..f00b63d 100644 --- a/src/Erebos/Object.hs +++ b/src/Erebos/Object.hs @@ -13,8 +13,9 @@ module Erebos.Object ( RecItem, RecItem'(..), Ref, PartialRef, RefDigest, - refDigest, - readRef, showRef, showRefDigest, + refDigest, refFromDigest, + readRef, showRef, + readRefDigest, showRefDigest, refDigestFromByteString, hashToRefDigest, copyRef, partialRef, partialRefFromDigest, ) where diff --git a/src/Erebos/Object/Internal.hs b/src/Erebos/Object/Internal.hs index 6111d2a..fdb587a 100644 --- a/src/Erebos/Object/Internal.hs +++ b/src/Erebos/Object/Internal.hs @@ -2,8 +2,9 @@ module Erebos.Object.Internal ( Storage, PartialStorage, StorageCompleteness, Ref, PartialRef, RefDigest, - refDigest, - readRef, showRef, showRefDigest, + refDigest, refFromDigest, + readRef, showRef, + readRefDigest, showRefDigest, refDigestFromByteString, hashToRefDigest, copyRef, partialRef, partialRefFromDigest, @@ -74,13 +75,14 @@ import Data.Time.Calendar import Data.Time.Clock import Data.Time.Format import Data.Time.LocalTime -import Data.UUID (UUID) -import qualified Data.UUID as U import System.IO.Unsafe import Erebos.Error import Erebos.Storage.Internal +import Erebos.UUID (UUID) +import Erebos.UUID qualified as U +import Erebos.Util zeroRef :: Storage' c -> Ref' c @@ -701,8 +703,6 @@ loadRawWeaks name = mapMaybe p <$> loadRecItems -type Stored a = Stored' Complete a - instance Storable a => Storable (Stored a) where store st = copyRef st . storedRef store' (Stored _ x) = store' x @@ -712,10 +712,10 @@ instance ZeroStorable a => ZeroStorable (Stored a) where fromZero st = Stored (zeroRef st) $ fromZero st fromStored :: Stored a -> a -fromStored (Stored _ x) = x +fromStored = storedObject' storedRef :: Stored a -> Ref -storedRef (Stored ref _) = ref +storedRef = storedRef' wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a) wrappedStore st x = do ref <- liftIO $ store st x @@ -724,9 +724,8 @@ wrappedStore st x = do ref <- liftIO $ store st x wrappedLoad :: Storable a => Ref -> Stored a wrappedLoad ref = Stored ref (load ref) -copyStored :: forall c c' m a. (StorageCompleteness c, StorageCompleteness c', MonadIO m) => - Storage' c' -> Stored' c a -> m (LoadResult c (Stored' c' a)) -copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (flip Stored x) <$> copyRef' st ref' +copyStored :: forall m a. MonadIO m => Storage -> Stored a -> m (Stored a) +copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (\r -> Stored r x) <$> copyRef' st ref' -- |Passed function needs to preserve the object representation to be safe unsafeMapStored :: (a -> b) -> Stored a -> Stored b diff --git a/src/Erebos/Pairing.hs b/src/Erebos/Pairing.hs index 703afcd..d1fdc79 100644 --- a/src/Erebos/Pairing.hs +++ b/src/Erebos/Pairing.hs @@ -17,9 +17,10 @@ import Control.Monad.Reader import Crypto.Random import Data.Bits -import Data.ByteArray (Bytes, convert) -import qualified Data.ByteArray as BA -import qualified Data.ByteString.Char8 as BC +import Data.ByteArray qualified as BA +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BC import Data.Kind import Data.Maybe import Data.Typeable @@ -34,16 +35,16 @@ import Erebos.State import Erebos.Storable data PairingService a = PairingRequest (Stored (Signed IdentityData)) (Stored (Signed IdentityData)) RefDigest - | PairingResponse Bytes - | PairingRequestNonce Bytes + | PairingResponse ByteString + | PairingRequestNonce ByteString | PairingAccept a | PairingReject data PairingState a = NoPairing - | OurRequest UnifiedIdentity UnifiedIdentity Bytes + | OurRequest UnifiedIdentity UnifiedIdentity ByteString | OurRequestConfirm (Maybe (PairingVerifiedResult a)) | OurRequestReady - | PeerRequest UnifiedIdentity UnifiedIdentity Bytes RefDigest + | PeerRequest UnifiedIdentity UnifiedIdentity ByteString RefDigest | PeerRequestConfirm | PairingDone @@ -88,7 +89,7 @@ instance Storable a => Storable (PairingService a) where load' = do res <- loadRec $ do - (req :: Maybe Bytes) <- loadMbBinary "request" + (req :: Maybe ByteString) <- loadMbBinary "request" idReq <- loadMbRef "id-req" idRsp <- loadMbRef "id-rsp" rsp <- loadMbBinary "response" @@ -171,7 +172,7 @@ instance PairingResult a => Service (PairingService a) where x@(OurRequestReady, _) -> reject $ uncurry PairingUnexpectedMessage x (PeerRequest peer self nonce dgst, PairingRequestNonce pnonce) -> do - if dgst == nonceDigest peer self pnonce BA.empty + if dgst == nonceDigest peer self pnonce BS.empty then do hook <- asks $ pairingHookRequestNonce . svcAttributes hook $ confirmationNumber $ nonceDigest peer self pnonce nonce svcSet PeerRequestConfirm @@ -188,12 +189,12 @@ reject reason = do replyPacket PairingReject -nonceDigest :: UnifiedIdentity -> UnifiedIdentity -> Bytes -> Bytes -> RefDigest +nonceDigest :: UnifiedIdentity -> UnifiedIdentity -> ByteString -> ByteString -> RefDigest nonceDigest idReq idRsp nonceReq nonceRsp = hashToRefDigest $ serializeObject $ Rec [ (BC.pack "id-req", RecRef $ storedRef $ idData idReq) , (BC.pack "id-rsp", RecRef $ storedRef $ idData idRsp) - , (BC.pack "nonce-req", RecBinary $ convert nonceReq) - , (BC.pack "nonce-rsp", RecBinary $ convert nonceRsp) + , (BC.pack "nonce-req", RecBinary nonceReq) + , (BC.pack "nonce-rsp", RecBinary nonceRsp) ] confirmationNumber :: RefDigest -> String @@ -208,11 +209,11 @@ pairingRequest :: forall a m e proxy. (PairingResult a, MonadIO m, MonadError e pairingRequest _ peer = do self <- liftIO $ serverIdentity $ peerServer peer nonce <- liftIO $ getRandomBytes 32 - pid <- peerIdentity peer >>= \case + pid <- getPeerIdentity peer >>= \case PeerIdentityFull pid -> return pid _ -> throwOtherError "incomplete peer identity" sendToPeerWith @(PairingService a) peer $ \case - NoPairing -> return (Just $ PairingRequest (idData self) (idData pid) (nonceDigest self pid nonce BA.empty), OurRequest self pid nonce) + NoPairing -> return (Just $ PairingRequest (idData self) (idData pid) (nonceDigest self pid nonce BS.empty), OurRequest self pid nonce) _ -> throwOtherError "already in progress" pairingAccept :: forall a m e proxy. (PairingResult a, MonadIO m, MonadError e m, FromErebosError e) => proxy a -> Peer -> m () diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs index e95e700..303f9db 100644 --- a/src/Erebos/Service.hs +++ b/src/Erebos/Service.hs @@ -29,14 +29,14 @@ import Control.Monad.Writer import Data.Kind import Data.Typeable -import Data.UUID (UUID) -import qualified Data.UUID as U import Erebos.Identity import {-# SOURCE #-} Erebos.Network +import Erebos.Network.Protocol import Erebos.State import Erebos.Storable import Erebos.Storage.Head +import Erebos.UUID qualified as U class ( Typeable s, Storable s, @@ -51,6 +51,9 @@ class ( serviceNewPeer :: ServiceHandler s () serviceNewPeer = return () + serviceUpdatedPeer :: ServiceHandler s () + serviceUpdatedPeer = return () + type ServiceAttributes s = attr | attr -> s type ServiceAttributes s = Proxy s defaultServiceAttributes :: proxy s -> ServiceAttributes s @@ -72,6 +75,9 @@ class ( serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s] serviceStorageWatchers _ = [] + serviceStopServer :: proxy s -> Server -> ServiceGlobalState s -> [ ( Peer, ServiceState s ) ] -> IO () + serviceStopServer _ _ _ _ = return () + data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s) @@ -101,11 +107,10 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p) -data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) - +data SomeStorageWatcher s + = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) + | forall a. Eq a => GlobalStorageWatcher (Stored LocalState -> a) (Server -> a -> ExceptT ErebosError IO ()) -newtype ServiceID = ServiceID UUID - deriving (Eq, Ord, Show, StorableUUID) mkServiceID :: String -> ServiceID mkServiceID = maybe (error "Invalid service ID") ServiceID . U.fromString @@ -113,13 +118,17 @@ mkServiceID = maybe (error "Invalid service ID") ServiceID . U.fromString data ServiceInput s = ServiceInput { svcAttributes :: ServiceAttributes s , svcPeer :: Peer + , svcPeerAddress :: PeerAddress , svcPeerIdentity :: UnifiedIdentity , svcServer :: Server , svcPrintOp :: String -> IO () + , svcNewStreams :: [ RawStreamReader ] } -data ServiceReply s = ServiceReply (Either s (Stored s)) Bool - | ServiceFinally (IO ()) +data ServiceReply s + = ServiceReply (Either s (Stored s)) Bool + | ServiceOpenStream (RawStreamWriter -> IO ()) + | ServiceFinally (IO ()) data ServiceHandlerState s = ServiceHandlerState { svcValue :: ServiceState s diff --git a/src/Erebos/Service/Stream.hs b/src/Erebos/Service/Stream.hs new file mode 100644 index 0000000..67df4d7 --- /dev/null +++ b/src/Erebos/Service/Stream.hs @@ -0,0 +1,74 @@ +module Erebos.Service.Stream ( + StreamPacket(..), + StreamReader, getStreamReaderNumber, + StreamWriter, getStreamWriterNumber, + openStream, receivedStreams, + readStreamPacket, writeStreamPacket, + writeStream, + closeStream, +) where + +import Control.Concurrent.MVar +import Control.Monad.Reader +import Control.Monad.Writer + +import Data.ByteString (ByteString) +import Data.Word + +import Erebos.Flow +import Erebos.Network +import Erebos.Network.Protocol +import Erebos.Service + + +data StreamReader = StreamReader RawStreamReader + +getStreamReaderNumber :: StreamReader -> IO Int +getStreamReaderNumber (StreamReader stream) = return $ rsrNum stream + +data StreamWriter = StreamWriter (MVar StreamWriterData) + +data StreamWriterData = StreamWriterData + { swdStream :: RawStreamWriter + , swdSequence :: Maybe Word64 + } + +getStreamWriterNumber :: StreamWriter -> IO Int +getStreamWriterNumber (StreamWriter stream) = rswNum . swdStream <$> readMVar stream + + +openStream :: Service s => ServiceHandler s StreamWriter +openStream = do + mvar <- liftIO newEmptyMVar + tell [ ServiceOpenStream $ \stream -> putMVar mvar $ StreamWriterData stream (Just 0) ] + return $ StreamWriter mvar + +receivedStreams :: Service s => ServiceHandler s [ StreamReader ] +receivedStreams = do + map StreamReader <$> asks svcNewStreams + +readStreamPacket :: StreamReader -> IO StreamPacket +readStreamPacket (StreamReader stream) = do + readFlowIO (rsrFlow stream) + +writeStreamPacket :: StreamWriter -> StreamPacket -> IO () +writeStreamPacket (StreamWriter mvar) packet = do + withMVar mvar $ \swd -> do + writeFlowIO (rswFlow $ swdStream swd) packet + +writeStream :: StreamWriter -> ByteString -> IO () +writeStream (StreamWriter mvar) bytes = do + modifyMVar_ mvar $ \swd -> do + case swdSequence swd of + Just seqNum -> do + writeFlowIO (rswFlow $ swdStream swd) $ StreamData seqNum bytes + return swd { swdSequence = Just (seqNum + 1) } + Nothing -> do + fail "writeStream: stream closed" + +closeStream :: StreamWriter -> IO () +closeStream (StreamWriter mvar) = do + withMVar mvar $ \swd -> do + case swdSequence swd of + Just seqNum -> writeFlowIO (rswFlow $ swdStream swd) $ StreamClosed seqNum + Nothing -> fail "closeStream: stream already closed" diff --git a/src/Erebos/Set.hs b/src/Erebos/Set.hs index 270c0ba..7453be4 100644 --- a/src/Erebos/Set.hs +++ b/src/Erebos/Set.hs @@ -10,7 +10,6 @@ module Erebos.Set ( ) where import Control.Arrow -import Control.Monad.IO.Class import Data.Function import Data.List @@ -53,14 +52,14 @@ emptySet = Set [] loadSet :: Mergeable a => Ref -> Set a loadSet = mergeSorted . (:[]) . wrappedLoad -storeSetAdd :: (Mergeable a, MonadIO m) => Storage -> a -> Set a -> m (Set a) -storeSetAdd st x (Set prev) = Set . (:[]) <$> wrappedStore st SetItem +storeSetAdd :: (Mergeable a, MonadStorage m) => a -> Set a -> m (Set a) +storeSetAdd x (Set prev) = Set . (: []) <$> mstore SetItem { siPrev = prev , siItem = toComponents x } -storeSetAddComponent :: (Mergeable a, MonadStorage m, MonadIO m) => Stored (Component a) -> Set a -> m (Set a) -storeSetAddComponent component (Set prev) = Set . (:[]) <$> mstore SetItem +storeSetAddComponent :: (Mergeable a, MonadStorage m) => Stored (Component a) -> Set a -> m (Set a) +storeSetAddComponent component (Set prev) = Set . (: []) <$> mstore SetItem { siPrev = prev , siItem = [ component ] } diff --git a/src/Erebos/State.hs b/src/Erebos/State.hs index 5ce9952..06e5c54 100644 --- a/src/Erebos/State.hs +++ b/src/Erebos/State.hs @@ -6,6 +6,7 @@ module Erebos.State ( MonadStorage(..), MonadHead(..), updateLocalHead_, + LocalHeadT(..), updateLocalState, updateLocalState_, updateSharedState, updateSharedState_, @@ -17,14 +18,14 @@ module Erebos.State ( mergeSharedIdentity, ) where +import Control.Monad import Control.Monad.Except import Control.Monad.Reader +import Data.Bifunctor import Data.ByteString (ByteString) import Data.ByteString.Char8 qualified as BC import Data.Typeable -import Data.UUID (UUID) -import Data.UUID qualified as U import Erebos.Identity import Erebos.Object @@ -32,6 +33,8 @@ import Erebos.PubKey import Erebos.Storable import Erebos.Storage.Head import Erebos.Storage.Merge +import Erebos.UUID (UUID) +import Erebos.UUID qualified as U data LocalState = LocalState { lsPrev :: Maybe RefDigest @@ -66,7 +69,7 @@ instance Storable LocalState where lsPrev <- loadMbRawWeak "PREV" lsIdentity <- loadRef "id" lsShared <- loadRefs "shared" - lsOther <- filter ((`notElem` [ BC.pack "id", BC.pack "shared" ]) . fst) <$> loadRecItems + lsOther <- filter ((`notElem` [ BC.pack "PREV", BC.pack "id", BC.pack "shared" ]) . fst) <$> loadRecItems return LocalState {..} instance HeadType LocalState where @@ -101,6 +104,35 @@ instance (HeadType a, MonadIO m) => MonadHead a (ReaderT (Head a) m) where snd <$> updateHead h f +newtype LocalHeadT h m a = LocalHeadT { runLocalHeadT :: Storage -> Stored h -> m ( a, Stored h ) } + +instance Functor m => Functor (LocalHeadT h m) where + fmap f (LocalHeadT act) = LocalHeadT $ \st h -> first f <$> act st h + +instance Monad m => Applicative (LocalHeadT h m) where + pure x = LocalHeadT $ \_ h -> pure ( x, h ) + (<*>) = ap + +instance Monad m => Monad (LocalHeadT h m) where + return = pure + LocalHeadT act >>= f = LocalHeadT $ \st h -> do + ( x, h' ) <- act st h + let (LocalHeadT act') = f x + act' st h' + +instance MonadIO m => MonadIO (LocalHeadT h m) where + liftIO act = LocalHeadT $ \_ h -> ( , h ) <$> liftIO act + +instance MonadIO m => MonadStorage (LocalHeadT h m) where + getStorage = LocalHeadT $ \st h -> return ( st, h ) + +instance (HeadType h, MonadIO m) => MonadHead h (LocalHeadT h m) where + updateLocalHead f = LocalHeadT $ \st h -> do + let LocalHeadT act = f h + ( ( h', x ), _ ) <- act st h + return ( x, h' ) + + localIdentity :: LocalState -> UnifiedIdentity localIdentity ls = maybe (error "failed to verify local identity") (updateOwners $ maybe [] idExtDataF $ lookupSharedValue $ lsShared ls) @@ -128,12 +160,11 @@ updateSharedState :: forall a b m. (SharedType a, MonadHead LocalState m) => (a updateSharedState f = \ls -> do let shared = lsShared $ fromStored ls val = lookupSharedValue shared - st <- getStorage (val', x) <- f val (,x) <$> if toComponents val' == toComponents val then return ls - else do shared' <- makeSharedStateUpdate st val' shared - wrappedStore st (fromStored ls) { lsShared = [shared'] } + else do shared' <- makeSharedStateUpdate val' shared + mstore (fromStored ls) { lsShared = [shared'] } lookupSharedValue :: forall a. SharedType a => [Stored SharedState] -> a lookupSharedValue = mergeSorted . filterAncestors . map wrappedLoad . concatMap (ssValue . fromStored) . filterAncestors . helper @@ -141,8 +172,8 @@ lookupSharedValue = mergeSorted . filterAncestors . map wrappedLoad . concatMap | otherwise = helper $ ssPrev (fromStored x) ++ xs helper [] = [] -makeSharedStateUpdate :: forall a m. MonadIO m => SharedType a => Storage -> a -> [Stored SharedState] -> m (Stored SharedState) -makeSharedStateUpdate st val prev = liftIO $ wrappedStore st SharedState +makeSharedStateUpdate :: forall a m. (SharedType a, MonadStorage m) => a -> [ Stored SharedState ] -> m (Stored SharedState) +makeSharedStateUpdate val prev = mstore SharedState { ssPrev = prev , ssType = Just $ sharedTypeID @a Proxy , ssValue = storedRef <$> toComponents val diff --git a/src/Erebos/Storage/Disk.hs b/src/Erebos/Storage/Disk.hs index 370c584..8e35940 100644 --- a/src/Erebos/Storage/Disk.hs +++ b/src/Erebos/Storage/Disk.hs @@ -18,7 +18,6 @@ import Data.ByteString.Lazy.Char8 qualified as BLC import Data.Function import Data.List import Data.Maybe -import Data.UUID qualified as U import System.Directory import System.FSNotify @@ -31,6 +30,7 @@ import Erebos.Storage.Backend import Erebos.Storage.Head import Erebos.Storage.Internal import Erebos.Storage.Platform +import Erebos.UUID qualified as U data DiskStorage = StorageDir diff --git a/src/Erebos/Storage/Head.hs b/src/Erebos/Storage/Head.hs index 8f8e009..285902d 100644 --- a/src/Erebos/Storage/Head.hs +++ b/src/Erebos/Storage/Head.hs @@ -28,13 +28,12 @@ import Control.Monad.Reader import Data.Bifunctor import Data.Typeable -import Data.UUID qualified as U -import Data.UUID.V4 qualified as U import Erebos.Object import Erebos.Storable import Erebos.Storage.Backend import Erebos.Storage.Internal +import Erebos.UUID qualified as U -- | Represents loaded Erebos storage head, along with the object it pointed to @@ -114,7 +113,7 @@ loadHeadRaw st@Storage {..} tid hid = do -- | Reload the given head from storage, returning `Head' with updated object, -- or `Nothing' if there is no longer head with the particular ID in storage. reloadHead :: (HeadType a, MonadIO m) => Head a -> m (Maybe (Head a)) -reloadHead (Head hid (Stored (Ref st _) _)) = loadHead st hid +reloadHead (Head hid val) = loadHead (storedStorage val) hid -- | Store a new `Head' of type 'a' in the storage. storeHead :: forall a m. MonadIO m => HeadType a => Storage -> a -> m (Head a) @@ -233,8 +232,8 @@ watchHeadWith -> (Head a -> b) -- ^ Selector function -> (b -> IO ()) -- ^ Callback -> IO WatchedHead -- ^ Watched head handle -watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do - watchHeadRaw st (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb +watchHeadWith (Head hid val) sel cb = do + watchHeadRaw (storedStorage val) (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb -- | Watch the given head using raw IDs and a selector from `Ref'. watchHeadRaw :: forall b. Eq b => Storage -> HeadTypeID -> HeadID -> (Ref -> b) -> (b -> IO ()) -> IO WatchedHead diff --git a/src/Erebos/Storage/Internal.hs b/src/Erebos/Storage/Internal.hs index 6df1410..db211bb 100644 --- a/src/Erebos/Storage/Internal.hs +++ b/src/Erebos/Storage/Internal.hs @@ -1,32 +1,55 @@ -module Erebos.Storage.Internal where +module Erebos.Storage.Internal ( + Storage'(..), Storage, PartialStorage, + Ref'(..), Ref, PartialRef, + RefDigest(..), + WatchID, startWatchID, nextWatchID, + WatchList(..), WatchListItem(..), watchListAdd, watchListDel, + + refStorage, + refDigest, refDigestFromByteString, + showRef, showRefDigest, showRefDigestParts, + readRefDigest, + hashToRefDigest, + + StorageCompleteness(..), + StorageBackend(..), + Complete, Partial, + + unsafeStoreRawBytes, + ioLoadBytesFromStorage, + + Generation(..), + HeadID(..), HeadTypeID(..), + Stored(..), storedStorage, +) where import Control.Arrow import Control.Concurrent import Control.DeepSeq import Control.Exception -import Control.Monad import Control.Monad.Identity import Crypto.Hash import Data.Bits -import Data.ByteArray (ByteArray, ByteArrayAccess, ScrubbedBytes) +import Data.ByteArray (ByteArrayAccess, ScrubbedBytes) import Data.ByteArray qualified as BA import Data.ByteString (ByteString) -import Data.ByteString qualified as B import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL -import Data.Char +import Data.Function import Data.HashTable.IO qualified as HT import Data.Hashable import Data.Kind import Data.Typeable -import Data.UUID (UUID) import Foreign.Storable (peek) import System.IO.Unsafe (unsafePerformIO) +import Erebos.UUID (UUID) +import Erebos.Util + data Storage' c = forall bck. (StorageBackend bck, BackendCompleteness bck ~ c) => Storage { stBackend :: bck @@ -196,35 +219,15 @@ showRefDigest = showRefDigestParts >>> \(alg, hex) -> alg <> BC.pack "#" <> hex readRefDigest :: ByteString -> Maybe RefDigest readRefDigest x = case BC.split '#' x of [alg, dgst] | BA.convert alg == BC.pack "blake2" -> - refDigestFromByteString =<< readHex @ByteString dgst + refDigestFromByteString =<< readHex dgst _ -> Nothing -refDigestFromByteString :: ByteArrayAccess ba => ba -> Maybe RefDigest +refDigestFromByteString :: ByteString -> Maybe RefDigest refDigestFromByteString = fmap RefDigest . digestFromByteString hashToRefDigest :: BL.ByteString -> RefDigest hashToRefDigest = RefDigest . hashFinalize . hashUpdates hashInit . BL.toChunks -showHex :: ByteArrayAccess ba => ba -> ByteString -showHex = B.concat . map showHexByte . BA.unpack - where showHexChar x | x < 10 = x + o '0' - | otherwise = x + o 'a' - 10 - showHexByte x = B.pack [ showHexChar (x `div` 16), showHexChar (x `mod` 16) ] - o = fromIntegral . ord - -readHex :: ByteArray ba => ByteString -> Maybe ba -readHex = return . BA.concat <=< readHex' - where readHex' bs | B.null bs = Just [] - readHex' bs = do (bx, bs') <- B.uncons bs - (by, bs'') <- B.uncons bs' - x <- hexDigit bx - y <- hexDigit by - (B.singleton (x * 16 + y) :) <$> readHex' bs'' - hexDigit x | x >= o '0' && x <= o '9' = Just $ x - o '0' - | x >= o 'a' && x <= o 'z' = Just $ x - o 'a' + 10 - | otherwise = Nothing - o = fromIntegral . ord - newtype Generation = Generation Int deriving (Eq, Show) @@ -237,17 +240,20 @@ newtype HeadID = HeadID UUID newtype HeadTypeID = HeadTypeID UUID deriving (Eq, Ord) -data Stored' c a = Stored (Ref' c) a +data Stored a = Stored + { storedRef' :: Ref + , storedObject' :: a + } deriving (Show) -instance Eq (Stored' c a) where - Stored r1 _ == Stored r2 _ = refDigest r1 == refDigest r2 +instance Eq (Stored a) where + (==) = (==) `on` (refDigest . storedRef') -instance Ord (Stored' c a) where - compare (Stored r1 _) (Stored r2 _) = compare (refDigest r1) (refDigest r2) +instance Ord (Stored a) where + compare = compare `on` (refDigest . storedRef') -storedStorage :: Stored' c a -> Storage' c -storedStorage (Stored (Ref st _) _) = st +storedStorage :: Stored a -> Storage +storedStorage = refStorage . storedRef' type Complete = Identity diff --git a/src/Erebos/Storage/Memory.hs b/src/Erebos/Storage/Memory.hs index 677e8c5..26bb181 100644 --- a/src/Erebos/Storage/Memory.hs +++ b/src/Erebos/Storage/Memory.hs @@ -4,7 +4,8 @@ module Erebos.Storage.Memory ( derivePartialStorage, ) where -import Control.Concurrent.MVar +import Control.Concurrent +import Control.Monad import Data.ByteArray (ScrubbedBytes) import Data.ByteString.Lazy qualified as BL @@ -62,14 +63,19 @@ instance (StorageCompleteness c, Typeable p) => StorageBackend (MemoryStorage p backendReplaceHead StorageMemory {..} tid hid expected new = do res <- modifyMVar memHeads $ \hs -> do - ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar memWatchers - return $ case partition ((==(tid, hid)) . fst) hs of - ( [] , _ ) -> ( hs, Left Nothing ) + case partition ((==(tid, hid)) . fst) hs of + ( [] , _ ) -> return ( hs, Left Nothing ) (( _, dgst ) : _, hs' ) - | dgst == expected -> ((( tid, hid ), new ) : hs', Right ( new, ws )) - | otherwise -> ( hs, Left $ Just dgst ) + | dgst == expected -> do + ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar memWatchers + return ((( tid, hid ), new ) : hs', Right ( new, ws )) + | otherwise -> do + return ( hs, Left $ Just dgst ) case res of - Right ( dgst, ws ) -> mapM_ ($ dgst) ws >> return (Right dgst) + Right ( dgst, ws ) -> do + void $ forkIO $ do + mapM_ ($ dgst) ws + return (Right dgst) Left x -> return $ Left x backendWatchHead StorageMemory {..} tid hid cb = modifyMVar memWatchers $ return . watchListAdd tid hid cb diff --git a/src/Erebos/Storage/Merge.hs b/src/Erebos/Storage/Merge.hs index 41725af..8221e91 100644 --- a/src/Erebos/Storage/Merge.hs +++ b/src/Erebos/Storage/Merge.hs @@ -7,7 +7,7 @@ module Erebos.Storage.Merge ( compareGeneration, generationMax, storedGeneration, - generations, + generations, generationsBy, ancestors, precedes, precedesOrEquals, @@ -17,6 +17,8 @@ module Erebos.Storage.Merge ( findProperty, findPropertyFirst, + + storedDifference, ) where import Control.Concurrent.MVar @@ -25,6 +27,8 @@ import Data.ByteString.Char8 qualified as BC import Data.HashTable.IO qualified as HT import Data.Kind import Data.List +import Data.List.NonEmpty (NonEmpty) +import Data.List.NonEmpty qualified as NE import Data.Maybe import Data.Set (Set) import Data.Set qualified as S @@ -52,7 +56,7 @@ merge xs = mergeSorted $ filterAncestors xs storeMerge :: (Mergeable a, Storable a) => [Stored (Component a)] -> IO (Stored a) storeMerge [] = error "merge: empty list" -storeMerge xs@(Stored ref _ : _) = wrappedStore (refStorage ref) $ mergeSorted $ filterAncestors xs +storeMerge xs@(x : _) = wrappedStore (storedStorage x) $ mergeSorted $ filterAncestors xs previous :: Storable a => Stored a -> [Stored a] previous (Stored ref _) = case load ref of @@ -100,16 +104,24 @@ storedGeneration x = -- |Returns list of sets starting with the set of given objects and -- intcrementally adding parents. -generations :: Storable a => [Stored a] -> [Set (Stored a)] -generations = unfoldr gen . (,S.empty) - where gen (hs, cur) = case filter (`S.notMember` cur) hs of - [] -> Nothing - added -> let next = foldr S.insert cur added - in Just (next, (previous =<< added, next)) +generations :: Storable a => [ Stored a ] -> NonEmpty (Set (Stored a)) +generations = generationsBy previous + +-- |Returns list of sets starting with the set of given objects and +-- intcrementally adding parents, with the first parameter being +-- a function to get all the parents of given object. +generationsBy :: Ord a => (a -> [ a ]) -> [ a ] -> NonEmpty (Set a) +generationsBy parents xs = NE.unfoldr gen ( xs, S.fromList xs ) + where + gen ( hs, cur ) = ( cur, ) $ + case filter (`S.notMember` cur) (parents =<< hs) of + [] -> Nothing + added -> let next = foldr S.insert cur added + in Just ( added, next ) -- |Returns set containing all given objects and their ancestors ancestors :: Storable a => [Stored a] -> Set (Stored a) -ancestors = last . (S.empty:) . generations +ancestors = NE.last . generations precedes :: Storable a => Stored a -> Stored a -> Bool precedes x y = not $ x `elem` filterAncestors [x, y] @@ -162,3 +174,18 @@ findPropertyFirst sel = fmap (fromJust . sel . fromStored) . listToMaybe . filte findPropHeads :: forall a b. Storable a => (a -> Maybe b) -> Stored a -> [Stored a] findPropHeads sel sobj | Just _ <- sel $ fromStored sobj = [sobj] | otherwise = findPropHeads sel =<< previous sobj + + +-- | Compute symmetrict difference between two stored histories. In other +-- words, return all 'Stored a' objects reachable (via 'previous') from first +-- given set, but not from the second; and vice versa. +storedDifference :: Storable a => [ Stored a ] -> [ Stored a ] -> [ Stored a ] +storedDifference xs' ys' = + let xs = filterAncestors xs' + ys = filterAncestors ys' + + filteredPrevious blocked zs = filterAncestors (previous zs ++ blocked) `diffSorted` blocked + xg = S.toAscList $ NE.last $ generationsBy (filteredPrevious ys) $ filterAncestors (xs ++ ys) `diffSorted` ys + yg = S.toAscList $ NE.last $ generationsBy (filteredPrevious xs) $ filterAncestors (ys ++ xs) `diffSorted` xs + + in xg `mergeUniq` yg diff --git a/src/Erebos/Sync.hs b/src/Erebos/Sync.hs index d837a14..5f5fdec 100644 --- a/src/Erebos/Sync.hs +++ b/src/Erebos/Sync.hs @@ -31,6 +31,7 @@ instance Service SyncService where else return ls serviceNewPeer = notifyPeer . lsShared . fromStored =<< svcGetLocal + serviceUpdatedPeer = serviceNewPeer serviceStorageWatchers _ = (:[]) $ SomeStorageWatcher (lsShared . fromStored) notifyPeer instance Storable SyncService where diff --git a/src/Erebos/UUID.hs b/src/Erebos/UUID.hs new file mode 100644 index 0000000..128d450 --- /dev/null +++ b/src/Erebos/UUID.hs @@ -0,0 +1,24 @@ +module Erebos.UUID ( + UUID, + toString, fromString, + toText, fromText, + toASCIIBytes, fromASCIIBytes, + nextRandom, +) where + +import Crypto.Random.Entropy + +import Data.Bits +import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as BSL +import Data.Maybe +import Data.UUID.Types + +nextRandom :: IO UUID +nextRandom = do + [ b0, b1, b2, b3, b4, b5, b6, b7, b8, b9, ba, bb, bc, bd, be, bf ] + <- BS.unpack <$> getEntropy 16 + let version = 4 + b6' = b6 .&. 0x0f .|. (version `shiftL` 4) + b8' = b8 .&. 0x3f .|. 0x80 + return $ fromJust $ fromByteString $ BSL.pack [ b0, b1, b2, b3, b4, b5, b6', b7, b8', b9, ba, bb, bc, bd, be, bf ] diff --git a/src/Erebos/Util.hs b/src/Erebos/Util.hs index ffca9c7..0d53e98 100644 --- a/src/Erebos/Util.hs +++ b/src/Erebos/Util.hs @@ -1,5 +1,14 @@ module Erebos.Util where +import Control.Monad + +import Data.ByteArray (ByteArray, ByteArrayAccess) +import Data.ByteArray qualified as BA +import Data.ByteString (ByteString) +import Data.ByteString qualified as B +import Data.Char + + uniq :: Eq a => [a] -> [a] uniq (x:y:xs) | x == y = uniq (x:xs) | otherwise = x : uniq (y:xs) @@ -13,15 +22,16 @@ mergeBy cmp (x : xs) (y : ys) = case cmp x y of mergeBy _ xs [] = xs mergeBy _ [] ys = ys -mergeUniqBy :: (a -> a -> Ordering) -> [a] -> [a] -> [a] -mergeUniqBy cmp (x : xs) (y : ys) = case cmp x y of - LT -> x : mergeBy cmp xs (y : ys) - EQ -> x : mergeBy cmp xs ys - GT -> y : mergeBy cmp (x : xs) ys +mergeUniqBy :: (a -> a -> Ordering) -> [ a ] -> [ a ] -> [ a ] +mergeUniqBy cmp (x : xs) (y : ys) = + case cmp x y of + LT -> x : mergeUniqBy cmp xs (y : ys) + EQ -> x : mergeUniqBy cmp xs ys + GT -> y : mergeUniqBy cmp (x : xs) ys mergeUniqBy _ xs [] = xs mergeUniqBy _ [] ys = ys -mergeUniq :: Ord a => [a] -> [a] -> [a] +mergeUniq :: Ord a => [ a ] -> [ a ] -> [ a ] mergeUniq = mergeUniqBy compare diffSorted :: Ord a => [a] -> [a] -> [a] @@ -35,3 +45,24 @@ intersectsSorted (x:xs) (y:ys) | x < y = intersectsSorted xs (y:ys) | x > y = intersectsSorted (x:xs) ys | otherwise = True intersectsSorted _ _ = False + + +showHex :: ByteArrayAccess ba => ba -> ByteString +showHex = B.concat . map showHexByte . BA.unpack + where showHexChar x | x < 10 = x + o '0' + | otherwise = x + o 'a' - 10 + showHexByte x = B.pack [ showHexChar (x `div` 16), showHexChar (x `mod` 16) ] + o = fromIntegral . ord + +readHex :: ByteArray ba => ByteString -> Maybe ba +readHex = return . BA.concat <=< readHex' + where readHex' bs | B.null bs = Just [] + readHex' bs = do (bx, bs') <- B.uncons bs + (by, bs'') <- B.uncons bs' + x <- hexDigit bx + y <- hexDigit by + (B.singleton (x * 16 + y) :) <$> readHex' bs'' + hexDigit x | x >= o '0' && x <= o '9' = Just $ x - o '0' + | x >= o 'a' && x <= o 'z' = Just $ x - o 'a' + 10 + | otherwise = Nothing + o = fromIntegral . ord |