From 35ffbac5897293bad66bcdae5818da55958950f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 4 Apr 2026 12:31:43 +0200 Subject: Simplify task scheduling logic with fewer variables Fixes a race happening when exception is receved during the "acquire" phase, which would leave a stale value for "next task". --- src/Job.hs | 42 +++++++++++++----------------------------- 1 file changed, 13 insertions(+), 29 deletions(-) (limited to 'src') diff --git a/src/Job.hs b/src/Job.hs index 740a9f8..9286839 100644 --- a/src/Job.hs +++ b/src/Job.hs @@ -135,11 +135,10 @@ textJobStatusDetails = \case data JobManager = JobManager - { jmSemaphore :: TVar Int + { jmMaxRunningTasks :: Int , jmDataDir :: FilePath , jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput))) , jmNextTaskId :: TVar TaskId - , jmNextTask :: TVar (Maybe TaskId) , jmReadyTasks :: TVar (Set TaskId) , jmRunningTasks :: TVar (Map TaskId ThreadId) , jmCancelled :: TVar Bool @@ -163,11 +162,9 @@ instance Exception JobCancelledException newJobManager :: FilePath -> Int -> IO JobManager -newJobManager jmDataDir queueLen = do - jmSemaphore <- newTVarIO queueLen +newJobManager jmDataDir jmMaxRunningTasks = do jmJobs <- newTVarIO M.empty jmNextTaskId <- newTVarIO (TaskId 0) - jmNextTask <- newTVarIO Nothing jmReadyTasks <- newTVarIO S.empty jmRunningTasks <- newTVarIO M.empty jmCancelled <- newTVarIO False @@ -196,41 +193,28 @@ runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case True -> cancel False -> job where - acquire = liftIO $ do + acquire = (`onException` release False) $ liftIO $ do atomically $ do writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks - trySelectNext threadId <- myThreadId atomically $ do readTVar jmCancelled >>= \case True -> return True - False -> readTVar jmNextTask >>= \case - Just tid' | tid' == tid -> do - writeTVar jmNextTask Nothing - writeTVar jmRunningTasks . M.insert tid threadId =<< readTVar jmRunningTasks - return False + False -> readTVar jmRunningTasks >>= \case + running | M.size running < jmMaxRunningTasks -> do + (S.minView <$> readTVar jmReadyTasks) >>= \case + Just ( tid', ready ) | tid' == tid -> do + writeTVar jmReadyTasks ready + writeTVar jmRunningTasks $ M.insert tid threadId running + return False + _ -> retry _ -> retry release False = liftIO $ atomically $ do - free <- readTVar jmSemaphore - writeTVar jmSemaphore $ free + 1 - trySelectNext + writeTVar jmReadyTasks . S.delete tid =<< readTVar jmReadyTasks + writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks release True = return () - trySelectNext = do - readTVar jmNextTask >>= \case - Just _ -> return () - Nothing -> do - readTVar jmSemaphore >>= \case - 0 -> return () - sem -> (S.minView <$> readTVar jmReadyTasks) >>= \case - Nothing -> return () - Just ( tid', ready ) -> do - writeTVar jmReadyTasks ready - writeTVar jmSemaphore (sem - 1) - writeTVar jmNextTask (Just tid') - writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks - runJobs :: JobManager -> Output -> [ Job ] -> (JobId -> JobStatus JobOutput -> Bool) -- ^ Rerun condition -- cgit v1.2.3