summaryrefslogtreecommitdiff
path: root/src/Job.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Job.hs')
-rw-r--r--src/Job.hs77
1 files changed, 61 insertions, 16 deletions
diff --git a/src/Job.hs b/src/Job.hs
index 3d86359..310152d 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -19,6 +19,8 @@ import Control.Monad.Except
import Control.Monad.IO.Class
import Data.List
+import Data.Map (Map)
+import Data.Map qualified as M
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.IO qualified as T
@@ -48,6 +50,7 @@ data ArtifactOutput = ArtifactOutput
data JobStatus a = JobQueued
+ | JobDuplicate JobId (JobStatus a)
| JobWaiting [JobName]
| JobRunning
| JobSkipped
@@ -58,20 +61,23 @@ data JobStatus a = JobQueued
jobStatusFinished :: JobStatus a -> Bool
jobStatusFinished = \case
- JobQueued {} -> False
- JobWaiting {} -> False
- JobRunning {} -> False
- _ -> True
+ JobQueued {} -> False
+ JobDuplicate _ s -> jobStatusFinished s
+ JobWaiting {} -> False
+ JobRunning {} -> False
+ _ -> True
jobStatusFailed :: JobStatus a -> Bool
jobStatusFailed = \case
- JobError {} -> True
- JobFailed {} -> True
- _ -> False
+ JobDuplicate _ s -> jobStatusFailed s
+ JobError {} -> True
+ JobFailed {} -> True
+ _ -> False
textJobStatus :: JobStatus a -> Text
textJobStatus = \case
JobQueued -> "queued"
+ JobDuplicate {} -> "duplicate"
JobWaiting _ -> "waiting"
JobRunning -> "running"
JobSkipped -> "skipped"
@@ -82,11 +88,13 @@ textJobStatus = \case
data JobManager = JobManager
{ jmSemaphore :: TVar Int
+ , jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput)))
}
newJobManager :: Int -> IO JobManager
newJobManager queueLen = do
jmSemaphore <- newTVarIO queueLen
+ jmJobs <- newTVarIO M.empty
return JobManager {..}
runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a
@@ -103,22 +111,59 @@ runManagedJob JobManager {..} job = bracket acquire release (\_ -> job)
runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
-runJobs mngr dir commit jobs = do
- results <- forM jobs $ \job -> (job,) <$> newTVarIO JobQueued
+runJobs mngr@JobManager {..} dir commit jobs = do
+ tid <- readTreeId commit
+ results <- atomically $ do
+ forM jobs $ \job -> do
+ let jid = JobId [ JobIdTree tid, JobIdName (jobName job) ]
+ managed <- readTVar jmJobs
+ ( job, ) <$> case M.lookup jid managed of
+ Just origVar -> do
+ newTVar . JobDuplicate jid =<< readTVar origVar
+
+ Nothing -> do
+ statusVar <- newTVar JobQueued
+ writeTVar jmJobs $ M.insert jid statusVar managed
+ return statusVar
+
forM_ results $ \(job, outVar) -> void $ forkIO $ do
res <- runExceptT $ do
- uses <- waitForUsedArtifacts job results outVar
- runManagedJob mngr $ do
- liftIO $ atomically $ writeTVar outVar JobRunning
- prepareJob dir commit job $ \checkoutPath jdir -> do
- updateStatusFile (jdir </> "status") outVar
- runJob job uses checkoutPath jdir
+ duplicate <- liftIO $ atomically $ do
+ readTVar outVar >>= \case
+ JobDuplicate jid _ -> do
+ fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs
+ _ -> do
+ return Nothing
+
+ case duplicate of
+ Nothing -> do
+ uses <- waitForUsedArtifacts job results outVar
+ runManagedJob mngr $ do
+ liftIO $ atomically $ writeTVar outVar JobRunning
+ prepareJob dir commit job $ \checkoutPath jdir -> do
+ updateStatusFile (jdir </> "status") outVar
+ JobDone <$> runJob job uses checkoutPath jdir
+
+ Just ( jid, origVar ) -> do
+ let wait = do
+ status <- atomically $ do
+ status <- readTVar origVar
+ out <- readTVar outVar
+ if status == out
+ then retry
+ else do
+ writeTVar outVar $ JobDuplicate jid status
+ return status
+ if jobStatusFinished status
+ then return $ JobDuplicate jid status
+ else wait
+ liftIO wait
case res of
Left (JobError err) -> T.putStrLn err
_ -> return ()
- atomically $ writeTVar outVar $ either id JobDone res
+ atomically $ writeTVar outVar $ either id id res
return results
waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) =>