diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-12 11:03:37 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-13 21:22:52 +0100 |
commit | 9b14b0c159d64eae18e2732f1662fd27a72f8db1 (patch) | |
tree | ff5f61b29c3298537fbaaec2a2fcc1a2be34c2e1 | |
parent | 17998a5e8d386b58d30d138ea8dbc565955cccc6 (diff) |
Handle duplicate tasks by matching tree id
-rw-r--r-- | src/Command/Run.hs | 7 | ||||
-rw-r--r-- | src/Job.hs | 77 | ||||
-rw-r--r-- | src/Job/Types.hs | 10 | ||||
-rw-r--r-- | src/Repo.hs | 2 |
4 files changed, 80 insertions, 16 deletions
diff --git a/src/Command/Run.hs b/src/Command/Run.hs index 7c169b2..bd2aba9 100644 --- a/src/Command/Run.hs +++ b/src/Command/Run.hs @@ -104,6 +104,13 @@ showStatus blink = \case JobFailed -> "\ESC[91m✗\ESC[0m " JobDone _ -> "\ESC[92m✓\ESC[0m " + JobDuplicate _ s -> case s of + JobQueued -> "\ESC[94m^\ESC[0m " + JobWaiting _ -> "\ESC[94m^\ESC[0m " + JobSkipped -> "\ESC[0m-\ESC[0m " + JobRunning -> "\ESC[96m" <> (if blink then "*" else "^") <> "\ESC[0m " + _ -> showStatus blink s + displayStatusLine :: TerminalOutput -> Text -> Text -> [ Maybe (TVar (JobStatus JobOutput)) ] -> IO () displayStatusLine tout prefix1 prefix2 statuses = do blinkVar <- newTVarIO False @@ -19,6 +19,8 @@ import Control.Monad.Except import Control.Monad.IO.Class import Data.List +import Data.Map (Map) +import Data.Map qualified as M import Data.Text (Text) import Data.Text qualified as T import Data.Text.IO qualified as T @@ -48,6 +50,7 @@ data ArtifactOutput = ArtifactOutput data JobStatus a = JobQueued + | JobDuplicate JobId (JobStatus a) | JobWaiting [JobName] | JobRunning | JobSkipped @@ -58,20 +61,23 @@ data JobStatus a = JobQueued jobStatusFinished :: JobStatus a -> Bool jobStatusFinished = \case - JobQueued {} -> False - JobWaiting {} -> False - JobRunning {} -> False - _ -> True + JobQueued {} -> False + JobDuplicate _ s -> jobStatusFinished s + JobWaiting {} -> False + JobRunning {} -> False + _ -> True jobStatusFailed :: JobStatus a -> Bool jobStatusFailed = \case - JobError {} -> True - JobFailed {} -> True - _ -> False + JobDuplicate _ s -> jobStatusFailed s + JobError {} -> True + JobFailed {} -> True + _ -> False textJobStatus :: JobStatus a -> Text textJobStatus = \case JobQueued -> "queued" + JobDuplicate {} -> "duplicate" JobWaiting _ -> "waiting" JobRunning -> "running" JobSkipped -> "skipped" @@ -82,11 +88,13 @@ textJobStatus = \case data JobManager = JobManager { jmSemaphore :: TVar Int + , jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput))) } newJobManager :: Int -> IO JobManager newJobManager queueLen = do jmSemaphore <- newTVarIO queueLen + jmJobs <- newTVarIO M.empty return JobManager {..} runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a @@ -103,22 +111,59 @@ runManagedJob JobManager {..} job = bracket acquire release (\_ -> job) runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] -runJobs mngr dir commit jobs = do - results <- forM jobs $ \job -> (job,) <$> newTVarIO JobQueued +runJobs mngr@JobManager {..} dir commit jobs = do + tid <- readTreeId commit + results <- atomically $ do + forM jobs $ \job -> do + let jid = JobId [ JobIdTree tid, JobIdName (jobName job) ] + managed <- readTVar jmJobs + ( job, ) <$> case M.lookup jid managed of + Just origVar -> do + newTVar . JobDuplicate jid =<< readTVar origVar + + Nothing -> do + statusVar <- newTVar JobQueued + writeTVar jmJobs $ M.insert jid statusVar managed + return statusVar + forM_ results $ \(job, outVar) -> void $ forkIO $ do res <- runExceptT $ do - uses <- waitForUsedArtifacts job results outVar - runManagedJob mngr $ do - liftIO $ atomically $ writeTVar outVar JobRunning - prepareJob dir commit job $ \checkoutPath jdir -> do - updateStatusFile (jdir </> "status") outVar - runJob job uses checkoutPath jdir + duplicate <- liftIO $ atomically $ do + readTVar outVar >>= \case + JobDuplicate jid _ -> do + fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs + _ -> do + return Nothing + + case duplicate of + Nothing -> do + uses <- waitForUsedArtifacts job results outVar + runManagedJob mngr $ do + liftIO $ atomically $ writeTVar outVar JobRunning + prepareJob dir commit job $ \checkoutPath jdir -> do + updateStatusFile (jdir </> "status") outVar + JobDone <$> runJob job uses checkoutPath jdir + + Just ( jid, origVar ) -> do + let wait = do + status <- atomically $ do + status <- readTVar origVar + out <- readTVar outVar + if status == out + then retry + else do + writeTVar outVar $ JobDuplicate jid status + return status + if jobStatusFinished status + then return $ JobDuplicate jid status + else wait + liftIO wait case res of Left (JobError err) -> T.putStrLn err _ -> return () - atomically $ writeTVar outVar $ either id JobDone res + atomically $ writeTVar outVar $ either id id res return results waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => diff --git a/src/Job/Types.hs b/src/Job/Types.hs index 6918738..50a6b43 100644 --- a/src/Job/Types.hs +++ b/src/Job/Types.hs @@ -36,3 +36,13 @@ data JobSet = JobSet jobsetJobs :: JobSet -> [ Job ] jobsetJobs = either (const []) id . jobsetJobsEither + + +newtype JobId = JobId [ JobIdPart ] + deriving (Eq, Ord) + +data JobIdPart + = JobIdName JobName + | JobIdCommit CommitId + | JobIdTree TreeId + deriving (Eq, Ord) diff --git a/src/Repo.hs b/src/Repo.hs index c0500f3..1148972 100644 --- a/src/Repo.hs +++ b/src/Repo.hs @@ -40,11 +40,13 @@ data Commit = Commit newtype CommitId = CommitId ByteString + deriving (Eq, Ord) showCommitId :: CommitId -> String showCommitId (CommitId cid) = BC.unpack cid newtype TreeId = TreeId ByteString + deriving (Eq, Ord) showTreeId :: TreeId -> String showTreeId (TreeId tid) = BC.unpack tid |