diff options
Diffstat (limited to 'src/Job.hs')
-rw-r--r-- | src/Job.hs | 77 |
1 files changed, 61 insertions, 16 deletions
@@ -19,6 +19,8 @@ import Control.Monad.Except import Control.Monad.IO.Class import Data.List +import Data.Map (Map) +import Data.Map qualified as M import Data.Text (Text) import Data.Text qualified as T import Data.Text.IO qualified as T @@ -48,6 +50,7 @@ data ArtifactOutput = ArtifactOutput data JobStatus a = JobQueued + | JobDuplicate JobId (JobStatus a) | JobWaiting [JobName] | JobRunning | JobSkipped @@ -58,20 +61,23 @@ data JobStatus a = JobQueued jobStatusFinished :: JobStatus a -> Bool jobStatusFinished = \case - JobQueued {} -> False - JobWaiting {} -> False - JobRunning {} -> False - _ -> True + JobQueued {} -> False + JobDuplicate _ s -> jobStatusFinished s + JobWaiting {} -> False + JobRunning {} -> False + _ -> True jobStatusFailed :: JobStatus a -> Bool jobStatusFailed = \case - JobError {} -> True - JobFailed {} -> True - _ -> False + JobDuplicate _ s -> jobStatusFailed s + JobError {} -> True + JobFailed {} -> True + _ -> False textJobStatus :: JobStatus a -> Text textJobStatus = \case JobQueued -> "queued" + JobDuplicate {} -> "duplicate" JobWaiting _ -> "waiting" JobRunning -> "running" JobSkipped -> "skipped" @@ -82,11 +88,13 @@ textJobStatus = \case data JobManager = JobManager { jmSemaphore :: TVar Int + , jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput))) } newJobManager :: Int -> IO JobManager newJobManager queueLen = do jmSemaphore <- newTVarIO queueLen + jmJobs <- newTVarIO M.empty return JobManager {..} runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a @@ -103,22 +111,59 @@ runManagedJob JobManager {..} job = bracket acquire release (\_ -> job) runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] -runJobs mngr dir commit jobs = do - results <- forM jobs $ \job -> (job,) <$> newTVarIO JobQueued +runJobs mngr@JobManager {..} dir commit jobs = do + tid <- readTreeId commit + results <- atomically $ do + forM jobs $ \job -> do + let jid = JobId [ JobIdTree tid, JobIdName (jobName job) ] + managed <- readTVar jmJobs + ( job, ) <$> case M.lookup jid managed of + Just origVar -> do + newTVar . JobDuplicate jid =<< readTVar origVar + + Nothing -> do + statusVar <- newTVar JobQueued + writeTVar jmJobs $ M.insert jid statusVar managed + return statusVar + forM_ results $ \(job, outVar) -> void $ forkIO $ do res <- runExceptT $ do - uses <- waitForUsedArtifacts job results outVar - runManagedJob mngr $ do - liftIO $ atomically $ writeTVar outVar JobRunning - prepareJob dir commit job $ \checkoutPath jdir -> do - updateStatusFile (jdir </> "status") outVar - runJob job uses checkoutPath jdir + duplicate <- liftIO $ atomically $ do + readTVar outVar >>= \case + JobDuplicate jid _ -> do + fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs + _ -> do + return Nothing + + case duplicate of + Nothing -> do + uses <- waitForUsedArtifacts job results outVar + runManagedJob mngr $ do + liftIO $ atomically $ writeTVar outVar JobRunning + prepareJob dir commit job $ \checkoutPath jdir -> do + updateStatusFile (jdir </> "status") outVar + JobDone <$> runJob job uses checkoutPath jdir + + Just ( jid, origVar ) -> do + let wait = do + status <- atomically $ do + status <- readTVar origVar + out <- readTVar outVar + if status == out + then retry + else do + writeTVar outVar $ JobDuplicate jid status + return status + if jobStatusFinished status + then return $ JobDuplicate jid status + else wait + liftIO wait case res of Left (JobError err) -> T.putStrLn err _ -> return () - atomically $ writeTVar outVar $ either id JobDone res + atomically $ writeTVar outVar $ either id id res return results waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => |