diff options
Diffstat (limited to 'main/Test.hs')
-rw-r--r-- | main/Test.hs | 144 |
1 files changed, 116 insertions, 28 deletions
diff --git a/main/Test.hs b/main/Test.hs index 08ad880..b07bd87 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -26,7 +26,7 @@ import Data.Text qualified as T import Data.Text.Encoding import Data.Text.IO qualified as T import Data.Typeable -import Data.UUID qualified as U +import Data.UUID.Types qualified as U import Network.Socket @@ -44,6 +44,7 @@ import Erebos.Object import Erebos.Pairing import Erebos.PubKey import Erebos.Service +import Erebos.Service.Stream import Erebos.Set import Erebos.State import Erebos.Storable @@ -66,10 +67,17 @@ data TestState = TestState data RunningServer = RunningServer { rsServer :: Server - , rsPeers :: MVar (Int, [(Int, Peer)]) + , rsPeers :: MVar ( Int, [ TestPeer ] ) , rsPeerThread :: ThreadId } +data TestPeer = TestPeer + { tpIndex :: Int + , tpPeer :: Peer + , tpStreamReaders :: MVar [ (Int, StreamReader ) ] + , tpStreamWriters :: MVar [ (Int, StreamWriter ) ] + } + initTestState :: TestState initTestState = TestState { tsHead = Nothing @@ -137,17 +145,20 @@ cmdOut line = do getPeer :: Text -> CommandM Peer -getPeer spidx = do +getPeer spidx = tpPeer <$> getTestPeer spidx + +getTestPeer :: Text -> CommandM TestPeer +getTestPeer spidx = do Just RunningServer {..} <- gets tsServer - Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers) + Just peer <- find (((read $ T.unpack spidx) ==) . tpIndex) . snd <$> liftIO (readMVar rsPeers) return peer -getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int +getPeerIndex :: MVar ( Int, [ TestPeer ] ) -> ServiceHandler s Int getPeerIndex pmvar = do peer <- asks svcPeer - maybe 0 fst . find ((==peer) . snd) . snd <$> liftIO (readMVar pmvar) + maybe 0 tpIndex . find ((peer ==) . tpPeer) . snd <$> liftIO (readMVar pmvar) -pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar (Int, [(Int, Peer)]) -> String -> PairingAttributes a +pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar ( Int, [ TestPeer ] ) -> String -> PairingAttributes a pairingAttributes _ out peers prefix = PairingAttributes { pairingHookRequest = return () @@ -216,6 +227,11 @@ directMessageAttributes out = DirectMessageAttributes { dmOwnerMismatch = afterCommit $ outLine out "dm-owner-mismatch" } +discoveryAttributes :: DiscoveryAttributes +discoveryAttributes = (defaultServiceAttributes Proxy) + { discoveryProvideTunnel = const True + } + dmReceivedWatcher :: Output -> Stored DirectMessage -> IO () dmReceivedWatcher out smsg = do let msg = fromStored smsg @@ -267,6 +283,9 @@ commands = map (T.pack *** id) , ("peer-drop", cmdPeerDrop) , ("peer-list", cmdPeerList) , ("test-message-send", cmdTestMessageSend) + , ("test-stream-open", cmdTestStreamOpen) + , ("test-stream-close", cmdTestStreamClose) + , ("test-stream-send", cmdTestStreamSend) , ("local-state-get", cmdLocalStateGet) , ("local-state-replace", cmdLocalStateReplace) , ("local-state-wait", cmdLocalStateWait) @@ -286,6 +305,7 @@ commands = map (T.pack *** id) , ("contact-set-name", cmdContactSetName) , ("dm-send-peer", cmdDmSendPeer) , ("dm-send-contact", cmdDmSendContact) + , ("dm-send-identity", cmdDmSendIdentity) , ("dm-list-peer", cmdDmListPeer) , ("dm-list-contact", cmdDmListContact) , ("chatroom-create", cmdChatroomCreate) @@ -301,6 +321,7 @@ commands = map (T.pack *** id) , ("chatroom-leave", cmdChatroomLeave) , ("chatroom-message-send", cmdChatroomMessageSend) , ("discovery-connect", cmdDiscoveryConnect) + , ("discovery-tunnel", cmdDiscoveryTunnel) ] cmdStore :: Command @@ -454,7 +475,8 @@ cmdCreateIdentity = do _ -> return [] storeHead st $ LocalState - { lsIdentity = idExtData identity + { lsPrev = Nothing + , lsIdentity = idExtData identity , lsShared = shared , lsOther = [] } @@ -493,14 +515,27 @@ cmdStartServer = do "attach" -> return $ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach" "chatroom" -> return $ someService @ChatroomService Proxy "contact" -> return $ someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact" - "discovery" -> return $ someService @DiscoveryService Proxy + "discovery" -> return $ someServiceAttr $ discoveryAttributes "dm" -> return $ someServiceAttr $ directMessageAttributes out "sync" -> return $ someService @SyncService Proxy "test" -> return $ someServiceAttr $ (defaultServiceAttributes Proxy) { testMessageReceived = \obj otype len sref -> do liftIO $ do void $ store (headStorage h) obj - outLine out $ unwords ["test-message-received", otype, len, sref] + outLine out $ unwords [ "test-message-received", otype, len, sref ] + , testStreamsReceived = \streams -> do + pidx <- getPeerIndex rsPeers + liftIO $ do + nums <- mapM getStreamReaderNumber streams + outLine out $ unwords $ "test-stream-open-from" : show pidx : map show nums + forM_ (zip nums streams) $ \( num, stream ) -> void $ forkIO $ do + let go = readStreamPacket stream >>= \case + StreamData seqNum bytes -> do + outLine out $ unwords [ "test-stream-received", show pidx, show num, show seqNum, BC.unpack bytes ] + go + StreamClosed seqNum -> do + outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ] + go } sname -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'" @@ -509,15 +544,23 @@ cmdStartServer = do rsPeerThread <- liftIO $ forkIO $ void $ forever $ do peer <- getNextPeerChange rsServer - let printPeer (idx, p) = do - params <- peerIdentity p >>= return . \case + let printPeer TestPeer {..} = do + params <- peerIdentity tpPeer >>= return . \case PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) - _ -> [ "addr", show (peerAddress p) ] - outLine out $ unwords $ [ "peer", show idx ] ++ params + _ -> [ "addr", show (peerAddress tpPeer) ] + outLine out $ unwords $ [ "peer", show tpIndex ] ++ params + + update ( tpIndex, [] ) = do + tpPeer <- return peer + tpStreamReaders <- newMVar [] + tpStreamWriters <- newMVar [] + let tp = TestPeer {..} + printPeer tp + return ( tpIndex + 1, [ tp ] ) - update (nid, []) = printPeer (nid, peer) >> return (nid + 1, [(nid, peer)]) - update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur - | otherwise = fmap (p:) <$> update (nid, ps) + update cur@( nid, p : ps ) + | tpPeer p == peer = printPeer p >> return cur + | otherwise = fmap (p :) <$> update ( nid, ps ) modifyMVar_ rsPeers update @@ -554,10 +597,10 @@ cmdPeerList = do peers <- liftIO $ getCurrentPeerList rsServer tpeers <- liftIO $ readMVar rsPeers forM_ peers $ \peer -> do - Just (n, _) <- return $ find ((peer==).snd) . snd $ tpeers + Just tp <- return $ find ((peer ==) . tpPeer) . snd $ tpeers mbpid <- peerIdentity peer cmdOut $ unwords $ concat - [ [ "peer-list-item", show n ] + [ [ "peer-list-item", show (tpIndex tp) ] , [ "addr", show (peerAddress peer) ] , case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) _ -> [] @@ -574,6 +617,40 @@ cmdTestMessageSend = do sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs cmdOut "test-message-send done" +cmdTestStreamOpen :: Command +cmdTestStreamOpen = do + spidx : rest <- asks tiParams + tp <- getTestPeer spidx + count <- case rest of + [] -> return 1 + tcount : _ -> return $ read $ T.unpack tcount + + out <- asks tiOutput + runPeerService (tpPeer tp) $ do + streams <- openTestStreams count + afterCommit $ do + nums <- mapM getStreamWriterNumber streams + modifyMVar_ (tpStreamWriters tp) $ return . (++ zip nums streams) + outLine out $ unwords $ "test-stream-open-done" + : T.unpack spidx + : map show nums + +cmdTestStreamClose :: Command +cmdTestStreamClose = do + [ spidx, sid ] <- asks tiParams + tp <- getTestPeer spidx + Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp)) + liftIO $ closeStream stream + cmdOut $ unwords [ "test-stream-close-done", T.unpack spidx, T.unpack sid ] + +cmdTestStreamSend :: Command +cmdTestStreamSend = do + [ spidx, sid, content ] <- asks tiParams + tp <- getTestPeer spidx + Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp)) + liftIO $ writeStream stream $ encodeUtf8 content + cmdOut $ unwords [ "test-stream-send-done", T.unpack spidx, T.unpack sid ] + cmdLocalStateGet :: Command cmdLocalStateGet = do h <- getHead @@ -646,7 +723,7 @@ cmdWatchSharedIdentity = do cmdUpdateLocalIdentity :: Command cmdUpdateLocalIdentity = do [name] <- asks tiParams - updateLocalHead_ $ \ls -> do + updateLocalState_ $ \ls -> do Just identity <- return $ validateExtendedIdentity $ lsIdentity $ fromStored ls let public = idKeyIdentity identity @@ -661,7 +738,7 @@ cmdUpdateLocalIdentity = do cmdUpdateSharedIdentity :: Command cmdUpdateSharedIdentity = do [name] <- asks tiParams - updateLocalHead_ $ updateSharedState_ $ \case + updateLocalState_ $ updateSharedState_ $ \case Nothing -> throwOtherError "no existing shared identity" Just identity -> do let public = idKeyIdentity identity @@ -731,7 +808,7 @@ cmdContactSetName :: Command cmdContactSetName = do [cid, name] <- asks tiParams contact <- getContact cid - updateLocalHead_ $ updateSharedState_ $ contactSetName contact name + updateLocalState_ $ updateSharedState_ $ contactSetName contact name cmdOut "contact-set-name-done" cmdDmSendPeer :: Command @@ -746,6 +823,14 @@ cmdDmSendContact = do Just to <- contactIdentity <$> getContact cid void $ sendDirectMessage to msg +cmdDmSendIdentity :: Command +cmdDmSendIdentity = do + st <- asks tiStorage + [ tid, msg ] <- asks tiParams + Just ref <- liftIO $ readRef st $ encodeUtf8 tid + Just to <- return $ validateExtendedIdentity $ wrappedLoad ref + void $ sendDirectMessage to msg + dmList :: Foldable f => Identity f -> Command dmList peer = do threads <- toThreadList . lookupSharedValue . lsShared . headObject <$> getHead @@ -887,11 +972,14 @@ cmdChatroomMessageSend = do cmdDiscoveryConnect :: Command cmdDiscoveryConnect = do - st <- asks tiStorage [ tref ] <- asks tiParams - Just ref <- liftIO $ readRef st $ encodeUtf8 tref - + Just dgst <- return $ readRefDigest $ encodeUtf8 tref Just RunningServer {..} <- gets tsServer - peers <- liftIO $ getCurrentPeerList rsServer - forM_ peers $ \peer -> do - sendToPeer peer $ DiscoverySearch ref + discoverySearch rsServer dgst + +cmdDiscoveryTunnel :: Command +cmdDiscoveryTunnel = do + [ tvia, ttarget ] <- asks tiParams + via <- getPeer tvia + Just target <- return $ readRefDigest $ encodeUtf8 ttarget + liftIO $ discoverySetupTunnel via target |