diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-18 15:48:10 +0100 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-18 19:19:48 +0100 |
commit | 1ecc43458cd5c4f41fb23948c48e429e376704a5 (patch) | |
tree | 8b9d54da2f9dcfaccb3024749e4254fae80097cc /src | |
parent | cbf936f3479172260261ba07a4ff0ca30ae1fe98 (diff) |
Cancel jobs on user interrupt
Changelog: Properly cancel and clean up jobs on user interrupt
Diffstat (limited to 'src')
-rw-r--r-- | src/Command/Run.hs | 59 | ||||
-rw-r--r-- | src/Job.hs | 135 |
2 files changed, 118 insertions, 76 deletions
diff --git a/src/Command/Run.hs b/src/Command/Run.hs index c762335..945a4fd 100644 --- a/src/Command/Run.hs +++ b/src/Command/Run.hs @@ -4,6 +4,7 @@ module Command.Run ( import Control.Concurrent import Control.Concurrent.STM +import Control.Exception import Control.Monad import Control.Monad.Reader @@ -68,25 +69,32 @@ cmdRun (RunCommand changeset) = do T.replicate (8 + 50) " " : map ((" "<>) . fitToLength 7 . textJobName) names - statuses <- forM jobssets $ \jobset -> do - let commit = jobsetCommit jobset - shortCid = T.pack $ take 7 $ showCommitId $ commitId commit - shortDesc = fitToLength 50 (commitDescription commit) - case jobsetJobsEither jobset of - Right jobs -> do - outs <- runJobs mngr commit jobs - let findJob name = snd <$> find ((name ==) . jobName . fst) outs - displayStatusLine tout shortCid (" " <> shortDesc) $ map findJob names - return $ map snd outs - Left err -> do - void $ newLine tout $ - "\ESC[91m" <> shortCid <> "\ESC[0m" <> " " <> shortDesc <> " \ESC[91m" <> T.pack err <> "\ESC[0m" - return [] - - -- wait for all jobs to complete - atomically $ forM_ (concat statuses) $ \tvar -> do - status <- readTVar tvar - when (not $ jobStatusFinished status) retry + threadCount <- newTVarIO (0 :: Int) + let changeCount f = atomically $ do + writeTVar threadCount . f =<< readTVar threadCount + let waitForJobs = atomically $ do + flip when retry . (0 <) =<< readTVar threadCount + + handle @SomeException (\_ -> cancelAllJobs mngr) $ do + forM_ jobssets $ \jobset -> do + let commit = jobsetCommit jobset + shortCid = T.pack $ take 7 $ showCommitId $ commitId commit + shortDesc = fitToLength 50 (commitDescription commit) + case jobsetJobsEither jobset of + Right jobs -> do + outs <- runJobs mngr commit jobs + let findJob name = snd <$> find ((name ==) . jobName . fst) outs + line <- newLine tout "" + mask $ \restore -> do + changeCount (+ 1) + void $ forkIO $ (>> changeCount (subtract 1)) $ + try @SomeException $ restore $ do + displayStatusLine tout line shortCid (" " <> shortDesc) $ map findJob names + Left err -> do + void $ newLine tout $ + "\ESC[91m" <> shortCid <> "\ESC[0m" <> " " <> shortDesc <> " \ESC[91m" <> T.pack err <> "\ESC[0m" + waitForJobs + waitForJobs fitToLength :: Int -> Text -> Text @@ -102,6 +110,7 @@ showStatus blink = \case JobRunning -> "\ESC[96m" <> (if blink then "*" else "•") <> "\ESC[0m " JobError _ -> "\ESC[91m!!\ESC[0m " JobFailed -> "\ESC[91m✗\ESC[0m " + JobCancelled -> "\ESC[0mC\ESC[0m " JobDone _ -> "\ESC[92m✓\ESC[0m " JobDuplicate _ s -> case s of @@ -111,13 +120,11 @@ showStatus blink = \case JobRunning -> "\ESC[96m" <> (if blink then "*" else "^") <> "\ESC[0m " _ -> showStatus blink s -displayStatusLine :: TerminalOutput -> Text -> Text -> [ Maybe (TVar (JobStatus JobOutput)) ] -> IO () -displayStatusLine tout prefix1 prefix2 statuses = do - line <- newLine tout "" - void $ forkIO $ do - go line "\0" +displayStatusLine :: TerminalOutput -> TerminalLine -> Text -> Text -> [ Maybe (TVar (JobStatus JobOutput)) ] -> IO () +displayStatusLine tout line prefix1 prefix2 statuses = do + go "\0" where - go line prev = do + go prev = do (ss, cur) <- atomically $ do ss <- mapM (sequence . fmap readTVar) statuses blink <- terminalBlinkStatus tout @@ -132,4 +139,4 @@ displayStatusLine tout prefix1 prefix2 statuses = do if all (maybe True jobStatusFinished) ss then return () - else go line cur + else go cur @@ -6,7 +6,7 @@ module Job ( ArtifactName(..), JobStatus(..), jobStatusFinished, jobStatusFailed, - JobManager(..), newJobManager, + JobManager(..), newJobManager, cancelAllJobs, runJobs, ) where @@ -31,6 +31,7 @@ import System.Directory import System.Exit import System.FilePath import System.IO +import System.Posix.Signals import System.Process import Job.Types @@ -58,6 +59,7 @@ data JobStatus a = JobQueued | JobSkipped | JobError Text | JobFailed + | JobCancelled | JobDone a deriving (Eq) @@ -85,6 +87,7 @@ textJobStatus = \case JobSkipped -> "skipped" JobError err -> "error\n" <> err JobFailed -> "failed" + JobCancelled -> "cancelled" JobDone _ -> "done" @@ -95,11 +98,19 @@ data JobManager = JobManager , jmNextTaskId :: TVar TaskId , jmNextTask :: TVar (Maybe TaskId) , jmReadyTasks :: TVar (Set TaskId) + , jmRunningTasks :: TVar (Map TaskId ThreadId) + , jmCancelled :: TVar Bool } newtype TaskId = TaskId Int deriving (Eq, Ord) +data JobCancelledException = JobCancelledException + deriving (Show) + +instance Exception JobCancelledException + + newJobManager :: FilePath -> Int -> IO JobManager newJobManager jmDataDir queueLen = do jmSemaphore <- newTVarIO queueLen @@ -107,31 +118,49 @@ newJobManager jmDataDir queueLen = do jmNextTaskId <- newTVarIO (TaskId 0) jmNextTask <- newTVarIO Nothing jmReadyTasks <- newTVarIO S.empty + jmRunningTasks <- newTVarIO M.empty + jmCancelled <- newTVarIO False return JobManager {..} +cancelAllJobs :: JobManager -> IO () +cancelAllJobs JobManager {..} = do + threads <- atomically $ do + writeTVar jmCancelled True + M.elems <$> readTVar jmRunningTasks + + mapM_ (`throwTo` JobCancelledException) threads + reserveTaskId :: JobManager -> STM TaskId reserveTaskId JobManager {..} = do tid@(TaskId n) <- readTVar jmNextTaskId writeTVar jmNextTaskId (TaskId (n + 1)) return tid -runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a -runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job) +runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a -> m a +runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case + True -> cancel + False -> job where acquire = liftIO $ do atomically $ do writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks trySelectNext + threadId <- myThreadId atomically $ do - readTVar jmNextTask >>= \case - Just tid' | tid' == tid -> do - writeTVar jmNextTask Nothing - _ -> retry - - release _ = liftIO $ atomically $ do + readTVar jmCancelled >>= \case + True -> return True + False -> readTVar jmNextTask >>= \case + Just tid' | tid' == tid -> do + writeTVar jmNextTask Nothing + writeTVar jmRunningTasks . M.insert tid threadId =<< readTVar jmRunningTasks + return False + _ -> retry + + release False = liftIO $ atomically $ do free <- readTVar jmSemaphore writeTVar jmSemaphore $ free + 1 trySelectNext + release True = return () trySelectNext = do readTVar jmNextTask >>= \case @@ -145,6 +174,7 @@ runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job) writeTVar jmReadyTasks ready writeTVar jmSemaphore (sem - 1) writeTVar jmNextTask (Just tid') + writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks runJobs :: JobManager -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] @@ -165,43 +195,47 @@ runJobs mngr@JobManager {..} commit jobs = do return statusVar forM_ results $ \( job, tid, outVar ) -> void $ forkIO $ do - res <- runExceptT $ do - duplicate <- liftIO $ atomically $ do - readTVar outVar >>= \case - JobDuplicate jid _ -> do - fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs - _ -> do - return Nothing - - case duplicate of - Nothing -> do - uses <- waitForUsedArtifacts job results outVar - runManagedJob mngr tid $ do - liftIO $ atomically $ writeTVar outVar JobRunning - prepareJob jmDataDir commit job $ \checkoutPath jdir -> do - updateStatusFile (jdir </> "status") outVar - JobDone <$> runJob job uses checkoutPath jdir - - Just ( jid, origVar ) -> do - let wait = do - status <- atomically $ do - status <- readTVar origVar - out <- readTVar outVar - if status == out - then retry - else do - writeTVar outVar $ JobDuplicate jid status - return status - if jobStatusFinished status - then return $ JobDuplicate jid status - else wait - liftIO wait - - case res of - Left (JobError err) -> T.putStrLn err - _ -> return () - - atomically $ writeTVar outVar $ either id id res + let handler e = atomically $ writeTVar outVar $ if + | Just JobCancelledException <- fromException e -> JobCancelled + | otherwise -> JobError (T.pack $ displayException e) + handle handler $ do + res <- runExceptT $ do + duplicate <- liftIO $ atomically $ do + readTVar outVar >>= \case + JobDuplicate jid _ -> do + fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs + _ -> do + return Nothing + + case duplicate of + Nothing -> do + uses <- waitForUsedArtifacts job results outVar + runManagedJob mngr tid (return JobCancelled) $ do + liftIO $ atomically $ writeTVar outVar JobRunning + prepareJob jmDataDir commit job $ \checkoutPath jdir -> do + updateStatusFile (jdir </> "status") outVar + JobDone <$> runJob job uses checkoutPath jdir + + Just ( jid, origVar ) -> do + let wait = do + status <- atomically $ do + status <- readTVar origVar + out <- readTVar outVar + if status == out + then retry + else do + writeTVar outVar $ JobDuplicate jid status + return status + if jobStatusFinished status + then return $ JobDuplicate jid status + else wait + liftIO wait + + case res of + Left (JobError err) -> T.putStrLn err + _ -> return () + + atomically $ writeTVar outVar $ either id id res return $ map (\( job, _, var ) -> ( job, var )) results waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => @@ -274,10 +308,11 @@ runJob job uses checkoutPath jdir = do , std_err = UseHandle logs } liftIO $ hClose hin - exit <- liftIO $ waitForProcess hp - - when (exit /= ExitSuccess) $ - throwError JobFailed + liftIO (waitForProcess hp) >>= \case + ExitSuccess -> return () + ExitFailure n + | fromIntegral n == -sigINT -> throwError JobCancelled + | otherwise -> throwError JobFailed let adir = jdir </> "artifacts" artifacts <- forM (jobArtifacts job) $ \(name@(ArtifactName tname), pathCmd) -> liftIO $ do |