summaryrefslogtreecommitdiff
path: root/src/Job.hs
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2025-01-12 11:03:37 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2025-01-13 21:22:52 +0100
commit9b14b0c159d64eae18e2732f1662fd27a72f8db1 (patch)
treeff5f61b29c3298537fbaaec2a2fcc1a2be34c2e1 /src/Job.hs
parent17998a5e8d386b58d30d138ea8dbc565955cccc6 (diff)
Handle duplicate tasks by matching tree id
Diffstat (limited to 'src/Job.hs')
-rw-r--r--src/Job.hs77
1 files changed, 61 insertions, 16 deletions
diff --git a/src/Job.hs b/src/Job.hs
index 3d86359..310152d 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -19,6 +19,8 @@ import Control.Monad.Except
import Control.Monad.IO.Class
import Data.List
+import Data.Map (Map)
+import Data.Map qualified as M
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.IO qualified as T
@@ -48,6 +50,7 @@ data ArtifactOutput = ArtifactOutput
data JobStatus a = JobQueued
+ | JobDuplicate JobId (JobStatus a)
| JobWaiting [JobName]
| JobRunning
| JobSkipped
@@ -58,20 +61,23 @@ data JobStatus a = JobQueued
jobStatusFinished :: JobStatus a -> Bool
jobStatusFinished = \case
- JobQueued {} -> False
- JobWaiting {} -> False
- JobRunning {} -> False
- _ -> True
+ JobQueued {} -> False
+ JobDuplicate _ s -> jobStatusFinished s
+ JobWaiting {} -> False
+ JobRunning {} -> False
+ _ -> True
jobStatusFailed :: JobStatus a -> Bool
jobStatusFailed = \case
- JobError {} -> True
- JobFailed {} -> True
- _ -> False
+ JobDuplicate _ s -> jobStatusFailed s
+ JobError {} -> True
+ JobFailed {} -> True
+ _ -> False
textJobStatus :: JobStatus a -> Text
textJobStatus = \case
JobQueued -> "queued"
+ JobDuplicate {} -> "duplicate"
JobWaiting _ -> "waiting"
JobRunning -> "running"
JobSkipped -> "skipped"
@@ -82,11 +88,13 @@ textJobStatus = \case
data JobManager = JobManager
{ jmSemaphore :: TVar Int
+ , jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput)))
}
newJobManager :: Int -> IO JobManager
newJobManager queueLen = do
jmSemaphore <- newTVarIO queueLen
+ jmJobs <- newTVarIO M.empty
return JobManager {..}
runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a
@@ -103,22 +111,59 @@ runManagedJob JobManager {..} job = bracket acquire release (\_ -> job)
runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
-runJobs mngr dir commit jobs = do
- results <- forM jobs $ \job -> (job,) <$> newTVarIO JobQueued
+runJobs mngr@JobManager {..} dir commit jobs = do
+ tid <- readTreeId commit
+ results <- atomically $ do
+ forM jobs $ \job -> do
+ let jid = JobId [ JobIdTree tid, JobIdName (jobName job) ]
+ managed <- readTVar jmJobs
+ ( job, ) <$> case M.lookup jid managed of
+ Just origVar -> do
+ newTVar . JobDuplicate jid =<< readTVar origVar
+
+ Nothing -> do
+ statusVar <- newTVar JobQueued
+ writeTVar jmJobs $ M.insert jid statusVar managed
+ return statusVar
+
forM_ results $ \(job, outVar) -> void $ forkIO $ do
res <- runExceptT $ do
- uses <- waitForUsedArtifacts job results outVar
- runManagedJob mngr $ do
- liftIO $ atomically $ writeTVar outVar JobRunning
- prepareJob dir commit job $ \checkoutPath jdir -> do
- updateStatusFile (jdir </> "status") outVar
- runJob job uses checkoutPath jdir
+ duplicate <- liftIO $ atomically $ do
+ readTVar outVar >>= \case
+ JobDuplicate jid _ -> do
+ fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs
+ _ -> do
+ return Nothing
+
+ case duplicate of
+ Nothing -> do
+ uses <- waitForUsedArtifacts job results outVar
+ runManagedJob mngr $ do
+ liftIO $ atomically $ writeTVar outVar JobRunning
+ prepareJob dir commit job $ \checkoutPath jdir -> do
+ updateStatusFile (jdir </> "status") outVar
+ JobDone <$> runJob job uses checkoutPath jdir
+
+ Just ( jid, origVar ) -> do
+ let wait = do
+ status <- atomically $ do
+ status <- readTVar origVar
+ out <- readTVar outVar
+ if status == out
+ then retry
+ else do
+ writeTVar outVar $ JobDuplicate jid status
+ return status
+ if jobStatusFinished status
+ then return $ JobDuplicate jid status
+ else wait
+ liftIO wait
case res of
Left (JobError err) -> T.putStrLn err
_ -> return ()
- atomically $ writeTVar outVar $ either id JobDone res
+ atomically $ writeTVar outVar $ either id id res
return results
waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) =>