summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2025-01-10 22:21:14 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2025-01-10 22:59:11 +0100
commit61a9e98239cf01e91ca079ef176602efe0077dde (patch)
treeafaf9ee23f295c5cf5f5cb60d1586547d50f313a
parentded067166901805bba63a35b37fe83ebfc4e6aa8 (diff)
Limit number of concurrently running jobs
Changelog: Configurable number of concurrently running jobs
-rw-r--r--src/Command.hs20
-rw-r--r--src/Command/Run.hs4
-rw-r--r--src/Job.hs40
-rw-r--r--src/Main.hs13
4 files changed, 63 insertions, 14 deletions
diff --git a/src/Command.hs b/src/Command.hs
index 0ca6710..2c2235f 100644
--- a/src/Command.hs
+++ b/src/Command.hs
@@ -1,8 +1,12 @@
module Command (
+ CommonOptions(..),
+ defaultCommonOptions,
+
Command(..),
CommandArgumentsType(..),
CommandExec(..),
+ getCommonOptions,
getConfig,
) where
@@ -17,6 +21,15 @@ import System.Console.GetOpt
import Config
+data CommonOptions = CommonOptions
+ { optJobs :: Int
+ }
+
+defaultCommonOptions :: CommonOptions
+defaultCommonOptions = CommonOptions
+ { optJobs = 2
+ }
+
class CommandArgumentsType (CommandArguments c) => Command c where
commandName :: proxy c -> String
commandDescription :: proxy c -> String
@@ -54,8 +67,11 @@ instance CommandArgumentsType (Maybe Text) where
argsFromStrings _ = throwError "expected at most one argument"
-newtype CommandExec a = CommandExec (ReaderT Config IO a)
+newtype CommandExec a = CommandExec (ReaderT ( CommonOptions, Config ) IO a)
deriving (Functor, Applicative, Monad, MonadIO)
+getCommonOptions :: CommandExec CommonOptions
+getCommonOptions = CommandExec (asks fst)
+
getConfig :: CommandExec Config
-getConfig = CommandExec ask
+getConfig = CommandExec (asks snd)
diff --git a/src/Command/Run.hs b/src/Command/Run.hs
index 73baee0..a2436c8 100644
--- a/src/Command/Run.hs
+++ b/src/Command/Run.hs
@@ -44,6 +44,7 @@ instance Command RunCommand where
cmdRun :: RunCommand -> CommandExec ()
cmdRun (RunCommand changeset) = do
+ CommonOptions {..} <- getCommonOptions
( base, tip ) <- case T.splitOn (T.pack "..") changeset of
base : tip : _ -> return ( T.unpack base, T.unpack tip )
[ param ] -> liftIO $ do
@@ -56,6 +57,7 @@ cmdRun (RunCommand changeset) = do
[] -> error "splitOn should not return empty list"
liftIO $ do
+ mngr <- newJobManager optJobs
Just repo <- openRepo "."
commits <- listCommits repo (base <> ".." <> tip)
jobssets <- mapM loadJobSetForCommit commits
@@ -72,7 +74,7 @@ cmdRun (RunCommand changeset) = do
shortDesc = fitToLength 50 (commitDescription commit)
case jobsetJobsEither jobset of
Right jobs -> do
- outs <- runJobs "./.minici" commit jobs
+ outs <- runJobs mngr "./.minici" commit jobs
let findJob name = snd <$> find ((name ==) . jobName . fst) outs
displayStatusLine shortCid (" " <> shortDesc) $ map findJob names
Left err -> do
diff --git a/src/Job.hs b/src/Job.hs
index 068a076..3d86359 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -6,6 +6,7 @@ module Job (
ArtifactName(..),
JobStatus(..),
jobStatusFinished, jobStatusFailed,
+ JobManager(..), newJobManager,
runJobs,
) where
@@ -79,16 +80,39 @@ textJobStatus = \case
JobDone _ -> "done"
-runJobs :: FilePath -> Commit -> [Job] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
-runJobs dir commit jobs = do
+data JobManager = JobManager
+ { jmSemaphore :: TVar Int
+ }
+
+newJobManager :: Int -> IO JobManager
+newJobManager queueLen = do
+ jmSemaphore <- newTVarIO queueLen
+ return JobManager {..}
+
+runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> m a -> m a
+runManagedJob JobManager {..} job = bracket acquire release (\_ -> job)
+ where
+ acquire = liftIO $ atomically $ do
+ free <- readTVar jmSemaphore
+ when (free <= 0) retry
+ writeTVar jmSemaphore $ free - 1
+
+ release _ = liftIO $ atomically $ do
+ free <- readTVar jmSemaphore
+ writeTVar jmSemaphore $ free + 1
+
+
+runJobs :: JobManager -> FilePath -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
+runJobs mngr dir commit jobs = do
results <- forM jobs $ \job -> (job,) <$> newTVarIO JobQueued
forM_ results $ \(job, outVar) -> void $ forkIO $ do
res <- runExceptT $ do
uses <- waitForUsedArtifacts job results outVar
- liftIO $ atomically $ writeTVar outVar JobRunning
- prepareJob dir commit job $ \checkoutPath jdir -> do
- updateStatusFile (jdir </> "status") outVar
- runJob job uses checkoutPath jdir
+ runManagedJob mngr $ do
+ liftIO $ atomically $ writeTVar outVar JobRunning
+ prepareJob dir commit job $ \checkoutPath jdir -> do
+ updateStatusFile (jdir </> "status") outVar
+ runJob job uses checkoutPath jdir
case res of
Left (JobError err) -> T.putStrLn err
@@ -100,6 +124,7 @@ runJobs dir commit jobs = do
waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) =>
Job -> [(Job, TVar (JobStatus JobOutput))] -> TVar (JobStatus JobOutput) -> m [ArtifactOutput]
waitForUsedArtifacts job results outVar = do
+ origState <- liftIO $ atomically $ readTVar outVar
ujobs <- forM (jobUses job) $ \(ujobName@(JobName tjobName), uartName) -> do
case find ((==ujobName) . jobName . fst) results of
Just (_, var) -> return (var, (ujobName, uartName))
@@ -110,7 +135,8 @@ waitForUsedArtifacts job results outVar = do
ustatuses <- forM ujobs $ \(uoutVar, uartName) -> do
(,uartName) <$> readTVar uoutVar
when (Just (map fst ustatuses) == prev) retry
- writeTVar outVar $ JobWaiting $ map (fst . snd) $ filter (not . jobStatusFinished . fst) ustatuses
+ let remains = map (fst . snd) $ filter (not . jobStatusFinished . fst) ustatuses
+ writeTVar outVar $ if null remains then origState else JobWaiting remains
return ustatuses
if all (jobStatusFinished . fst) ustatuses
then return ustatuses
diff --git a/src/Main.hs b/src/Main.hs
index 971bffe..c693281 100644
--- a/src/Main.hs
+++ b/src/Main.hs
@@ -22,12 +22,14 @@ import Version
data CmdlineOptions = CmdlineOptions
{ optShowHelp :: Bool
, optShowVersion :: Bool
+ , optCommon :: CommonOptions
}
defaultCmdlineOptions :: CmdlineOptions
defaultCmdlineOptions = CmdlineOptions
{ optShowHelp = False
, optShowVersion = False
+ , optCommon = defaultCommonOptions
}
options :: [OptDescr (CmdlineOptions -> CmdlineOptions)]
@@ -38,6 +40,9 @@ options =
, Option ['V'] ["version"]
(NoArg $ \opts -> opts { optShowVersion = True })
"show version and exit"
+ , Option ['j'] ["jobs"]
+ (ReqArg (\num opts -> opts { optCommon = (optCommon opts) { optJobs = read num }}) "<num>")
+ ("number of jobs to run simultaneously (default " <> show (optJobs defaultCommonOptions) <> ")")
]
data SomeCommandType = forall c. Command c => SC (Proxy c)
@@ -92,7 +97,7 @@ main = do
]
exitFailure
- runSomeCommand ncmd cargs
+ runSomeCommand (optCommon opts) ncmd cargs
data FullCommandOptions c = FullCommandOptions
{ fcoSpecific :: CommandOptions c
@@ -114,8 +119,8 @@ fullCommandOptions proxy =
"show this help and exit"
]
-runSomeCommand :: SomeCommandType -> [ String ] -> IO ()
-runSomeCommand (SC tproxy) args = do
+runSomeCommand :: CommonOptions -> SomeCommandType -> [ String ] -> IO ()
+runSomeCommand copts (SC tproxy) args = do
let exitWithErrors errs = do
hPutStrLn stderr $ concat errs <> "Try `minici " <> commandName tproxy <> " --help' for more information."
exitFailure
@@ -140,4 +145,4 @@ runSomeCommand (SC tproxy) args = do
Right config -> do
let cmd = commandInit tproxy (fcoSpecific opts) cmdargs
let CommandExec exec = commandExec cmd
- flip runReaderT config exec
+ flip runReaderT ( copts, config ) exec