summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2024-05-25 10:40:03 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2024-05-25 22:40:42 +0200
commitdb575cad9b8a3c3c2ab9f1a71ac2ea442c761df2 (patch)
treec5b4b080a7f0767beefe06a216d720a90eb79850
parente40317a3b43594c0629c8a0d1d569b4c8d55e2ae (diff)
Handle peer reconnection after its restart
Allow the handshake and channel request to proceed correcty even with already established channel to handle the case where peer attempts to reconnect after restart. Changelog: Handle peer reconnection after its restart
-rw-r--r--src/Erebos/Network.hs36
-rw-r--r--src/Erebos/Network/Protocol.hs88
-rw-r--r--test/network.test49
3 files changed, 111 insertions, 62 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index cc111e2..a01bdd1 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -473,9 +473,9 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
[ [ storedRef sidentity ]
, map storedRef $ idUpdates identity
, case ochannel of
- ChannelOurRequest _ req -> [ storedRef req ]
- ChannelOurAccept _ acc _ -> [ storedRef acc ]
- _ -> []
+ ChannelOurRequest req -> [ storedRef req ]
+ ChannelOurAccept acc _ -> [ storedRef acc ]
+ _ -> []
]
runPacketHandler secure peer $ do
@@ -483,7 +483,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
forM_ headers $ \case
Acknowledged dgst -> do
liftSTM (getPeerChannel peer) >>= \case
- ChannelOurAccept _ acc ch | refDigest (storedRef acc) == dgst -> do
+ ChannelOurAccept acc ch | refDigest (storedRef acc) == dgst -> do
liftSTM $ finalizedChannel peer ch identity
_ -> return ()
@@ -552,22 +552,22 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
_ -> return ()
TrChannelRequest dgst -> do
- let process cookie = do
+ let process = do
addHeader $ Acknowledged dgst
wref <- newWaitingRef dgst $ handleChannelRequest peer identity
- liftSTM $ setPeerChannel peer $ ChannelPeerRequest cookie wref
+ liftSTM $ setPeerChannel peer $ ChannelPeerRequest wref
reject = addHeader $ Rejected dgst
liftSTM (getPeerChannel peer) >>= \case
ChannelNone {} -> return ()
ChannelCookieWait {} -> return ()
- ChannelCookieReceived cookie -> process $ Just cookie
- ChannelCookieConfirmed cookie -> process $ Just cookie
- ChannelOurRequest mbcookie our | dgst < refDigest (storedRef our) -> process mbcookie
- | otherwise -> reject
- ChannelPeerRequest mbcookie _ -> process mbcookie
+ ChannelCookieReceived {} -> process
+ ChannelCookieConfirmed {} -> process
+ ChannelOurRequest our | dgst < refDigest (storedRef our) -> process
+ | otherwise -> reject
+ ChannelPeerRequest {} -> process
ChannelOurAccept {} -> reject
- ChannelEstablished {} -> process Nothing
+ ChannelEstablished {} -> process
TrChannelAccept dgst -> do
let process = do
@@ -580,8 +580,8 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
ChannelCookieConfirmed {} -> reject
ChannelOurRequest {} -> process
ChannelPeerRequest {} -> process
- ChannelOurAccept _ our _ | dgst < refDigest (storedRef our) -> process
- | otherwise -> addHeader $ Rejected dgst
+ ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process
+ | otherwise -> addHeader $ Rejected dgst
ChannelEstablished {} -> process
ServiceType _ -> return ()
@@ -617,10 +617,10 @@ setupChannel identity peer upid = do
]
liftIO $ atomically $ do
getPeerChannel peer >>= \case
- ChannelCookieConfirmed cookie -> do
+ ChannelCookieConfirmed -> do
sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $
TransportPacket (TransportHeader hitems) [storedRef req]
- setPeerChannel peer $ ChannelOurRequest (Just cookie) req
+ setPeerChannel peer $ ChannelOurRequest req
_ -> return ()
handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback
@@ -629,8 +629,8 @@ handleChannelRequest peer identity req = do
(acc, ch) <- flip runReaderT (peerStorage peer) $ acceptChannelRequest identity upid (wrappedLoad req)
liftIO $ atomically $ do
getPeerChannel peer >>= \case
- ChannelPeerRequest mbcookie wr | wrDigest wr == refDigest req -> do
- setPeerChannel peer $ ChannelOurAccept mbcookie acc ch
+ ChannelPeerRequest wr | wrDigest wr == refDigest req -> do
+ setPeerChannel peer $ ChannelOurAccept acc ch
let accref = refDigest $ storedRef acc
header = TrChannelAccept accref
ackedBy = [ Acknowledged accref, Rejected accref ]
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index 27e05ba..a669988 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -174,6 +174,7 @@ data Connection addr = Connection
, cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
, cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject)
, cChannel :: TVar ChannelState
+ , cCookie :: TVar (Maybe Cookie)
, cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
, cMaxInFlightPackets :: TVar Int
, cReservedPackets :: TVar Int
@@ -240,9 +241,9 @@ connAddWriteStream conn@Connection {..} = do
mbReserved = Just reserved
mbch <- atomically (readTVar cChannel) >>= return . \case
- ChannelEstablished ch -> Just ch
- ChannelOurAccept _ _ ch -> Just ch
- _ -> Nothing
+ ChannelEstablished ch -> Just ch
+ ChannelOurAccept _ ch -> Just ch
+ _ -> Nothing
mbs <- case mbch of
Just ch -> do
@@ -373,11 +374,11 @@ wrDigest = refDigest . wrefPartial
data ChannelState = ChannelNone
| ChannelCookieWait -- sent initiation, waiting for response
- | ChannelCookieReceived Cookie -- received cookie, but no cookie echo yet
- | ChannelCookieConfirmed Cookie -- received cookie echo, no need to send from our side
- | ChannelOurRequest (Maybe Cookie) (Stored ChannelRequest)
- | ChannelPeerRequest (Maybe Cookie) WaitingRef
- | ChannelOurAccept (Maybe Cookie) (Stored ChannelAccept) Channel
+ | ChannelCookieReceived -- received cookie, but no cookie echo yet
+ | ChannelCookieConfirmed -- received cookie echo, no need to send from our side
+ | ChannelOurRequest (Stored ChannelRequest)
+ | ChannelPeerRequest WaitingRef
+ | ChannelOurAccept (Stored ChannelAccept) Channel
| ChannelEstablished Channel
data ReservedToSend = ReservedToSend
@@ -456,6 +457,7 @@ newConnection cGlobalState@GlobalState {..} addr = do
let cAddress = addr
(cDataUp, cDataInternal) <- newFlow
cChannel <- newTVar ChannelNone
+ cCookie <- newTVar Nothing
cSecureOutQueue <- newTQueue
cMaxInFlightPackets <- newTVar 4
cReservedPackets <- newTVar 0
@@ -485,9 +487,9 @@ processIncoming gs@GlobalState {..} = do
mbch <- case mbconn of
Nothing -> return Nothing
Just conn -> readTVar (cChannel conn) >>= return . \case
- ChannelEstablished ch -> Just ch
- ChannelOurAccept _ _ ch -> Just ch
- _ -> Nothing
+ ChannelEstablished ch -> Just ch
+ ChannelOurAccept _ ch -> Just ch
+ _ -> Nothing
return $ do
let deserialize = liftEither . runExcept . deserializeObjects gStorage . BL.fromStrict
@@ -582,11 +584,12 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor
True -> do
atomically $ do
conn@Connection {..} <- getConnection gs addr
- channel <- readTVar cChannel
+ oldCookie <- readTVar cCookie
let received = listToMaybe $ mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header
- case received `mplus` channelCurrentCookie channel of
+ case received `mplus` oldCookie of
Just current -> do
- cookieEchoReceived gs conn mbpid current
+ writeTVar cCookie (Just current)
+ cookieEchoReceived gs conn mbpid
return $ Just (conn, Just packet)
Nothing -> do
gLog $ show addr <> ": missing cookie set, dropping " <> show header
@@ -603,7 +606,8 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor
-> do
atomically $ readTVar cChannel >>= \case
ChannelCookieWait -> do
- writeTVar cChannel $ ChannelCookieReceived cookie
+ writeTVar cChannel $ ChannelCookieReceived
+ writeTVar cCookie $ Just cookie
writeFlow gControlFlow (NewConnection conn mbpid)
return $ Just (conn, Nothing)
_ -> return Nothing
@@ -642,17 +646,8 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor
mbpid = listToMaybe $ mapMaybe (\case AnnounceSelf dgst -> Just dgst; _ -> Nothing) header
version = listToMaybe $ filter (\v -> ProtocolVersion v `elem` header) protocolVersions
-channelCurrentCookie :: ChannelState -> Maybe Cookie
-channelCurrentCookie = \case
- ChannelCookieReceived cookie -> Just cookie
- ChannelCookieConfirmed cookie -> Just cookie
- ChannelOurRequest mbcookie _ -> mbcookie
- ChannelPeerRequest mbcookie _ -> mbcookie
- ChannelOurAccept mbcookie _ _ -> mbcookie
- _ -> Nothing
-
-cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> Cookie -> STM ()
-cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do
+cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> STM ()
+cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid = do
readTVar cChannel >>= \case
ChannelNone -> newConn
ChannelCookieWait -> newConn
@@ -660,18 +655,18 @@ cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do
_ -> return ()
where
update = do
- writeTVar cChannel $ ChannelCookieConfirmed cookieSet
+ writeTVar cChannel ChannelCookieConfirmed
newConn = do
update
writeFlow gControlFlow (NewConnection conn mbpid)
-generateCookieHeaders :: GlobalState addr -> addr -> ChannelState -> IO [TransportHeaderItem]
-generateCookieHeaders gs addr ch = catMaybes <$> sequence [ echoHeader, setHeader ]
+generateCookieHeaders :: Connection addr -> ChannelState -> IO [TransportHeaderItem]
+generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader, setHeader ]
where
- echoHeader = return $ CookieEcho <$> channelCurrentCookie ch
+ echoHeader = fmap CookieEcho <$> atomically (readTVar cCookie)
setHeader = case ch of
- ChannelCookieWait {} -> Just . CookieSet <$> createCookie gs addr
- ChannelCookieReceived {} -> Just . CookieSet <$> createCookie gs addr
+ ChannelCookieWait {} -> Just . CookieSet <$> createCookie cGlobalState cAddress
+ ChannelCookieReceived {} -> Just . CookieSet <$> createCookie cGlobalState cAddress
_ -> return Nothing
createCookie :: GlobalState addr -> addr -> IO Cookie
@@ -754,27 +749,32 @@ processOutgoing gs@GlobalState {..} = do
Just _ -> swapTVar cToAcknowledge []
return $ do
- cookieHeaders <- generateCookieHeaders gs cAddress channel
- let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ cookieHeaders ++ hitems
+ let onAck = sequence_ $ map (streamAccepted conn) $
+ catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems)
- let plain = BL.concat $
- (serializeObject $ transportToObject gStorage header)
- : map lazyLoadBytes content
+ let mkPlain extraHeaders =
+ let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems
+ in BL.concat $
+ (serializeObject $ transportToObject gStorage header)
+ : map lazyLoadBytes content
- let onAck = case catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) of
- [] -> return ()
- xs -> sequence_ $ map (streamAccepted conn) xs
+ let usePlaintext = do
+ plain <- mkPlain <$> generateCookieHeaders conn channel
+ return $ Just (BL.toStrict plain, plainAckedBy)
- mbs <- case (secure, mbch) of
- (PlaintextOnly, _) -> return $ Just (BL.toStrict plain, plainAckedBy)
- (PlaintextAllowed, Nothing) -> return $ Just (BL.toStrict plain, plainAckedBy)
- (_, Just ch) -> do
+ let useEncryption ch = do
+ plain <- mkPlain <$> return []
runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case
Right (ctext, counter) -> do
let isAcked = any isHeaderItemAcknowledged hitems
return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else [])
Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err
return Nothing
+
+ mbs <- case (secure, mbch) of
+ (PlaintextOnly, _) -> usePlaintext
+ (PlaintextAllowed, Nothing) -> usePlaintext
+ (_, Just ch) -> useEncryption ch
(EncryptedOnly, Nothing) -> return Nothing
case mbs of
diff --git a/test/network.test b/test/network.test
index 93f9743..ea57a77 100644
--- a/test/network.test
+++ b/test/network.test
@@ -176,3 +176,52 @@ test ManyStreams:
send "test-message-send 1 $ref"
expect /test-message-send done/
expect /test-message-received blob 100[2-4] $ref/ from p2
+
+
+test Reconnection:
+ spawn as p1
+ with p1:
+ send "create-identity Device1"
+ send "start-server"
+
+ node n
+ local:
+ spawn as p2 on n
+ send "create-identity Device2" to p2
+ send "start-server" to p2
+
+ expect from p1:
+ /peer 1 addr ${p2.node.ip} 29665/
+ /peer 1 id Device2/
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ with p1:
+ send "store blob"
+ send "message1"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message
+
+ send "test-message-send 1 $message"
+ expect /test-message-send done/
+ expect /test-message-received blob [0-9]+ $message/ from p2
+
+ local:
+ spawn as p2 on n
+ send "start-server" to p2
+ send "peer-add ${p1.node.ip}" to p2
+
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ with p1:
+ send "store blob"
+ send "message2"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message
+
+ send "test-message-send 1 $message"
+ expect /test-message-send done/
+ expect /test-message-received blob [0-9]+ $message/ from p2