diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2025-11-22 22:04:05 +0100 |
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-11-22 22:04:05 +0100 |
| commit | 5b19d70e6bb70f951e6a7c1670c54db640eaa1d0 (patch) | |
| tree | 22069238b511c06ae8b58efa1cfeae0b2f65e9a5 /src | |
| parent | 90a4709f4b6cafcd6be9461046a26ad6fb641869 (diff) | |
Make sure to write the final status update before terminating
Diffstat (limited to 'src')
| -rw-r--r-- | src/Command/Run.hs | 6 | ||||
| -rw-r--r-- | src/Job.hs | 22 |
2 files changed, 21 insertions, 7 deletions
diff --git a/src/Command/Run.hs b/src/Command/Run.hs index ddc166a..bd60bae 100644 --- a/src/Command/Run.hs +++ b/src/Command/Run.hs @@ -333,8 +333,10 @@ cmdRun (RunCommand RunOptions {..} args) = do threadCount <- newTVarIO (0 :: Int) let changeCount f = atomically $ do writeTVar threadCount . f =<< readTVar threadCount - let waitForJobs = atomically $ do - flip when retry . (0 <) =<< readTVar threadCount + let waitForJobs = do + atomically $ do + flip when retry . (0 <) =<< readTVar threadCount + waitForRemainingTasks mngr let loop _ Nothing = return () loop names (Just ( [], next )) = do @@ -7,7 +7,7 @@ module Job ( JobStatus(..), jobStatusFinished, jobStatusFailed, JobManager(..), newJobManager, cancelAllJobs, - runJobs, + runJobs, waitForRemainingTasks, prepareJob, jobStorageSubdir, @@ -138,6 +138,7 @@ data JobManager = JobManager , jmReadyTasks :: TVar (Set TaskId) , jmRunningTasks :: TVar (Map TaskId ThreadId) , jmCancelled :: TVar Bool + , jmOpenStatusUpdates :: TVar Int } newtype TaskId = TaskId Int @@ -158,6 +159,7 @@ newJobManager jmDataDir queueLen = do jmReadyTasks <- newTVarIO S.empty jmRunningTasks <- newTVarIO M.empty jmCancelled <- newTVarIO False + jmOpenStatusUpdates <- newTVarIO 0 return JobManager {..} cancelAllJobs :: JobManager -> IO () @@ -266,7 +268,7 @@ runJobs mngr@JobManager {..} tout jobs rerun = do liftIO $ atomically $ writeTVar outVar JobRunning liftIO $ outputEvent tout $ JobStarted (jobId job) prepareJob jmDataDir job $ \checkoutPath -> do - updateStatusFile jdir outVar + updateStatusFile mngr jdir outVar JobDone <$> runJob job uses checkoutPath jdir Just ( jid, origVar ) -> do @@ -288,6 +290,12 @@ runJobs mngr@JobManager {..} tout jobs rerun = do outputJobFinishedEvent tout job $ either id id res return $ map (\( job, _, var ) -> ( job, var )) results +waitForRemainingTasks :: JobManager -> IO () +waitForRemainingTasks JobManager {..} = do + atomically $ do + remainingStatusUpdates <- readTVar jmOpenStatusUpdates + when (remainingStatusUpdates > 0) retry + waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => Output -> Job @@ -349,8 +357,10 @@ readStatusFile tout job jdir = do { outArtifacts = artifacts } -updateStatusFile :: MonadIO m => FilePath -> TVar (JobStatus JobOutput) -> m () -updateStatusFile jdir outVar = void $ liftIO $ forkIO $ loop Nothing +updateStatusFile :: MonadIO m => JobManager -> FilePath -> TVar (JobStatus JobOutput) -> m () +updateStatusFile JobManager {..} jdir outVar = liftIO $ do + atomically $ writeTVar jmOpenStatusUpdates . (+ 1) =<< readTVar jmOpenStatusUpdates + void $ forkIO $ loop Nothing where loop prev = do status <- atomically $ do @@ -358,7 +368,9 @@ updateStatusFile jdir outVar = void $ liftIO $ forkIO $ loop Nothing when (Just status == prev) retry return status T.writeFile (jdir </> "status") $ textJobStatus status <> "\n" <> textJobStatusDetails status - when (not (jobStatusFinished status)) $ loop $ Just status + if (not (jobStatusFinished status)) + then loop $ Just status + else atomically $ writeTVar jmOpenStatusUpdates . (subtract 1) =<< readTVar jmOpenStatusUpdates jobStorageSubdir :: JobId -> FilePath jobStorageSubdir (JobId jidParts) = "jobs" </> joinPath (map (T.unpack . textJobIdPart) (jidParts)) |