diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-12 11:03:37 +0100 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-13 21:22:52 +0100 | 
| commit | 9b14b0c159d64eae18e2732f1662fd27a72f8db1 (patch) | |
| tree | ff5f61b29c3298537fbaaec2a2fcc1a2be34c2e1 | |
| parent | 17998a5e8d386b58d30d138ea8dbc565955cccc6 (diff) | |
Handle duplicate tasks by matching tree id
| -rw-r--r-- | src/Command/Run.hs | 7 | ||||
| -rw-r--r-- | src/Job.hs | 77 | ||||
| -rw-r--r-- | src/Job/Types.hs | 10 | ||||
| -rw-r--r-- | src/Repo.hs | 2 | 
4 files changed, 80 insertions, 16 deletions
| diff --git a/src/Command/Run.hs b/src/Command/Run.hs index 7c169b2..bd2aba9 100644 --- a/src/Command/Run.hs +++ b/src/Command/Run.hs @@ -104,6 +104,13 @@ showStatus blink = \case      JobFailed       -> "\ESC[91m✗\ESC[0m      "      JobDone _       -> "\ESC[92m✓\ESC[0m      " +    JobDuplicate _ s -> case s of +        JobQueued    -> "\ESC[94m^\ESC[0m      " +        JobWaiting _ -> "\ESC[94m^\ESC[0m      " +        JobSkipped   ->  "\ESC[0m-\ESC[0m      " +        JobRunning   -> "\ESC[96m" <> (if blink then "*" else "^") <> "\ESC[0m      " +        _            -> showStatus blink s +  displayStatusLine :: TerminalOutput -> Text -> Text -> [ Maybe (TVar (JobStatus JobOutput)) ] -> IO ()  displayStatusLine tout prefix1 prefix2 statuses = do      blinkVar <- newTVarIO False @@ -19,6 +19,8 @@ import Control.Monad.Except  import Control.Monad.IO.Class  import Data.List +import Data.Map (Map) +import Data.Map qualified as M  import Data.Text (Text)  import Data.Text qualified as T  import Data.Text.IO qualified as T @@ -48,6 +50,7 @@ data ArtifactOutput = ArtifactOutput  data JobStatus a = JobQueued +                 | JobDuplicate JobId (JobStatus a)                   | JobWaiting [JobName]                   | JobRunning                   | JobSkipped @@ -58,20 +61,23 @@ data JobStatus a = JobQueued  jobStatusFinished :: JobStatus a -> Bool  jobStatusFinished = \case -    JobQueued  {} -> False -    JobWaiting {} -> False -    JobRunning {} -> False -    _             -> True +    JobQueued      {} -> False +    JobDuplicate _ s  -> jobStatusFinished s +    JobWaiting     {} -> False +    JobRunning     {} -> False +    _                 -> True  jobStatusFailed :: JobStatus a -> Bool  jobStatusFailed = \case -    JobError  {} -> True -    JobFailed {} -> True -    _            -> False +    JobDuplicate _ s  -> jobStatusFailed s +    JobError       {} -> True +    JobFailed      {} -> True +    _                 -> False  textJobStatus :: JobStatus a -> Text  textJobStatus = \case      JobQueued -> "queued" +    JobDuplicate {} -> "duplicate"      JobWaiting _ -> "waiting"      JobRunning -> "running"      JobSkipped -> "skipped" @@ -82,11 +88,13 @@ textJobStatus = \case  data JobManager = JobManager      { jmSemaphore :: TVar Int +    , jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput)))      }  newJobManager :: Int -> IO JobManager  newJobManager queueLen = do      jmSemaphore <- newTVarIO queueLen +    jmJobs <- newTVarIO M.empty      return JobManager {..}  runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a @@ -103,22 +111,59 @@ runManagedJob JobManager {..} job = bracket acquire release (\_ -> job)  runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] -runJobs mngr dir commit jobs = do -    results <- forM jobs $ \job -> (job,) <$> newTVarIO JobQueued +runJobs mngr@JobManager {..} dir commit jobs = do +    tid <- readTreeId commit +    results <- atomically $ do +        forM jobs $ \job -> do +            let jid = JobId [ JobIdTree tid, JobIdName (jobName job) ] +            managed <- readTVar jmJobs +            ( job, ) <$> case M.lookup jid managed of +                Just origVar -> do +                    newTVar . JobDuplicate jid =<< readTVar origVar + +                Nothing -> do +                    statusVar <- newTVar JobQueued +                    writeTVar jmJobs $ M.insert jid statusVar managed +                    return statusVar +      forM_ results $ \(job, outVar) -> void $ forkIO $ do          res <- runExceptT $ do -            uses <- waitForUsedArtifacts job results outVar -            runManagedJob mngr $ do -                liftIO $ atomically $ writeTVar outVar JobRunning -                prepareJob dir commit job $ \checkoutPath jdir -> do -                    updateStatusFile (jdir </> "status") outVar -                    runJob job uses checkoutPath jdir +            duplicate <- liftIO $ atomically $ do +                readTVar outVar >>= \case +                    JobDuplicate jid _ -> do +                        fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs +                    _ -> do +                        return Nothing + +            case duplicate of +                Nothing -> do +                    uses <- waitForUsedArtifacts job results outVar +                    runManagedJob mngr $ do +                        liftIO $ atomically $ writeTVar outVar JobRunning +                        prepareJob dir commit job $ \checkoutPath jdir -> do +                            updateStatusFile (jdir </> "status") outVar +                            JobDone <$> runJob job uses checkoutPath jdir + +                Just ( jid, origVar ) -> do +                    let wait = do +                            status <- atomically $ do +                                status <- readTVar origVar +                                out <- readTVar outVar +                                if status == out +                                  then retry +                                  else do +                                    writeTVar outVar $ JobDuplicate jid status +                                    return status +                            if jobStatusFinished status +                              then return $ JobDuplicate jid status +                              else wait +                    liftIO wait          case res of              Left (JobError err) -> T.putStrLn err              _ -> return () -        atomically $ writeTVar outVar $ either id JobDone res +        atomically $ writeTVar outVar $ either id id res      return results  waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => diff --git a/src/Job/Types.hs b/src/Job/Types.hs index 6918738..50a6b43 100644 --- a/src/Job/Types.hs +++ b/src/Job/Types.hs @@ -36,3 +36,13 @@ data JobSet = JobSet  jobsetJobs :: JobSet -> [ Job ]  jobsetJobs = either (const []) id . jobsetJobsEither + + +newtype JobId = JobId [ JobIdPart ] +    deriving (Eq, Ord) + +data JobIdPart +    = JobIdName JobName +    | JobIdCommit CommitId +    | JobIdTree TreeId +    deriving (Eq, Ord) diff --git a/src/Repo.hs b/src/Repo.hs index c0500f3..1148972 100644 --- a/src/Repo.hs +++ b/src/Repo.hs @@ -40,11 +40,13 @@ data Commit = Commit  newtype CommitId = CommitId ByteString +    deriving (Eq, Ord)  showCommitId :: CommitId -> String  showCommitId (CommitId cid) = BC.unpack cid  newtype TreeId = TreeId ByteString +    deriving (Eq, Ord)  showTreeId :: TreeId -> String  showTreeId (TreeId tid) = BC.unpack tid |