summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Erebos/Network.hs36
-rw-r--r--src/Erebos/Network/Protocol.hs88
-rw-r--r--test/network.test49
3 files changed, 111 insertions, 62 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index cc111e2..a01bdd1 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -473,9 +473,9 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
[ [ storedRef sidentity ]
, map storedRef $ idUpdates identity
, case ochannel of
- ChannelOurRequest _ req -> [ storedRef req ]
- ChannelOurAccept _ acc _ -> [ storedRef acc ]
- _ -> []
+ ChannelOurRequest req -> [ storedRef req ]
+ ChannelOurAccept acc _ -> [ storedRef acc ]
+ _ -> []
]
runPacketHandler secure peer $ do
@@ -483,7 +483,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
forM_ headers $ \case
Acknowledged dgst -> do
liftSTM (getPeerChannel peer) >>= \case
- ChannelOurAccept _ acc ch | refDigest (storedRef acc) == dgst -> do
+ ChannelOurAccept acc ch | refDigest (storedRef acc) == dgst -> do
liftSTM $ finalizedChannel peer ch identity
_ -> return ()
@@ -552,22 +552,22 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
_ -> return ()
TrChannelRequest dgst -> do
- let process cookie = do
+ let process = do
addHeader $ Acknowledged dgst
wref <- newWaitingRef dgst $ handleChannelRequest peer identity
- liftSTM $ setPeerChannel peer $ ChannelPeerRequest cookie wref
+ liftSTM $ setPeerChannel peer $ ChannelPeerRequest wref
reject = addHeader $ Rejected dgst
liftSTM (getPeerChannel peer) >>= \case
ChannelNone {} -> return ()
ChannelCookieWait {} -> return ()
- ChannelCookieReceived cookie -> process $ Just cookie
- ChannelCookieConfirmed cookie -> process $ Just cookie
- ChannelOurRequest mbcookie our | dgst < refDigest (storedRef our) -> process mbcookie
- | otherwise -> reject
- ChannelPeerRequest mbcookie _ -> process mbcookie
+ ChannelCookieReceived {} -> process
+ ChannelCookieConfirmed {} -> process
+ ChannelOurRequest our | dgst < refDigest (storedRef our) -> process
+ | otherwise -> reject
+ ChannelPeerRequest {} -> process
ChannelOurAccept {} -> reject
- ChannelEstablished {} -> process Nothing
+ ChannelEstablished {} -> process
TrChannelAccept dgst -> do
let process = do
@@ -580,8 +580,8 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
ChannelCookieConfirmed {} -> reject
ChannelOurRequest {} -> process
ChannelPeerRequest {} -> process
- ChannelOurAccept _ our _ | dgst < refDigest (storedRef our) -> process
- | otherwise -> addHeader $ Rejected dgst
+ ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process
+ | otherwise -> addHeader $ Rejected dgst
ChannelEstablished {} -> process
ServiceType _ -> return ()
@@ -617,10 +617,10 @@ setupChannel identity peer upid = do
]
liftIO $ atomically $ do
getPeerChannel peer >>= \case
- ChannelCookieConfirmed cookie -> do
+ ChannelCookieConfirmed -> do
sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $
TransportPacket (TransportHeader hitems) [storedRef req]
- setPeerChannel peer $ ChannelOurRequest (Just cookie) req
+ setPeerChannel peer $ ChannelOurRequest req
_ -> return ()
handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback
@@ -629,8 +629,8 @@ handleChannelRequest peer identity req = do
(acc, ch) <- flip runReaderT (peerStorage peer) $ acceptChannelRequest identity upid (wrappedLoad req)
liftIO $ atomically $ do
getPeerChannel peer >>= \case
- ChannelPeerRequest mbcookie wr | wrDigest wr == refDigest req -> do
- setPeerChannel peer $ ChannelOurAccept mbcookie acc ch
+ ChannelPeerRequest wr | wrDigest wr == refDigest req -> do
+ setPeerChannel peer $ ChannelOurAccept acc ch
let accref = refDigest $ storedRef acc
header = TrChannelAccept accref
ackedBy = [ Acknowledged accref, Rejected accref ]
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index 27e05ba..a669988 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -174,6 +174,7 @@ data Connection addr = Connection
, cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
, cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject)
, cChannel :: TVar ChannelState
+ , cCookie :: TVar (Maybe Cookie)
, cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
, cMaxInFlightPackets :: TVar Int
, cReservedPackets :: TVar Int
@@ -240,9 +241,9 @@ connAddWriteStream conn@Connection {..} = do
mbReserved = Just reserved
mbch <- atomically (readTVar cChannel) >>= return . \case
- ChannelEstablished ch -> Just ch
- ChannelOurAccept _ _ ch -> Just ch
- _ -> Nothing
+ ChannelEstablished ch -> Just ch
+ ChannelOurAccept _ ch -> Just ch
+ _ -> Nothing
mbs <- case mbch of
Just ch -> do
@@ -373,11 +374,11 @@ wrDigest = refDigest . wrefPartial
data ChannelState = ChannelNone
| ChannelCookieWait -- sent initiation, waiting for response
- | ChannelCookieReceived Cookie -- received cookie, but no cookie echo yet
- | ChannelCookieConfirmed Cookie -- received cookie echo, no need to send from our side
- | ChannelOurRequest (Maybe Cookie) (Stored ChannelRequest)
- | ChannelPeerRequest (Maybe Cookie) WaitingRef
- | ChannelOurAccept (Maybe Cookie) (Stored ChannelAccept) Channel
+ | ChannelCookieReceived -- received cookie, but no cookie echo yet
+ | ChannelCookieConfirmed -- received cookie echo, no need to send from our side
+ | ChannelOurRequest (Stored ChannelRequest)
+ | ChannelPeerRequest WaitingRef
+ | ChannelOurAccept (Stored ChannelAccept) Channel
| ChannelEstablished Channel
data ReservedToSend = ReservedToSend
@@ -456,6 +457,7 @@ newConnection cGlobalState@GlobalState {..} addr = do
let cAddress = addr
(cDataUp, cDataInternal) <- newFlow
cChannel <- newTVar ChannelNone
+ cCookie <- newTVar Nothing
cSecureOutQueue <- newTQueue
cMaxInFlightPackets <- newTVar 4
cReservedPackets <- newTVar 0
@@ -485,9 +487,9 @@ processIncoming gs@GlobalState {..} = do
mbch <- case mbconn of
Nothing -> return Nothing
Just conn -> readTVar (cChannel conn) >>= return . \case
- ChannelEstablished ch -> Just ch
- ChannelOurAccept _ _ ch -> Just ch
- _ -> Nothing
+ ChannelEstablished ch -> Just ch
+ ChannelOurAccept _ ch -> Just ch
+ _ -> Nothing
return $ do
let deserialize = liftEither . runExcept . deserializeObjects gStorage . BL.fromStrict
@@ -582,11 +584,12 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor
True -> do
atomically $ do
conn@Connection {..} <- getConnection gs addr
- channel <- readTVar cChannel
+ oldCookie <- readTVar cCookie
let received = listToMaybe $ mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header
- case received `mplus` channelCurrentCookie channel of
+ case received `mplus` oldCookie of
Just current -> do
- cookieEchoReceived gs conn mbpid current
+ writeTVar cCookie (Just current)
+ cookieEchoReceived gs conn mbpid
return $ Just (conn, Just packet)
Nothing -> do
gLog $ show addr <> ": missing cookie set, dropping " <> show header
@@ -603,7 +606,8 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor
-> do
atomically $ readTVar cChannel >>= \case
ChannelCookieWait -> do
- writeTVar cChannel $ ChannelCookieReceived cookie
+ writeTVar cChannel $ ChannelCookieReceived
+ writeTVar cCookie $ Just cookie
writeFlow gControlFlow (NewConnection conn mbpid)
return $ Just (conn, Nothing)
_ -> return Nothing
@@ -642,17 +646,8 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor
mbpid = listToMaybe $ mapMaybe (\case AnnounceSelf dgst -> Just dgst; _ -> Nothing) header
version = listToMaybe $ filter (\v -> ProtocolVersion v `elem` header) protocolVersions
-channelCurrentCookie :: ChannelState -> Maybe Cookie
-channelCurrentCookie = \case
- ChannelCookieReceived cookie -> Just cookie
- ChannelCookieConfirmed cookie -> Just cookie
- ChannelOurRequest mbcookie _ -> mbcookie
- ChannelPeerRequest mbcookie _ -> mbcookie
- ChannelOurAccept mbcookie _ _ -> mbcookie
- _ -> Nothing
-
-cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> Cookie -> STM ()
-cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do
+cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> STM ()
+cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid = do
readTVar cChannel >>= \case
ChannelNone -> newConn
ChannelCookieWait -> newConn
@@ -660,18 +655,18 @@ cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do
_ -> return ()
where
update = do
- writeTVar cChannel $ ChannelCookieConfirmed cookieSet
+ writeTVar cChannel ChannelCookieConfirmed
newConn = do
update
writeFlow gControlFlow (NewConnection conn mbpid)
-generateCookieHeaders :: GlobalState addr -> addr -> ChannelState -> IO [TransportHeaderItem]
-generateCookieHeaders gs addr ch = catMaybes <$> sequence [ echoHeader, setHeader ]
+generateCookieHeaders :: Connection addr -> ChannelState -> IO [TransportHeaderItem]
+generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader, setHeader ]
where
- echoHeader = return $ CookieEcho <$> channelCurrentCookie ch
+ echoHeader = fmap CookieEcho <$> atomically (readTVar cCookie)
setHeader = case ch of
- ChannelCookieWait {} -> Just . CookieSet <$> createCookie gs addr
- ChannelCookieReceived {} -> Just . CookieSet <$> createCookie gs addr
+ ChannelCookieWait {} -> Just . CookieSet <$> createCookie cGlobalState cAddress
+ ChannelCookieReceived {} -> Just . CookieSet <$> createCookie cGlobalState cAddress
_ -> return Nothing
createCookie :: GlobalState addr -> addr -> IO Cookie
@@ -754,27 +749,32 @@ processOutgoing gs@GlobalState {..} = do
Just _ -> swapTVar cToAcknowledge []
return $ do
- cookieHeaders <- generateCookieHeaders gs cAddress channel
- let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ cookieHeaders ++ hitems
+ let onAck = sequence_ $ map (streamAccepted conn) $
+ catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems)
- let plain = BL.concat $
- (serializeObject $ transportToObject gStorage header)
- : map lazyLoadBytes content
+ let mkPlain extraHeaders =
+ let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems
+ in BL.concat $
+ (serializeObject $ transportToObject gStorage header)
+ : map lazyLoadBytes content
- let onAck = case catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) of
- [] -> return ()
- xs -> sequence_ $ map (streamAccepted conn) xs
+ let usePlaintext = do
+ plain <- mkPlain <$> generateCookieHeaders conn channel
+ return $ Just (BL.toStrict plain, plainAckedBy)
- mbs <- case (secure, mbch) of
- (PlaintextOnly, _) -> return $ Just (BL.toStrict plain, plainAckedBy)
- (PlaintextAllowed, Nothing) -> return $ Just (BL.toStrict plain, plainAckedBy)
- (_, Just ch) -> do
+ let useEncryption ch = do
+ plain <- mkPlain <$> return []
runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case
Right (ctext, counter) -> do
let isAcked = any isHeaderItemAcknowledged hitems
return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else [])
Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err
return Nothing
+
+ mbs <- case (secure, mbch) of
+ (PlaintextOnly, _) -> usePlaintext
+ (PlaintextAllowed, Nothing) -> usePlaintext
+ (_, Just ch) -> useEncryption ch
(EncryptedOnly, Nothing) -> return Nothing
case mbs of
diff --git a/test/network.test b/test/network.test
index 93f9743..ea57a77 100644
--- a/test/network.test
+++ b/test/network.test
@@ -176,3 +176,52 @@ test ManyStreams:
send "test-message-send 1 $ref"
expect /test-message-send done/
expect /test-message-received blob 100[2-4] $ref/ from p2
+
+
+test Reconnection:
+ spawn as p1
+ with p1:
+ send "create-identity Device1"
+ send "start-server"
+
+ node n
+ local:
+ spawn as p2 on n
+ send "create-identity Device2" to p2
+ send "start-server" to p2
+
+ expect from p1:
+ /peer 1 addr ${p2.node.ip} 29665/
+ /peer 1 id Device2/
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ with p1:
+ send "store blob"
+ send "message1"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message
+
+ send "test-message-send 1 $message"
+ expect /test-message-send done/
+ expect /test-message-received blob [0-9]+ $message/ from p2
+
+ local:
+ spawn as p2 on n
+ send "start-server" to p2
+ send "peer-add ${p1.node.ip}" to p2
+
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ with p1:
+ send "store blob"
+ send "message2"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message
+
+ send "test-message-send 1 $message"
+ expect /test-message-send done/
+ expect /test-message-received blob [0-9]+ $message/ from p2