summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Erebos/Flow.hs37
1 files changed, 18 insertions, 19 deletions
diff --git a/src/Erebos/Flow.hs b/src/Erebos/Flow.hs
index ba2607a..1e1a521 100644
--- a/src/Erebos/Flow.hs
+++ b/src/Erebos/Flow.hs
@@ -11,54 +11,53 @@ module Erebos.Flow (
import Control.Concurrent.STM
-data Flow r w = Flow (TMVar [r]) (TMVar [w])
- | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
+data Flow r w
+ = Flow (TBQueue r) (TBQueue w)
+ | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
type SymFlow a = Flow a a
newFlow :: STM (Flow a b, Flow b a)
newFlow = do
- x <- newEmptyTMVar
- y <- newEmptyTMVar
+ x <- newTBQueue 16
+ y <- newTBQueue 16
return (Flow x y, Flow y x)
newFlowIO :: IO (Flow a b, Flow b a)
newFlowIO = atomically newFlow
readFlow :: Flow r w -> STM r
-readFlow (Flow rvar _) = takeTMVar rvar >>= \case
- (x:[]) -> return x
- (x:xs) -> putTMVar rvar xs >> return x
- [] -> error "Flow: empty list"
+readFlow (Flow rvar _) = readTBQueue rvar
readFlow (MappedFlow f _ up) = f <$> readFlow up
tryReadFlow :: Flow r w -> STM (Maybe r)
-tryReadFlow (Flow rvar _) = tryTakeTMVar rvar >>= \case
- Just (x:[]) -> return (Just x)
- Just (x:xs) -> putTMVar rvar xs >> return (Just x)
- Just [] -> error "Flow: empty list"
- Nothing -> return Nothing
+tryReadFlow (Flow rvar _) = tryReadTBQueue rvar
tryReadFlow (MappedFlow f _ up) = fmap f <$> tryReadFlow up
canReadFlow :: Flow r w -> STM Bool
-canReadFlow (Flow rvar _) = not <$> isEmptyTMVar rvar
+canReadFlow (Flow rvar _) = not <$> isEmptyTBQueue rvar
canReadFlow (MappedFlow _ _ up) = canReadFlow up
writeFlow :: Flow r w -> w -> STM ()
-writeFlow (Flow _ wvar) = putTMVar wvar . (:[])
+writeFlow (Flow _ wvar) = writeTBQueue wvar
writeFlow (MappedFlow _ f up) = writeFlow up . f
writeFlowBulk :: Flow r w -> [w] -> STM ()
writeFlowBulk _ [] = return ()
-writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs
+writeFlowBulk (Flow _ wvar) xs = mapM_ (writeTBQueue wvar) xs
writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs
tryWriteFlow :: Flow r w -> w -> STM Bool
-tryWriteFlow (Flow _ wvar) = tryPutTMVar wvar . (:[])
-tryWriteFlow (MappedFlow _ f up) = tryWriteFlow up . f
+tryWriteFlow (Flow _ wvar) x = do
+ isFullTBQueue wvar >>= \case
+ True -> return False
+ False -> do
+ writeTBQueue wvar x
+ return True
+tryWriteFlow (MappedFlow _ f up) x = tryWriteFlow up $ f x
canWriteFlow :: Flow r w -> STM Bool
-canWriteFlow (Flow _ wvar) = isEmptyTMVar wvar
+canWriteFlow (Flow _ wvar) = not <$> isFullTBQueue wvar
canWriteFlow (MappedFlow _ _ up) = canWriteFlow up
readFlowIO :: Flow r w -> IO r