summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Erebos/Network.hs5
-rw-r--r--src/Erebos/Network/Protocol.hs25
2 files changed, 18 insertions, 12 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 7c6a61e..744c476 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -433,7 +433,10 @@ openStream = do
conn <- readTVarP peerConnection >>= \case
Right conn -> return conn
_ -> throwError "can't open stream without established connection"
- (hdr, writer, handler) <- liftSTM $ connAddWriteStream conn
+ (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case
+ Right res -> return res
+ Left err -> throwError err
+
liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
addHeader hdr
return writer
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index 9ac9574..b79b105 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -36,6 +36,7 @@ import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Except
+import Control.Monad.Trans
import Data.Bits
import Data.ByteString (ByteString)
@@ -189,23 +190,27 @@ connSetChannel :: Connection addr -> ChannelState -> STM ()
connSetChannel Connection {..} ch = do
writeTVar cChannel ch
-connAddWriteStream :: Connection addr -> STM (TransportHeaderItem, RawStreamWriter, IO ())
+connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ()))
connAddWriteStream conn@Connection {..} = do
- let GlobalState {..} = cGlobalState
-
outStreams <- readTVar cOutStreams
- let doInsert n (s@(n', _) : rest) | n == n' =
+ let doInsert :: Word8 -> [(Word8, Stream)] -> ExceptT String STM ((Word8, Stream), [(Word8, Stream)])
+ doInsert n (s@(n', _) : rest) | n == n' =
fmap (s:) <$> doInsert (n + 1) rest
- doInsert n streams = do
+ doInsert n streams | n < 63 = lift $ do
sState <- newTVar StreamOpening
(sFlowIn, sFlowOut) <- newFlow
sNextSequence <- newTVar 0
let info = (n, Stream {..})
return (info, info : streams)
- ((streamNumber, stream), outStreams') <- doInsert 1 outStreams
- writeTVar cOutStreams outStreams'
+ doInsert _ _ = throwError "all outbound streams in use"
+
+ runExceptT $ do
+ ((streamNumber, stream), outStreams') <- doInsert 1 outStreams
+ lift $ writeTVar cOutStreams outStreams'
+ return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream)
- let go = do
+ where
+ go gs@GlobalState {..} streamNumber stream = do
(reserved, msg) <- atomically $ do
readTVar (sState stream) >>= \case
StreamRunning -> return ()
@@ -247,9 +252,7 @@ connAddWriteStream conn@Connection {..} = do
sendBytes conn mbReserved' bs
Nothing -> return ()
- when cont go
-
- return (StreamOpen streamNumber, sFlowIn stream, go)
+ when cont $ go gs streamNumber stream
connAddReadStream :: Connection addr -> Word8 -> STM RawStreamReader
connAddReadStream Connection {..} streamNumber = do