summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2025-11-22 22:04:05 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2025-11-22 22:04:05 +0100
commit5b19d70e6bb70f951e6a7c1670c54db640eaa1d0 (patch)
tree22069238b511c06ae8b58efa1cfeae0b2f65e9a5 /src
parent90a4709f4b6cafcd6be9461046a26ad6fb641869 (diff)
Make sure to write the final status update before terminating
Diffstat (limited to 'src')
-rw-r--r--src/Command/Run.hs6
-rw-r--r--src/Job.hs22
2 files changed, 21 insertions, 7 deletions
diff --git a/src/Command/Run.hs b/src/Command/Run.hs
index ddc166a..bd60bae 100644
--- a/src/Command/Run.hs
+++ b/src/Command/Run.hs
@@ -333,8 +333,10 @@ cmdRun (RunCommand RunOptions {..} args) = do
threadCount <- newTVarIO (0 :: Int)
let changeCount f = atomically $ do
writeTVar threadCount . f =<< readTVar threadCount
- let waitForJobs = atomically $ do
- flip when retry . (0 <) =<< readTVar threadCount
+ let waitForJobs = do
+ atomically $ do
+ flip when retry . (0 <) =<< readTVar threadCount
+ waitForRemainingTasks mngr
let loop _ Nothing = return ()
loop names (Just ( [], next )) = do
diff --git a/src/Job.hs b/src/Job.hs
index 6445f5f..5a28a50 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -7,7 +7,7 @@ module Job (
JobStatus(..),
jobStatusFinished, jobStatusFailed,
JobManager(..), newJobManager, cancelAllJobs,
- runJobs,
+ runJobs, waitForRemainingTasks,
prepareJob,
jobStorageSubdir,
@@ -138,6 +138,7 @@ data JobManager = JobManager
, jmReadyTasks :: TVar (Set TaskId)
, jmRunningTasks :: TVar (Map TaskId ThreadId)
, jmCancelled :: TVar Bool
+ , jmOpenStatusUpdates :: TVar Int
}
newtype TaskId = TaskId Int
@@ -158,6 +159,7 @@ newJobManager jmDataDir queueLen = do
jmReadyTasks <- newTVarIO S.empty
jmRunningTasks <- newTVarIO M.empty
jmCancelled <- newTVarIO False
+ jmOpenStatusUpdates <- newTVarIO 0
return JobManager {..}
cancelAllJobs :: JobManager -> IO ()
@@ -266,7 +268,7 @@ runJobs mngr@JobManager {..} tout jobs rerun = do
liftIO $ atomically $ writeTVar outVar JobRunning
liftIO $ outputEvent tout $ JobStarted (jobId job)
prepareJob jmDataDir job $ \checkoutPath -> do
- updateStatusFile jdir outVar
+ updateStatusFile mngr jdir outVar
JobDone <$> runJob job uses checkoutPath jdir
Just ( jid, origVar ) -> do
@@ -288,6 +290,12 @@ runJobs mngr@JobManager {..} tout jobs rerun = do
outputJobFinishedEvent tout job $ either id id res
return $ map (\( job, _, var ) -> ( job, var )) results
+waitForRemainingTasks :: JobManager -> IO ()
+waitForRemainingTasks JobManager {..} = do
+ atomically $ do
+ remainingStatusUpdates <- readTVar jmOpenStatusUpdates
+ when (remainingStatusUpdates > 0) retry
+
waitForUsedArtifacts
:: (MonadIO m, MonadError (JobStatus JobOutput) m)
=> Output -> Job
@@ -349,8 +357,10 @@ readStatusFile tout job jdir = do
{ outArtifacts = artifacts
}
-updateStatusFile :: MonadIO m => FilePath -> TVar (JobStatus JobOutput) -> m ()
-updateStatusFile jdir outVar = void $ liftIO $ forkIO $ loop Nothing
+updateStatusFile :: MonadIO m => JobManager -> FilePath -> TVar (JobStatus JobOutput) -> m ()
+updateStatusFile JobManager {..} jdir outVar = liftIO $ do
+ atomically $ writeTVar jmOpenStatusUpdates . (+ 1) =<< readTVar jmOpenStatusUpdates
+ void $ forkIO $ loop Nothing
where
loop prev = do
status <- atomically $ do
@@ -358,7 +368,9 @@ updateStatusFile jdir outVar = void $ liftIO $ forkIO $ loop Nothing
when (Just status == prev) retry
return status
T.writeFile (jdir </> "status") $ textJobStatus status <> "\n" <> textJobStatusDetails status
- when (not (jobStatusFinished status)) $ loop $ Just status
+ if (not (jobStatusFinished status))
+ then loop $ Just status
+ else atomically $ writeTVar jmOpenStatusUpdates . (subtract 1) =<< readTVar jmOpenStatusUpdates
jobStorageSubdir :: JobId -> FilePath
jobStorageSubdir (JobId jidParts) = "jobs" </> joinPath (map (T.unpack . textJobIdPart) (jidParts))