diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2026-04-04 12:31:43 +0200 |
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2026-04-04 12:49:35 +0200 |
| commit | 35ffbac5897293bad66bcdae5818da55958950f7 (patch) | |
| tree | c867b2d4787cae9dacb4b422facbab4df1cd961a | |
| parent | 007261536b8b5daf1e3cac24eeeb160c6d572c85 (diff) | |
Simplify task scheduling logic with fewer variables
Fixes a race happening when exception is receved during the "acquire"
phase, which would leave a stale value for "next task".
| -rw-r--r-- | src/Job.hs | 42 |
1 files changed, 13 insertions, 29 deletions
@@ -135,11 +135,10 @@ textJobStatusDetails = \case data JobManager = JobManager - { jmSemaphore :: TVar Int + { jmMaxRunningTasks :: Int , jmDataDir :: FilePath , jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput))) , jmNextTaskId :: TVar TaskId - , jmNextTask :: TVar (Maybe TaskId) , jmReadyTasks :: TVar (Set TaskId) , jmRunningTasks :: TVar (Map TaskId ThreadId) , jmCancelled :: TVar Bool @@ -163,11 +162,9 @@ instance Exception JobCancelledException newJobManager :: FilePath -> Int -> IO JobManager -newJobManager jmDataDir queueLen = do - jmSemaphore <- newTVarIO queueLen +newJobManager jmDataDir jmMaxRunningTasks = do jmJobs <- newTVarIO M.empty jmNextTaskId <- newTVarIO (TaskId 0) - jmNextTask <- newTVarIO Nothing jmReadyTasks <- newTVarIO S.empty jmRunningTasks <- newTVarIO M.empty jmCancelled <- newTVarIO False @@ -196,41 +193,28 @@ runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case True -> cancel False -> job where - acquire = liftIO $ do + acquire = (`onException` release False) $ liftIO $ do atomically $ do writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks - trySelectNext threadId <- myThreadId atomically $ do readTVar jmCancelled >>= \case True -> return True - False -> readTVar jmNextTask >>= \case - Just tid' | tid' == tid -> do - writeTVar jmNextTask Nothing - writeTVar jmRunningTasks . M.insert tid threadId =<< readTVar jmRunningTasks - return False + False -> readTVar jmRunningTasks >>= \case + running | M.size running < jmMaxRunningTasks -> do + (S.minView <$> readTVar jmReadyTasks) >>= \case + Just ( tid', ready ) | tid' == tid -> do + writeTVar jmReadyTasks ready + writeTVar jmRunningTasks $ M.insert tid threadId running + return False + _ -> retry _ -> retry release False = liftIO $ atomically $ do - free <- readTVar jmSemaphore - writeTVar jmSemaphore $ free + 1 - trySelectNext + writeTVar jmReadyTasks . S.delete tid =<< readTVar jmReadyTasks + writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks release True = return () - trySelectNext = do - readTVar jmNextTask >>= \case - Just _ -> return () - Nothing -> do - readTVar jmSemaphore >>= \case - 0 -> return () - sem -> (S.minView <$> readTVar jmReadyTasks) >>= \case - Nothing -> return () - Just ( tid', ready ) -> do - writeTVar jmReadyTasks ready - writeTVar jmSemaphore (sem - 1) - writeTVar jmNextTask (Just tid') - writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks - runJobs :: JobManager -> Output -> [ Job ] -> (JobId -> JobStatus JobOutput -> Bool) -- ^ Rerun condition |