blob: 349178ff7df1e18c34bbf699d047445b40ff5ae0 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
module Flow (
Flow, SymFlow,
newFlow, newFlowIO,
readFlow, writeFlow, writeFlowBulk,
readFlowIO, writeFlowIO,
mapPath,
) where
import Control.Concurrent.STM
data Flow r w = Flow (TMVar [r]) (TMVar [w])
| forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
type SymFlow a = Flow a a
newFlow :: STM (Flow a b, Flow b a)
newFlow = do
x <- newEmptyTMVar
y <- newEmptyTMVar
return (Flow x y, Flow y x)
newFlowIO :: IO (Flow a b, Flow b a)
newFlowIO = atomically newFlow
readFlow :: Flow r w -> STM r
readFlow (Flow rvar _) = takeTMVar rvar >>= \case
(x:[]) -> return x
(x:xs) -> putTMVar rvar xs >> return x
[] -> error "Flow: empty list"
readFlow (MappedFlow f _ up) = f <$> readFlow up
writeFlow :: Flow r w -> w -> STM ()
writeFlow (Flow _ wvar) = putTMVar wvar . (:[])
writeFlow (MappedFlow _ f up) = writeFlow up . f
writeFlowBulk :: Flow r w -> [w] -> STM ()
writeFlowBulk _ [] = return ()
writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs
writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs
readFlowIO :: Flow r w -> IO r
readFlowIO path = atomically $ readFlow path
writeFlowIO :: Flow r w -> w -> IO ()
writeFlowIO path = atomically . writeFlow path
mapPath :: (r -> r') -> (w' -> w) -> Flow r w -> Flow r' w'
mapPath rf wf (MappedFlow rf' wf' up) = MappedFlow (rf . rf') (wf' . wf) up
mapPath rf wf up = MappedFlow rf wf up
|