summaryrefslogtreecommitdiff
path: root/src/Job.hs
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2025-01-18 15:48:10 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2025-01-18 19:19:48 +0100
commit1ecc43458cd5c4f41fb23948c48e429e376704a5 (patch)
tree8b9d54da2f9dcfaccb3024749e4254fae80097cc /src/Job.hs
parentcbf936f3479172260261ba07a4ff0ca30ae1fe98 (diff)
Cancel jobs on user interrupt
Changelog: Properly cancel and clean up jobs on user interrupt
Diffstat (limited to 'src/Job.hs')
-rw-r--r--src/Job.hs135
1 files changed, 85 insertions, 50 deletions
diff --git a/src/Job.hs b/src/Job.hs
index 25b9a73..990ea3e 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -6,7 +6,7 @@ module Job (
ArtifactName(..),
JobStatus(..),
jobStatusFinished, jobStatusFailed,
- JobManager(..), newJobManager,
+ JobManager(..), newJobManager, cancelAllJobs,
runJobs,
) where
@@ -31,6 +31,7 @@ import System.Directory
import System.Exit
import System.FilePath
import System.IO
+import System.Posix.Signals
import System.Process
import Job.Types
@@ -58,6 +59,7 @@ data JobStatus a = JobQueued
| JobSkipped
| JobError Text
| JobFailed
+ | JobCancelled
| JobDone a
deriving (Eq)
@@ -85,6 +87,7 @@ textJobStatus = \case
JobSkipped -> "skipped"
JobError err -> "error\n" <> err
JobFailed -> "failed"
+ JobCancelled -> "cancelled"
JobDone _ -> "done"
@@ -95,11 +98,19 @@ data JobManager = JobManager
, jmNextTaskId :: TVar TaskId
, jmNextTask :: TVar (Maybe TaskId)
, jmReadyTasks :: TVar (Set TaskId)
+ , jmRunningTasks :: TVar (Map TaskId ThreadId)
+ , jmCancelled :: TVar Bool
}
newtype TaskId = TaskId Int
deriving (Eq, Ord)
+data JobCancelledException = JobCancelledException
+ deriving (Show)
+
+instance Exception JobCancelledException
+
+
newJobManager :: FilePath -> Int -> IO JobManager
newJobManager jmDataDir queueLen = do
jmSemaphore <- newTVarIO queueLen
@@ -107,31 +118,49 @@ newJobManager jmDataDir queueLen = do
jmNextTaskId <- newTVarIO (TaskId 0)
jmNextTask <- newTVarIO Nothing
jmReadyTasks <- newTVarIO S.empty
+ jmRunningTasks <- newTVarIO M.empty
+ jmCancelled <- newTVarIO False
return JobManager {..}
+cancelAllJobs :: JobManager -> IO ()
+cancelAllJobs JobManager {..} = do
+ threads <- atomically $ do
+ writeTVar jmCancelled True
+ M.elems <$> readTVar jmRunningTasks
+
+ mapM_ (`throwTo` JobCancelledException) threads
+
reserveTaskId :: JobManager -> STM TaskId
reserveTaskId JobManager {..} = do
tid@(TaskId n) <- readTVar jmNextTaskId
writeTVar jmNextTaskId (TaskId (n + 1))
return tid
-runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a
-runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job)
+runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a -> m a
+runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case
+ True -> cancel
+ False -> job
where
acquire = liftIO $ do
atomically $ do
writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks
trySelectNext
+ threadId <- myThreadId
atomically $ do
- readTVar jmNextTask >>= \case
- Just tid' | tid' == tid -> do
- writeTVar jmNextTask Nothing
- _ -> retry
-
- release _ = liftIO $ atomically $ do
+ readTVar jmCancelled >>= \case
+ True -> return True
+ False -> readTVar jmNextTask >>= \case
+ Just tid' | tid' == tid -> do
+ writeTVar jmNextTask Nothing
+ writeTVar jmRunningTasks . M.insert tid threadId =<< readTVar jmRunningTasks
+ return False
+ _ -> retry
+
+ release False = liftIO $ atomically $ do
free <- readTVar jmSemaphore
writeTVar jmSemaphore $ free + 1
trySelectNext
+ release True = return ()
trySelectNext = do
readTVar jmNextTask >>= \case
@@ -145,6 +174,7 @@ runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job)
writeTVar jmReadyTasks ready
writeTVar jmSemaphore (sem - 1)
writeTVar jmNextTask (Just tid')
+ writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks
runJobs :: JobManager -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
@@ -165,43 +195,47 @@ runJobs mngr@JobManager {..} commit jobs = do
return statusVar
forM_ results $ \( job, tid, outVar ) -> void $ forkIO $ do
- res <- runExceptT $ do
- duplicate <- liftIO $ atomically $ do
- readTVar outVar >>= \case
- JobDuplicate jid _ -> do
- fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs
- _ -> do
- return Nothing
-
- case duplicate of
- Nothing -> do
- uses <- waitForUsedArtifacts job results outVar
- runManagedJob mngr tid $ do
- liftIO $ atomically $ writeTVar outVar JobRunning
- prepareJob jmDataDir commit job $ \checkoutPath jdir -> do
- updateStatusFile (jdir </> "status") outVar
- JobDone <$> runJob job uses checkoutPath jdir
-
- Just ( jid, origVar ) -> do
- let wait = do
- status <- atomically $ do
- status <- readTVar origVar
- out <- readTVar outVar
- if status == out
- then retry
- else do
- writeTVar outVar $ JobDuplicate jid status
- return status
- if jobStatusFinished status
- then return $ JobDuplicate jid status
- else wait
- liftIO wait
-
- case res of
- Left (JobError err) -> T.putStrLn err
- _ -> return ()
-
- atomically $ writeTVar outVar $ either id id res
+ let handler e = atomically $ writeTVar outVar $ if
+ | Just JobCancelledException <- fromException e -> JobCancelled
+ | otherwise -> JobError (T.pack $ displayException e)
+ handle handler $ do
+ res <- runExceptT $ do
+ duplicate <- liftIO $ atomically $ do
+ readTVar outVar >>= \case
+ JobDuplicate jid _ -> do
+ fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs
+ _ -> do
+ return Nothing
+
+ case duplicate of
+ Nothing -> do
+ uses <- waitForUsedArtifacts job results outVar
+ runManagedJob mngr tid (return JobCancelled) $ do
+ liftIO $ atomically $ writeTVar outVar JobRunning
+ prepareJob jmDataDir commit job $ \checkoutPath jdir -> do
+ updateStatusFile (jdir </> "status") outVar
+ JobDone <$> runJob job uses checkoutPath jdir
+
+ Just ( jid, origVar ) -> do
+ let wait = do
+ status <- atomically $ do
+ status <- readTVar origVar
+ out <- readTVar outVar
+ if status == out
+ then retry
+ else do
+ writeTVar outVar $ JobDuplicate jid status
+ return status
+ if jobStatusFinished status
+ then return $ JobDuplicate jid status
+ else wait
+ liftIO wait
+
+ case res of
+ Left (JobError err) -> T.putStrLn err
+ _ -> return ()
+
+ atomically $ writeTVar outVar $ either id id res
return $ map (\( job, _, var ) -> ( job, var )) results
waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) =>
@@ -274,10 +308,11 @@ runJob job uses checkoutPath jdir = do
, std_err = UseHandle logs
}
liftIO $ hClose hin
- exit <- liftIO $ waitForProcess hp
-
- when (exit /= ExitSuccess) $
- throwError JobFailed
+ liftIO (waitForProcess hp) >>= \case
+ ExitSuccess -> return ()
+ ExitFailure n
+ | fromIntegral n == -sigINT -> throwError JobCancelled
+ | otherwise -> throwError JobFailed
let adir = jdir </> "artifacts"
artifacts <- forM (jobArtifacts job) $ \(name@(ArtifactName tname), pathCmd) -> liftIO $ do