summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2025-01-12 11:03:37 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2025-01-13 21:22:52 +0100
commit9b14b0c159d64eae18e2732f1662fd27a72f8db1 (patch)
treeff5f61b29c3298537fbaaec2a2fcc1a2be34c2e1
parent17998a5e8d386b58d30d138ea8dbc565955cccc6 (diff)
Handle duplicate tasks by matching tree id
-rw-r--r--src/Command/Run.hs7
-rw-r--r--src/Job.hs77
-rw-r--r--src/Job/Types.hs10
-rw-r--r--src/Repo.hs2
4 files changed, 80 insertions, 16 deletions
diff --git a/src/Command/Run.hs b/src/Command/Run.hs
index 7c169b2..bd2aba9 100644
--- a/src/Command/Run.hs
+++ b/src/Command/Run.hs
@@ -104,6 +104,13 @@ showStatus blink = \case
JobFailed -> "\ESC[91m✗\ESC[0m "
JobDone _ -> "\ESC[92m✓\ESC[0m "
+ JobDuplicate _ s -> case s of
+ JobQueued -> "\ESC[94m^\ESC[0m "
+ JobWaiting _ -> "\ESC[94m^\ESC[0m "
+ JobSkipped -> "\ESC[0m-\ESC[0m "
+ JobRunning -> "\ESC[96m" <> (if blink then "*" else "^") <> "\ESC[0m "
+ _ -> showStatus blink s
+
displayStatusLine :: TerminalOutput -> Text -> Text -> [ Maybe (TVar (JobStatus JobOutput)) ] -> IO ()
displayStatusLine tout prefix1 prefix2 statuses = do
blinkVar <- newTVarIO False
diff --git a/src/Job.hs b/src/Job.hs
index 3d86359..310152d 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -19,6 +19,8 @@ import Control.Monad.Except
import Control.Monad.IO.Class
import Data.List
+import Data.Map (Map)
+import Data.Map qualified as M
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.IO qualified as T
@@ -48,6 +50,7 @@ data ArtifactOutput = ArtifactOutput
data JobStatus a = JobQueued
+ | JobDuplicate JobId (JobStatus a)
| JobWaiting [JobName]
| JobRunning
| JobSkipped
@@ -58,20 +61,23 @@ data JobStatus a = JobQueued
jobStatusFinished :: JobStatus a -> Bool
jobStatusFinished = \case
- JobQueued {} -> False
- JobWaiting {} -> False
- JobRunning {} -> False
- _ -> True
+ JobQueued {} -> False
+ JobDuplicate _ s -> jobStatusFinished s
+ JobWaiting {} -> False
+ JobRunning {} -> False
+ _ -> True
jobStatusFailed :: JobStatus a -> Bool
jobStatusFailed = \case
- JobError {} -> True
- JobFailed {} -> True
- _ -> False
+ JobDuplicate _ s -> jobStatusFailed s
+ JobError {} -> True
+ JobFailed {} -> True
+ _ -> False
textJobStatus :: JobStatus a -> Text
textJobStatus = \case
JobQueued -> "queued"
+ JobDuplicate {} -> "duplicate"
JobWaiting _ -> "waiting"
JobRunning -> "running"
JobSkipped -> "skipped"
@@ -82,11 +88,13 @@ textJobStatus = \case
data JobManager = JobManager
{ jmSemaphore :: TVar Int
+ , jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput)))
}
newJobManager :: Int -> IO JobManager
newJobManager queueLen = do
jmSemaphore <- newTVarIO queueLen
+ jmJobs <- newTVarIO M.empty
return JobManager {..}
runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a
@@ -103,22 +111,59 @@ runManagedJob JobManager {..} job = bracket acquire release (\_ -> job)
runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
-runJobs mngr dir commit jobs = do
- results <- forM jobs $ \job -> (job,) <$> newTVarIO JobQueued
+runJobs mngr@JobManager {..} dir commit jobs = do
+ tid <- readTreeId commit
+ results <- atomically $ do
+ forM jobs $ \job -> do
+ let jid = JobId [ JobIdTree tid, JobIdName (jobName job) ]
+ managed <- readTVar jmJobs
+ ( job, ) <$> case M.lookup jid managed of
+ Just origVar -> do
+ newTVar . JobDuplicate jid =<< readTVar origVar
+
+ Nothing -> do
+ statusVar <- newTVar JobQueued
+ writeTVar jmJobs $ M.insert jid statusVar managed
+ return statusVar
+
forM_ results $ \(job, outVar) -> void $ forkIO $ do
res <- runExceptT $ do
- uses <- waitForUsedArtifacts job results outVar
- runManagedJob mngr $ do
- liftIO $ atomically $ writeTVar outVar JobRunning
- prepareJob dir commit job $ \checkoutPath jdir -> do
- updateStatusFile (jdir </> "status") outVar
- runJob job uses checkoutPath jdir
+ duplicate <- liftIO $ atomically $ do
+ readTVar outVar >>= \case
+ JobDuplicate jid _ -> do
+ fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs
+ _ -> do
+ return Nothing
+
+ case duplicate of
+ Nothing -> do
+ uses <- waitForUsedArtifacts job results outVar
+ runManagedJob mngr $ do
+ liftIO $ atomically $ writeTVar outVar JobRunning
+ prepareJob dir commit job $ \checkoutPath jdir -> do
+ updateStatusFile (jdir </> "status") outVar
+ JobDone <$> runJob job uses checkoutPath jdir
+
+ Just ( jid, origVar ) -> do
+ let wait = do
+ status <- atomically $ do
+ status <- readTVar origVar
+ out <- readTVar outVar
+ if status == out
+ then retry
+ else do
+ writeTVar outVar $ JobDuplicate jid status
+ return status
+ if jobStatusFinished status
+ then return $ JobDuplicate jid status
+ else wait
+ liftIO wait
case res of
Left (JobError err) -> T.putStrLn err
_ -> return ()
- atomically $ writeTVar outVar $ either id JobDone res
+ atomically $ writeTVar outVar $ either id id res
return results
waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) =>
diff --git a/src/Job/Types.hs b/src/Job/Types.hs
index 6918738..50a6b43 100644
--- a/src/Job/Types.hs
+++ b/src/Job/Types.hs
@@ -36,3 +36,13 @@ data JobSet = JobSet
jobsetJobs :: JobSet -> [ Job ]
jobsetJobs = either (const []) id . jobsetJobsEither
+
+
+newtype JobId = JobId [ JobIdPart ]
+ deriving (Eq, Ord)
+
+data JobIdPart
+ = JobIdName JobName
+ | JobIdCommit CommitId
+ | JobIdTree TreeId
+ deriving (Eq, Ord)
diff --git a/src/Repo.hs b/src/Repo.hs
index c0500f3..1148972 100644
--- a/src/Repo.hs
+++ b/src/Repo.hs
@@ -40,11 +40,13 @@ data Commit = Commit
newtype CommitId = CommitId ByteString
+ deriving (Eq, Ord)
showCommitId :: CommitId -> String
showCommitId (CommitId cid) = BC.unpack cid
newtype TreeId = TreeId ByteString
+ deriving (Eq, Ord)
showTreeId :: TreeId -> String
showTreeId (TreeId tid) = BC.unpack tid