diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-12 20:52:50 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-14 20:34:19 +0100 |
commit | 30e48ddd5d7b6c94b5cb22e645b1fcc4b994cabd (patch) | |
tree | 901e27898c081584f69fd5d3c4f88e05f6ac3ddf /src/Job.hs | |
parent | 9b14b0c159d64eae18e2732f1662fd27a72f8db1 (diff) |
Explicit task priority by job creation order
Diffstat (limited to 'src/Job.hs')
-rw-r--r-- | src/Job.hs | 67 |
1 files changed, 52 insertions, 15 deletions
@@ -21,6 +21,8 @@ import Control.Monad.IO.Class import Data.List import Data.Map (Map) import Data.Map qualified as M +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) import Data.Text qualified as T import Data.Text.IO qualified as T @@ -89,35 +91,70 @@ textJobStatus = \case data JobManager = JobManager { jmSemaphore :: TVar Int , jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput))) + , jmNextTaskId :: TVar TaskId + , jmNextTask :: TVar (Maybe TaskId) + , jmReadyTasks :: TVar (Set TaskId) } +newtype TaskId = TaskId Int + deriving (Eq, Ord) + newJobManager :: Int -> IO JobManager newJobManager queueLen = do jmSemaphore <- newTVarIO queueLen jmJobs <- newTVarIO M.empty + jmNextTaskId <- newTVarIO (TaskId 0) + jmNextTask <- newTVarIO Nothing + jmReadyTasks <- newTVarIO S.empty return JobManager {..} -runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a -runManagedJob JobManager {..} job = bracket acquire release (\_ -> job) +reserveTaskId :: JobManager -> STM TaskId +reserveTaskId JobManager {..} = do + tid@(TaskId n) <- readTVar jmNextTaskId + writeTVar jmNextTaskId (TaskId (n + 1)) + return tid + +runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a +runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job) where - acquire = liftIO $ atomically $ do - free <- readTVar jmSemaphore - when (free <= 0) retry - writeTVar jmSemaphore $ free - 1 + acquire = liftIO $ do + atomically $ do + writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks + trySelectNext + atomically $ do + readTVar jmNextTask >>= \case + Just tid' | tid' == tid -> do + writeTVar jmNextTask Nothing + _ -> retry release _ = liftIO $ atomically $ do free <- readTVar jmSemaphore writeTVar jmSemaphore $ free + 1 + trySelectNext + + trySelectNext = do + readTVar jmNextTask >>= \case + Just _ -> return () + Nothing -> do + readTVar jmSemaphore >>= \case + 0 -> return () + sem -> (S.minView <$> readTVar jmReadyTasks) >>= \case + Nothing -> return () + Just ( tid', ready ) -> do + writeTVar jmReadyTasks ready + writeTVar jmSemaphore (sem - 1) + writeTVar jmNextTask (Just tid') runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] runJobs mngr@JobManager {..} dir commit jobs = do - tid <- readTreeId commit + treeId <- readTreeId commit results <- atomically $ do forM jobs $ \job -> do - let jid = JobId [ JobIdTree tid, JobIdName (jobName job) ] + let jid = JobId [ JobIdTree treeId, JobIdName (jobName job) ] + tid <- reserveTaskId mngr managed <- readTVar jmJobs - ( job, ) <$> case M.lookup jid managed of + ( job, tid, ) <$> case M.lookup jid managed of Just origVar -> do newTVar . JobDuplicate jid =<< readTVar origVar @@ -126,7 +163,7 @@ runJobs mngr@JobManager {..} dir commit jobs = do writeTVar jmJobs $ M.insert jid statusVar managed return statusVar - forM_ results $ \(job, outVar) -> void $ forkIO $ do + forM_ results $ \( job, tid, outVar ) -> void $ forkIO $ do res <- runExceptT $ do duplicate <- liftIO $ atomically $ do readTVar outVar >>= \case @@ -138,7 +175,7 @@ runJobs mngr@JobManager {..} dir commit jobs = do case duplicate of Nothing -> do uses <- waitForUsedArtifacts job results outVar - runManagedJob mngr $ do + runManagedJob mngr tid $ do liftIO $ atomically $ writeTVar outVar JobRunning prepareJob dir commit job $ \checkoutPath jdir -> do updateStatusFile (jdir </> "status") outVar @@ -164,15 +201,15 @@ runJobs mngr@JobManager {..} dir commit jobs = do _ -> return () atomically $ writeTVar outVar $ either id id res - return results + return $ map (\( job, _, var ) -> ( job, var )) results waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => - Job -> [(Job, TVar (JobStatus JobOutput))] -> TVar (JobStatus JobOutput) -> m [ArtifactOutput] + Job -> [ ( Job, TaskId, TVar (JobStatus JobOutput) ) ] -> TVar (JobStatus JobOutput) -> m [ ArtifactOutput ] waitForUsedArtifacts job results outVar = do origState <- liftIO $ atomically $ readTVar outVar ujobs <- forM (jobUses job) $ \(ujobName@(JobName tjobName), uartName) -> do - case find ((==ujobName) . jobName . fst) results of - Just (_, var) -> return (var, (ujobName, uartName)) + case find (\( j, _, _ ) -> jobName j == ujobName) results of + Just ( _, _, var ) -> return ( var, ( ujobName, uartName )) Nothing -> throwError $ JobError $ "Job '" <> tjobName <> "' not found" let loop prev = do |