diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-18 15:48:10 +0100 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-01-18 19:19:48 +0100 | 
| commit | 1ecc43458cd5c4f41fb23948c48e429e376704a5 (patch) | |
| tree | 8b9d54da2f9dcfaccb3024749e4254fae80097cc /src | |
| parent | cbf936f3479172260261ba07a4ff0ca30ae1fe98 (diff) | |
Cancel jobs on user interrupt
Changelog: Properly cancel and clean up jobs on user interrupt
Diffstat (limited to 'src')
| -rw-r--r-- | src/Command/Run.hs | 59 | ||||
| -rw-r--r-- | src/Job.hs | 135 | 
2 files changed, 118 insertions, 76 deletions
| diff --git a/src/Command/Run.hs b/src/Command/Run.hs index c762335..945a4fd 100644 --- a/src/Command/Run.hs +++ b/src/Command/Run.hs @@ -4,6 +4,7 @@ module Command.Run (  import Control.Concurrent  import Control.Concurrent.STM +import Control.Exception  import Control.Monad  import Control.Monad.Reader @@ -68,25 +69,32 @@ cmdRun (RunCommand changeset) = do              T.replicate (8 + 50) " " :              map ((" "<>) . fitToLength 7 . textJobName) names -        statuses <- forM jobssets $ \jobset -> do -            let commit = jobsetCommit jobset -                shortCid = T.pack $ take 7 $ showCommitId $ commitId commit -                shortDesc = fitToLength 50 (commitDescription commit) -            case jobsetJobsEither jobset of -                Right jobs -> do -                    outs <- runJobs mngr commit jobs -                    let findJob name = snd <$> find ((name ==) . jobName . fst) outs -                    displayStatusLine tout shortCid (" " <> shortDesc) $ map findJob names -                    return $ map snd outs -                Left err -> do -                    void $ newLine tout $ -                        "\ESC[91m" <> shortCid <> "\ESC[0m" <> " " <> shortDesc <> " \ESC[91m" <> T.pack err <> "\ESC[0m" -                    return [] - -        -- wait for all jobs to complete -        atomically $ forM_ (concat statuses) $ \tvar -> do -            status <- readTVar tvar -            when (not $ jobStatusFinished status) retry +        threadCount <- newTVarIO (0 :: Int) +        let changeCount f = atomically $ do +                writeTVar threadCount . f =<< readTVar threadCount +        let waitForJobs = atomically $ do +                flip when retry . (0 <) =<< readTVar threadCount + +        handle @SomeException (\_ -> cancelAllJobs mngr) $ do +            forM_ jobssets $ \jobset -> do +                let commit = jobsetCommit jobset +                    shortCid = T.pack $ take 7 $ showCommitId $ commitId commit +                    shortDesc = fitToLength 50 (commitDescription commit) +                case jobsetJobsEither jobset of +                    Right jobs -> do +                        outs <- runJobs mngr commit jobs +                        let findJob name = snd <$> find ((name ==) . jobName . fst) outs +                        line <- newLine tout "" +                        mask $ \restore -> do +                            changeCount (+ 1) +                            void $ forkIO $ (>> changeCount (subtract 1)) $ +                                try @SomeException $ restore $ do +                                    displayStatusLine tout line shortCid (" " <> shortDesc) $ map findJob names +                    Left err -> do +                        void $ newLine tout $ +                            "\ESC[91m" <> shortCid <> "\ESC[0m" <> " " <> shortDesc <> " \ESC[91m" <> T.pack err <> "\ESC[0m" +            waitForJobs +        waitForJobs  fitToLength :: Int -> Text -> Text @@ -102,6 +110,7 @@ showStatus blink = \case      JobRunning      -> "\ESC[96m" <> (if blink then "*" else "•") <> "\ESC[0m      "      JobError _      -> "\ESC[91m!!\ESC[0m     "      JobFailed       -> "\ESC[91m✗\ESC[0m      " +    JobCancelled    ->  "\ESC[0mC\ESC[0m      "      JobDone _       -> "\ESC[92m✓\ESC[0m      "      JobDuplicate _ s -> case s of @@ -111,13 +120,11 @@ showStatus blink = \case          JobRunning   -> "\ESC[96m" <> (if blink then "*" else "^") <> "\ESC[0m      "          _            -> showStatus blink s -displayStatusLine :: TerminalOutput -> Text -> Text -> [ Maybe (TVar (JobStatus JobOutput)) ] -> IO () -displayStatusLine tout prefix1 prefix2 statuses = do -    line <- newLine tout "" -    void $ forkIO $ do -        go line "\0" +displayStatusLine :: TerminalOutput -> TerminalLine -> Text -> Text -> [ Maybe (TVar (JobStatus JobOutput)) ] -> IO () +displayStatusLine tout line prefix1 prefix2 statuses = do +    go "\0"    where -    go line prev = do +    go prev = do          (ss, cur) <- atomically $ do              ss <- mapM (sequence . fmap readTVar) statuses              blink <- terminalBlinkStatus tout @@ -132,4 +139,4 @@ displayStatusLine tout prefix1 prefix2 statuses = do          if all (maybe True jobStatusFinished) ss             then return () -           else go line cur +           else go cur @@ -6,7 +6,7 @@ module Job (      ArtifactName(..),      JobStatus(..),      jobStatusFinished, jobStatusFailed, -    JobManager(..), newJobManager, +    JobManager(..), newJobManager, cancelAllJobs,      runJobs,  ) where @@ -31,6 +31,7 @@ import System.Directory  import System.Exit  import System.FilePath  import System.IO +import System.Posix.Signals  import System.Process  import Job.Types @@ -58,6 +59,7 @@ data JobStatus a = JobQueued                   | JobSkipped                   | JobError Text                   | JobFailed +                 | JobCancelled                   | JobDone a      deriving (Eq) @@ -85,6 +87,7 @@ textJobStatus = \case      JobSkipped -> "skipped"      JobError err -> "error\n" <> err      JobFailed -> "failed" +    JobCancelled -> "cancelled"      JobDone _ -> "done" @@ -95,11 +98,19 @@ data JobManager = JobManager      , jmNextTaskId :: TVar TaskId      , jmNextTask :: TVar (Maybe TaskId)      , jmReadyTasks :: TVar (Set TaskId) +    , jmRunningTasks :: TVar (Map TaskId ThreadId) +    , jmCancelled :: TVar Bool      }  newtype TaskId = TaskId Int      deriving (Eq, Ord) +data JobCancelledException = JobCancelledException +    deriving (Show) + +instance Exception JobCancelledException + +  newJobManager :: FilePath -> Int -> IO JobManager  newJobManager jmDataDir queueLen = do      jmSemaphore <- newTVarIO queueLen @@ -107,31 +118,49 @@ newJobManager jmDataDir queueLen = do      jmNextTaskId <- newTVarIO (TaskId 0)      jmNextTask <- newTVarIO Nothing      jmReadyTasks <- newTVarIO S.empty +    jmRunningTasks <- newTVarIO M.empty +    jmCancelled <- newTVarIO False      return JobManager {..} +cancelAllJobs :: JobManager -> IO () +cancelAllJobs JobManager {..} = do +    threads <- atomically $ do +        writeTVar jmCancelled True +        M.elems <$> readTVar jmRunningTasks + +    mapM_ (`throwTo` JobCancelledException) threads +  reserveTaskId :: JobManager -> STM TaskId  reserveTaskId JobManager {..} = do      tid@(TaskId n) <- readTVar jmNextTaskId      writeTVar jmNextTaskId (TaskId (n + 1))      return tid -runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a -runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job) +runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a -> m a +runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case +    True -> cancel +    False -> job    where      acquire = liftIO $ do          atomically $ do              writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks              trySelectNext +        threadId <- myThreadId          atomically $ do -            readTVar jmNextTask >>= \case -                Just tid' | tid' == tid -> do -                    writeTVar jmNextTask Nothing -                _ -> retry - -    release _ = liftIO $ atomically $ do +            readTVar jmCancelled >>= \case +                True -> return True +                False -> readTVar jmNextTask >>= \case +                    Just tid' | tid' == tid -> do +                        writeTVar jmNextTask Nothing +                        writeTVar jmRunningTasks . M.insert tid threadId =<< readTVar jmRunningTasks +                        return False +                    _ -> retry + +    release False = liftIO $ atomically $ do          free <- readTVar jmSemaphore          writeTVar jmSemaphore $ free + 1          trySelectNext +    release True = return ()      trySelectNext = do          readTVar jmNextTask >>= \case @@ -145,6 +174,7 @@ runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job)                              writeTVar jmReadyTasks ready                              writeTVar jmSemaphore (sem - 1)                              writeTVar jmNextTask (Just tid') +                            writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks  runJobs :: JobManager -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] @@ -165,43 +195,47 @@ runJobs mngr@JobManager {..} commit jobs = do                      return statusVar      forM_ results $ \( job, tid, outVar ) -> void $ forkIO $ do -        res <- runExceptT $ do -            duplicate <- liftIO $ atomically $ do -                readTVar outVar >>= \case -                    JobDuplicate jid _ -> do -                        fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs -                    _ -> do -                        return Nothing - -            case duplicate of -                Nothing -> do -                    uses <- waitForUsedArtifacts job results outVar -                    runManagedJob mngr tid $ do -                        liftIO $ atomically $ writeTVar outVar JobRunning -                        prepareJob jmDataDir commit job $ \checkoutPath jdir -> do -                            updateStatusFile (jdir </> "status") outVar -                            JobDone <$> runJob job uses checkoutPath jdir - -                Just ( jid, origVar ) -> do -                    let wait = do -                            status <- atomically $ do -                                status <- readTVar origVar -                                out <- readTVar outVar -                                if status == out -                                  then retry -                                  else do -                                    writeTVar outVar $ JobDuplicate jid status -                                    return status -                            if jobStatusFinished status -                              then return $ JobDuplicate jid status -                              else wait -                    liftIO wait - -        case res of -            Left (JobError err) -> T.putStrLn err -            _ -> return () - -        atomically $ writeTVar outVar $ either id id res +        let handler e = atomically $ writeTVar outVar $ if +                | Just JobCancelledException <- fromException e -> JobCancelled +                | otherwise -> JobError (T.pack $ displayException e) +        handle handler $ do +            res <- runExceptT $ do +                duplicate <- liftIO $ atomically $ do +                    readTVar outVar >>= \case +                        JobDuplicate jid _ -> do +                            fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs +                        _ -> do +                            return Nothing + +                case duplicate of +                    Nothing -> do +                        uses <- waitForUsedArtifacts job results outVar +                        runManagedJob mngr tid (return JobCancelled) $ do +                            liftIO $ atomically $ writeTVar outVar JobRunning +                            prepareJob jmDataDir commit job $ \checkoutPath jdir -> do +                                updateStatusFile (jdir </> "status") outVar +                                JobDone <$> runJob job uses checkoutPath jdir + +                    Just ( jid, origVar ) -> do +                        let wait = do +                                status <- atomically $ do +                                    status <- readTVar origVar +                                    out <- readTVar outVar +                                    if status == out +                                      then retry +                                      else do +                                        writeTVar outVar $ JobDuplicate jid status +                                        return status +                                if jobStatusFinished status +                                  then return $ JobDuplicate jid status +                                  else wait +                        liftIO wait + +            case res of +                Left (JobError err) -> T.putStrLn err +                _ -> return () + +            atomically $ writeTVar outVar $ either id id res      return $ map (\( job, _, var ) -> ( job, var )) results  waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => @@ -274,10 +308,11 @@ runJob job uses checkoutPath jdir = do                  , std_err = UseHandle logs                  }              liftIO $ hClose hin -            exit <- liftIO $ waitForProcess hp - -            when (exit /= ExitSuccess) $ -                throwError JobFailed +            liftIO (waitForProcess hp) >>= \case +                ExitSuccess -> return () +                ExitFailure n +                    | fromIntegral n == -sigINT -> throwError JobCancelled +                    | otherwise -> throwError JobFailed      let adir = jdir </> "artifacts"      artifacts <- forM (jobArtifacts job) $ \(name@(ArtifactName tname), pathCmd) -> liftIO $ do |