summaryrefslogtreecommitdiff
path: root/src/Job.hs
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2025-01-10 22:21:14 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2025-01-10 22:21:14 +0100
commit654edd780dfd28b3611e42df9fd7fd68b8f88191 (patch)
tree01747437b15ad3740637fbdef986546e07fa8d1a /src/Job.hs
parentded067166901805bba63a35b37fe83ebfc4e6aa8 (diff)
Limit number of concurrently running jobsdevel
Changelog: Configurable number of concurrently running jobs
Diffstat (limited to 'src/Job.hs')
-rw-r--r--src/Job.hs40
1 files changed, 33 insertions, 7 deletions
diff --git a/src/Job.hs b/src/Job.hs
index 068a076..9ffa14a 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -6,6 +6,7 @@ module Job (
ArtifactName(..),
JobStatus(..),
jobStatusFinished, jobStatusFailed,
+ JobManager(..), newJobManager,
runJobs,
) where
@@ -79,16 +80,39 @@ textJobStatus = \case
JobDone _ -> "done"
-runJobs :: FilePath -> Commit -> [Job] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
-runJobs dir commit jobs = do
+data JobManager = JobManager
+ { jmSemaphore :: TVar Int
+ }
+
+newJobManager :: Int -> IO JobManager
+newJobManager queueLen = do
+ jmSemaphore <- newTVarIO queueLen
+ return JobManager {..}
+
+runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a
+runManagedJob JobManager {..} job = bracket acquire release (\_ -> job)
+ where
+ acquire = liftIO $ atomically $ do
+ free <- readTVar jmSemaphore
+ when (free <= 0) retry
+ writeTVar jmSemaphore $ free - 1
+
+ release _ = liftIO $ atomically $ do
+ free <- readTVar jmSemaphore
+ writeTVar jmSemaphore $ free + 1
+
+
+runJobs :: JobManager -> FilePath -> Commit -> [Job] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
+runJobs mngr dir commit jobs = do
results <- forM jobs $ \job -> (job,) <$> newTVarIO JobQueued
forM_ results $ \(job, outVar) -> void $ forkIO $ do
res <- runExceptT $ do
uses <- waitForUsedArtifacts job results outVar
- liftIO $ atomically $ writeTVar outVar JobRunning
- prepareJob dir commit job $ \checkoutPath jdir -> do
- updateStatusFile (jdir </> "status") outVar
- runJob job uses checkoutPath jdir
+ runManagedJob mngr $ do
+ liftIO $ atomically $ writeTVar outVar JobRunning
+ prepareJob dir commit job $ \checkoutPath jdir -> do
+ updateStatusFile (jdir </> "status") outVar
+ runJob job uses checkoutPath jdir
case res of
Left (JobError err) -> T.putStrLn err
@@ -100,6 +124,7 @@ runJobs dir commit jobs = do
waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) =>
Job -> [(Job, TVar (JobStatus JobOutput))] -> TVar (JobStatus JobOutput) -> m [ArtifactOutput]
waitForUsedArtifacts job results outVar = do
+ origState <- liftIO $ atomically $ readTVar outVar
ujobs <- forM (jobUses job) $ \(ujobName@(JobName tjobName), uartName) -> do
case find ((==ujobName) . jobName . fst) results of
Just (_, var) -> return (var, (ujobName, uartName))
@@ -110,7 +135,8 @@ waitForUsedArtifacts job results outVar = do
ustatuses <- forM ujobs $ \(uoutVar, uartName) -> do
(,uartName) <$> readTVar uoutVar
when (Just (map fst ustatuses) == prev) retry
- writeTVar outVar $ JobWaiting $ map (fst . snd) $ filter (not . jobStatusFinished . fst) ustatuses
+ let remains = map (fst . snd) $ filter (not . jobStatusFinished . fst) ustatuses
+ writeTVar outVar $ if null remains then origState else JobWaiting remains
return ustatuses
if all (jobStatusFinished . fst) ustatuses
then return ustatuses