summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Job.hs42
1 files changed, 13 insertions, 29 deletions
diff --git a/src/Job.hs b/src/Job.hs
index 740a9f8..9286839 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -135,11 +135,10 @@ textJobStatusDetails = \case
data JobManager = JobManager
- { jmSemaphore :: TVar Int
+ { jmMaxRunningTasks :: Int
, jmDataDir :: FilePath
, jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput)))
, jmNextTaskId :: TVar TaskId
- , jmNextTask :: TVar (Maybe TaskId)
, jmReadyTasks :: TVar (Set TaskId)
, jmRunningTasks :: TVar (Map TaskId ThreadId)
, jmCancelled :: TVar Bool
@@ -163,11 +162,9 @@ instance Exception JobCancelledException
newJobManager :: FilePath -> Int -> IO JobManager
-newJobManager jmDataDir queueLen = do
- jmSemaphore <- newTVarIO queueLen
+newJobManager jmDataDir jmMaxRunningTasks = do
jmJobs <- newTVarIO M.empty
jmNextTaskId <- newTVarIO (TaskId 0)
- jmNextTask <- newTVarIO Nothing
jmReadyTasks <- newTVarIO S.empty
jmRunningTasks <- newTVarIO M.empty
jmCancelled <- newTVarIO False
@@ -196,41 +193,28 @@ runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case
True -> cancel
False -> job
where
- acquire = liftIO $ do
+ acquire = (`onException` release False) $ liftIO $ do
atomically $ do
writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks
- trySelectNext
threadId <- myThreadId
atomically $ do
readTVar jmCancelled >>= \case
True -> return True
- False -> readTVar jmNextTask >>= \case
- Just tid' | tid' == tid -> do
- writeTVar jmNextTask Nothing
- writeTVar jmRunningTasks . M.insert tid threadId =<< readTVar jmRunningTasks
- return False
+ False -> readTVar jmRunningTasks >>= \case
+ running | M.size running < jmMaxRunningTasks -> do
+ (S.minView <$> readTVar jmReadyTasks) >>= \case
+ Just ( tid', ready ) | tid' == tid -> do
+ writeTVar jmReadyTasks ready
+ writeTVar jmRunningTasks $ M.insert tid threadId running
+ return False
+ _ -> retry
_ -> retry
release False = liftIO $ atomically $ do
- free <- readTVar jmSemaphore
- writeTVar jmSemaphore $ free + 1
- trySelectNext
+ writeTVar jmReadyTasks . S.delete tid =<< readTVar jmReadyTasks
+ writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks
release True = return ()
- trySelectNext = do
- readTVar jmNextTask >>= \case
- Just _ -> return ()
- Nothing -> do
- readTVar jmSemaphore >>= \case
- 0 -> return ()
- sem -> (S.minView <$> readTVar jmReadyTasks) >>= \case
- Nothing -> return ()
- Just ( tid', ready ) -> do
- writeTVar jmReadyTasks ready
- writeTVar jmSemaphore (sem - 1)
- writeTVar jmNextTask (Just tid')
- writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks
-
runJobs :: JobManager -> Output -> [ Job ]
-> (JobId -> JobStatus JobOutput -> Bool) -- ^ Rerun condition