diff options
-rw-r--r-- | src/Erebos/Network.hs | 36 | ||||
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 88 | ||||
-rw-r--r-- | test/network.test | 49 |
3 files changed, 111 insertions, 62 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index cc111e2..a01bdd1 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -473,9 +473,9 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = [ [ storedRef sidentity ] , map storedRef $ idUpdates identity , case ochannel of - ChannelOurRequest _ req -> [ storedRef req ] - ChannelOurAccept _ acc _ -> [ storedRef acc ] - _ -> [] + ChannelOurRequest req -> [ storedRef req ] + ChannelOurAccept acc _ -> [ storedRef acc ] + _ -> [] ] runPacketHandler secure peer $ do @@ -483,7 +483,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = forM_ headers $ \case Acknowledged dgst -> do liftSTM (getPeerChannel peer) >>= \case - ChannelOurAccept _ acc ch | refDigest (storedRef acc) == dgst -> do + ChannelOurAccept acc ch | refDigest (storedRef acc) == dgst -> do liftSTM $ finalizedChannel peer ch identity _ -> return () @@ -552,22 +552,22 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = _ -> return () TrChannelRequest dgst -> do - let process cookie = do + let process = do addHeader $ Acknowledged dgst wref <- newWaitingRef dgst $ handleChannelRequest peer identity - liftSTM $ setPeerChannel peer $ ChannelPeerRequest cookie wref + liftSTM $ setPeerChannel peer $ ChannelPeerRequest wref reject = addHeader $ Rejected dgst liftSTM (getPeerChannel peer) >>= \case ChannelNone {} -> return () ChannelCookieWait {} -> return () - ChannelCookieReceived cookie -> process $ Just cookie - ChannelCookieConfirmed cookie -> process $ Just cookie - ChannelOurRequest mbcookie our | dgst < refDigest (storedRef our) -> process mbcookie - | otherwise -> reject - ChannelPeerRequest mbcookie _ -> process mbcookie + ChannelCookieReceived {} -> process + ChannelCookieConfirmed {} -> process + ChannelOurRequest our | dgst < refDigest (storedRef our) -> process + | otherwise -> reject + ChannelPeerRequest {} -> process ChannelOurAccept {} -> reject - ChannelEstablished {} -> process Nothing + ChannelEstablished {} -> process TrChannelAccept dgst -> do let process = do @@ -580,8 +580,8 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = ChannelCookieConfirmed {} -> reject ChannelOurRequest {} -> process ChannelPeerRequest {} -> process - ChannelOurAccept _ our _ | dgst < refDigest (storedRef our) -> process - | otherwise -> addHeader $ Rejected dgst + ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process + | otherwise -> addHeader $ Rejected dgst ChannelEstablished {} -> process ServiceType _ -> return () @@ -617,10 +617,10 @@ setupChannel identity peer upid = do ] liftIO $ atomically $ do getPeerChannel peer >>= \case - ChannelCookieConfirmed cookie -> do + ChannelCookieConfirmed -> do sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ TransportPacket (TransportHeader hitems) [storedRef req] - setPeerChannel peer $ ChannelOurRequest (Just cookie) req + setPeerChannel peer $ ChannelOurRequest req _ -> return () handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback @@ -629,8 +629,8 @@ handleChannelRequest peer identity req = do (acc, ch) <- flip runReaderT (peerStorage peer) $ acceptChannelRequest identity upid (wrappedLoad req) liftIO $ atomically $ do getPeerChannel peer >>= \case - ChannelPeerRequest mbcookie wr | wrDigest wr == refDigest req -> do - setPeerChannel peer $ ChannelOurAccept mbcookie acc ch + ChannelPeerRequest wr | wrDigest wr == refDigest req -> do + setPeerChannel peer $ ChannelOurAccept acc ch let accref = refDigest $ storedRef acc header = TrChannelAccept accref ackedBy = [ Acknowledged accref, Rejected accref ] diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index 27e05ba..a669988 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -174,6 +174,7 @@ data Connection addr = Connection , cDataUp :: Flow (Bool, TransportPacket PartialObject) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) , cDataInternal :: Flow (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject) , cChannel :: TVar ChannelState + , cCookie :: TVar (Maybe Cookie) , cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) , cMaxInFlightPackets :: TVar Int , cReservedPackets :: TVar Int @@ -240,9 +241,9 @@ connAddWriteStream conn@Connection {..} = do mbReserved = Just reserved mbch <- atomically (readTVar cChannel) >>= return . \case - ChannelEstablished ch -> Just ch - ChannelOurAccept _ _ ch -> Just ch - _ -> Nothing + ChannelEstablished ch -> Just ch + ChannelOurAccept _ ch -> Just ch + _ -> Nothing mbs <- case mbch of Just ch -> do @@ -373,11 +374,11 @@ wrDigest = refDigest . wrefPartial data ChannelState = ChannelNone | ChannelCookieWait -- sent initiation, waiting for response - | ChannelCookieReceived Cookie -- received cookie, but no cookie echo yet - | ChannelCookieConfirmed Cookie -- received cookie echo, no need to send from our side - | ChannelOurRequest (Maybe Cookie) (Stored ChannelRequest) - | ChannelPeerRequest (Maybe Cookie) WaitingRef - | ChannelOurAccept (Maybe Cookie) (Stored ChannelAccept) Channel + | ChannelCookieReceived -- received cookie, but no cookie echo yet + | ChannelCookieConfirmed -- received cookie echo, no need to send from our side + | ChannelOurRequest (Stored ChannelRequest) + | ChannelPeerRequest WaitingRef + | ChannelOurAccept (Stored ChannelAccept) Channel | ChannelEstablished Channel data ReservedToSend = ReservedToSend @@ -456,6 +457,7 @@ newConnection cGlobalState@GlobalState {..} addr = do let cAddress = addr (cDataUp, cDataInternal) <- newFlow cChannel <- newTVar ChannelNone + cCookie <- newTVar Nothing cSecureOutQueue <- newTQueue cMaxInFlightPackets <- newTVar 4 cReservedPackets <- newTVar 0 @@ -485,9 +487,9 @@ processIncoming gs@GlobalState {..} = do mbch <- case mbconn of Nothing -> return Nothing Just conn -> readTVar (cChannel conn) >>= return . \case - ChannelEstablished ch -> Just ch - ChannelOurAccept _ _ ch -> Just ch - _ -> Nothing + ChannelEstablished ch -> Just ch + ChannelOurAccept _ ch -> Just ch + _ -> Nothing return $ do let deserialize = liftEither . runExcept . deserializeObjects gStorage . BL.fromStrict @@ -582,11 +584,12 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor True -> do atomically $ do conn@Connection {..} <- getConnection gs addr - channel <- readTVar cChannel + oldCookie <- readTVar cCookie let received = listToMaybe $ mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header - case received `mplus` channelCurrentCookie channel of + case received `mplus` oldCookie of Just current -> do - cookieEchoReceived gs conn mbpid current + writeTVar cCookie (Just current) + cookieEchoReceived gs conn mbpid return $ Just (conn, Just packet) Nothing -> do gLog $ show addr <> ": missing cookie set, dropping " <> show header @@ -603,7 +606,8 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor -> do atomically $ readTVar cChannel >>= \case ChannelCookieWait -> do - writeTVar cChannel $ ChannelCookieReceived cookie + writeTVar cChannel $ ChannelCookieReceived + writeTVar cCookie $ Just cookie writeFlow gControlFlow (NewConnection conn mbpid) return $ Just (conn, Nothing) _ -> return Nothing @@ -642,17 +646,8 @@ processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (Transpor mbpid = listToMaybe $ mapMaybe (\case AnnounceSelf dgst -> Just dgst; _ -> Nothing) header version = listToMaybe $ filter (\v -> ProtocolVersion v `elem` header) protocolVersions -channelCurrentCookie :: ChannelState -> Maybe Cookie -channelCurrentCookie = \case - ChannelCookieReceived cookie -> Just cookie - ChannelCookieConfirmed cookie -> Just cookie - ChannelOurRequest mbcookie _ -> mbcookie - ChannelPeerRequest mbcookie _ -> mbcookie - ChannelOurAccept mbcookie _ _ -> mbcookie - _ -> Nothing - -cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> Cookie -> STM () -cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do +cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> STM () +cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid = do readTVar cChannel >>= \case ChannelNone -> newConn ChannelCookieWait -> newConn @@ -660,18 +655,18 @@ cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do _ -> return () where update = do - writeTVar cChannel $ ChannelCookieConfirmed cookieSet + writeTVar cChannel ChannelCookieConfirmed newConn = do update writeFlow gControlFlow (NewConnection conn mbpid) -generateCookieHeaders :: GlobalState addr -> addr -> ChannelState -> IO [TransportHeaderItem] -generateCookieHeaders gs addr ch = catMaybes <$> sequence [ echoHeader, setHeader ] +generateCookieHeaders :: Connection addr -> ChannelState -> IO [TransportHeaderItem] +generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader, setHeader ] where - echoHeader = return $ CookieEcho <$> channelCurrentCookie ch + echoHeader = fmap CookieEcho <$> atomically (readTVar cCookie) setHeader = case ch of - ChannelCookieWait {} -> Just . CookieSet <$> createCookie gs addr - ChannelCookieReceived {} -> Just . CookieSet <$> createCookie gs addr + ChannelCookieWait {} -> Just . CookieSet <$> createCookie cGlobalState cAddress + ChannelCookieReceived {} -> Just . CookieSet <$> createCookie cGlobalState cAddress _ -> return Nothing createCookie :: GlobalState addr -> addr -> IO Cookie @@ -754,27 +749,32 @@ processOutgoing gs@GlobalState {..} = do Just _ -> swapTVar cToAcknowledge [] return $ do - cookieHeaders <- generateCookieHeaders gs cAddress channel - let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ cookieHeaders ++ hitems + let onAck = sequence_ $ map (streamAccepted conn) $ + catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) - let plain = BL.concat $ - (serializeObject $ transportToObject gStorage header) - : map lazyLoadBytes content + let mkPlain extraHeaders = + let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems + in BL.concat $ + (serializeObject $ transportToObject gStorage header) + : map lazyLoadBytes content - let onAck = case catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) of - [] -> return () - xs -> sequence_ $ map (streamAccepted conn) xs + let usePlaintext = do + plain <- mkPlain <$> generateCookieHeaders conn channel + return $ Just (BL.toStrict plain, plainAckedBy) - mbs <- case (secure, mbch) of - (PlaintextOnly, _) -> return $ Just (BL.toStrict plain, plainAckedBy) - (PlaintextAllowed, Nothing) -> return $ Just (BL.toStrict plain, plainAckedBy) - (_, Just ch) -> do + let useEncryption ch = do + plain <- mkPlain <$> return [] runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case Right (ctext, counter) -> do let isAcked = any isHeaderItemAcknowledged hitems return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err return Nothing + + mbs <- case (secure, mbch) of + (PlaintextOnly, _) -> usePlaintext + (PlaintextAllowed, Nothing) -> usePlaintext + (_, Just ch) -> useEncryption ch (EncryptedOnly, Nothing) -> return Nothing case mbs of diff --git a/test/network.test b/test/network.test index 93f9743..ea57a77 100644 --- a/test/network.test +++ b/test/network.test @@ -176,3 +176,52 @@ test ManyStreams: send "test-message-send 1 $ref" expect /test-message-send done/ expect /test-message-received blob 100[2-4] $ref/ from p2 + + +test Reconnection: + spawn as p1 + with p1: + send "create-identity Device1" + send "start-server" + + node n + local: + spawn as p2 on n + send "create-identity Device2" to p2 + send "start-server" to p2 + + expect from p1: + /peer 1 addr ${p2.node.ip} 29665/ + /peer 1 id Device2/ + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + with p1: + send "store blob" + send "message1" + send "" + expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message + + send "test-message-send 1 $message" + expect /test-message-send done/ + expect /test-message-received blob [0-9]+ $message/ from p2 + + local: + spawn as p2 on n + send "start-server" to p2 + send "peer-add ${p1.node.ip}" to p2 + + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + with p1: + send "store blob" + send "message2" + send "" + expect /store-done (blake2#[0-9a-f]*)/ from p1 capture message + + send "test-message-send 1 $message" + expect /test-message-send done/ + expect /test-message-received blob [0-9]+ $message/ from p2 |