summaryrefslogtreecommitdiff
path: root/src/Erebos/Network/Protocol.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
-rw-r--r--src/Erebos/Network/Protocol.hs66
1 files changed, 42 insertions, 24 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index efafd31..025f52c 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -3,6 +3,7 @@ module Erebos.Network.Protocol (
transportToObject,
TransportHeader(..),
TransportHeaderItem(..),
+ ServiceID(..),
SecurityRequirement(..),
WaitingRef(..),
@@ -22,7 +23,8 @@ module Erebos.Network.Protocol (
connSetChannel,
connClose,
- RawStreamReader, RawStreamWriter,
+ RawStreamReader(..), RawStreamWriter(..),
+ StreamPacket(..),
connAddWriteStream,
connAddReadStream,
readStreamToList,
@@ -65,11 +67,13 @@ import Data.Void
import System.Clock
-import Erebos.Channel
import Erebos.Flow
import Erebos.Identity
-import Erebos.Service
+import Erebos.Network.Channel
+import Erebos.Object
+import Erebos.Storable
import Erebos.Storage
+import Erebos.UUID (UUID)
protocolVersion :: Text
@@ -106,6 +110,9 @@ data TransportHeaderItem
| StreamOpen Word8
deriving (Eq, Show)
+newtype ServiceID = ServiceID UUID
+ deriving (Eq, Ord, Show, StorableUUID)
+
newtype Cookie = Cookie ByteString
deriving (Eq, Show)
@@ -282,7 +289,11 @@ connAddWriteStream conn@Connection {..} = do
runExceptT $ do
((streamNumber, stream), outStreams') <- doInsert 1 outStreams
lift $ writeTVar cOutStreams outStreams'
- return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream)
+ return
+ ( StreamOpen streamNumber
+ , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream)
+ , go cGlobalState streamNumber stream
+ )
where
go gs@GlobalState {..} streamNumber stream = do
@@ -322,7 +333,7 @@ connAddWriteStream conn@Connection {..} = do
Right (ctext, counter) -> do
let isAcked = True
return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else [])
- Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err
+ Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err
return Nothing
Nothing | secure -> return Nothing
| otherwise -> return $ Just (plain, plainAckedBy)
@@ -355,14 +366,21 @@ connAddReadStream Connection {..} streamNumber = do
sNextSequence <- newTVar 0
sWaitingForAck <- newTVar 0
let stream = Stream {..}
- return (stream, (streamNumber, stream) : streams)
- (stream, inStreams') <- doInsert inStreams
+ return ( streamNumber, stream, (streamNumber, stream) : streams )
+ ( num, stream, inStreams' ) <- doInsert inStreams
writeTVar cInStreams inStreams'
- return $ sFlowOut stream
+ return $ RawStreamReader (fromIntegral num) (sFlowOut stream)
+
+data RawStreamReader = RawStreamReader
+ { rsrNum :: Int
+ , rsrFlow :: Flow StreamPacket Void
+ }
-type RawStreamReader = Flow StreamPacket Void
-type RawStreamWriter = Flow Void StreamPacket
+data RawStreamWriter = RawStreamWriter
+ { rswNum :: Int
+ , rswFlow :: Flow Void StreamPacket
+ }
data Stream = Stream
{ sState :: TVar StreamState
@@ -397,20 +415,20 @@ streamClosed Connection {..} snum = atomically $ do
modifyTVar' cOutStreams $ filter ((snum /=) . fst)
readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)])
-readStreamToList stream = readFlowIO stream >>= \case
+readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case
StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream
StreamClosed sqEnd -> return (sqEnd, [])
-readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except String [PartialObject])
+readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except ErebosError [PartialObject])
readObjectsFromStream st stream = do
(seqEnd, list) <- readStreamToList stream
let validate s ((s', bytes) : rest)
| s == s' = (bytes : ) <$> validate (s + 1) rest
| s > s' = validate s rest
- | otherwise = throwError "missing object chunk"
+ | otherwise = throwOtherError "missing object chunk"
validate s []
| s == seqEnd = return []
- | otherwise = throwError "content length mismatch"
+ | otherwise = throwOtherError "content length mismatch"
return $ do
content <- BL.fromChunks <$> validate 0 list
deserializeObjects st content
@@ -419,10 +437,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO ()
writeByteStringToStream stream = go 0
where
go seqNum bstr
- | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum
+ | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum
| otherwise = do
let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU
- writeFlowIO stream $ StreamData seqNum (BL.toStrict cur)
+ writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur)
go (seqNum + 1) rest
@@ -433,7 +451,7 @@ data WaitingRef = WaitingRef
, wrefStatus :: TVar (Either [RefDigest] Ref)
}
-type WaitingRefCallback = ExceptT String IO ()
+type WaitingRefCallback = ExceptT ErebosError IO ()
wrDigest :: WaitingRef -> RefDigest
wrDigest = refDigest . wrefPartial
@@ -572,7 +590,7 @@ processIncoming gs@GlobalState {..} = do
let parse = case B.uncons msg of
Just (b, enc)
| b .&. 0xE0 == 0x80 -> do
- ch <- maybe (throwError "unexpected encrypted packet") return mbch
+ ch <- maybe (throwOtherError "unexpected encrypted packet") return mbch
(dec, counter) <- channelDecrypt ch enc
case B.uncons dec of
@@ -587,18 +605,18 @@ processIncoming gs@GlobalState {..} = do
return $ Right (snum, seq8, content, counter)
Just (_, _) -> do
- throwError "unexpected stream header"
+ throwOtherError "unexpected stream header"
Nothing -> do
- throwError "empty decrypted content"
+ throwOtherError "empty decrypted content"
| b .&. 0xE0 == 0x60 -> do
objs <- deserialize msg
return $ Left (False, objs, Nothing)
- | otherwise -> throwError "invalid packet"
+ | otherwise -> throwOtherError "invalid packet"
- Nothing -> throwError "empty packet"
+ Nothing -> throwOtherError "empty packet"
now <- getTime Monotonic
runExceptT parse >>= \case
@@ -649,7 +667,7 @@ processIncoming gs@GlobalState {..} = do
atomically $ gLog $ show addr <> ": stream packet without connection"
Left err -> do
- atomically $ gLog $ show addr <> ": failed to parse packet: " <> err
+ atomically $ gLog $ show addr <> ": failed to parse packet: " <> showErebosError err
processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a)))
processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if
@@ -883,7 +901,7 @@ processOutgoing gs@GlobalState {..} = do
Right (ctext, counter) -> do
let isAcked = any isHeaderItemAcknowledged hitems
return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else [])
- Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err
+ Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err
return Nothing
mbs <- case (secure, mbch) of