summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Job.hs67
1 files changed, 52 insertions, 15 deletions
diff --git a/src/Job.hs b/src/Job.hs
index 310152d..bcc7f08 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -21,6 +21,8 @@ import Control.Monad.IO.Class
import Data.List
import Data.Map (Map)
import Data.Map qualified as M
+import Data.Set (Set)
+import Data.Set qualified as S
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.IO qualified as T
@@ -89,35 +91,70 @@ textJobStatus = \case
data JobManager = JobManager
{ jmSemaphore :: TVar Int
, jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput)))
+ , jmNextTaskId :: TVar TaskId
+ , jmNextTask :: TVar (Maybe TaskId)
+ , jmReadyTasks :: TVar (Set TaskId)
}
+newtype TaskId = TaskId Int
+ deriving (Eq, Ord)
+
newJobManager :: Int -> IO JobManager
newJobManager queueLen = do
jmSemaphore <- newTVarIO queueLen
jmJobs <- newTVarIO M.empty
+ jmNextTaskId <- newTVarIO (TaskId 0)
+ jmNextTask <- newTVarIO Nothing
+ jmReadyTasks <- newTVarIO S.empty
return JobManager {..}
-runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a
-runManagedJob JobManager {..} job = bracket acquire release (\_ -> job)
+reserveTaskId :: JobManager -> STM TaskId
+reserveTaskId JobManager {..} = do
+ tid@(TaskId n) <- readTVar jmNextTaskId
+ writeTVar jmNextTaskId (TaskId (n + 1))
+ return tid
+
+runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a
+runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job)
where
- acquire = liftIO $ atomically $ do
- free <- readTVar jmSemaphore
- when (free <= 0) retry
- writeTVar jmSemaphore $ free - 1
+ acquire = liftIO $ do
+ atomically $ do
+ writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks
+ trySelectNext
+ atomically $ do
+ readTVar jmNextTask >>= \case
+ Just tid' | tid' == tid -> do
+ writeTVar jmNextTask Nothing
+ _ -> retry
release _ = liftIO $ atomically $ do
free <- readTVar jmSemaphore
writeTVar jmSemaphore $ free + 1
+ trySelectNext
+
+ trySelectNext = do
+ readTVar jmNextTask >>= \case
+ Just _ -> return ()
+ Nothing -> do
+ readTVar jmSemaphore >>= \case
+ 0 -> return ()
+ sem -> (S.minView <$> readTVar jmReadyTasks) >>= \case
+ Nothing -> return ()
+ Just ( tid', ready ) -> do
+ writeTVar jmReadyTasks ready
+ writeTVar jmSemaphore (sem - 1)
+ writeTVar jmNextTask (Just tid')
runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
runJobs mngr@JobManager {..} dir commit jobs = do
- tid <- readTreeId commit
+ treeId <- readTreeId commit
results <- atomically $ do
forM jobs $ \job -> do
- let jid = JobId [ JobIdTree tid, JobIdName (jobName job) ]
+ let jid = JobId [ JobIdTree treeId, JobIdName (jobName job) ]
+ tid <- reserveTaskId mngr
managed <- readTVar jmJobs
- ( job, ) <$> case M.lookup jid managed of
+ ( job, tid, ) <$> case M.lookup jid managed of
Just origVar -> do
newTVar . JobDuplicate jid =<< readTVar origVar
@@ -126,7 +163,7 @@ runJobs mngr@JobManager {..} dir commit jobs = do
writeTVar jmJobs $ M.insert jid statusVar managed
return statusVar
- forM_ results $ \(job, outVar) -> void $ forkIO $ do
+ forM_ results $ \( job, tid, outVar ) -> void $ forkIO $ do
res <- runExceptT $ do
duplicate <- liftIO $ atomically $ do
readTVar outVar >>= \case
@@ -138,7 +175,7 @@ runJobs mngr@JobManager {..} dir commit jobs = do
case duplicate of
Nothing -> do
uses <- waitForUsedArtifacts job results outVar
- runManagedJob mngr $ do
+ runManagedJob mngr tid $ do
liftIO $ atomically $ writeTVar outVar JobRunning
prepareJob dir commit job $ \checkoutPath jdir -> do
updateStatusFile (jdir </> "status") outVar
@@ -164,15 +201,15 @@ runJobs mngr@JobManager {..} dir commit jobs = do
_ -> return ()
atomically $ writeTVar outVar $ either id id res
- return results
+ return $ map (\( job, _, var ) -> ( job, var )) results
waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) =>
- Job -> [(Job, TVar (JobStatus JobOutput))] -> TVar (JobStatus JobOutput) -> m [ArtifactOutput]
+ Job -> [ ( Job, TaskId, TVar (JobStatus JobOutput) ) ] -> TVar (JobStatus JobOutput) -> m [ ArtifactOutput ]
waitForUsedArtifacts job results outVar = do
origState <- liftIO $ atomically $ readTVar outVar
ujobs <- forM (jobUses job) $ \(ujobName@(JobName tjobName), uartName) -> do
- case find ((==ujobName) . jobName . fst) results of
- Just (_, var) -> return (var, (ujobName, uartName))
+ case find (\( j, _, _ ) -> jobName j == ujobName) results of
+ Just ( _, _, var ) -> return ( var, ( ujobName, uartName ))
Nothing -> throwError $ JobError $ "Job '" <> tjobName <> "' not found"
let loop prev = do