diff options
Diffstat (limited to 'src/Job.hs')
| -rw-r--r-- | src/Job.hs | 22 |
1 files changed, 17 insertions, 5 deletions
@@ -7,7 +7,7 @@ module Job ( JobStatus(..), jobStatusFinished, jobStatusFailed, JobManager(..), newJobManager, cancelAllJobs, - runJobs, + runJobs, waitForRemainingTasks, prepareJob, jobStorageSubdir, @@ -138,6 +138,7 @@ data JobManager = JobManager , jmReadyTasks :: TVar (Set TaskId) , jmRunningTasks :: TVar (Map TaskId ThreadId) , jmCancelled :: TVar Bool + , jmOpenStatusUpdates :: TVar Int } newtype TaskId = TaskId Int @@ -158,6 +159,7 @@ newJobManager jmDataDir queueLen = do jmReadyTasks <- newTVarIO S.empty jmRunningTasks <- newTVarIO M.empty jmCancelled <- newTVarIO False + jmOpenStatusUpdates <- newTVarIO 0 return JobManager {..} cancelAllJobs :: JobManager -> IO () @@ -266,7 +268,7 @@ runJobs mngr@JobManager {..} tout jobs rerun = do liftIO $ atomically $ writeTVar outVar JobRunning liftIO $ outputEvent tout $ JobStarted (jobId job) prepareJob jmDataDir job $ \checkoutPath -> do - updateStatusFile jdir outVar + updateStatusFile mngr jdir outVar JobDone <$> runJob job uses checkoutPath jdir Just ( jid, origVar ) -> do @@ -288,6 +290,12 @@ runJobs mngr@JobManager {..} tout jobs rerun = do outputJobFinishedEvent tout job $ either id id res return $ map (\( job, _, var ) -> ( job, var )) results +waitForRemainingTasks :: JobManager -> IO () +waitForRemainingTasks JobManager {..} = do + atomically $ do + remainingStatusUpdates <- readTVar jmOpenStatusUpdates + when (remainingStatusUpdates > 0) retry + waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => Output -> Job @@ -349,8 +357,10 @@ readStatusFile tout job jdir = do { outArtifacts = artifacts } -updateStatusFile :: MonadIO m => FilePath -> TVar (JobStatus JobOutput) -> m () -updateStatusFile jdir outVar = void $ liftIO $ forkIO $ loop Nothing +updateStatusFile :: MonadIO m => JobManager -> FilePath -> TVar (JobStatus JobOutput) -> m () +updateStatusFile JobManager {..} jdir outVar = liftIO $ do + atomically $ writeTVar jmOpenStatusUpdates . (+ 1) =<< readTVar jmOpenStatusUpdates + void $ forkIO $ loop Nothing where loop prev = do status <- atomically $ do @@ -358,7 +368,9 @@ updateStatusFile jdir outVar = void $ liftIO $ forkIO $ loop Nothing when (Just status == prev) retry return status T.writeFile (jdir </> "status") $ textJobStatus status <> "\n" <> textJobStatusDetails status - when (not (jobStatusFinished status)) $ loop $ Just status + if (not (jobStatusFinished status)) + then loop $ Just status + else atomically $ writeTVar jmOpenStatusUpdates . (subtract 1) =<< readTVar jmOpenStatusUpdates jobStorageSubdir :: JobId -> FilePath jobStorageSubdir (JobId jidParts) = "jobs" </> joinPath (map (T.unpack . textJobIdPart) (jidParts)) |