summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--minici.cabal1
-rw-r--r--src/Command/Run.hs59
-rw-r--r--src/Job.hs135
3 files changed, 119 insertions, 76 deletions
diff --git a/minici.cabal b/minici.cabal
index 6b298f6..49a8337 100644
--- a/minici.cabal
+++ b/minici.cabal
@@ -97,6 +97,7 @@ executable minici
template-haskell ^>= { 2.17, 2.18, 2.19, 2.20, 2.21, 2.22 },
text ^>= { 1.2, 2.0, 2.1 },
th-compat ^>= { 0.1 },
+ unix ^>= { 2.7.3, 2.8.6 },
hs-source-dirs: src
default-language: Haskell2010
diff --git a/src/Command/Run.hs b/src/Command/Run.hs
index c762335..945a4fd 100644
--- a/src/Command/Run.hs
+++ b/src/Command/Run.hs
@@ -4,6 +4,7 @@ module Command.Run (
import Control.Concurrent
import Control.Concurrent.STM
+import Control.Exception
import Control.Monad
import Control.Monad.Reader
@@ -68,25 +69,32 @@ cmdRun (RunCommand changeset) = do
T.replicate (8 + 50) " " :
map ((" "<>) . fitToLength 7 . textJobName) names
- statuses <- forM jobssets $ \jobset -> do
- let commit = jobsetCommit jobset
- shortCid = T.pack $ take 7 $ showCommitId $ commitId commit
- shortDesc = fitToLength 50 (commitDescription commit)
- case jobsetJobsEither jobset of
- Right jobs -> do
- outs <- runJobs mngr commit jobs
- let findJob name = snd <$> find ((name ==) . jobName . fst) outs
- displayStatusLine tout shortCid (" " <> shortDesc) $ map findJob names
- return $ map snd outs
- Left err -> do
- void $ newLine tout $
- "\ESC[91m" <> shortCid <> "\ESC[0m" <> " " <> shortDesc <> " \ESC[91m" <> T.pack err <> "\ESC[0m"
- return []
-
- -- wait for all jobs to complete
- atomically $ forM_ (concat statuses) $ \tvar -> do
- status <- readTVar tvar
- when (not $ jobStatusFinished status) retry
+ threadCount <- newTVarIO (0 :: Int)
+ let changeCount f = atomically $ do
+ writeTVar threadCount . f =<< readTVar threadCount
+ let waitForJobs = atomically $ do
+ flip when retry . (0 <) =<< readTVar threadCount
+
+ handle @SomeException (\_ -> cancelAllJobs mngr) $ do
+ forM_ jobssets $ \jobset -> do
+ let commit = jobsetCommit jobset
+ shortCid = T.pack $ take 7 $ showCommitId $ commitId commit
+ shortDesc = fitToLength 50 (commitDescription commit)
+ case jobsetJobsEither jobset of
+ Right jobs -> do
+ outs <- runJobs mngr commit jobs
+ let findJob name = snd <$> find ((name ==) . jobName . fst) outs
+ line <- newLine tout ""
+ mask $ \restore -> do
+ changeCount (+ 1)
+ void $ forkIO $ (>> changeCount (subtract 1)) $
+ try @SomeException $ restore $ do
+ displayStatusLine tout line shortCid (" " <> shortDesc) $ map findJob names
+ Left err -> do
+ void $ newLine tout $
+ "\ESC[91m" <> shortCid <> "\ESC[0m" <> " " <> shortDesc <> " \ESC[91m" <> T.pack err <> "\ESC[0m"
+ waitForJobs
+ waitForJobs
fitToLength :: Int -> Text -> Text
@@ -102,6 +110,7 @@ showStatus blink = \case
JobRunning -> "\ESC[96m" <> (if blink then "*" else "•") <> "\ESC[0m "
JobError _ -> "\ESC[91m!!\ESC[0m "
JobFailed -> "\ESC[91m✗\ESC[0m "
+ JobCancelled -> "\ESC[0mC\ESC[0m "
JobDone _ -> "\ESC[92m✓\ESC[0m "
JobDuplicate _ s -> case s of
@@ -111,13 +120,11 @@ showStatus blink = \case
JobRunning -> "\ESC[96m" <> (if blink then "*" else "^") <> "\ESC[0m "
_ -> showStatus blink s
-displayStatusLine :: TerminalOutput -> Text -> Text -> [ Maybe (TVar (JobStatus JobOutput)) ] -> IO ()
-displayStatusLine tout prefix1 prefix2 statuses = do
- line <- newLine tout ""
- void $ forkIO $ do
- go line "\0"
+displayStatusLine :: TerminalOutput -> TerminalLine -> Text -> Text -> [ Maybe (TVar (JobStatus JobOutput)) ] -> IO ()
+displayStatusLine tout line prefix1 prefix2 statuses = do
+ go "\0"
where
- go line prev = do
+ go prev = do
(ss, cur) <- atomically $ do
ss <- mapM (sequence . fmap readTVar) statuses
blink <- terminalBlinkStatus tout
@@ -132,4 +139,4 @@ displayStatusLine tout prefix1 prefix2 statuses = do
if all (maybe True jobStatusFinished) ss
then return ()
- else go line cur
+ else go cur
diff --git a/src/Job.hs b/src/Job.hs
index 25b9a73..990ea3e 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -6,7 +6,7 @@ module Job (
ArtifactName(..),
JobStatus(..),
jobStatusFinished, jobStatusFailed,
- JobManager(..), newJobManager,
+ JobManager(..), newJobManager, cancelAllJobs,
runJobs,
) where
@@ -31,6 +31,7 @@ import System.Directory
import System.Exit
import System.FilePath
import System.IO
+import System.Posix.Signals
import System.Process
import Job.Types
@@ -58,6 +59,7 @@ data JobStatus a = JobQueued
| JobSkipped
| JobError Text
| JobFailed
+ | JobCancelled
| JobDone a
deriving (Eq)
@@ -85,6 +87,7 @@ textJobStatus = \case
JobSkipped -> "skipped"
JobError err -> "error\n" <> err
JobFailed -> "failed"
+ JobCancelled -> "cancelled"
JobDone _ -> "done"
@@ -95,11 +98,19 @@ data JobManager = JobManager
, jmNextTaskId :: TVar TaskId
, jmNextTask :: TVar (Maybe TaskId)
, jmReadyTasks :: TVar (Set TaskId)
+ , jmRunningTasks :: TVar (Map TaskId ThreadId)
+ , jmCancelled :: TVar Bool
}
newtype TaskId = TaskId Int
deriving (Eq, Ord)
+data JobCancelledException = JobCancelledException
+ deriving (Show)
+
+instance Exception JobCancelledException
+
+
newJobManager :: FilePath -> Int -> IO JobManager
newJobManager jmDataDir queueLen = do
jmSemaphore <- newTVarIO queueLen
@@ -107,31 +118,49 @@ newJobManager jmDataDir queueLen = do
jmNextTaskId <- newTVarIO (TaskId 0)
jmNextTask <- newTVarIO Nothing
jmReadyTasks <- newTVarIO S.empty
+ jmRunningTasks <- newTVarIO M.empty
+ jmCancelled <- newTVarIO False
return JobManager {..}
+cancelAllJobs :: JobManager -> IO ()
+cancelAllJobs JobManager {..} = do
+ threads <- atomically $ do
+ writeTVar jmCancelled True
+ M.elems <$> readTVar jmRunningTasks
+
+ mapM_ (`throwTo` JobCancelledException) threads
+
reserveTaskId :: JobManager -> STM TaskId
reserveTaskId JobManager {..} = do
tid@(TaskId n) <- readTVar jmNextTaskId
writeTVar jmNextTaskId (TaskId (n + 1))
return tid
-runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a
-runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job)
+runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a -> m a
+runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case
+ True -> cancel
+ False -> job
where
acquire = liftIO $ do
atomically $ do
writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks
trySelectNext
+ threadId <- myThreadId
atomically $ do
- readTVar jmNextTask >>= \case
- Just tid' | tid' == tid -> do
- writeTVar jmNextTask Nothing
- _ -> retry
-
- release _ = liftIO $ atomically $ do
+ readTVar jmCancelled >>= \case
+ True -> return True
+ False -> readTVar jmNextTask >>= \case
+ Just tid' | tid' == tid -> do
+ writeTVar jmNextTask Nothing
+ writeTVar jmRunningTasks . M.insert tid threadId =<< readTVar jmRunningTasks
+ return False
+ _ -> retry
+
+ release False = liftIO $ atomically $ do
free <- readTVar jmSemaphore
writeTVar jmSemaphore $ free + 1
trySelectNext
+ release True = return ()
trySelectNext = do
readTVar jmNextTask >>= \case
@@ -145,6 +174,7 @@ runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job)
writeTVar jmReadyTasks ready
writeTVar jmSemaphore (sem - 1)
writeTVar jmNextTask (Just tid')
+ writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks
runJobs :: JobManager -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
@@ -165,43 +195,47 @@ runJobs mngr@JobManager {..} commit jobs = do
return statusVar
forM_ results $ \( job, tid, outVar ) -> void $ forkIO $ do
- res <- runExceptT $ do
- duplicate <- liftIO $ atomically $ do
- readTVar outVar >>= \case
- JobDuplicate jid _ -> do
- fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs
- _ -> do
- return Nothing
-
- case duplicate of
- Nothing -> do
- uses <- waitForUsedArtifacts job results outVar
- runManagedJob mngr tid $ do
- liftIO $ atomically $ writeTVar outVar JobRunning
- prepareJob jmDataDir commit job $ \checkoutPath jdir -> do
- updateStatusFile (jdir </> "status") outVar
- JobDone <$> runJob job uses checkoutPath jdir
-
- Just ( jid, origVar ) -> do
- let wait = do
- status <- atomically $ do
- status <- readTVar origVar
- out <- readTVar outVar
- if status == out
- then retry
- else do
- writeTVar outVar $ JobDuplicate jid status
- return status
- if jobStatusFinished status
- then return $ JobDuplicate jid status
- else wait
- liftIO wait
-
- case res of
- Left (JobError err) -> T.putStrLn err
- _ -> return ()
-
- atomically $ writeTVar outVar $ either id id res
+ let handler e = atomically $ writeTVar outVar $ if
+ | Just JobCancelledException <- fromException e -> JobCancelled
+ | otherwise -> JobError (T.pack $ displayException e)
+ handle handler $ do
+ res <- runExceptT $ do
+ duplicate <- liftIO $ atomically $ do
+ readTVar outVar >>= \case
+ JobDuplicate jid _ -> do
+ fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs
+ _ -> do
+ return Nothing
+
+ case duplicate of
+ Nothing -> do
+ uses <- waitForUsedArtifacts job results outVar
+ runManagedJob mngr tid (return JobCancelled) $ do
+ liftIO $ atomically $ writeTVar outVar JobRunning
+ prepareJob jmDataDir commit job $ \checkoutPath jdir -> do
+ updateStatusFile (jdir </> "status") outVar
+ JobDone <$> runJob job uses checkoutPath jdir
+
+ Just ( jid, origVar ) -> do
+ let wait = do
+ status <- atomically $ do
+ status <- readTVar origVar
+ out <- readTVar outVar
+ if status == out
+ then retry
+ else do
+ writeTVar outVar $ JobDuplicate jid status
+ return status
+ if jobStatusFinished status
+ then return $ JobDuplicate jid status
+ else wait
+ liftIO wait
+
+ case res of
+ Left (JobError err) -> T.putStrLn err
+ _ -> return ()
+
+ atomically $ writeTVar outVar $ either id id res
return $ map (\( job, _, var ) -> ( job, var )) results
waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) =>
@@ -274,10 +308,11 @@ runJob job uses checkoutPath jdir = do
, std_err = UseHandle logs
}
liftIO $ hClose hin
- exit <- liftIO $ waitForProcess hp
-
- when (exit /= ExitSuccess) $
- throwError JobFailed
+ liftIO (waitForProcess hp) >>= \case
+ ExitSuccess -> return ()
+ ExitFailure n
+ | fromIntegral n == -sigINT -> throwError JobCancelled
+ | otherwise -> throwError JobFailed
let adir = jdir </> "artifacts"
artifacts <- forM (jobArtifacts job) $ \(name@(ArtifactName tname), pathCmd) -> liftIO $ do