summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2026-03-28 16:15:20 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2026-03-28 16:15:20 +0100
commit8615433b8ac8e161c36df9b18d551ceb3949ff1f (patch)
treecd6cfaa47e8185717ce7ba9e3ebbb1e02e25cd1d /src
parentd442fcbbe5bc40d903b25bd21fd2a9f2b8dd4cc6 (diff)
Interface to cancel individual tasks
Diffstat (limited to 'src')
-rw-r--r--src/Job.hs106
1 files changed, 56 insertions, 50 deletions
diff --git a/src/Job.hs b/src/Job.hs
index aadea66..649833a 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -7,7 +7,7 @@ module Job (
JobStatus(..),
jobStatusFinished, jobStatusFailed,
JobManager(..), newJobManager, cancelAllJobs,
- Task(..), TaskId,
+ Task(..), TaskId, cancelTask,
runJobs, waitForRemainingTasks,
prepareJob,
@@ -149,6 +149,7 @@ data JobManager = JobManager
data Task = Task
{ taskId :: TaskId
, taskJob :: Job
+ , taskThread :: ThreadId
, taskStatus :: TVar (JobStatus JobOutput)
}
@@ -181,6 +182,9 @@ cancelAllJobs JobManager {..} = do
mapM_ (`throwTo` JobCancelledException) threads
+cancelTask :: Task -> IO ()
+cancelTask Task {..} = throwTo taskThread JobCancelledException
+
reserveTaskId :: JobManager -> STM TaskId
reserveTaskId JobManager {..} = do
tid@(TaskId n) <- readTVar jmNextTaskId
@@ -245,61 +249,63 @@ runJobs mngr@JobManager {..} tout jobs rerun = do
writeTVar jmJobs $ M.insert (jobId job) statusVar managed
return statusVar
- forM_ results $ \( job, tid, outVar ) -> void $ forkIO $ do
+ forM results $ \( taskJob, taskId, taskStatus ) -> do
let handler e = do
status <- if
| Just JobCancelledException <- fromException e -> do
return JobCancelled
| otherwise -> do
JobError <$> outputFootnote tout (T.pack $ displayException e)
- atomically $ writeTVar outVar status
- outputJobFinishedEvent tout job status
- handle handler $ do
- res <- runExceptT $ do
- duplicate <- liftIO $ atomically $ do
- readTVar outVar >>= \case
- JobDuplicate jid _ -> do
- fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs
- _ -> do
- return Nothing
-
- case duplicate of
- Nothing -> do
- let jdir = jmDataDir </> jobStorageSubdir (jobId job)
- readStatusFile tout job jdir >>= \case
- Just status | status /= JobCancelled && not (rerun (jobId job) status) -> do
- let status' = JobPreviousStatus status
- liftIO $ atomically $ writeTVar outVar status'
- return status'
- mbStatus -> do
- when (isJust mbStatus) $ do
- liftIO $ removeDirectoryRecursive jdir
- uses <- waitForUsedArtifacts tout job results outVar
- runManagedJob mngr tid (return JobCancelled) $ do
- liftIO $ atomically $ writeTVar outVar JobRunning
- liftIO $ outputEvent tout $ JobStarted (jobId job)
- prepareJob jmDataDir job $ \checkoutPath -> do
- updateStatusFile mngr jdir outVar
- JobDone <$> runJob job uses checkoutPath jdir
-
- Just ( jid, origVar ) -> do
- let wait = do
- status <- atomically $ do
- status <- readTVar origVar
- out <- readTVar outVar
- if status == out
- then retry
- else do
- writeTVar outVar $ JobDuplicate jid status
- return status
- if jobStatusFinished status
- then return $ JobDuplicate jid status
- else wait
- liftIO wait
-
- atomically $ writeTVar outVar $ either id id res
- outputJobFinishedEvent tout job $ either id id res
- return $ map (\( job, tid, var ) -> Task tid job var ) results
+ atomically $ writeTVar taskStatus status
+ outputJobFinishedEvent tout taskJob status
+
+ taskThread <- forkIO $ do
+ handle handler $ do
+ res <- runExceptT $ do
+ duplicate <- liftIO $ atomically $ do
+ readTVar taskStatus >>= \case
+ JobDuplicate jid _ -> do
+ fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs
+ _ -> do
+ return Nothing
+
+ case duplicate of
+ Nothing -> do
+ let jdir = jmDataDir </> jobStorageSubdir (jobId taskJob)
+ readStatusFile tout taskJob jdir >>= \case
+ Just status | status /= JobCancelled && not (rerun (jobId taskJob) status) -> do
+ let status' = JobPreviousStatus status
+ liftIO $ atomically $ writeTVar taskStatus status'
+ return status'
+ mbStatus -> do
+ when (isJust mbStatus) $ do
+ liftIO $ removeDirectoryRecursive jdir
+ uses <- waitForUsedArtifacts tout taskJob results taskStatus
+ runManagedJob mngr taskId (return JobCancelled) $ do
+ liftIO $ atomically $ writeTVar taskStatus JobRunning
+ liftIO $ outputEvent tout $ JobStarted (jobId taskJob)
+ prepareJob jmDataDir taskJob $ \checkoutPath -> do
+ updateStatusFile mngr jdir taskStatus
+ JobDone <$> runJob taskJob uses checkoutPath jdir
+
+ Just ( jid, origVar ) -> do
+ let wait = do
+ status <- atomically $ do
+ status <- readTVar origVar
+ out <- readTVar taskStatus
+ if status == out
+ then retry
+ else do
+ writeTVar taskStatus $ JobDuplicate jid status
+ return status
+ if jobStatusFinished status
+ then return $ JobDuplicate jid status
+ else wait
+ liftIO wait
+
+ atomically $ writeTVar taskStatus $ either id id res
+ outputJobFinishedEvent tout taskJob $ either id id res
+ return Task {..}
waitForRemainingTasks :: JobManager -> IO ()
waitForRemainingTasks JobManager {..} = do