diff options
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
| -rw-r--r-- | src/Erebos/Network/Protocol.hs | 97 |
1 files changed, 67 insertions, 30 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index efafd31..f67e296 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -3,6 +3,7 @@ module Erebos.Network.Protocol ( transportToObject, TransportHeader(..), TransportHeaderItem(..), + ServiceID(..), SecurityRequirement(..), WaitingRef(..), @@ -22,7 +23,8 @@ module Erebos.Network.Protocol ( connSetChannel, connClose, - RawStreamReader, RawStreamWriter, + RawStreamReader(..), RawStreamWriter(..), + StreamPacket(..), connAddWriteStream, connAddReadStream, readStreamToList, @@ -65,11 +67,13 @@ import Data.Void import System.Clock -import Erebos.Channel import Erebos.Flow import Erebos.Identity -import Erebos.Service +import Erebos.Network.Channel +import Erebos.Object +import Erebos.Storable import Erebos.Storage +import Erebos.UUID (UUID) protocolVersion :: Text @@ -106,6 +110,9 @@ data TransportHeaderItem | StreamOpen Word8 deriving (Eq, Show) +newtype ServiceID = ServiceID UUID + deriving (Eq, Ord, Show, StorableUUID) + newtype Cookie = Cookie ByteString deriving (Eq, Show) @@ -206,6 +213,7 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState , gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr) , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject)) , gLog :: String -> STM () + , gTestLog :: String -> STM () , gStorage :: PartialStorage , gStartTime :: TimeSpec , gNowVar :: TVar TimeSpec @@ -242,6 +250,12 @@ instance Eq (Connection addr) where connAddress :: Connection addr -> addr connAddress = cAddress +showConnAddress :: forall addr. Connection addr -> String +showConnAddress Connection {..} = helper cGlobalState cAddress + where + helper :: GlobalState addr -> addr -> String + helper GlobalState {} = show + connData :: Connection addr -> Flow (Maybe (Bool, TransportPacket PartialObject)) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) @@ -266,6 +280,7 @@ connClose conn@Connection {..} = do connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ())) connAddWriteStream conn@Connection {..} = do + let GlobalState {..} = cGlobalState outStreams <- readTVar cOutStreams let doInsert :: Word8 -> [(Word8, Stream)] -> ExceptT String STM ((Word8, Stream), [(Word8, Stream)]) doInsert n (s@(n', _) : rest) | n == n' = @@ -282,10 +297,16 @@ connAddWriteStream conn@Connection {..} = do runExceptT $ do ((streamNumber, stream), outStreams') <- doInsert 1 outStreams lift $ writeTVar cOutStreams outStreams' - return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream) + lift $ gTestLog $ "net-ostream-open " <> showConnAddress conn <> " " <> show streamNumber <> " " <> show (length outStreams') + return + ( StreamOpen streamNumber + , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream) + , go streamNumber stream + ) where - go gs@GlobalState {..} streamNumber stream = do + go streamNumber stream = do + let GlobalState {..} = cGlobalState (reserved, msg) <- atomically $ do readTVar (sState stream) >>= \case StreamRunning -> return () @@ -298,6 +319,8 @@ connAddWriteStream conn@Connection {..} = do return (stpData, True, return ()) StreamClosed {} -> do atomically $ do + gTestLog $ "net-ostream-close-send " <> showConnAddress conn <> " " <> show streamNumber + atomically $ do -- wait for ack on all sent stream data waits <- readTVar (sWaitingForAck stream) when (waits > 0) retry @@ -322,7 +345,7 @@ connAddWriteStream conn@Connection {..} = do Right (ctext, counter) -> do let isAcked = True return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) - Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err return Nothing Nothing | secure -> return Nothing | otherwise -> return $ Just (plain, plainAckedBy) @@ -341,7 +364,7 @@ connAddWriteStream conn@Connection {..} = do sendBytes conn mbReserved' bs Nothing -> return () - when cont $ go gs streamNumber stream + when cont $ go streamNumber stream connAddReadStream :: Connection addr -> Word8 -> STM RawStreamReader connAddReadStream Connection {..} streamNumber = do @@ -355,14 +378,21 @@ connAddReadStream Connection {..} streamNumber = do sNextSequence <- newTVar 0 sWaitingForAck <- newTVar 0 let stream = Stream {..} - return (stream, (streamNumber, stream) : streams) - (stream, inStreams') <- doInsert inStreams + return ( streamNumber, stream, (streamNumber, stream) : streams ) + ( num, stream, inStreams' ) <- doInsert inStreams writeTVar cInStreams inStreams' - return $ sFlowOut stream + return $ RawStreamReader (fromIntegral num) (sFlowOut stream) -type RawStreamReader = Flow StreamPacket Void -type RawStreamWriter = Flow Void StreamPacket +data RawStreamReader = RawStreamReader + { rsrNum :: Int + , rsrFlow :: Flow StreamPacket Void + } + +data RawStreamWriter = RawStreamWriter + { rswNum :: Int + , rswFlow :: Flow Void StreamPacket + } data Stream = Stream { sState :: TVar StreamState @@ -393,24 +423,26 @@ streamAccepted Connection {..} snum = atomically $ do Nothing -> return () streamClosed :: Connection addr -> Word8 -> IO () -streamClosed Connection {..} snum = atomically $ do - modifyTVar' cOutStreams $ filter ((snum /=) . fst) +streamClosed conn@Connection {..} snum = atomically $ do + streams <- filter ((snum /=) . fst) <$> readTVar cOutStreams + writeTVar cOutStreams streams + gTestLog cGlobalState $ "net-ostream-close-ack " <> showConnAddress conn <> " " <> show snum <> " " <> show (length streams) readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) -readStreamToList stream = readFlowIO stream >>= \case +readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream StreamClosed sqEnd -> return (sqEnd, []) -readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except String [PartialObject]) +readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except ErebosError [PartialObject]) readObjectsFromStream st stream = do (seqEnd, list) <- readStreamToList stream let validate s ((s', bytes) : rest) | s == s' = (bytes : ) <$> validate (s + 1) rest | s > s' = validate s rest - | otherwise = throwError "missing object chunk" + | otherwise = throwOtherError "missing object chunk" validate s [] | s == seqEnd = return [] - | otherwise = throwError "content length mismatch" + | otherwise = throwOtherError "content length mismatch" return $ do content <- BL.fromChunks <$> validate 0 list deserializeObjects st content @@ -419,10 +451,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO () writeByteStringToStream stream = go 0 where go seqNum bstr - | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum + | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum | otherwise = do let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU - writeFlowIO stream $ StreamData seqNum (BL.toStrict cur) + writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur) go (seqNum + 1) rest @@ -433,7 +465,7 @@ data WaitingRef = WaitingRef , wrefStatus :: TVar (Either [RefDigest] Ref) } -type WaitingRefCallback = ExceptT String IO () +type WaitingRefCallback = ExceptT ErebosError IO () wrDigest :: WaitingRef -> RefDigest wrDigest = refDigest . wrefPartial @@ -476,10 +508,11 @@ data ControlMessage addr = NewConnection (Connection addr) (Maybe RefDigest) erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr) => UnifiedIdentity -> (String -> STM ()) + -> (String -> STM ()) -> SymFlow (addr, ByteString) -> Flow (ControlRequest addr) (ControlMessage addr) -> IO () -erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do +erebosNetworkProtocol initialIdentity gLog gTestLog gDataFlow gControlFlow = do gIdentity <- newTVarIO (initialIdentity, []) gConnections <- newTVarIO [] gNextUp <- newEmptyTMVarIO @@ -543,6 +576,7 @@ newConnection cGlobalState@GlobalState {..} addr = do cOutStreams <- newTVar [] let conn = Connection {..} + gTestLog $ "net-conn-new " <> show cAddress writeTVar gConnections (conn : conns) return conn @@ -572,7 +606,7 @@ processIncoming gs@GlobalState {..} = do let parse = case B.uncons msg of Just (b, enc) | b .&. 0xE0 == 0x80 -> do - ch <- maybe (throwError "unexpected encrypted packet") return mbch + ch <- maybe (throwOtherError "unexpected encrypted packet") return mbch (dec, counter) <- channelDecrypt ch enc case B.uncons dec of @@ -587,18 +621,18 @@ processIncoming gs@GlobalState {..} = do return $ Right (snum, seq8, content, counter) Just (_, _) -> do - throwError "unexpected stream header" + throwOtherError "unexpected stream header" Nothing -> do - throwError "empty decrypted content" + throwOtherError "empty decrypted content" | b .&. 0xE0 == 0x60 -> do objs <- deserialize msg return $ Left (False, objs, Nothing) - | otherwise -> throwError "invalid packet" + | otherwise -> throwOtherError "invalid packet" - Nothing -> throwError "empty packet" + Nothing -> throwOtherError "empty packet" now <- getTime Monotonic runExceptT parse >>= \case @@ -649,7 +683,7 @@ processIncoming gs@GlobalState {..} = do atomically $ gLog $ show addr <> ": stream packet without connection" Left err -> do - atomically $ gLog $ show addr <> ": failed to parse packet: " <> err + atomically $ gLog $ show addr <> ": failed to parse packet: " <> showErebosError err processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a))) processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if @@ -883,7 +917,7 @@ processOutgoing gs@GlobalState {..} = do Right (ctext, counter) -> do let isAcked = any isHeaderItemAcknowledged hitems return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) - Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err return Nothing mbs <- case (secure, mbch) of @@ -899,7 +933,10 @@ processOutgoing gs@GlobalState {..} = do , rsOnAck = rsOnAck rs >> onAck }) <$> mbReserved sendBytes conn mbReserved' bs - Nothing -> return () + Nothing -> do + when (isJust mbReserved) $ do + atomically $ do + modifyTVar' cReservedPackets (subtract 1) let waitUntil :: TimeSpec -> TimeSpec -> STM () waitUntil now till = do |