diff options
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 66 |
1 files changed, 42 insertions, 24 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index efafd31..025f52c 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -3,6 +3,7 @@ module Erebos.Network.Protocol ( transportToObject, TransportHeader(..), TransportHeaderItem(..), + ServiceID(..), SecurityRequirement(..), WaitingRef(..), @@ -22,7 +23,8 @@ module Erebos.Network.Protocol ( connSetChannel, connClose, - RawStreamReader, RawStreamWriter, + RawStreamReader(..), RawStreamWriter(..), + StreamPacket(..), connAddWriteStream, connAddReadStream, readStreamToList, @@ -65,11 +67,13 @@ import Data.Void import System.Clock -import Erebos.Channel import Erebos.Flow import Erebos.Identity -import Erebos.Service +import Erebos.Network.Channel +import Erebos.Object +import Erebos.Storable import Erebos.Storage +import Erebos.UUID (UUID) protocolVersion :: Text @@ -106,6 +110,9 @@ data TransportHeaderItem | StreamOpen Word8 deriving (Eq, Show) +newtype ServiceID = ServiceID UUID + deriving (Eq, Ord, Show, StorableUUID) + newtype Cookie = Cookie ByteString deriving (Eq, Show) @@ -282,7 +289,11 @@ connAddWriteStream conn@Connection {..} = do runExceptT $ do ((streamNumber, stream), outStreams') <- doInsert 1 outStreams lift $ writeTVar cOutStreams outStreams' - return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream) + return + ( StreamOpen streamNumber + , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream) + , go cGlobalState streamNumber stream + ) where go gs@GlobalState {..} streamNumber stream = do @@ -322,7 +333,7 @@ connAddWriteStream conn@Connection {..} = do Right (ctext, counter) -> do let isAcked = True return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) - Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err return Nothing Nothing | secure -> return Nothing | otherwise -> return $ Just (plain, plainAckedBy) @@ -355,14 +366,21 @@ connAddReadStream Connection {..} streamNumber = do sNextSequence <- newTVar 0 sWaitingForAck <- newTVar 0 let stream = Stream {..} - return (stream, (streamNumber, stream) : streams) - (stream, inStreams') <- doInsert inStreams + return ( streamNumber, stream, (streamNumber, stream) : streams ) + ( num, stream, inStreams' ) <- doInsert inStreams writeTVar cInStreams inStreams' - return $ sFlowOut stream + return $ RawStreamReader (fromIntegral num) (sFlowOut stream) + +data RawStreamReader = RawStreamReader + { rsrNum :: Int + , rsrFlow :: Flow StreamPacket Void + } -type RawStreamReader = Flow StreamPacket Void -type RawStreamWriter = Flow Void StreamPacket +data RawStreamWriter = RawStreamWriter + { rswNum :: Int + , rswFlow :: Flow Void StreamPacket + } data Stream = Stream { sState :: TVar StreamState @@ -397,20 +415,20 @@ streamClosed Connection {..} snum = atomically $ do modifyTVar' cOutStreams $ filter ((snum /=) . fst) readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) -readStreamToList stream = readFlowIO stream >>= \case +readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream StreamClosed sqEnd -> return (sqEnd, []) -readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except String [PartialObject]) +readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except ErebosError [PartialObject]) readObjectsFromStream st stream = do (seqEnd, list) <- readStreamToList stream let validate s ((s', bytes) : rest) | s == s' = (bytes : ) <$> validate (s + 1) rest | s > s' = validate s rest - | otherwise = throwError "missing object chunk" + | otherwise = throwOtherError "missing object chunk" validate s [] | s == seqEnd = return [] - | otherwise = throwError "content length mismatch" + | otherwise = throwOtherError "content length mismatch" return $ do content <- BL.fromChunks <$> validate 0 list deserializeObjects st content @@ -419,10 +437,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO () writeByteStringToStream stream = go 0 where go seqNum bstr - | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum + | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum | otherwise = do let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU - writeFlowIO stream $ StreamData seqNum (BL.toStrict cur) + writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur) go (seqNum + 1) rest @@ -433,7 +451,7 @@ data WaitingRef = WaitingRef , wrefStatus :: TVar (Either [RefDigest] Ref) } -type WaitingRefCallback = ExceptT String IO () +type WaitingRefCallback = ExceptT ErebosError IO () wrDigest :: WaitingRef -> RefDigest wrDigest = refDigest . wrefPartial @@ -572,7 +590,7 @@ processIncoming gs@GlobalState {..} = do let parse = case B.uncons msg of Just (b, enc) | b .&. 0xE0 == 0x80 -> do - ch <- maybe (throwError "unexpected encrypted packet") return mbch + ch <- maybe (throwOtherError "unexpected encrypted packet") return mbch (dec, counter) <- channelDecrypt ch enc case B.uncons dec of @@ -587,18 +605,18 @@ processIncoming gs@GlobalState {..} = do return $ Right (snum, seq8, content, counter) Just (_, _) -> do - throwError "unexpected stream header" + throwOtherError "unexpected stream header" Nothing -> do - throwError "empty decrypted content" + throwOtherError "empty decrypted content" | b .&. 0xE0 == 0x60 -> do objs <- deserialize msg return $ Left (False, objs, Nothing) - | otherwise -> throwError "invalid packet" + | otherwise -> throwOtherError "invalid packet" - Nothing -> throwError "empty packet" + Nothing -> throwOtherError "empty packet" now <- getTime Monotonic runExceptT parse >>= \case @@ -649,7 +667,7 @@ processIncoming gs@GlobalState {..} = do atomically $ gLog $ show addr <> ": stream packet without connection" Left err -> do - atomically $ gLog $ show addr <> ": failed to parse packet: " <> err + atomically $ gLog $ show addr <> ": failed to parse packet: " <> showErebosError err processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a))) processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if @@ -883,7 +901,7 @@ processOutgoing gs@GlobalState {..} = do Right (ctext, counter) -> do let isAcked = any isHeaderItemAcknowledged hitems return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) - Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err return Nothing mbs <- case (secure, mbch) of |