diff options
Diffstat (limited to 'src/Job.hs')
| -rw-r--r-- | src/Job.hs | 233 |
1 files changed, 184 insertions, 49 deletions
@@ -7,8 +7,16 @@ module Job ( JobStatus(..), jobStatusFinished, jobStatusFailed, JobManager(..), newJobManager, cancelAllJobs, - runJobs, + runJobs, waitForRemainingTasks, + + prepareJob, + getArtifactWorkPath, + copyArtifact, + jobStorageSubdir, + + copyRecursive, + copyRecursiveForce, ) where import Control.Concurrent @@ -30,6 +38,7 @@ import Data.Text qualified as T import Data.Text.IO qualified as T import System.Directory +import System.Environment import System.Exit import System.FilePath import System.FilePath.Glob @@ -38,14 +47,14 @@ import System.IO.Temp import System.Posix.Signals import System.Process +import Destination import Job.Types import Output import Repo data JobOutput = JobOutput - { outName :: JobName - , outArtifacts :: [ArtifactOutput] + { outArtifacts :: [ArtifactOutput] } deriving (Eq) @@ -59,6 +68,7 @@ data ArtifactOutput = ArtifactOutput data JobStatus a = JobQueued | JobDuplicate JobId (JobStatus a) + | JobPreviousStatus (JobStatus a) | JobWaiting [JobName] | JobRunning | JobSkipped @@ -70,31 +80,58 @@ data JobStatus a = JobQueued jobStatusFinished :: JobStatus a -> Bool jobStatusFinished = \case - JobQueued {} -> False - JobDuplicate _ s -> jobStatusFinished s - JobWaiting {} -> False - JobRunning {} -> False - _ -> True + JobQueued {} -> False + JobDuplicate _ s -> jobStatusFinished s + JobPreviousStatus s -> jobStatusFinished s + JobWaiting {} -> False + JobRunning {} -> False + _ -> True jobStatusFailed :: JobStatus a -> Bool jobStatusFailed = \case - JobDuplicate _ s -> jobStatusFailed s - JobError {} -> True - JobFailed {} -> True - _ -> False + JobDuplicate _ s -> jobStatusFailed s + JobPreviousStatus s -> jobStatusFailed s + JobError {} -> True + JobFailed {} -> True + _ -> False + +jobResult :: JobStatus a -> Maybe a +jobResult = \case + JobDone x -> Just x + JobDuplicate _ s -> jobResult s + JobPreviousStatus s -> jobResult s + _ -> Nothing textJobStatus :: JobStatus a -> Text textJobStatus = \case JobQueued -> "queued" JobDuplicate {} -> "duplicate" + JobPreviousStatus s -> textJobStatus s JobWaiting _ -> "waiting" JobRunning -> "running" JobSkipped -> "skipped" - JobError err -> "error\n" <> footnoteText err + JobError _ -> "error" JobFailed -> "failed" JobCancelled -> "cancelled" JobDone _ -> "done" +readJobStatus :: (MonadIO m) => Output -> Text -> m a -> m (Maybe (JobStatus a)) +readJobStatus tout text readResult = case T.lines text of + "queued" : _ -> return (Just JobQueued) + "running" : _ -> return (Just JobRunning) + "skipped" : _ -> return (Just JobSkipped) + "error" : note : _ -> Just . JobError <$> liftIO (outputFootnote tout note) + "failed" : _ -> return (Just JobFailed) + "cancelled" : _ -> return (Just JobCancelled) + "done" : _ -> Just . JobDone <$> readResult + _ -> return Nothing + +textJobStatusDetails :: JobStatus a -> Text +textJobStatusDetails = \case + JobError err -> footnoteText err <> "\n" + JobPreviousStatus s -> textJobStatusDetails s + _ -> "" + data JobManager = JobManager { jmSemaphore :: TVar Int @@ -105,6 +142,7 @@ data JobManager = JobManager , jmReadyTasks :: TVar (Set TaskId) , jmRunningTasks :: TVar (Map TaskId ThreadId) , jmCancelled :: TVar Bool + , jmOpenStatusUpdates :: TVar Int } newtype TaskId = TaskId Int @@ -125,6 +163,7 @@ newJobManager jmDataDir queueLen = do jmReadyTasks <- newTVarIO S.empty jmRunningTasks <- newTVarIO M.empty jmCancelled <- newTVarIO False + jmOpenStatusUpdates <- newTVarIO 0 return JobManager {..} cancelAllJobs :: JobManager -> IO () @@ -182,8 +221,10 @@ runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks -runJobs :: JobManager -> Output -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] -runJobs mngr@JobManager {..} tout jobs = do +runJobs :: JobManager -> Output -> [ Job ] + -> (JobId -> JobStatus JobOutput -> Bool) -- ^ Rerun condition + -> IO [ ( Job, TVar (JobStatus JobOutput) ) ] +runJobs mngr@JobManager {..} tout jobs rerun = do results <- atomically $ do forM jobs $ \job -> do tid <- reserveTaskId mngr @@ -205,7 +246,7 @@ runJobs mngr@JobManager {..} tout jobs = do | otherwise -> do JobError <$> outputFootnote tout (T.pack $ displayException e) atomically $ writeTVar outVar status - outputEvent tout $ JobFinished (jobId job) (textJobStatus status) + outputJobFinishedEvent tout job status handle handler $ do res <- runExceptT $ do duplicate <- liftIO $ atomically $ do @@ -217,13 +258,22 @@ runJobs mngr@JobManager {..} tout jobs = do case duplicate of Nothing -> do - uses <- waitForUsedArtifacts tout job results outVar - runManagedJob mngr tid (return JobCancelled) $ do - liftIO $ atomically $ writeTVar outVar JobRunning - liftIO $ outputEvent tout $ JobStarted (jobId job) - prepareJob jmDataDir job $ \checkoutPath jdir -> do - updateStatusFile (jdir </> "status") outVar - JobDone <$> runJob job uses checkoutPath jdir + let jdir = jmDataDir </> jobStorageSubdir (jobId job) + readStatusFile tout job jdir >>= \case + Just status | status /= JobCancelled && not (rerun (jobId job) status) -> do + let status' = JobPreviousStatus status + liftIO $ atomically $ writeTVar outVar status' + return status' + mbStatus -> do + when (isJust mbStatus) $ do + liftIO $ removeDirectoryRecursive jdir + uses <- waitForUsedArtifacts tout job results outVar + runManagedJob mngr tid (return JobCancelled) $ do + liftIO $ atomically $ writeTVar outVar JobRunning + liftIO $ outputEvent tout $ JobStarted (jobId job) + prepareJob jmDataDir job $ \checkoutPath -> do + updateStatusFile mngr jdir outVar + JobDone <$> runJob job uses checkoutPath jdir Just ( jid, origVar ) -> do let wait = do @@ -241,15 +291,30 @@ runJobs mngr@JobManager {..} tout jobs = do liftIO wait atomically $ writeTVar outVar $ either id id res - outputEvent tout $ JobFinished (jobId job) (textJobStatus $ either id id res) + outputJobFinishedEvent tout job $ either id id res return $ map (\( job, _, var ) -> ( job, var )) results -waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) => - Output -> - Job -> [ ( Job, TaskId, TVar (JobStatus JobOutput) ) ] -> TVar (JobStatus JobOutput) -> m [ ArtifactOutput ] +waitForRemainingTasks :: JobManager -> IO () +waitForRemainingTasks JobManager {..} = do + atomically $ do + remainingStatusUpdates <- readTVar jmOpenStatusUpdates + when (remainingStatusUpdates > 0) retry + +waitForUsedArtifacts + :: (MonadIO m, MonadError (JobStatus JobOutput) m) + => Output -> Job + -> [ ( Job, TaskId, TVar (JobStatus JobOutput) ) ] + -> TVar (JobStatus JobOutput) + -> m [ ( ArtifactSpec, ArtifactOutput ) ] waitForUsedArtifacts tout job results outVar = do origState <- liftIO $ atomically $ readTVar outVar - ujobs <- forM (jobUses job) $ \(ujobName@(JobName tjobName), uartName) -> do + let ( selfSpecs, artSpecs ) = partition ((jobName job ==) . fst) $ jobRequiredArtifacts job + + forM_ selfSpecs $ \( _, artName@(ArtifactName tname) ) -> do + when (not (artName `elem` map fst (jobArtifacts job))) $ do + throwError . JobError =<< liftIO (outputFootnote tout $ "Artifact ‘" <> tname <> "’ not produced by the job") + + ujobs <- forM artSpecs $ \(ujobName@(JobName tjobName), uartName) -> do case find (\( j, _, _ ) -> jobName j == ujobName) results of Just ( _, _, var ) -> return ( var, ( ujobName, uartName )) Nothing -> throwError . JobError =<< liftIO (outputFootnote tout $ "Job '" <> tjobName <> "' not found") @@ -267,28 +332,55 @@ waitForUsedArtifacts tout job results outVar = do else loop $ Just $ map fst ustatuses ustatuses <- liftIO $ loop Nothing - forM ustatuses $ \(ustatus, (JobName tjobName, uartName@(ArtifactName tartName))) -> do - case ustatus of - JobDone out -> case find ((==uartName) . aoutName) $ outArtifacts out of - Just art -> return art + forM ustatuses $ \(ustatus, spec@( JobName tjobName, uartName@(ArtifactName tartName)) ) -> do + case jobResult ustatus of + Just out -> case find ((==uartName) . aoutName) $ outArtifacts out of + Just art -> return ( spec, art ) Nothing -> throwError . JobError =<< liftIO (outputFootnote tout $ "Artifact '" <> tjobName <> "." <> tartName <> "' not found") _ -> throwError JobSkipped -updateStatusFile :: MonadIO m => FilePath -> TVar (JobStatus JobOutput) -> m () -updateStatusFile path outVar = void $ liftIO $ forkIO $ loop Nothing +outputJobFinishedEvent :: Output -> Job -> JobStatus a -> IO () +outputJobFinishedEvent tout job = \case + JobDuplicate _ s -> outputEvent tout $ JobIsDuplicate (jobId job) (textJobStatus s) + JobPreviousStatus s -> outputEvent tout $ JobPreviouslyFinished (jobId job) (textJobStatus s) + JobSkipped -> outputEvent tout $ JobWasSkipped (jobId job) + s -> outputEvent tout $ JobFinished (jobId job) (textJobStatus s) + +readStatusFile :: (MonadIO m, MonadCatch m) => Output -> Job -> FilePath -> m (Maybe (JobStatus JobOutput)) +readStatusFile tout job jdir = do + handleIOError (\_ -> return Nothing) $ do + text <- liftIO $ T.readFile (jdir </> "status") + readJobStatus tout text $ do + artifacts <- forM (jobArtifacts job) $ \( aoutName@(ArtifactName tname), _ ) -> do + let adir = jdir </> "artifacts" </> T.unpack tname + aoutStorePath = adir </> "data" + aoutWorkPath <- fmap T.unpack $ liftIO $ T.readFile (adir </> "path") + return ArtifactOutput {..} + + return JobOutput + { outArtifacts = artifacts + } + +updateStatusFile :: MonadIO m => JobManager -> FilePath -> TVar (JobStatus JobOutput) -> m () +updateStatusFile JobManager {..} jdir outVar = liftIO $ do + atomically $ writeTVar jmOpenStatusUpdates . (+ 1) =<< readTVar jmOpenStatusUpdates + void $ forkIO $ loop Nothing where loop prev = do status <- atomically $ do status <- readTVar outVar when (Just status == prev) retry return status - T.writeFile path $ textJobStatus status <> "\n" - when (not (jobStatusFinished status)) $ loop $ Just status + T.writeFile (jdir </> "status") $ textJobStatus status <> "\n" <> textJobStatusDetails status + if (not (jobStatusFinished status)) + then loop $ Just status + else atomically $ writeTVar jmOpenStatusUpdates . (subtract 1) =<< readTVar jmOpenStatusUpdates jobStorageSubdir :: JobId -> FilePath jobStorageSubdir (JobId jidParts) = "jobs" </> joinPath (map (T.unpack . textJobIdPart) (jidParts)) -prepareJob :: (MonadIO m, MonadMask m, MonadFail m) => FilePath -> Job -> (FilePath -> FilePath -> m a) -> m a + +prepareJob :: (MonadIO m, MonadMask m, MonadFail m) => FilePath -> Job -> (FilePath -> m a) -> m a prepareJob dir job inner = do withSystemTempDirectory "minici" $ \checkoutPath -> do forM_ (jobCheckout job) $ \(JobCheckout tree mbsub dest) -> do @@ -297,32 +389,65 @@ prepareJob dir job inner = do let jdir = dir </> jobStorageSubdir (jobId job) liftIO $ createDirectoryIfMissing True jdir - inner checkoutPath jdir + inner checkoutPath + +getArtifactStoredPath :: (MonadIO m, MonadError Text m) => FilePath -> JobId -> ArtifactName -> m FilePath +getArtifactStoredPath storageDir jid@(JobId ids) (ArtifactName aname) = do + let jdir = joinPath $ (storageDir :) $ ("jobs" :) $ map (T.unpack . textJobIdPart) ids + adir = jdir </> "artifacts" </> T.unpack aname + + liftIO (doesDirectoryExist jdir) >>= \case + True -> return () + False -> throwError $ "job ‘" <> textJobId jid <> "’ not yet executed" + + liftIO (doesDirectoryExist adir) >>= \case + True -> return () + False -> throwError $ "artifact ‘" <> aname <> "’ of job ‘" <> textJobId jid <> "’ not found" + + return adir -runJob :: Job -> [ArtifactOutput] -> FilePath -> FilePath -> ExceptT (JobStatus JobOutput) IO JobOutput +getArtifactWorkPath :: (MonadIO m, MonadError Text m) => FilePath -> JobId -> ArtifactName -> m FilePath +getArtifactWorkPath storageDir jid aname = do + adir <- getArtifactStoredPath storageDir jid aname + liftIO $ readFile (adir </> "path") + +copyArtifact :: (MonadIO m, MonadError Text m) => FilePath -> JobId -> ArtifactName -> FilePath -> m () +copyArtifact storageDir jid aname tpath = do + adir <- getArtifactStoredPath storageDir jid aname + liftIO $ copyRecursive (adir </> "data") tpath + + +runJob :: Job -> [ ( ArtifactSpec, ArtifactOutput) ] -> FilePath -> FilePath -> ExceptT (JobStatus JobOutput) IO JobOutput runJob job uses checkoutPath jdir = do - liftIO $ forM_ uses $ \aout -> do + liftIO $ forM_ (filter ((`elem` jobUses job) . fst) uses) $ \( _, aout ) -> do let target = checkoutPath </> aoutWorkPath aout createDirectoryIfMissing True $ takeDirectory target - copyFile (aoutStorePath aout) target + copyRecursive (aoutStorePath aout) target bracket (liftIO $ openFile (jdir </> "log") WriteMode) (liftIO . hClose) $ \logs -> do - forM_ (jobRecipe job) $ \p -> do + forM_ (fromMaybe [] $ jobRecipe job) $ \ep -> do + ( p, input ) <- case ep of + Left p -> return ( p, "" ) + Right script -> do + sh <- fromMaybe "/bin/sh" <$> liftIO (lookupEnv "SHELL") + return ( proc sh [], script ) (Just hin, _, _, hp) <- liftIO $ createProcess_ "" p { cwd = Just checkoutPath , std_in = CreatePipe , std_out = UseHandle logs , std_err = UseHandle logs } - liftIO $ hClose hin + liftIO $ void $ forkIO $ do + T.hPutStr hin input + hClose hin liftIO (waitForProcess hp) >>= \case ExitSuccess -> return () ExitFailure n | fromIntegral n == -sigINT -> throwError JobCancelled | otherwise -> throwError JobFailed - let adir = jdir </> "artifacts" artifacts <- forM (jobArtifacts job) $ \( name@(ArtifactName tname), pathPattern ) -> do + let adir = jdir </> "artifacts" </> T.unpack tname path <- liftIO (globDir1 pathPattern checkoutPath) >>= \case [ path ] -> return path found -> do @@ -330,17 +455,27 @@ runJob job uses checkoutPath jdir = do (if null found then "no file" else "multiple files") <> " found matching pattern ‘" <> decompile pathPattern <> "’ for artifact ‘" <> T.unpack tname <> "’" throwError JobFailed - let target = adir </> T.unpack tname </> takeFileName path + let target = adir </> "data" + workPath = makeRelative checkoutPath path liftIO $ do createDirectoryIfMissing True $ takeDirectory target - copyFile path target + copyRecursiveForce path target + T.writeFile (adir </> "path") $ T.pack workPath return $ ArtifactOutput { aoutName = name - , aoutWorkPath = makeRelative checkoutPath path + , aoutWorkPath = workPath , aoutStorePath = target } + forM_ (jobPublish job) $ \pub -> do + Just aout <- return $ lookup (jpArtifact pub) $ map (\aout -> ( ( jobName job, aoutName aout ), aout )) artifacts ++ uses + let ppath = case jpPath pub of + Just path + | hasTrailingPathSeparator path -> path </> takeFileName (aoutWorkPath aout) + | otherwise -> path + Nothing -> aoutWorkPath aout + copyToDestination (aoutStorePath aout) (jpDestination pub) ppath + return JobOutput - { outName = jobName job - , outArtifacts = artifacts + { outArtifacts = artifacts } |