diff options
| -rw-r--r-- | src/Job.hs | 106 |
1 files changed, 56 insertions, 50 deletions
@@ -7,7 +7,7 @@ module Job ( JobStatus(..), jobStatusFinished, jobStatusFailed, JobManager(..), newJobManager, cancelAllJobs, - Task(..), TaskId, + Task(..), TaskId, cancelTask, runJobs, waitForRemainingTasks, prepareJob, @@ -149,6 +149,7 @@ data JobManager = JobManager data Task = Task { taskId :: TaskId , taskJob :: Job + , taskThread :: ThreadId , taskStatus :: TVar (JobStatus JobOutput) } @@ -181,6 +182,9 @@ cancelAllJobs JobManager {..} = do mapM_ (`throwTo` JobCancelledException) threads +cancelTask :: Task -> IO () +cancelTask Task {..} = throwTo taskThread JobCancelledException + reserveTaskId :: JobManager -> STM TaskId reserveTaskId JobManager {..} = do tid@(TaskId n) <- readTVar jmNextTaskId @@ -245,61 +249,63 @@ runJobs mngr@JobManager {..} tout jobs rerun = do writeTVar jmJobs $ M.insert (jobId job) statusVar managed return statusVar - forM_ results $ \( job, tid, outVar ) -> void $ forkIO $ do + forM results $ \( taskJob, taskId, taskStatus ) -> do let handler e = do status <- if | Just JobCancelledException <- fromException e -> do return JobCancelled | otherwise -> do JobError <$> outputFootnote tout (T.pack $ displayException e) - atomically $ writeTVar outVar status - outputJobFinishedEvent tout job status - handle handler $ do - res <- runExceptT $ do - duplicate <- liftIO $ atomically $ do - readTVar outVar >>= \case - JobDuplicate jid _ -> do - fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs - _ -> do - return Nothing - - case duplicate of - Nothing -> do - let jdir = jmDataDir </> jobStorageSubdir (jobId job) - readStatusFile tout job jdir >>= \case - Just status | status /= JobCancelled && not (rerun (jobId job) status) -> do - let status' = JobPreviousStatus status - liftIO $ atomically $ writeTVar outVar status' - return status' - mbStatus -> do - when (isJust mbStatus) $ do - liftIO $ removeDirectoryRecursive jdir - uses <- waitForUsedArtifacts tout job results outVar - runManagedJob mngr tid (return JobCancelled) $ do - liftIO $ atomically $ writeTVar outVar JobRunning - liftIO $ outputEvent tout $ JobStarted (jobId job) - prepareJob jmDataDir job $ \checkoutPath -> do - updateStatusFile mngr jdir outVar - JobDone <$> runJob job uses checkoutPath jdir - - Just ( jid, origVar ) -> do - let wait = do - status <- atomically $ do - status <- readTVar origVar - out <- readTVar outVar - if status == out - then retry - else do - writeTVar outVar $ JobDuplicate jid status - return status - if jobStatusFinished status - then return $ JobDuplicate jid status - else wait - liftIO wait - - atomically $ writeTVar outVar $ either id id res - outputJobFinishedEvent tout job $ either id id res - return $ map (\( job, tid, var ) -> Task tid job var ) results + atomically $ writeTVar taskStatus status + outputJobFinishedEvent tout taskJob status + + taskThread <- forkIO $ do + handle handler $ do + res <- runExceptT $ do + duplicate <- liftIO $ atomically $ do + readTVar taskStatus >>= \case + JobDuplicate jid _ -> do + fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs + _ -> do + return Nothing + + case duplicate of + Nothing -> do + let jdir = jmDataDir </> jobStorageSubdir (jobId taskJob) + readStatusFile tout taskJob jdir >>= \case + Just status | status /= JobCancelled && not (rerun (jobId taskJob) status) -> do + let status' = JobPreviousStatus status + liftIO $ atomically $ writeTVar taskStatus status' + return status' + mbStatus -> do + when (isJust mbStatus) $ do + liftIO $ removeDirectoryRecursive jdir + uses <- waitForUsedArtifacts tout taskJob results taskStatus + runManagedJob mngr taskId (return JobCancelled) $ do + liftIO $ atomically $ writeTVar taskStatus JobRunning + liftIO $ outputEvent tout $ JobStarted (jobId taskJob) + prepareJob jmDataDir taskJob $ \checkoutPath -> do + updateStatusFile mngr jdir taskStatus + JobDone <$> runJob taskJob uses checkoutPath jdir + + Just ( jid, origVar ) -> do + let wait = do + status <- atomically $ do + status <- readTVar origVar + out <- readTVar taskStatus + if status == out + then retry + else do + writeTVar taskStatus $ JobDuplicate jid status + return status + if jobStatusFinished status + then return $ JobDuplicate jid status + else wait + liftIO wait + + atomically $ writeTVar taskStatus $ either id id res + outputJobFinishedEvent tout taskJob $ either id id res + return Task {..} waitForRemainingTasks :: JobManager -> IO () waitForRemainingTasks JobManager {..} = do |