summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-05-26 14:04:09 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2024-06-01 13:19:06 +0200
commitd0f1ce6171ccb59fce7534a19e827352b35686a0 (patch)
tree0205dc6792598173502fdef78a1cf5af6152f947
parent2f409a3ab30ff846bf0d6bf81084295ed0221075 (diff)
Manual peer drop
-rw-r--r--README.md4
-rw-r--r--main/Main.hs10
-rw-r--r--main/Test.hs25
-rw-r--r--src/Erebos/Network.hs90
-rw-r--r--src/Erebos/Network/Protocol.hs12
-rw-r--r--test/network.test103
6 files changed, 219 insertions, 25 deletions
diff --git a/README.md b/README.md
index 4ee75f7..85b2f88 100644
--- a/README.md
+++ b/README.md
@@ -125,6 +125,10 @@ Reject device attachment request or verification code of selected peer.
`/peer-add <host> [<port>]`
Manually add network peer with given hostname or IP address.
+`/peer-drop`
+Drop the currently selected peer. Afterwards, the connection can be
+re-established by either side.
+
`/update-identity`
Interactively update current identity information
diff --git a/main/Main.hs b/main/Main.hs
index 0857191..44e2f7b 100644
--- a/main/Main.hs
+++ b/main/Main.hs
@@ -281,7 +281,9 @@ interactiveLoop st opts = runInputT inputSettings $ do
{ ciServer = server
, ciLine = line
, ciPrint = extPrintLn
- , ciPeers = liftIO $ readMVar peers
+ , ciPeers = liftIO $ modifyMVar peers $ \ps -> do
+ ps' <- filterM (fmap not . isPeerDropped . fst) ps
+ return (ps', ps')
, ciContextOptions = liftIO $ readMVar contextOptions
, ciSetContextOptions = \ctxs -> liftIO $ modifyMVar_ contextOptions $ const $ return ctxs
}
@@ -372,6 +374,7 @@ commands =
[ ("history", cmdHistory)
, ("peers", cmdPeers)
, ("peer-add", cmdPeerAdd)
+ , ("peer-drop", cmdPeerDrop)
, ("send", cmdSend)
, ("update-identity", cmdUpdateIdentity)
, ("attach", cmdAttach)
@@ -423,6 +426,11 @@ cmdPeerAdd = void $ do
addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just hostname) (Just port)
liftIO $ serverPeer server (addrAddress addr)
+cmdPeerDrop :: Command
+cmdPeerDrop = do
+ dropPeer =<< getSelectedPeer
+ modify $ \s -> s { csContext = NoContext }
+
showPeer :: PeerIdentity -> PeerAddress -> String
showPeer pidentity paddr =
let name = case pidentity of
diff --git a/main/Test.hs b/main/Test.hs
index d16e141..cdc337e 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -246,6 +246,8 @@ commands = map (T.pack *** id)
, ("start-server", cmdStartServer)
, ("stop-server", cmdStopServer)
, ("peer-add", cmdPeerAdd)
+ , ("peer-drop", cmdPeerDrop)
+ , ("peer-list", cmdPeerList)
, ("test-message-send", cmdTestMessageSend)
, ("shared-state-get", cmdSharedStateGet)
, ("shared-state-wait", cmdSharedStateWait)
@@ -410,6 +412,29 @@ cmdPeerAdd = do
addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just host) (Just port)
void $ liftIO $ serverPeer rsServer (addrAddress addr)
+cmdPeerDrop :: Command
+cmdPeerDrop = do
+ [spidx] <- asks tiParams
+ peer <- getPeer spidx
+ liftIO $ dropPeer peer
+
+cmdPeerList :: Command
+cmdPeerList = do
+ Just RunningServer {..} <- gets tsServer
+ peers <- liftIO $ getCurrentPeerList rsServer
+ tpeers <- liftIO $ readMVar rsPeers
+ forM_ peers $ \peer -> do
+ Just (n, _) <- return $ find ((peer==).snd) . snd $ tpeers
+ mbpid <- peerIdentity peer
+ cmdOut $ unwords $ concat
+ [ [ "peer-list-item", show n ]
+ , [ "addr", show (peerAddress peer) ]
+ , case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid)
+ _ -> []
+ ]
+ cmdOut "peer-list-done"
+
+
cmdTestMessageSend :: Command
cmdTestMessageSend = do
[spidx, tref] <- asks tiParams
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index a01bdd1..f234971 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -4,6 +4,7 @@ module Erebos.Network (
Server,
startServer,
stopServer,
+ getCurrentPeerList,
getNextPeerChange,
ServerOptions(..), serverIdentity, defaultServerOptions,
@@ -16,6 +17,8 @@ module Erebos.Network (
#ifdef ENABLE_ICE_SUPPORT
serverPeerIce,
#endif
+ dropPeer,
+ isPeerDropped,
sendToPeer, sendToPeerStored, sendToPeerWith,
runPeerService,
@@ -90,6 +93,9 @@ data Server = Server
serverIdentity :: Server -> IO UnifiedIdentity
serverIdentity = readMVar . serverIdentity_
+getCurrentPeerList :: Server -> IO [Peer]
+getCurrentPeerList = fmap M.elems . readMVar . serverPeers
+
getNextPeerChange :: Server -> IO Peer
getNextPeerChange = atomically . readTChan . serverChanPeer
@@ -108,7 +114,7 @@ defaultServerOptions = ServerOptions
data Peer = Peer
{ peerAddress :: PeerAddress
, peerServer_ :: Server
- , peerConnection :: TVar (Either [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])] (Connection PeerAddress))
+ , peerState :: TVar PeerState
, peerIdentityVar :: TVar PeerIdentity
, peerStorage_ :: Storage
, peerInStorage :: PartialStorage
@@ -123,13 +129,18 @@ peerStorage :: Peer -> Storage
peerStorage = peerStorage_
getPeerChannel :: Peer -> STM ChannelState
-getPeerChannel Peer {..} = either (const $ return ChannelNone) connGetChannel =<< readTVar peerConnection
+getPeerChannel Peer {..} =
+ readTVar peerState >>= \case
+ PeerInit _ -> return ChannelNone
+ PeerConnected conn -> connGetChannel conn
+ PeerDropped -> return ChannelClosed
setPeerChannel :: Peer -> ChannelState -> STM ()
setPeerChannel Peer {..} ch = do
- readTVar peerConnection >>= \case
- Left _ -> retry
- Right conn -> connSetChannel conn ch
+ readTVar peerState >>= \case
+ PeerInit _ -> retry
+ PeerConnected conn -> connSetChannel conn ch
+ PeerDropped -> return ()
instance Eq Peer where
(==) = (==) `on` peerIdentityVar
@@ -175,6 +186,11 @@ peerIdentity :: MonadIO m => Peer -> m PeerIdentity
peerIdentity = liftIO . atomically . readTVar . peerIdentityVar
+data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])]
+ | PeerConnected (Connection PeerAddress)
+ | PeerDropped
+
+
lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
lookupServiceType (ServiceType stype : _) = Just stype
lookupServiceType (_ : hs) = lookupServiceType hs
@@ -196,8 +212,13 @@ newWaitingRef dgst act = do
forkServerThread :: Server -> IO () -> IO ()
-forkServerThread server act = modifyMVar_ (serverThreads server) $ \ts -> do
- (:ts) <$> forkIO act
+forkServerThread server act = do
+ modifyMVar_ (serverThreads server) $ \ts -> do
+ t <- forkIO $ do
+ t <- myThreadId
+ act
+ modifyMVar_ (serverThreads server) $ return . filter (/=t)
+ return (t:ts)
startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server
startServer opt serverOrigHead logd' serverServices = do
@@ -299,10 +320,14 @@ startServer opt serverOrigHead logd' serverServices = do
forkServerThread server $ do
atomically $ do
- readTVar (peerConnection peer) >>= \case
- Left packets -> writeFlowBulk (connData conn) $ reverse packets
- Right _ -> return ()
- writeTVar (peerConnection peer) (Right conn)
+ readTVar (peerState peer) >>= \case
+ PeerInit packets -> do
+ writeFlowBulk (connData conn) $ reverse packets
+ writeTVar (peerState peer) (PeerConnected conn)
+ PeerConnected _ -> do
+ writeTVar (peerState peer) (PeerConnected conn)
+ PeerDropped -> do
+ connClose conn
case mbpid of
Just dgst -> do
@@ -438,9 +463,9 @@ keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True }
openStream :: PacketHandler RawStreamWriter
openStream = do
Peer {..} <- gets phPeer
- conn <- readTVarP peerConnection >>= \case
- Right conn -> return conn
- _ -> throwError "can't open stream without established connection"
+ conn <- readTVarP peerState >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't open stream without established connection"
(hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case
Right res -> return res
Left err -> throwError err
@@ -452,9 +477,9 @@ openStream = do
acceptStream :: Word8 -> PacketHandler RawStreamReader
acceptStream streamNumber = do
Peer {..} <- gets phPeer
- conn <- readTVarP peerConnection >>= \case
- Right conn -> return conn
- _ -> throwError "can't accept stream without established connection"
+ conn <- readTVarP peerState >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't accept stream without established connection"
liftSTM $ connAddReadStream conn streamNumber
appendDistinct :: Eq a => a -> [a] -> [a]
@@ -568,6 +593,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
ChannelPeerRequest {} -> process
ChannelOurAccept {} -> reject
ChannelEstablished {} -> process
+ ChannelClosed {} -> return ()
TrChannelAccept dgst -> do
let process = do
@@ -583,6 +609,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process
| otherwise -> addHeader $ Rejected dgst
ChannelEstablished {} -> process
+ ChannelClosed {} -> return ()
ServiceType _ -> return ()
ServiceRef dgst
@@ -721,7 +748,7 @@ notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
mkPeer :: Server -> PeerAddress -> IO Peer
mkPeer peerServer_ peerAddress = do
- peerConnection <- newTVarIO (Left [])
+ peerState <- newTVarIO (PeerInit [])
peerIdentityVar <- newTVarIO . PeerIdentityUnknown =<< newTVarIO []
peerStorage_ <- deriveEphemeralStorage $ serverStorage peerServer_
peerInStorage <- derivePartialStorage peerStorage_
@@ -731,7 +758,11 @@ mkPeer peerServer_ peerAddress = do
serverPeer :: Server -> SockAddr -> IO Peer
serverPeer server paddr = do
- serverPeer' server (DatagramAddress paddr)
+ let paddr' = case IP.fromSockAddr paddr of
+ Just (IP.IPv4 ipv4, port)
+ -> IP.toSockAddr (IP.IPv6 $ IP.toIPv6w (0, 0, 0xffff, IP.fromIPv4w ipv4), port)
+ _ -> paddr
+ serverPeer' server (DatagramAddress paddr')
#ifdef ENABLE_ICE_SUPPORT
serverPeerIce :: Server -> IceSession -> IO Peer
@@ -754,6 +785,20 @@ serverPeer' server paddr = do
writeFlow (serverControlFlow server) (RequestConnection paddr)
return peer
+dropPeer :: MonadIO m => Peer -> m ()
+dropPeer peer = liftIO $ do
+ modifyMVar_ (serverPeers $ peerServer peer) $ \pvalue -> do
+ atomically $ do
+ readTVar (peerState peer) >>= \case
+ PeerConnected conn -> connClose conn
+ _ -> return()
+ writeTVar (peerState peer) PeerDropped
+ return $ M.delete (peerAddress peer) pvalue
+
+isPeerDropped :: MonadIO m => Peer -> m Bool
+isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case
+ PeerDropped -> return True
+ _ -> return False
sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m ()
sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True]
@@ -777,9 +822,10 @@ sendToPeerList peer parts = do
sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' secure Peer {..} ackedBy packet = do
- readTVar peerConnection >>= \case
- Left xs -> writeTVar peerConnection $ Left $ (secure, packet, ackedBy) : xs
- Right conn -> writeFlow (connData conn) (secure, packet, ackedBy)
+ readTVar peerState >>= \case
+ PeerInit xs -> writeTVar peerState $ PeerInit $ (secure, packet, ackedBy) : xs
+ PeerConnected conn -> writeFlow (connData conn) (secure, packet, ackedBy)
+ PeerDropped -> return ()
sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS = sendToPeerS' EncryptedOnly
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index a669988..26bd615 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -20,6 +20,7 @@ module Erebos.Network.Protocol (
connData,
connGetChannel,
connSetChannel,
+ connClose,
RawStreamReader, RawStreamWriter,
connAddWriteStream,
@@ -44,6 +45,7 @@ import Data.ByteString (ByteString)
import Data.ByteString qualified as B
import Data.ByteString.Char8 qualified as BC
import Data.ByteString.Lazy qualified as BL
+import Data.Function
import Data.List
import Data.Maybe
import Data.Text (Text)
@@ -184,6 +186,9 @@ data Connection addr = Connection
, cOutStreams :: TVar [(Word8, Stream)]
}
+instance Eq (Connection addr) where
+ (==) = (==) `on` cChannel
+
connAddress :: Connection addr -> addr
connAddress = cAddress
@@ -197,6 +202,12 @@ connSetChannel :: Connection addr -> ChannelState -> STM ()
connSetChannel Connection {..} ch = do
writeTVar cChannel ch
+connClose :: Connection addr -> STM ()
+connClose conn@Connection {..} = do
+ let GlobalState {..} = cGlobalState
+ writeTVar cChannel ChannelClosed
+ writeTVar gConnections . filter (/=conn) =<< readTVar gConnections
+
connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ()))
connAddWriteStream conn@Connection {..} = do
outStreams <- readTVar cOutStreams
@@ -380,6 +391,7 @@ data ChannelState = ChannelNone
| ChannelPeerRequest WaitingRef
| ChannelOurAccept (Stored ChannelAccept) Channel
| ChannelEstablished Channel
+ | ChannelClosed
data ReservedToSend = ReservedToSend
{ rsAckedBy :: Maybe (TransportHeaderItem -> Bool)
diff --git a/test/network.test b/test/network.test
index ea57a77..9540bf6 100644
--- a/test/network.test
+++ b/test/network.test
@@ -201,12 +201,19 @@ test Reconnection:
send "store blob"
send "message1"
send ""
- expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message
+ expect /store-done (blake2#[0-9a-f]*)/ capture message
send "test-message-send 1 $message"
expect /test-message-send done/
expect /test-message-received blob [0-9]+ $message/ from p2
+ send "peer-list"
+ expect /peer-list-item 1 addr ${p2.node.ip} 29665 id Device2/
+ local:
+ expect /peer-list-(.*)/ capture done
+ guard (done == "done")
+
+ # Restart process on node 'n'
local:
spawn as p2 on n
send "start-server" to p2
@@ -220,8 +227,100 @@ test Reconnection:
send "store blob"
send "message2"
send ""
- expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message
+ expect /store-done (blake2#[0-9a-f]*)/ capture message
send "test-message-send 1 $message"
expect /test-message-send done/
expect /test-message-received blob [0-9]+ $message/ from p2
+
+ # Drop and re-add peer p2:
+ with p1:
+ send "peer-list"
+ expect /peer-list-item 1 addr ${p2.node.ip} 29665 id Device2/
+ local:
+ expect /peer-list-(.*)/ capture done
+ guard (done == "done")
+
+ send "peer-drop 1"
+ send "peer-list"
+ local:
+ expect /peer-list-(.*)/ capture done
+ guard (done == "done")
+
+ send "peer-add ${p2.node.ip}"
+ expect /peer 2 addr ${p2.node.ip} 29665/
+ expect /peer 2 id Device2/
+
+ send "peer-list"
+ expect /peer-list-item 2 addr ${p2.node.ip} 29665 id Device2/
+ local:
+ expect /peer-list-(.*)/ capture done
+ guard (done == "done")
+
+ send "store blob"
+ send "message3"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture message
+
+ send "test-message-send 2 $message"
+ expect /test-message-send done/
+ expect /test-message-received blob [0-9]+ $message/ from p2
+
+ # Drop on both sides and re-add:
+ with p2:
+ send "peer-list"
+ expect /peer-list-item 1 addr ${p1.node.ip} 29665 id Device1/
+ local:
+ expect /peer-list-(.*)/ capture done
+ guard (done == "done")
+
+ send "peer-drop 1"
+ send "peer-list"
+ local:
+ expect /peer-list-(.*)/ capture done
+ guard (done == "done")
+
+ with p1:
+ send "peer-list"
+ expect /peer-list-item 2 addr ${p2.node.ip} 29665 id Device2/
+ local:
+ expect /peer-list-(.*)/ capture done
+ guard (done == "done")
+
+ send "peer-drop 2"
+ send "peer-list"
+ local:
+ expect /peer-list-(.*)/ capture done
+ guard (done == "done")
+
+ send "peer-add ${p1.node.ip}" to p2
+
+ with p2:
+ expect /peer 2 addr ${p1.node.ip} 29665/ from p2
+ expect /peer 2 id Device1/ from p2
+
+ send "peer-list"
+ expect /peer-list-item 2 addr ${p1.node.ip} 29665 id Device1/
+ local:
+ expect /peer-list-(.*)/ capture done
+ guard (done == "done")
+
+ send "store blob"
+ send "message4"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture message
+
+ send "test-message-send 2 $message"
+ expect /test-message-send done/
+
+ with p1:
+ expect /peer 3 addr ${p2.node.ip} 29665/
+ expect /peer 3 id Device2/
+
+ send "peer-list"
+ expect /peer-list-item 3 addr ${p2.node.ip} 29665 id Device2/
+ local:
+ expect /peer-list-(.*)/ capture done
+ guard (done == "done")
+
+ expect /test-message-received blob [0-9]+ $message/