diff options
Diffstat (limited to 'src/Job.hs')
-rw-r--r-- | src/Job.hs | 135 |
1 files changed, 85 insertions, 50 deletions
@@ -6,7 +6,7 @@ module Job ( ArtifactName(..), JobStatus(..), jobStatusFinished, jobStatusFailed, - JobManager(..), newJobManager, + JobManager(..), newJobManager, cancelAllJobs, runJobs, ) where @@ -31,6 +31,7 @@ import System.Directory import System.Exit import System.FilePath import System.IO +import System.Posix.Signals import System.Process import Job.Types @@ -58,6 +59,7 @@ data JobStatus a = JobQueued | JobSkipped | JobError Text | JobFailed + | JobCancelled | JobDone a deriving (Eq) @@ -85,6 +87,7 @@ textJobStatus = \case JobSkipped -> "skipped" JobError err -> "error\n" <> err JobFailed -> "failed" + JobCancelled -> "cancelled" JobDone _ -> "done" @@ -95,11 +98,19 @@ data JobManager = JobManager , jmNextTaskId :: TVar TaskId , jmNextTask :: TVar (Maybe TaskId) , jmReadyTasks :: TVar (Set TaskId) + , jmRunningTasks :: TVar (Map TaskId ThreadId) + , jmCancelled :: TVar Bool } newtype TaskId = TaskId Int deriving (Eq, Ord) +data JobCancelledException = JobCancelledException + deriving (Show) + +instance Exception JobCancelledException + + newJobManager :: FilePath -> Int -> IO JobManager newJobManager jmDataDir queueLen = do jmSemaphore <- newTVarIO queueLen @@ -107,31 +118,49 @@ newJobManager jmDataDir queueLen = do jmNextTaskId <- newTVarIO (TaskId 0) jmNextTask <- newTVarIO Nothing jmReadyTasks <- newTVarIO S.empty + jmRunningTasks <- newTVarIO M.empty + jmCancelled <- newTVarIO False return JobManager {..} +cancelAllJobs :: JobManager -> IO () +cancelAllJobs JobManager {..} = do + threads <- atomically $ do + writeTVar jmCancelled True + M.elems <$> readTVar jmRunningTasks + + mapM_ (`throwTo` JobCancelledException) threads + reserveTaskId :: JobManager -> STM TaskId reserveTaskId JobManager {..} = do tid@(TaskId n) <- readTVar jmNextTaskId writeTVar jmNextTaskId (TaskId (n + 1)) return tid -runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a -runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job) +runManagedJob :: (MonadIO m, MonadMask m) => JobManager -> TaskId -> m a -> m a -> m a +runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case + True -> cancel + False -> job where acquire = liftIO $ do atomically $ do writeTVar jmReadyTasks . S.insert tid =<< readTVar jmReadyTasks trySelectNext + threadId <- myThreadId atomically $ do - readTVar jmNextTask >>= \case - Just tid' | tid' == tid -> do - writeTVar jmNextTask Nothing - _ -> retry - - release _ = liftIO $ atomically $ do + readTVar jmCancelled >>= \case + True -> return True + False -> readTVar jmNextTask >>= \case + Just tid' | tid' == tid -> do + writeTVar jmNextTask Nothing + writeTVar jmRunningTasks . M.insert tid threadId =<< readTVar jmRunningTasks + return False + _ -> retry + + release False = liftIO $ atomically $ do free <- readTVar jmSemaphore writeTVar jmSemaphore $ free + 1 trySelectNext + release True = return () trySelectNext = do readTVar jmNextTask >>= \case @@ -145,6 +174,7 @@ runManagedJob JobManager {..} tid job = bracket acquire release (\_ -> job) writeTVar jmReadyTasks ready writeTVar jmSemaphore (sem - 1) writeTVar jmNextTask (Just tid') + writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks runJobs :: JobManager -> Commit -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] @@ -165,43 +195,47 @@ runJobs mngr@JobManager {..} commit jobs = do return statusVar forM_ results $ \( job, tid, outVar ) -> void $ forkIO $ do - res <- runExceptT $ do - duplicate <- liftIO $ atomically $ do - readTVar outVar >>= \case - JobDuplicate jid _ -> do - fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs - _ -> do - return Nothing - - case duplicate of - Nothing -> do - uses <- waitForUsedArtifacts job results outVar - runManagedJob mngr tid $ do - liftIO $ atomically $ writeTVar outVar JobRunning - prepareJob jmDataDir commit job $ \checkoutPath jdir -> do - updateStatusFile (jdir </> "status") outVar - JobDone <$> runJob job uses checkoutPath jdir - - Just ( jid, origVar ) -> do - let wait = do - status <- atomically $ do - status <- readTVar origVar - out <- readTVar outVar - if status == out - then retry - else do - writeTVar outVar $ JobDuplicate jid status - return status - if jobStatusFinished status - then return $ JobDuplicate jid status - else wait - liftIO wait - - case res of - Left (JobError err) -> T.putStrLn err - _ -> return () - - atomically $ writeTVar outVar $ either id id res + let handler e = atomically $ writeTVar outVar $ if + | Just JobCancelledException <- fromException e -> JobCancelled + | otherwise -> JobError (T.pack $ displayException e) + handle handler $ do + res <- runExceptT $ do + duplicate <- liftIO $ atomically $ do + readTVar outVar >>= \case + JobDuplicate jid _ -> do + fmap ( jid, ) . M.lookup jid <$> readTVar jmJobs + _ -> do + return Nothing + + case duplicate of + Nothing -> do + uses <- waitForUsedArtifacts job results outVar + runManagedJob mngr tid (return JobCancelled) $ do + liftIO $ atomically $ writeTVar outVar JobRunning + prepareJob jmDataDir commit job $ \checkoutPath jdir -> do + updateStatusFile (jdir </> "status") outVar + JobDone <$> runJob job uses checkoutPath jdir + + Just ( jid, origVar ) -> do + let wait = do + status <- atomically $ do + status <- readTVar origVar + out <- readTVar outVar + if status == out + then retry + else do + writeTVar outVar $ JobDuplicate jid status + return status + if jobStatusFinished status + then return $ JobDuplicate jid status + else wait + liftIO wait + + case res of + Left (JobError err) -> T.putStrLn err + _ -> return () + + atomically $ writeTVar outVar $ either id id res return $ map (\( job, _, var ) -> ( job, var )) results waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => @@ -274,10 +308,11 @@ runJob job uses checkoutPath jdir = do , std_err = UseHandle logs } liftIO $ hClose hin - exit <- liftIO $ waitForProcess hp - - when (exit /= ExitSuccess) $ - throwError JobFailed + liftIO (waitForProcess hp) >>= \case + ExitSuccess -> return () + ExitFailure n + | fromIntegral n == -sigINT -> throwError JobCancelled + | otherwise -> throwError JobFailed let adir = jdir </> "artifacts" artifacts <- forM (jobArtifacts job) $ \(name@(ArtifactName tname), pathCmd) -> liftIO $ do |