1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
|
module Command.Run (
RunCommand,
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Either
import Data.List
import Data.Maybe
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.IO qualified as T
import System.Console.GetOpt
import System.FilePath.Glob
import System.IO
import Command
import Config
import Eval
import Job
import Job.Types
import Output
import Repo
import Terminal
data RunCommand = RunCommand RunOptions [ Text ]
data RunOptions = RunOptions
{ roRanges :: [ Text ]
, roSinceUpstream :: [ Text ]
, roNewCommitsOn :: [ Text ]
, roNewTags :: [ Pattern ]
}
instance Command RunCommand where
commandName _ = "run"
commandDescription _ = "Execude jobs per minici.yaml for given commits"
type CommandArguments RunCommand = [ Text ]
commandUsage _ = T.pack $ unlines $
[ "Usage: minici run"
, " run jobs for commits on current branch not yet in upstream branch"
, " or: minici run <job>..."
, " run jobs specified on the command line"
, " or: minici run [--range=]<commit>..<commit>"
, " run jobs for commits in given range"
, " or: minici run <option>..."
, " run jobs based on given options (see below)"
]
type CommandOptions RunCommand = RunOptions
defaultCommandOptions _ = RunOptions
{ roRanges = []
, roSinceUpstream = []
, roNewCommitsOn = []
, roNewTags = []
}
commandOptions _ =
[ Option [] [ "range" ]
(ReqArg (\val opts -> opts { roRanges = T.pack val : roRanges opts }) "<range>")
"run jobs for commits in given range"
, Option [] [ "since-upstream" ]
(ReqArg (\val opts -> opts { roSinceUpstream = T.pack val : roSinceUpstream opts }) "<ref>")
"run jobs for commits on <ref> not yet in its upstream ref"
, Option [] [ "new-commits-on" ]
(ReqArg (\val opts -> opts { roNewCommitsOn = T.pack val : roNewCommitsOn opts }) "<branch>")
"run jobs for new commits on given branch"
, Option [] [ "new-tags" ]
(ReqArg (\val opts -> opts { roNewTags = compile val : roNewTags opts }) "<pattern>")
"run jobs for new annotated tags matching pattern"
]
commandInit _ = RunCommand
commandExec = cmdRun
data JobSource = JobSource (TMVar (Maybe ( [ JobSet ], JobSource )))
emptyJobSource :: MonadIO m => m JobSource
emptyJobSource = JobSource <$> liftIO (newTMVarIO Nothing)
oneshotJobSource :: MonadIO m => [ JobSet ] -> m JobSource
oneshotJobSource jobsets = do
next <- emptyJobSource
JobSource <$> liftIO (newTMVarIO (Just ( jobsets, next )))
takeJobSource :: JobSource -> STM (Maybe ( [ JobSet ], JobSource ))
takeJobSource (JobSource tmvar) = takeTMVar tmvar
mergeSources :: [ JobSource ] -> IO JobSource
mergeSources sources = do
let go tmvar [] = do
atomically (putTMVar tmvar Nothing)
go tmvar cur = do
( jobsets, next ) <- atomically (select cur)
if null next
then do
go tmvar next
else do
nextvar <- newEmptyTMVarIO
atomically $ putTMVar tmvar (Just ( jobsets, JobSource nextvar ))
go nextvar next
tmvar <- newEmptyTMVarIO
void $ forkIO $ go tmvar sources
return $ JobSource tmvar
where
select :: [ JobSource ] -> STM ( [ JobSet ], [ JobSource ] )
select [] = retry
select (x@(JobSource tmvar) : xs) = do
tryTakeTMVar tmvar >>= \case
Nothing -> fmap (x :) <$> select xs
Just Nothing -> return ( [], xs )
Just (Just ( jobsets, x' )) -> return ( jobsets, x' : xs )
argumentJobSource :: [ JobName ] -> CommandExec JobSource
argumentJobSource [] = emptyJobSource
argumentJobSource names = do
( config, jobsetCommit ) <- getJobRoot >>= \case
JobRootConfig config -> do
commit <- sequence . fmap createWipCommit =<< tryGetDefaultRepo
return ( config, commit )
JobRootRepo repo -> do
commit <- createWipCommit repo
config <- either fail return =<< loadConfigForCommit =<< getCommitTree commit
return ( config, Just commit )
cidPart <- case jobsetCommit of
Just commit -> (: []) . JobIdTree Nothing . treeId <$> getCommitTree commit
Nothing -> return []
jobsetJobsEither <- fmap Right $ forM names $ \name ->
case find ((name ==) . jobName) (configJobs config) of
Just job -> return job
Nothing -> tfail $ "job `" <> textJobName name <> "' not found"
oneshotJobSource . (: []) =<<
cmdEvalWith (\ei -> ei { eiCurrentIdRev = cidPart ++ eiCurrentIdRev ei })
(evalJobSet [] JobSet {..})
loadJobSetFromRoot :: (MonadIO m, MonadFail m) => JobRoot -> Commit -> m DeclaredJobSet
loadJobSetFromRoot root commit = case root of
JobRootRepo _ -> loadJobSetForCommit commit
JobRootConfig config -> return JobSet
{ jobsetCommit = Just commit
, jobsetJobsEither = Right $ configJobs config
}
rangeSource :: Text -> Text -> CommandExec JobSource
rangeSource base tip = do
root <- getJobRoot
repo <- getDefaultRepo
commits <- listCommits repo (base <> ".." <> tip)
jobsets <- forM commits $ \commit -> do
tree <- getCommitTree commit
cmdEvalWith (\ei -> ei
{ eiCurrentIdRev = JobIdTree Nothing (treeId tree) : eiCurrentIdRev ei
}) . evalJobSet [] =<< loadJobSetFromRoot root commit
oneshotJobSource jobsets
watchBranchSource :: Text -> CommandExec JobSource
watchBranchSource branch = do
root <- getJobRoot
repo <- getDefaultRepo
einputBase <- getEvalInput
getCurrentTip <- watchBranch repo branch
let go prev tmvar = do
cur <- atomically $ do
getCurrentTip >>= \case
Just cur -> do
when (cur == prev) retry
return cur
Nothing -> retry
commits <- listCommits repo (textCommitId (commitId prev) <> ".." <> textCommitId (commitId cur))
jobsets <- forM commits $ \commit -> do
tree <- getCommitTree commit
let einput = einputBase
{ eiCurrentIdRev = JobIdTree Nothing (treeId tree) : eiCurrentIdRev einputBase
}
either (fail . T.unpack . textEvalError) return =<<
flip runEval einput . evalJobSet [] =<< loadJobSetFromRoot root commit
nextvar <- newEmptyTMVarIO
atomically $ putTMVar tmvar $ Just ( jobsets, JobSource nextvar )
go cur nextvar
liftIO $ do
tmvar <- newEmptyTMVarIO
atomically getCurrentTip >>= \case
Just commit ->
void $ forkIO $ go commit tmvar
Nothing -> do
T.hPutStrLn stderr $ "Branch `" <> branch <> "' not found"
atomically $ putTMVar tmvar Nothing
return $ JobSource tmvar
watchTagSource :: Pattern -> CommandExec JobSource
watchTagSource pat = do
root <- getJobRoot
chan <- watchTags =<< getDefaultRepo
einputBase <- getEvalInput
let go tmvar = do
tag <- atomically $ readTChan chan
if match pat $ T.unpack $ tagTag tag
then do
tree <- getCommitTree $ tagObject tag
let einput = einputBase
{ eiCurrentIdRev = JobIdTree Nothing (treeId tree) : eiCurrentIdRev einputBase
}
jobset <- either (fail . T.unpack . textEvalError) return =<<
flip runEval einput . evalJobSet [] =<< loadJobSetFromRoot root (tagObject tag)
nextvar <- newEmptyTMVarIO
atomically $ putTMVar tmvar $ Just ( [ jobset ], JobSource nextvar )
go nextvar
else do
go tmvar
liftIO $ do
tmvar <- newEmptyTMVarIO
void $ forkIO $ go tmvar
return $ JobSource tmvar
cmdRun :: RunCommand -> CommandExec ()
cmdRun (RunCommand RunOptions {..} args) = do
CommonOptions {..} <- getCommonOptions
output <- getOutput
storageDir <- getStorageDir
( rangeOptions, jobOptions ) <- partitionEithers . concat <$> sequence
[ forM roRanges $ \range -> case T.splitOn ".." range of
[ base, tip ]
| not (T.null base) && not (T.null tip)
-> return $ Left ( Just base, tip )
_ -> tfail $ "invalid range: " <> range
, forM roSinceUpstream $ return . Left . ( Nothing, )
, forM args $ \arg -> case T.splitOn ".." arg of
[ base, tip ]
| not (T.null base) && not (T.null tip)
-> return $ Left ( Just base, tip )
[ _ ] -> return $ Right $ JobName arg
_ -> tfail $ "invalid argument: " <> arg
]
argumentJobs <- argumentJobSource jobOptions
defaultSource <- getJobRoot >>= \case
_ | not (null rangeOptions && null roNewCommitsOn && null roNewTags && null jobOptions) -> do
emptyJobSource
JobRootRepo repo -> do
Just base <- findUpstreamRef repo "HEAD"
rangeSource base "HEAD"
JobRootConfig config -> do
argumentJobSource (jobName <$> configJobs config)
ranges <- forM rangeOptions $ \( mbBase, paramTip ) -> do
( base, tip ) <- case mbBase of
Just base -> return ( base, paramTip )
Nothing -> do
Just base <- flip findUpstreamRef paramTip =<< getDefaultRepo
return ( base, paramTip )
rangeSource base tip
branches <- mapM watchBranchSource roNewCommitsOn
tags <- mapM watchTagSource roNewTags
liftIO $ do
mngr <- newJobManager storageDir optJobs
source <- mergeSources $ concat [ [ defaultSource, argumentJobs ], ranges, branches, tags ]
mbHeaderLine <- mapM (flip newLine "") (outputTerminal output)
threadCount <- newTVarIO (0 :: Int)
let changeCount f = atomically $ do
writeTVar threadCount . f =<< readTVar threadCount
let waitForJobs = atomically $ do
flip when retry . (0 <) =<< readTVar threadCount
let loop _ Nothing = return ()
loop names (Just ( [], next )) = do
loop names =<< atomically (takeJobSource next)
loop pnames (Just ( jobset : rest, next )) = do
let names = nub $ (pnames ++) $ map jobName $ jobsetJobs jobset
when (names /= pnames) $ do
forM_ mbHeaderLine $ \headerLine -> do
redrawLine headerLine $ T.concat $
T.replicate (8 + 50) " " :
map ((" " <>) . fitToLength 7 . textJobName) names
let commit = jobsetCommit jobset
shortCid = T.pack $ take 7 $ maybe (repeat ' ') (showCommitId . commitId) commit
shortDesc <- fitToLength 50 <$> maybe (return "") getCommitTitle commit
case jobsetJobsEither jobset of
Right jobs -> do
outs <- runJobs mngr output commit jobs
let findJob name = snd <$> find ((name ==) . jobName . fst) outs
statuses = map findJob names
forM_ (outputTerminal output) $ \tout -> do
line <- newLine tout ""
void $ forkIO $ do
displayStatusLine tout line shortCid (" " <> shortDesc) statuses
mask $ \restore -> do
changeCount (+ 1)
void $ forkIO $ do
void $ try @SomeException $ restore $ waitForJobStatuses statuses
changeCount (subtract 1)
Left err -> do
forM_ (outputTerminal output) $ flip newLine $
"\ESC[91m" <> shortCid <> "\ESC[0m" <> " " <> shortDesc <> " \ESC[91m" <> T.pack err <> "\ESC[0m"
loop names (Just ( rest, next ))
handle @SomeException (\_ -> cancelAllJobs mngr) $ do
loop [] =<< atomically (takeJobSource source)
waitForJobs
waitForJobs
fitToLength :: Int -> Text -> Text
fitToLength maxlen str | len <= maxlen = str <> T.replicate (maxlen - len) " "
| otherwise = T.take (maxlen - 1) str <> "…"
where len = T.length str
showStatus :: Bool -> JobStatus a -> Text
showStatus blink = \case
JobQueued -> "\ESC[94m…\ESC[0m "
JobWaiting uses -> "\ESC[94m~" <> fitToLength 6 (T.intercalate "," (map textJobName uses)) <> "\ESC[0m"
JobSkipped -> "\ESC[0m-\ESC[0m "
JobRunning -> "\ESC[96m" <> (if blink then "*" else "•") <> "\ESC[0m "
JobError fnote -> "\ESC[91m" <> fitToLength 7 ("!! [" <> T.pack (maybe "?" (show . tfNumber) (footnoteTerminal fnote)) <> "]") <> "\ESC[0m"
JobFailed -> "\ESC[91m✗\ESC[0m "
JobCancelled -> "\ESC[0mC\ESC[0m "
JobDone _ -> "\ESC[92m✓\ESC[0m "
JobDuplicate _ s -> case s of
JobQueued -> "\ESC[94m^\ESC[0m "
JobWaiting _ -> "\ESC[94m^\ESC[0m "
JobSkipped -> "\ESC[0m-\ESC[0m "
JobRunning -> "\ESC[96m" <> (if blink then "*" else "^") <> "\ESC[0m "
_ -> showStatus blink s
displayStatusLine :: TerminalOutput -> TerminalLine -> Text -> Text -> [ Maybe (TVar (JobStatus JobOutput)) ] -> IO ()
displayStatusLine tout line prefix1 prefix2 statuses = do
go "\0"
where
go prev = do
(ss, cur) <- atomically $ do
ss <- mapM (sequence . fmap readTVar) statuses
blink <- terminalBlinkStatus tout
let cur = T.concat $ map (maybe " " ((" " <>) . showStatus blink)) ss
when (cur == prev) retry
return (ss, cur)
let prefix1' = if any (maybe False jobStatusFailed) ss
then "\ESC[91m" <> prefix1 <> "\ESC[0m"
else prefix1
redrawLine line $ prefix1' <> prefix2 <> cur
if all (maybe True jobStatusFinished) ss
then return ()
else go cur
waitForJobStatuses :: [ Maybe (TVar (JobStatus a)) ] -> IO ()
waitForJobStatuses mbstatuses = do
let statuses = catMaybes mbstatuses
atomically $ do
ss <- mapM readTVar statuses
when (any (not . jobStatusFinished) ss) retry
|