diff options
Diffstat (limited to 'src/Erebos')
| -rw-r--r-- | src/Erebos/Network.hs | 5 | ||||
| -rw-r--r-- | src/Erebos/Network/Protocol.hs | 25 | 
2 files changed, 18 insertions, 12 deletions
| diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 7c6a61e..744c476 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -433,7 +433,10 @@ openStream = do      conn <- readTVarP peerConnection >>= \case          Right conn -> return conn          _          -> throwError "can't open stream without established connection" -    (hdr, writer, handler) <- liftSTM $ connAddWriteStream conn +    (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case +        Right res -> return res +        Left err -> throwError err +      liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)      addHeader hdr      return writer diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index 9ac9574..b79b105 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -36,6 +36,7 @@ import Control.Concurrent.Async  import Control.Concurrent.STM  import Control.Monad  import Control.Monad.Except +import Control.Monad.Trans  import Data.Bits  import Data.ByteString (ByteString) @@ -189,23 +190,27 @@ connSetChannel :: Connection addr -> ChannelState -> STM ()  connSetChannel Connection {..} ch = do      writeTVar cChannel ch -connAddWriteStream :: Connection addr -> STM (TransportHeaderItem, RawStreamWriter, IO ()) +connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ()))  connAddWriteStream conn@Connection {..} = do -    let GlobalState {..} = cGlobalState -      outStreams <- readTVar cOutStreams -    let doInsert n (s@(n', _) : rest) | n == n' = +    let doInsert :: Word8 -> [(Word8, Stream)] -> ExceptT String STM ((Word8, Stream), [(Word8, Stream)]) +        doInsert n (s@(n', _) : rest) | n == n' =              fmap (s:) <$> doInsert (n + 1) rest -        doInsert n streams = do +        doInsert n streams | n < 63 = lift $ do              sState <- newTVar StreamOpening              (sFlowIn, sFlowOut) <- newFlow              sNextSequence <- newTVar 0              let info = (n, Stream {..})              return (info, info : streams) -    ((streamNumber, stream), outStreams') <- doInsert 1 outStreams -    writeTVar cOutStreams outStreams' +        doInsert _ _ = throwError "all outbound streams in use" + +    runExceptT $ do +        ((streamNumber, stream), outStreams') <- doInsert 1 outStreams +        lift $ writeTVar cOutStreams outStreams' +        return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream) -    let go = do +  where +    go gs@GlobalState {..} streamNumber stream = do              (reserved, msg) <- atomically $ do                  readTVar (sState stream) >>= \case                      StreamRunning -> return () @@ -247,9 +252,7 @@ connAddWriteStream conn@Connection {..} = do                      sendBytes conn mbReserved' bs                  Nothing -> return () -            when cont go - -    return (StreamOpen streamNumber, sFlowIn stream, go) +            when cont $ go gs streamNumber stream  connAddReadStream :: Connection addr -> Word8 -> STM RawStreamReader  connAddReadStream Connection {..} streamNumber = do |