From bda62efef1ad38779f23b38b4e1436f06fb9c7c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Tue, 1 Aug 2023 23:01:30 +0200 Subject: Network protocol refactoring with explicit data flows --- src/Flow.hs | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 src/Flow.hs (limited to 'src/Flow.hs') diff --git a/src/Flow.hs b/src/Flow.hs new file mode 100644 index 0000000..349178f --- /dev/null +++ b/src/Flow.hs @@ -0,0 +1,52 @@ +module Flow ( + Flow, SymFlow, + newFlow, newFlowIO, + readFlow, writeFlow, writeFlowBulk, + readFlowIO, writeFlowIO, + + mapPath, +) where + +import Control.Concurrent.STM + + +data Flow r w = Flow (TMVar [r]) (TMVar [w]) + | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') + +type SymFlow a = Flow a a + +newFlow :: STM (Flow a b, Flow b a) +newFlow = do + x <- newEmptyTMVar + y <- newEmptyTMVar + return (Flow x y, Flow y x) + +newFlowIO :: IO (Flow a b, Flow b a) +newFlowIO = atomically newFlow + +readFlow :: Flow r w -> STM r +readFlow (Flow rvar _) = takeTMVar rvar >>= \case + (x:[]) -> return x + (x:xs) -> putTMVar rvar xs >> return x + [] -> error "Flow: empty list" +readFlow (MappedFlow f _ up) = f <$> readFlow up + +writeFlow :: Flow r w -> w -> STM () +writeFlow (Flow _ wvar) = putTMVar wvar . (:[]) +writeFlow (MappedFlow _ f up) = writeFlow up . f + +writeFlowBulk :: Flow r w -> [w] -> STM () +writeFlowBulk _ [] = return () +writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs +writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs + +readFlowIO :: Flow r w -> IO r +readFlowIO path = atomically $ readFlow path + +writeFlowIO :: Flow r w -> w -> IO () +writeFlowIO path = atomically . writeFlow path + + +mapPath :: (r -> r') -> (w' -> w) -> Flow r w -> Flow r' w' +mapPath rf wf (MappedFlow rf' wf' up) = MappedFlow (rf . rf') (wf' . wf) up +mapPath rf wf up = MappedFlow rf wf up -- cgit v1.2.3