From d0f1ce6171ccb59fce7534a19e827352b35686a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 26 May 2024 14:04:09 +0200 Subject: Manual peer drop --- README.md | 4 ++ main/Main.hs | 10 +++- main/Test.hs | 25 ++++++++++ src/Erebos/Network.hs | 90 ++++++++++++++++++++++++++--------- src/Erebos/Network/Protocol.hs | 12 +++++ test/network.test | 103 ++++++++++++++++++++++++++++++++++++++++- 6 files changed, 219 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 4ee75f7..85b2f88 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,10 @@ Reject device attachment request or verification code of selected peer. `/peer-add []` Manually add network peer with given hostname or IP address. +`/peer-drop` +Drop the currently selected peer. Afterwards, the connection can be +re-established by either side. + `/update-identity` Interactively update current identity information diff --git a/main/Main.hs b/main/Main.hs index 0857191..44e2f7b 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -281,7 +281,9 @@ interactiveLoop st opts = runInputT inputSettings $ do { ciServer = server , ciLine = line , ciPrint = extPrintLn - , ciPeers = liftIO $ readMVar peers + , ciPeers = liftIO $ modifyMVar peers $ \ps -> do + ps' <- filterM (fmap not . isPeerDropped . fst) ps + return (ps', ps') , ciContextOptions = liftIO $ readMVar contextOptions , ciSetContextOptions = \ctxs -> liftIO $ modifyMVar_ contextOptions $ const $ return ctxs } @@ -372,6 +374,7 @@ commands = [ ("history", cmdHistory) , ("peers", cmdPeers) , ("peer-add", cmdPeerAdd) + , ("peer-drop", cmdPeerDrop) , ("send", cmdSend) , ("update-identity", cmdUpdateIdentity) , ("attach", cmdAttach) @@ -423,6 +426,11 @@ cmdPeerAdd = void $ do addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just hostname) (Just port) liftIO $ serverPeer server (addrAddress addr) +cmdPeerDrop :: Command +cmdPeerDrop = do + dropPeer =<< getSelectedPeer + modify $ \s -> s { csContext = NoContext } + showPeer :: PeerIdentity -> PeerAddress -> String showPeer pidentity paddr = let name = case pidentity of diff --git a/main/Test.hs b/main/Test.hs index d16e141..cdc337e 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -246,6 +246,8 @@ commands = map (T.pack *** id) , ("start-server", cmdStartServer) , ("stop-server", cmdStopServer) , ("peer-add", cmdPeerAdd) + , ("peer-drop", cmdPeerDrop) + , ("peer-list", cmdPeerList) , ("test-message-send", cmdTestMessageSend) , ("shared-state-get", cmdSharedStateGet) , ("shared-state-wait", cmdSharedStateWait) @@ -410,6 +412,29 @@ cmdPeerAdd = do addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just host) (Just port) void $ liftIO $ serverPeer rsServer (addrAddress addr) +cmdPeerDrop :: Command +cmdPeerDrop = do + [spidx] <- asks tiParams + peer <- getPeer spidx + liftIO $ dropPeer peer + +cmdPeerList :: Command +cmdPeerList = do + Just RunningServer {..} <- gets tsServer + peers <- liftIO $ getCurrentPeerList rsServer + tpeers <- liftIO $ readMVar rsPeers + forM_ peers $ \peer -> do + Just (n, _) <- return $ find ((peer==).snd) . snd $ tpeers + mbpid <- peerIdentity peer + cmdOut $ unwords $ concat + [ [ "peer-list-item", show n ] + , [ "addr", show (peerAddress peer) ] + , case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "" T.unpack . idName) (unfoldOwners pid) + _ -> [] + ] + cmdOut "peer-list-done" + + cmdTestMessageSend :: Command cmdTestMessageSend = do [spidx, tref] <- asks tiParams diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index a01bdd1..f234971 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -4,6 +4,7 @@ module Erebos.Network ( Server, startServer, stopServer, + getCurrentPeerList, getNextPeerChange, ServerOptions(..), serverIdentity, defaultServerOptions, @@ -16,6 +17,8 @@ module Erebos.Network ( #ifdef ENABLE_ICE_SUPPORT serverPeerIce, #endif + dropPeer, + isPeerDropped, sendToPeer, sendToPeerStored, sendToPeerWith, runPeerService, @@ -90,6 +93,9 @@ data Server = Server serverIdentity :: Server -> IO UnifiedIdentity serverIdentity = readMVar . serverIdentity_ +getCurrentPeerList :: Server -> IO [Peer] +getCurrentPeerList = fmap M.elems . readMVar . serverPeers + getNextPeerChange :: Server -> IO Peer getNextPeerChange = atomically . readTChan . serverChanPeer @@ -108,7 +114,7 @@ defaultServerOptions = ServerOptions data Peer = Peer { peerAddress :: PeerAddress , peerServer_ :: Server - , peerConnection :: TVar (Either [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])] (Connection PeerAddress)) + , peerState :: TVar PeerState , peerIdentityVar :: TVar PeerIdentity , peerStorage_ :: Storage , peerInStorage :: PartialStorage @@ -123,13 +129,18 @@ peerStorage :: Peer -> Storage peerStorage = peerStorage_ getPeerChannel :: Peer -> STM ChannelState -getPeerChannel Peer {..} = either (const $ return ChannelNone) connGetChannel =<< readTVar peerConnection +getPeerChannel Peer {..} = + readTVar peerState >>= \case + PeerInit _ -> return ChannelNone + PeerConnected conn -> connGetChannel conn + PeerDropped -> return ChannelClosed setPeerChannel :: Peer -> ChannelState -> STM () setPeerChannel Peer {..} ch = do - readTVar peerConnection >>= \case - Left _ -> retry - Right conn -> connSetChannel conn ch + readTVar peerState >>= \case + PeerInit _ -> retry + PeerConnected conn -> connSetChannel conn ch + PeerDropped -> return () instance Eq Peer where (==) = (==) `on` peerIdentityVar @@ -175,6 +186,11 @@ peerIdentity :: MonadIO m => Peer -> m PeerIdentity peerIdentity = liftIO . atomically . readTVar . peerIdentityVar +data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])] + | PeerConnected (Connection PeerAddress) + | PeerDropped + + lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID lookupServiceType (ServiceType stype : _) = Just stype lookupServiceType (_ : hs) = lookupServiceType hs @@ -196,8 +212,13 @@ newWaitingRef dgst act = do forkServerThread :: Server -> IO () -> IO () -forkServerThread server act = modifyMVar_ (serverThreads server) $ \ts -> do - (:ts) <$> forkIO act +forkServerThread server act = do + modifyMVar_ (serverThreads server) $ \ts -> do + t <- forkIO $ do + t <- myThreadId + act + modifyMVar_ (serverThreads server) $ return . filter (/=t) + return (t:ts) startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server startServer opt serverOrigHead logd' serverServices = do @@ -299,10 +320,14 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ do atomically $ do - readTVar (peerConnection peer) >>= \case - Left packets -> writeFlowBulk (connData conn) $ reverse packets - Right _ -> return () - writeTVar (peerConnection peer) (Right conn) + readTVar (peerState peer) >>= \case + PeerInit packets -> do + writeFlowBulk (connData conn) $ reverse packets + writeTVar (peerState peer) (PeerConnected conn) + PeerConnected _ -> do + writeTVar (peerState peer) (PeerConnected conn) + PeerDropped -> do + connClose conn case mbpid of Just dgst -> do @@ -438,9 +463,9 @@ keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True } openStream :: PacketHandler RawStreamWriter openStream = do Peer {..} <- gets phPeer - conn <- readTVarP peerConnection >>= \case - Right conn -> return conn - _ -> throwError "can't open stream without established connection" + conn <- readTVarP peerState >>= \case + PeerConnected conn -> return conn + _ -> throwError "can't open stream without established connection" (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case Right res -> return res Left err -> throwError err @@ -452,9 +477,9 @@ openStream = do acceptStream :: Word8 -> PacketHandler RawStreamReader acceptStream streamNumber = do Peer {..} <- gets phPeer - conn <- readTVarP peerConnection >>= \case - Right conn -> return conn - _ -> throwError "can't accept stream without established connection" + conn <- readTVarP peerState >>= \case + PeerConnected conn -> return conn + _ -> throwError "can't accept stream without established connection" liftSTM $ connAddReadStream conn streamNumber appendDistinct :: Eq a => a -> [a] -> [a] @@ -568,6 +593,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = ChannelPeerRequest {} -> process ChannelOurAccept {} -> reject ChannelEstablished {} -> process + ChannelClosed {} -> return () TrChannelAccept dgst -> do let process = do @@ -583,6 +609,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process | otherwise -> addHeader $ Rejected dgst ChannelEstablished {} -> process + ChannelClosed {} -> return () ServiceType _ -> return () ServiceRef dgst @@ -721,7 +748,7 @@ notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do mkPeer :: Server -> PeerAddress -> IO Peer mkPeer peerServer_ peerAddress = do - peerConnection <- newTVarIO (Left []) + peerState <- newTVarIO (PeerInit []) peerIdentityVar <- newTVarIO . PeerIdentityUnknown =<< newTVarIO [] peerStorage_ <- deriveEphemeralStorage $ serverStorage peerServer_ peerInStorage <- derivePartialStorage peerStorage_ @@ -731,7 +758,11 @@ mkPeer peerServer_ peerAddress = do serverPeer :: Server -> SockAddr -> IO Peer serverPeer server paddr = do - serverPeer' server (DatagramAddress paddr) + let paddr' = case IP.fromSockAddr paddr of + Just (IP.IPv4 ipv4, port) + -> IP.toSockAddr (IP.IPv6 $ IP.toIPv6w (0, 0, 0xffff, IP.fromIPv4w ipv4), port) + _ -> paddr + serverPeer' server (DatagramAddress paddr') #ifdef ENABLE_ICE_SUPPORT serverPeerIce :: Server -> IceSession -> IO Peer @@ -754,6 +785,20 @@ serverPeer' server paddr = do writeFlow (serverControlFlow server) (RequestConnection paddr) return peer +dropPeer :: MonadIO m => Peer -> m () +dropPeer peer = liftIO $ do + modifyMVar_ (serverPeers $ peerServer peer) $ \pvalue -> do + atomically $ do + readTVar (peerState peer) >>= \case + PeerConnected conn -> connClose conn + _ -> return() + writeTVar (peerState peer) PeerDropped + return $ M.delete (peerAddress peer) pvalue + +isPeerDropped :: MonadIO m => Peer -> m Bool +isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case + PeerDropped -> return True + _ -> return False sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m () sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True] @@ -777,9 +822,10 @@ sendToPeerList peer parts = do sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerS' secure Peer {..} ackedBy packet = do - readTVar peerConnection >>= \case - Left xs -> writeTVar peerConnection $ Left $ (secure, packet, ackedBy) : xs - Right conn -> writeFlow (connData conn) (secure, packet, ackedBy) + readTVar peerState >>= \case + PeerInit xs -> writeTVar peerState $ PeerInit $ (secure, packet, ackedBy) : xs + PeerConnected conn -> writeFlow (connData conn) (secure, packet, ackedBy) + PeerDropped -> return () sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerS = sendToPeerS' EncryptedOnly diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index a669988..26bd615 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -20,6 +20,7 @@ module Erebos.Network.Protocol ( connData, connGetChannel, connSetChannel, + connClose, RawStreamReader, RawStreamWriter, connAddWriteStream, @@ -44,6 +45,7 @@ import Data.ByteString (ByteString) import Data.ByteString qualified as B import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL +import Data.Function import Data.List import Data.Maybe import Data.Text (Text) @@ -184,6 +186,9 @@ data Connection addr = Connection , cOutStreams :: TVar [(Word8, Stream)] } +instance Eq (Connection addr) where + (==) = (==) `on` cChannel + connAddress :: Connection addr -> addr connAddress = cAddress @@ -197,6 +202,12 @@ connSetChannel :: Connection addr -> ChannelState -> STM () connSetChannel Connection {..} ch = do writeTVar cChannel ch +connClose :: Connection addr -> STM () +connClose conn@Connection {..} = do + let GlobalState {..} = cGlobalState + writeTVar cChannel ChannelClosed + writeTVar gConnections . filter (/=conn) =<< readTVar gConnections + connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ())) connAddWriteStream conn@Connection {..} = do outStreams <- readTVar cOutStreams @@ -380,6 +391,7 @@ data ChannelState = ChannelNone | ChannelPeerRequest WaitingRef | ChannelOurAccept (Stored ChannelAccept) Channel | ChannelEstablished Channel + | ChannelClosed data ReservedToSend = ReservedToSend { rsAckedBy :: Maybe (TransportHeaderItem -> Bool) diff --git a/test/network.test b/test/network.test index ea57a77..9540bf6 100644 --- a/test/network.test +++ b/test/network.test @@ -201,12 +201,19 @@ test Reconnection: send "store blob" send "message1" send "" - expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message + expect /store-done (blake2#[0-9a-f]*)/ capture message send "test-message-send 1 $message" expect /test-message-send done/ expect /test-message-received blob [0-9]+ $message/ from p2 + send "peer-list" + expect /peer-list-item 1 addr ${p2.node.ip} 29665 id Device2/ + local: + expect /peer-list-(.*)/ capture done + guard (done == "done") + + # Restart process on node 'n' local: spawn as p2 on n send "start-server" to p2 @@ -220,8 +227,100 @@ test Reconnection: send "store blob" send "message2" send "" - expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message + expect /store-done (blake2#[0-9a-f]*)/ capture message send "test-message-send 1 $message" expect /test-message-send done/ expect /test-message-received blob [0-9]+ $message/ from p2 + + # Drop and re-add peer p2: + with p1: + send "peer-list" + expect /peer-list-item 1 addr ${p2.node.ip} 29665 id Device2/ + local: + expect /peer-list-(.*)/ capture done + guard (done == "done") + + send "peer-drop 1" + send "peer-list" + local: + expect /peer-list-(.*)/ capture done + guard (done == "done") + + send "peer-add ${p2.node.ip}" + expect /peer 2 addr ${p2.node.ip} 29665/ + expect /peer 2 id Device2/ + + send "peer-list" + expect /peer-list-item 2 addr ${p2.node.ip} 29665 id Device2/ + local: + expect /peer-list-(.*)/ capture done + guard (done == "done") + + send "store blob" + send "message3" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture message + + send "test-message-send 2 $message" + expect /test-message-send done/ + expect /test-message-received blob [0-9]+ $message/ from p2 + + # Drop on both sides and re-add: + with p2: + send "peer-list" + expect /peer-list-item 1 addr ${p1.node.ip} 29665 id Device1/ + local: + expect /peer-list-(.*)/ capture done + guard (done == "done") + + send "peer-drop 1" + send "peer-list" + local: + expect /peer-list-(.*)/ capture done + guard (done == "done") + + with p1: + send "peer-list" + expect /peer-list-item 2 addr ${p2.node.ip} 29665 id Device2/ + local: + expect /peer-list-(.*)/ capture done + guard (done == "done") + + send "peer-drop 2" + send "peer-list" + local: + expect /peer-list-(.*)/ capture done + guard (done == "done") + + send "peer-add ${p1.node.ip}" to p2 + + with p2: + expect /peer 2 addr ${p1.node.ip} 29665/ from p2 + expect /peer 2 id Device1/ from p2 + + send "peer-list" + expect /peer-list-item 2 addr ${p1.node.ip} 29665 id Device1/ + local: + expect /peer-list-(.*)/ capture done + guard (done == "done") + + send "store blob" + send "message4" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture message + + send "test-message-send 2 $message" + expect /test-message-send done/ + + with p1: + expect /peer 3 addr ${p2.node.ip} 29665/ + expect /peer 3 id Device2/ + + send "peer-list" + expect /peer-list-item 3 addr ${p2.node.ip} 29665 id Device2/ + local: + expect /peer-list-(.*)/ capture done + guard (done == "done") + + expect /test-message-received blob [0-9]+ $message/ -- cgit v1.2.3