diff options
| -rw-r--r-- | src/Job.hs | 67 | 
1 files changed, 52 insertions, 15 deletions
| @@ -21,6 +21,8 @@ import Control.Monad.IO.Class  import Data.List  import Data.Map (Map)  import Data.Map qualified as M +import Data.Set (Set) +import Data.Set qualified as S  import Data.Text (Text)  import Data.Text qualified as T  import Data.Text.IO qualified as T @@ -89,35 +91,70 @@ textJobStatus = \case  data JobManager = JobManager      { jmSemaphore :: TVar Int      , jmJobs :: TVar (Map JobId (TVar (JobStatus JobOutput))) +    , jmNextTaskId :: TVar TaskId +    , jmNextTask :: TVar (Maybe TaskId) +    , jmReadyTasks :: TVar (Set TaskId)      } +newtype TaskId = TaskId Int +    deriving (Eq, Ord) +  newJobManager :: Int -> IO JobManager  newJobManager queueLen = do      jmSemaphore <- newTVarIO queueLen      jmJobs <- newTVarIO M.empty +    jmNextTaskId <- newTVarIO (TaskId 0) +    jmNextTask <- newTVarIO Nothing +    jmReadyTasks <- newTVarIO S.empty      return JobManager {..} -runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a -runManagedJob JobManager {..} job = bracket acquire release (\_ -> job) +reserveTaskId :: JobManager -> STM TaskId +reserveTaskId JobManager {..} = do +    tid@(TaskId n) <- readTVar jmNextTaskId +    writeTVar jmNextTaskId (TaskId (n + 1)) +    return tid + +runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a +runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job)    where -    acquire = liftIO $ atomically $ do -        free <- readTVar jmSemaphore -        when (free <= 0) retry -        writeTVar jmSemaphore $ free - 1 +    acquire = liftIO $ do +        atomically $ do +            writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks +            trySelectNext +        atomically $ do +            readTVar jmNextTask >>= \case +                Just tid' | tid' == tid -> do +                    writeTVar jmNextTask Nothing +                _ -> retry      release _ = liftIO $ atomically $ do          free <- readTVar jmSemaphore          writeTVar jmSemaphore $ free + 1 +        trySelectNext + +    trySelectNext = do +        readTVar jmNextTask >>= \case +            Just _ -> return () +            Nothing -> do +                readTVar jmSemaphore >>= \case +                    0   -> return () +                    sem -> (S.minView <$> readTVar jmReadyTasks) >>= \case +                        Nothing -> return () +                        Just ( tid', ready ) -> do +                            writeTVar jmReadyTasks ready +                            writeTVar jmSemaphore (sem - 1) +                            writeTVar jmNextTask (Just tid')  runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]  runJobs mngr@JobManager {..} dir commit jobs = do -    tid <- readTreeId commit +    treeId <- readTreeId commit      results <- atomically $ do          forM jobs $ \job -> do -            let jid = JobId [ JobIdTree tid, JobIdName (jobName job) ] +            let jid = JobId [ JobIdTree treeId, JobIdName (jobName job) ] +            tid <- reserveTaskId mngr              managed <- readTVar jmJobs -            ( job, ) <$> case M.lookup jid managed of +            ( job, tid, ) <$> case M.lookup jid managed of                  Just origVar -> do                      newTVar . JobDuplicate jid =<< readTVar origVar @@ -126,7 +163,7 @@ runJobs mngr@JobManager {..} dir commit jobs = do                      writeTVar jmJobs $ M.insert jid statusVar managed                      return statusVar -    forM_ results $ \(job, outVar) -> void $ forkIO $ do +    forM_ results $ \( job, tid, outVar ) -> void $ forkIO $ do          res <- runExceptT $ do              duplicate <- liftIO $ atomically $ do                  readTVar outVar >>= \case @@ -138,7 +175,7 @@ runJobs mngr@JobManager {..} dir commit jobs = do              case duplicate of                  Nothing -> do                      uses <- waitForUsedArtifacts job results outVar -                    runManagedJob mngr $ do +                    runManagedJob mngr tid $ do                          liftIO $ atomically $ writeTVar outVar JobRunning                          prepareJob dir commit job $ \checkoutPath jdir -> do                              updateStatusFile (jdir </> "status") outVar @@ -164,15 +201,15 @@ runJobs mngr@JobManager {..} dir commit jobs = do              _ -> return ()          atomically $ writeTVar outVar $ either id id res -    return results +    return $ map (\( job, _, var ) -> ( job, var )) results  waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => -    Job -> [(Job, TVar (JobStatus JobOutput))] -> TVar (JobStatus JobOutput) -> m [ArtifactOutput] +    Job -> [ ( Job, TaskId, TVar (JobStatus JobOutput) ) ] -> TVar (JobStatus JobOutput) -> m [ ArtifactOutput ]  waitForUsedArtifacts job results outVar = do      origState <- liftIO $ atomically $ readTVar outVar      ujobs <- forM (jobUses job) $ \(ujobName@(JobName tjobName), uartName) -> do -        case find ((==ujobName) . jobName . fst) results of -            Just (_, var) -> return (var, (ujobName, uartName)) +        case find (\( j, _, _ ) -> jobName j == ujobName) results of +            Just ( _, _, var ) -> return ( var, ( ujobName, uartName ))              Nothing -> throwError $ JobError $ "Job '" <> tjobName <> "' not found"      let loop prev = do |