summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2026-04-04 12:31:43 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2026-04-04 12:49:35 +0200
commit35ffbac5897293bad66bcdae5818da55958950f7 (patch)
treec867b2d4787cae9dacb4b422facbab4df1cd961a /src
parent007261536b8b5daf1e3cac24eeeb160c6d572c85 (diff)
Simplify task scheduling logic with fewer variables
Fixes a race happening when exception is receved during the "acquire" phase, which would leave a stale value for "next task".
Diffstat (limited to 'src')
-rw-r--r--src/Job.hs42
1 files changed, 13 insertions, 29 deletions
diff --git a/src/Job.hs b/src/Job.hs
index 740a9f8..9286839 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -135,11 +135,10 @@ textJobStatusDetails = \case
data JobManager = JobManager
- { jmSemaphore :: TVar Int
+ { jmMaxRunningTasks :: Int
, jmDataDir :: FilePath
, jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput)))
, jmNextTaskId :: TVar TaskId
- , jmNextTask :: TVar (Maybe TaskId)
, jmReadyTasks :: TVar (Set TaskId)
, jmRunningTasks :: TVar (Map TaskId ThreadId)
, jmCancelled :: TVar Bool
@@ -163,11 +162,9 @@ instance Exception JobCancelledException
newJobManager :: FilePath -> Int -> IO JobManager
-newJobManager jmDataDir queueLen = do
- jmSemaphore <- newTVarIO queueLen
+newJobManager jmDataDir jmMaxRunningTasks = do
jmJobs <- newTVarIO M.empty
jmNextTaskId <- newTVarIO (TaskId 0)
- jmNextTask <- newTVarIO Nothing
jmReadyTasks <- newTVarIO S.empty
jmRunningTasks <- newTVarIO M.empty
jmCancelled <- newTVarIO False
@@ -196,41 +193,28 @@ runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case
True -> cancel
False -> job
where
- acquire = liftIO $ do
+ acquire = (`onException` release False) $ liftIO $ do
atomically $ do
writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks
- trySelectNext
threadId <- myThreadId
atomically $ do
readTVar jmCancelled >>= \case
True -> return True
- False -> readTVar jmNextTask >>= \case
- Just tid' | tid' == tid -> do
- writeTVar jmNextTask Nothing
- writeTVar jmRunningTasks . M.insert tid threadId =<< readTVar jmRunningTasks
- return False
+ False -> readTVar jmRunningTasks >>= \case
+ running | M.size running < jmMaxRunningTasks -> do
+ (S.minView <$> readTVar jmReadyTasks) >>= \case
+ Just ( tid', ready ) | tid' == tid -> do
+ writeTVar jmReadyTasks ready
+ writeTVar jmRunningTasks $ M.insert tid threadId running
+ return False
+ _ -> retry
_ -> retry
release False = liftIO $ atomically $ do
- free <- readTVar jmSemaphore
- writeTVar jmSemaphore $ free + 1
- trySelectNext
+ writeTVar jmReadyTasks . S.delete tid =<< readTVar jmReadyTasks
+ writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks
release True = return ()
- trySelectNext = do
- readTVar jmNextTask >>= \case
- Just _ -> return ()
- Nothing -> do
- readTVar jmSemaphore >>= \case
- 0 -> return ()
- sem -> (S.minView <$> readTVar jmReadyTasks) >>= \case
- Nothing -> return ()
- Just ( tid', ready ) -> do
- writeTVar jmReadyTasks ready
- writeTVar jmSemaphore (sem - 1)
- writeTVar jmNextTask (Just tid')
- writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks
-
runJobs :: JobManager -> Output -> [ Job ]
-> (JobId -> JobStatus JobOutput -> Bool) -- ^ Rerun condition