diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-10 22:21:14 +0100 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-10 22:59:11 +0100 | 
| commit | 61a9e98239cf01e91ca079ef176602efe0077dde (patch) | |
| tree | afaf9ee23f295c5cf5f5cb60d1586547d50f313a | |
| parent | ded067166901805bba63a35b37fe83ebfc4e6aa8 (diff) | |
Limit number of concurrently running jobs
Changelog: Configurable number of concurrently running jobs
| -rw-r--r-- | src/Command.hs | 20 | ||||
| -rw-r--r-- | src/Command/Run.hs | 4 | ||||
| -rw-r--r-- | src/Job.hs | 40 | ||||
| -rw-r--r-- | src/Main.hs | 13 | 
4 files changed, 63 insertions, 14 deletions
| diff --git a/src/Command.hs b/src/Command.hs index 0ca6710..2c2235f 100644 --- a/src/Command.hs +++ b/src/Command.hs @@ -1,8 +1,12 @@  module Command ( +    CommonOptions(..), +    defaultCommonOptions, +      Command(..),      CommandArgumentsType(..),      CommandExec(..), +    getCommonOptions,      getConfig,  ) where @@ -17,6 +21,15 @@ import System.Console.GetOpt  import Config +data CommonOptions = CommonOptions +    { optJobs :: Int +    } + +defaultCommonOptions :: CommonOptions +defaultCommonOptions = CommonOptions +    { optJobs = 2 +    } +  class CommandArgumentsType (CommandArguments c) => Command c where      commandName :: proxy c -> String      commandDescription :: proxy c -> String @@ -54,8 +67,11 @@ instance CommandArgumentsType (Maybe Text) where      argsFromStrings _ = throwError "expected at most one argument" -newtype CommandExec a = CommandExec (ReaderT Config IO a) +newtype CommandExec a = CommandExec (ReaderT ( CommonOptions, Config ) IO a)      deriving (Functor, Applicative, Monad, MonadIO) +getCommonOptions :: CommandExec CommonOptions +getCommonOptions = CommandExec (asks fst) +  getConfig :: CommandExec Config -getConfig = CommandExec ask +getConfig = CommandExec (asks snd) diff --git a/src/Command/Run.hs b/src/Command/Run.hs index 73baee0..a2436c8 100644 --- a/src/Command/Run.hs +++ b/src/Command/Run.hs @@ -44,6 +44,7 @@ instance Command RunCommand where  cmdRun :: RunCommand -> CommandExec ()  cmdRun (RunCommand changeset) = do +    CommonOptions {..} <- getCommonOptions      ( base, tip ) <- case T.splitOn (T.pack "..") changeset of          base : tip : _ -> return ( T.unpack base, T.unpack tip )          [ param ] -> liftIO $ do @@ -56,6 +57,7 @@ cmdRun (RunCommand changeset) = do          [] -> error "splitOn should not return empty list"      liftIO $ do +        mngr <- newJobManager optJobs          Just repo <- openRepo "."          commits <- listCommits repo (base <> ".." <> tip)          jobssets <- mapM loadJobSetForCommit commits @@ -72,7 +74,7 @@ cmdRun (RunCommand changeset) = do                  shortDesc = fitToLength 50 (commitDescription commit)              case jobsetJobsEither jobset of                  Right jobs -> do -                    outs <- runJobs "./.minici" commit jobs +                    outs <- runJobs mngr "./.minici" commit jobs                      let findJob name = snd <$> find ((name ==) . jobName . fst) outs                      displayStatusLine shortCid (" " <> shortDesc) $ map findJob names                  Left err -> do @@ -6,6 +6,7 @@ module Job (      ArtifactName(..),      JobStatus(..),      jobStatusFinished, jobStatusFailed, +    JobManager(..), newJobManager,      runJobs,  ) where @@ -79,16 +80,39 @@ textJobStatus = \case      JobDone _ -> "done" -runJobs :: FilePath -> Commit -> [Job] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] -runJobs dir commit jobs = do +data JobManager = JobManager +    { jmSemaphore :: TVar Int +    } + +newJobManager :: Int -> IO JobManager +newJobManager queueLen = do +    jmSemaphore <- newTVarIO queueLen +    return JobManager {..} + +runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a +runManagedJob JobManager {..} job = bracket acquire release (\_ -> job) +  where +    acquire = liftIO $ atomically $ do +        free <- readTVar jmSemaphore +        when (free <= 0) retry +        writeTVar jmSemaphore $ free - 1 + +    release _ = liftIO $ atomically $ do +        free <- readTVar jmSemaphore +        writeTVar jmSemaphore $ free + 1 + + +runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] +runJobs mngr dir commit jobs = do      results <- forM jobs $ \job -> (job,) <$> newTVarIO JobQueued      forM_ results $ \(job, outVar) -> void $ forkIO $ do          res <- runExceptT $ do              uses <- waitForUsedArtifacts job results outVar -            liftIO $ atomically $ writeTVar outVar JobRunning -            prepareJob dir commit job $ \checkoutPath jdir -> do -                updateStatusFile (jdir </> "status") outVar -                runJob job uses checkoutPath jdir +            runManagedJob mngr $ do +                liftIO $ atomically $ writeTVar outVar JobRunning +                prepareJob dir commit job $ \checkoutPath jdir -> do +                    updateStatusFile (jdir </> "status") outVar +                    runJob job uses checkoutPath jdir          case res of              Left (JobError err) -> T.putStrLn err @@ -100,6 +124,7 @@ runJobs dir commit jobs = do  waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) =>      Job -> [(Job, TVar (JobStatus JobOutput))] -> TVar (JobStatus JobOutput) -> m [ArtifactOutput]  waitForUsedArtifacts job results outVar = do +    origState <- liftIO $ atomically $ readTVar outVar      ujobs <- forM (jobUses job) $ \(ujobName@(JobName tjobName), uartName) -> do          case find ((==ujobName) . jobName . fst) results of              Just (_, var) -> return (var, (ujobName, uartName)) @@ -110,7 +135,8 @@ waitForUsedArtifacts job results outVar = do                  ustatuses <- forM ujobs $ \(uoutVar, uartName) -> do                      (,uartName) <$> readTVar uoutVar                  when (Just (map fst ustatuses) == prev) retry -                writeTVar outVar $ JobWaiting $ map (fst . snd) $ filter (not . jobStatusFinished . fst) ustatuses +                let remains = map (fst . snd) $ filter (not . jobStatusFinished . fst) ustatuses +                writeTVar outVar $ if null remains then origState else JobWaiting remains                  return ustatuses              if all (jobStatusFinished . fst) ustatuses                 then return ustatuses diff --git a/src/Main.hs b/src/Main.hs index 971bffe..c693281 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -22,12 +22,14 @@ import Version  data CmdlineOptions = CmdlineOptions      { optShowHelp :: Bool      , optShowVersion :: Bool +    , optCommon :: CommonOptions      }  defaultCmdlineOptions :: CmdlineOptions  defaultCmdlineOptions = CmdlineOptions      { optShowHelp = False      , optShowVersion = False +    , optCommon = defaultCommonOptions      }  options :: [OptDescr (CmdlineOptions -> CmdlineOptions)] @@ -38,6 +40,9 @@ options =      , Option ['V'] ["version"]          (NoArg $ \opts -> opts { optShowVersion = True })          "show version and exit" +    , Option ['j'] ["jobs"] +        (ReqArg (\num opts -> opts { optCommon = (optCommon opts) { optJobs = read num }}) "<num>") +        ("number of jobs to run simultaneously (default " <> show (optJobs defaultCommonOptions) <> ")")      ]  data SomeCommandType = forall c. Command c => SC (Proxy c) @@ -92,7 +97,7 @@ main = do                      ]                  exitFailure -    runSomeCommand ncmd cargs +    runSomeCommand (optCommon opts) ncmd cargs  data FullCommandOptions c = FullCommandOptions      { fcoSpecific :: CommandOptions c @@ -114,8 +119,8 @@ fullCommandOptions proxy =          "show this help and exit"      ] -runSomeCommand :: SomeCommandType -> [ String ] -> IO () -runSomeCommand (SC tproxy) args = do +runSomeCommand :: CommonOptions -> SomeCommandType -> [ String ] -> IO () +runSomeCommand copts (SC tproxy) args = do      let exitWithErrors errs = do              hPutStrLn stderr $ concat errs <> "Try `minici " <> commandName tproxy <> " --help' for more information."              exitFailure @@ -140,4 +145,4 @@ runSomeCommand (SC tproxy) args = do          Right config -> do              let cmd = commandInit tproxy (fcoSpecific opts) cmdargs              let CommandExec exec = commandExec cmd -            flip runReaderT config exec +            flip runReaderT ( copts, config ) exec |