diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-10 22:21:14 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-10 22:59:11 +0100 |
commit | 61a9e98239cf01e91ca079ef176602efe0077dde (patch) | |
tree | afaf9ee23f295c5cf5f5cb60d1586547d50f313a | |
parent | ded067166901805bba63a35b37fe83ebfc4e6aa8 (diff) |
Limit number of concurrently running jobs
Changelog: Configurable number of concurrently running jobs
-rw-r--r-- | src/Command.hs | 20 | ||||
-rw-r--r-- | src/Command/Run.hs | 4 | ||||
-rw-r--r-- | src/Job.hs | 40 | ||||
-rw-r--r-- | src/Main.hs | 13 |
4 files changed, 63 insertions, 14 deletions
diff --git a/src/Command.hs b/src/Command.hs index 0ca6710..2c2235f 100644 --- a/src/Command.hs +++ b/src/Command.hs @@ -1,8 +1,12 @@ module Command ( + CommonOptions(..), + defaultCommonOptions, + Command(..), CommandArgumentsType(..), CommandExec(..), + getCommonOptions, getConfig, ) where @@ -17,6 +21,15 @@ import System.Console.GetOpt import Config +data CommonOptions = CommonOptions + { optJobs :: Int + } + +defaultCommonOptions :: CommonOptions +defaultCommonOptions = CommonOptions + { optJobs = 2 + } + class CommandArgumentsType (CommandArguments c) => Command c where commandName :: proxy c -> String commandDescription :: proxy c -> String @@ -54,8 +67,11 @@ instance CommandArgumentsType (Maybe Text) where argsFromStrings _ = throwError "expected at most one argument" -newtype CommandExec a = CommandExec (ReaderT Config IO a) +newtype CommandExec a = CommandExec (ReaderT ( CommonOptions, Config ) IO a) deriving (Functor, Applicative, Monad, MonadIO) +getCommonOptions :: CommandExec CommonOptions +getCommonOptions = CommandExec (asks fst) + getConfig :: CommandExec Config -getConfig = CommandExec ask +getConfig = CommandExec (asks snd) diff --git a/src/Command/Run.hs b/src/Command/Run.hs index 73baee0..a2436c8 100644 --- a/src/Command/Run.hs +++ b/src/Command/Run.hs @@ -44,6 +44,7 @@ instance Command RunCommand where cmdRun :: RunCommand -> CommandExec () cmdRun (RunCommand changeset) = do + CommonOptions {..} <- getCommonOptions ( base, tip ) <- case T.splitOn (T.pack "..") changeset of base : tip : _ -> return ( T.unpack base, T.unpack tip ) [ param ] -> liftIO $ do @@ -56,6 +57,7 @@ cmdRun (RunCommand changeset) = do [] -> error "splitOn should not return empty list" liftIO $ do + mngr <- newJobManager optJobs Just repo <- openRepo "." commits <- listCommits repo (base <> ".." <> tip) jobssets <- mapM loadJobSetForCommit commits @@ -72,7 +74,7 @@ cmdRun (RunCommand changeset) = do shortDesc = fitToLength 50 (commitDescription commit) case jobsetJobsEither jobset of Right jobs -> do - outs <- runJobs "./.minici" commit jobs + outs <- runJobs mngr "./.minici" commit jobs let findJob name = snd <$> find ((name ==) . jobName . fst) outs displayStatusLine shortCid (" " <> shortDesc) $ map findJob names Left err -> do @@ -6,6 +6,7 @@ module Job ( ArtifactName(..), JobStatus(..), jobStatusFinished, jobStatusFailed, + JobManager(..), newJobManager, runJobs, ) where @@ -79,16 +80,39 @@ textJobStatus = \case JobDone _ -> "done" -runJobs :: FilePath -> Commit -> [Job] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] -runJobs dir commit jobs = do +data JobManager = JobManager + { jmSemaphore :: TVar Int + } + +newJobManager :: Int -> IO JobManager +newJobManager queueLen = do + jmSemaphore <- newTVarIO queueLen + return JobManager {..} + +runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a +runManagedJob JobManager {..} job = bracket acquire release (\_ -> job) + where + acquire = liftIO $ atomically $ do + free <- readTVar jmSemaphore + when (free <= 0) retry + writeTVar jmSemaphore $ free - 1 + + release _ = liftIO $ atomically $ do + free <- readTVar jmSemaphore + writeTVar jmSemaphore $ free + 1 + + +runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] +runJobs mngr dir commit jobs = do results <- forM jobs $ \job -> (job,) <$> newTVarIO JobQueued forM_ results $ \(job, outVar) -> void $ forkIO $ do res <- runExceptT $ do uses <- waitForUsedArtifacts job results outVar - liftIO $ atomically $ writeTVar outVar JobRunning - prepareJob dir commit job $ \checkoutPath jdir -> do - updateStatusFile (jdir </> "status") outVar - runJob job uses checkoutPath jdir + runManagedJob mngr $ do + liftIO $ atomically $ writeTVar outVar JobRunning + prepareJob dir commit job $ \checkoutPath jdir -> do + updateStatusFile (jdir </> "status") outVar + runJob job uses checkoutPath jdir case res of Left (JobError err) -> T.putStrLn err @@ -100,6 +124,7 @@ runJobs dir commit jobs = do waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => Job -> [(Job, TVar (JobStatus JobOutput))] -> TVar (JobStatus JobOutput) -> m [ArtifactOutput] waitForUsedArtifacts job results outVar = do + origState <- liftIO $ atomically $ readTVar outVar ujobs <- forM (jobUses job) $ \(ujobName@(JobName tjobName), uartName) -> do case find ((==ujobName) . jobName . fst) results of Just (_, var) -> return (var, (ujobName, uartName)) @@ -110,7 +135,8 @@ waitForUsedArtifacts job results outVar = do ustatuses <- forM ujobs $ \(uoutVar, uartName) -> do (,uartName) <$> readTVar uoutVar when (Just (map fst ustatuses) == prev) retry - writeTVar outVar $ JobWaiting $ map (fst . snd) $ filter (not . jobStatusFinished . fst) ustatuses + let remains = map (fst . snd) $ filter (not . jobStatusFinished . fst) ustatuses + writeTVar outVar $ if null remains then origState else JobWaiting remains return ustatuses if all (jobStatusFinished . fst) ustatuses then return ustatuses diff --git a/src/Main.hs b/src/Main.hs index 971bffe..c693281 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -22,12 +22,14 @@ import Version data CmdlineOptions = CmdlineOptions { optShowHelp :: Bool , optShowVersion :: Bool + , optCommon :: CommonOptions } defaultCmdlineOptions :: CmdlineOptions defaultCmdlineOptions = CmdlineOptions { optShowHelp = False , optShowVersion = False + , optCommon = defaultCommonOptions } options :: [OptDescr (CmdlineOptions -> CmdlineOptions)] @@ -38,6 +40,9 @@ options = , Option ['V'] ["version"] (NoArg $ \opts -> opts { optShowVersion = True }) "show version and exit" + , Option ['j'] ["jobs"] + (ReqArg (\num opts -> opts { optCommon = (optCommon opts) { optJobs = read num }}) "<num>") + ("number of jobs to run simultaneously (default " <> show (optJobs defaultCommonOptions) <> ")") ] data SomeCommandType = forall c. Command c => SC (Proxy c) @@ -92,7 +97,7 @@ main = do ] exitFailure - runSomeCommand ncmd cargs + runSomeCommand (optCommon opts) ncmd cargs data FullCommandOptions c = FullCommandOptions { fcoSpecific :: CommandOptions c @@ -114,8 +119,8 @@ fullCommandOptions proxy = "show this help and exit" ] -runSomeCommand :: SomeCommandType -> [ String ] -> IO () -runSomeCommand (SC tproxy) args = do +runSomeCommand :: CommonOptions -> SomeCommandType -> [ String ] -> IO () +runSomeCommand copts (SC tproxy) args = do let exitWithErrors errs = do hPutStrLn stderr $ concat errs <> "Try `minici " <> commandName tproxy <> " --help' for more information." exitFailure @@ -140,4 +145,4 @@ runSomeCommand (SC tproxy) args = do Right config -> do let cmd = commandInit tproxy (fcoSpecific opts) cmdargs let CommandExec exec = commandExec cmd - flip runReaderT config exec + flip runReaderT ( copts, config ) exec |