summaryrefslogtreecommitdiff
path: root/src/Job.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Job.hs')
-rw-r--r--src/Job.hs233
1 files changed, 184 insertions, 49 deletions
diff --git a/src/Job.hs b/src/Job.hs
index 5435cbd..3fe75e6 100644
--- a/src/Job.hs
+++ b/src/Job.hs
@@ -7,8 +7,16 @@ module Job (
JobStatus(..),
jobStatusFinished, jobStatusFailed,
JobManager(..), newJobManager, cancelAllJobs,
- runJobs,
+ runJobs, waitForRemainingTasks,
+
+ prepareJob,
+ getArtifactWorkPath,
+ copyArtifact,
+
jobStorageSubdir,
+
+ copyRecursive,
+ copyRecursiveForce,
) where
import Control.Concurrent
@@ -30,6 +38,7 @@ import Data.Text qualified as T
import Data.Text.IO qualified as T
import System.Directory
+import System.Environment
import System.Exit
import System.FilePath
import System.FilePath.Glob
@@ -38,14 +47,14 @@ import System.IO.Temp
import System.Posix.Signals
import System.Process
+import Destination
import Job.Types
import Output
import Repo
data JobOutput = JobOutput
- { outName :: JobName
- , outArtifacts :: [ArtifactOutput]
+ { outArtifacts :: [ArtifactOutput]
}
deriving (Eq)
@@ -59,6 +68,7 @@ data ArtifactOutput = ArtifactOutput
data JobStatus a = JobQueued
| JobDuplicate JobId (JobStatus a)
+ | JobPreviousStatus (JobStatus a)
| JobWaiting [JobName]
| JobRunning
| JobSkipped
@@ -70,31 +80,58 @@ data JobStatus a = JobQueued
jobStatusFinished :: JobStatus a -> Bool
jobStatusFinished = \case
- JobQueued {} -> False
- JobDuplicate _ s -> jobStatusFinished s
- JobWaiting {} -> False
- JobRunning {} -> False
- _ -> True
+ JobQueued {} -> False
+ JobDuplicate _ s -> jobStatusFinished s
+ JobPreviousStatus s -> jobStatusFinished s
+ JobWaiting {} -> False
+ JobRunning {} -> False
+ _ -> True
jobStatusFailed :: JobStatus a -> Bool
jobStatusFailed = \case
- JobDuplicate _ s -> jobStatusFailed s
- JobError {} -> True
- JobFailed {} -> True
- _ -> False
+ JobDuplicate _ s -> jobStatusFailed s
+ JobPreviousStatus s -> jobStatusFailed s
+ JobError {} -> True
+ JobFailed {} -> True
+ _ -> False
+
+jobResult :: JobStatus a -> Maybe a
+jobResult = \case
+ JobDone x -> Just x
+ JobDuplicate _ s -> jobResult s
+ JobPreviousStatus s -> jobResult s
+ _ -> Nothing
textJobStatus :: JobStatus a -> Text
textJobStatus = \case
JobQueued -> "queued"
JobDuplicate {} -> "duplicate"
+ JobPreviousStatus s -> textJobStatus s
JobWaiting _ -> "waiting"
JobRunning -> "running"
JobSkipped -> "skipped"
- JobError err -> "error\n" <> footnoteText err
+ JobError _ -> "error"
JobFailed -> "failed"
JobCancelled -> "cancelled"
JobDone _ -> "done"
+readJobStatus :: (MonadIO m) => Output -> Text -> m a -> m (Maybe (JobStatus a))
+readJobStatus tout text readResult = case T.lines text of
+ "queued" : _ -> return (Just JobQueued)
+ "running" : _ -> return (Just JobRunning)
+ "skipped" : _ -> return (Just JobSkipped)
+ "error" : note : _ -> Just . JobError <$> liftIO (outputFootnote tout note)
+ "failed" : _ -> return (Just JobFailed)
+ "cancelled" : _ -> return (Just JobCancelled)
+ "done" : _ -> Just . JobDone <$> readResult
+ _ -> return Nothing
+
+textJobStatusDetails :: JobStatus a -> Text
+textJobStatusDetails = \case
+ JobError err -> footnoteText err <> "\n"
+ JobPreviousStatus s -> textJobStatusDetails s
+ _ -> ""
+
data JobManager = JobManager
{ jmSemaphore :: TVar Int
@@ -105,6 +142,7 @@ data JobManager = JobManager
, jmReadyTasks :: TVar (Set TaskId)
, jmRunningTasks :: TVar (Map TaskId ThreadId)
, jmCancelled :: TVar Bool
+ , jmOpenStatusUpdates :: TVar Int
}
newtype TaskId = TaskId Int
@@ -125,6 +163,7 @@ newJobManager jmDataDir queueLen = do
jmReadyTasks <- newTVarIO S.empty
jmRunningTasks <- newTVarIO M.empty
jmCancelled <- newTVarIO False
+ jmOpenStatusUpdates <- newTVarIO 0
return JobManager {..}
cancelAllJobs :: JobManager -> IO ()
@@ -182,8 +221,10 @@ runManagedJob JobManager {..} tid cancel job = bracket acquire release $ \case
writeTVar jmRunningTasks . M.delete tid =<< readTVar jmRunningTasks
-runJobs :: JobManager -> Output -> [ Job ] -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
-runJobs mngr@JobManager {..} tout jobs = do
+runJobs :: JobManager -> Output -> [ Job ]
+ -> (JobId -> JobStatus JobOutput -> Bool) -- ^ Rerun condition
+ -> IO [ ( Job, TVar (JobStatus JobOutput) ) ]
+runJobs mngr@JobManager {..} tout jobs rerun = do
results <- atomically $ do
forM jobs $ \job -> do
tid <- reserveTaskId mngr
@@ -205,7 +246,7 @@ runJobs mngr@JobManager {..} tout jobs = do
| otherwise -> do
JobError <$> outputFootnote tout (T.pack $ displayException e)
atomically $ writeTVar outVar status
- outputEvent tout $ JobFinished (jobId job) (textJobStatus status)
+ outputJobFinishedEvent tout job status
handle handler $ do
res <- runExceptT $ do
duplicate <- liftIO $ atomically $ do
@@ -217,13 +258,22 @@ runJobs mngr@JobManager {..} tout jobs = do
case duplicate of
Nothing -> do
- uses <- waitForUsedArtifacts tout job results outVar
- runManagedJob mngr tid (return JobCancelled) $ do
- liftIO $ atomically $ writeTVar outVar JobRunning
- liftIO $ outputEvent tout $ JobStarted (jobId job)
- prepareJob jmDataDir job $ \checkoutPath jdir -> do
- updateStatusFile (jdir </> "status") outVar
- JobDone <$> runJob job uses checkoutPath jdir
+ let jdir = jmDataDir </> jobStorageSubdir (jobId job)
+ readStatusFile tout job jdir >>= \case
+ Just status | status /= JobCancelled && not (rerun (jobId job) status) -> do
+ let status' = JobPreviousStatus status
+ liftIO $ atomically $ writeTVar outVar status'
+ return status'
+ mbStatus -> do
+ when (isJust mbStatus) $ do
+ liftIO $ removeDirectoryRecursive jdir
+ uses <- waitForUsedArtifacts tout job results outVar
+ runManagedJob mngr tid (return JobCancelled) $ do
+ liftIO $ atomically $ writeTVar outVar JobRunning
+ liftIO $ outputEvent tout $ JobStarted (jobId job)
+ prepareJob jmDataDir job $ \checkoutPath -> do
+ updateStatusFile mngr jdir outVar
+ JobDone <$> runJob job uses checkoutPath jdir
Just ( jid, origVar ) -> do
let wait = do
@@ -241,15 +291,30 @@ runJobs mngr@JobManager {..} tout jobs = do
liftIO wait
atomically $ writeTVar outVar $ either id id res
- outputEvent tout $ JobFinished (jobId job) (textJobStatus $ either id id res)
+ outputJobFinishedEvent tout job $ either id id res
return $ map (\( job, _, var ) -> ( job, var )) results
-waitForUsedArtifacts :: (MonadIO m, MonadError (JobStatus JobOutput) m) =>
- Output ->
- Job -> [ ( Job, TaskId, TVar (JobStatus JobOutput) ) ] -> TVar (JobStatus JobOutput) -> m [ ArtifactOutput ]
+waitForRemainingTasks :: JobManager -> IO ()
+waitForRemainingTasks JobManager {..} = do
+ atomically $ do
+ remainingStatusUpdates <- readTVar jmOpenStatusUpdates
+ when (remainingStatusUpdates > 0) retry
+
+waitForUsedArtifacts
+ :: (MonadIO m, MonadError (JobStatus JobOutput) m)
+ => Output -> Job
+ -> [ ( Job, TaskId, TVar (JobStatus JobOutput) ) ]
+ -> TVar (JobStatus JobOutput)
+ -> m [ ( ArtifactSpec, ArtifactOutput ) ]
waitForUsedArtifacts tout job results outVar = do
origState <- liftIO $ atomically $ readTVar outVar
- ujobs <- forM (jobUses job) $ \(ujobName@(JobName tjobName), uartName) -> do
+ let ( selfSpecs, artSpecs ) = partition ((jobName job ==) . fst) $ jobRequiredArtifacts job
+
+ forM_ selfSpecs $ \( _, artName@(ArtifactName tname) ) -> do
+ when (not (artName `elem` map fst (jobArtifacts job))) $ do
+ throwError . JobError =<< liftIO (outputFootnote tout $ "Artifact ‘" <> tname <> "’ not produced by the job")
+
+ ujobs <- forM artSpecs $ \(ujobName@(JobName tjobName), uartName) -> do
case find (\( j, _, _ ) -> jobName j == ujobName) results of
Just ( _, _, var ) -> return ( var, ( ujobName, uartName ))
Nothing -> throwError . JobError =<< liftIO (outputFootnote tout $ "Job '" <> tjobName <> "' not found")
@@ -267,28 +332,55 @@ waitForUsedArtifacts tout job results outVar = do
else loop $ Just $ map fst ustatuses
ustatuses <- liftIO $ loop Nothing
- forM ustatuses $ \(ustatus, (JobName tjobName, uartName@(ArtifactName tartName))) -> do
- case ustatus of
- JobDone out -> case find ((==uartName) . aoutName) $ outArtifacts out of
- Just art -> return art
+ forM ustatuses $ \(ustatus, spec@( JobName tjobName, uartName@(ArtifactName tartName)) ) -> do
+ case jobResult ustatus of
+ Just out -> case find ((==uartName) . aoutName) $ outArtifacts out of
+ Just art -> return ( spec, art )
Nothing -> throwError . JobError =<< liftIO (outputFootnote tout $ "Artifact '" <> tjobName <> "." <> tartName <> "' not found")
_ -> throwError JobSkipped
-updateStatusFile :: MonadIO m => FilePath -> TVar (JobStatus JobOutput) -> m ()
-updateStatusFile path outVar = void $ liftIO $ forkIO $ loop Nothing
+outputJobFinishedEvent :: Output -> Job -> JobStatus a -> IO ()
+outputJobFinishedEvent tout job = \case
+ JobDuplicate _ s -> outputEvent tout $ JobIsDuplicate (jobId job) (textJobStatus s)
+ JobPreviousStatus s -> outputEvent tout $ JobPreviouslyFinished (jobId job) (textJobStatus s)
+ JobSkipped -> outputEvent tout $ JobWasSkipped (jobId job)
+ s -> outputEvent tout $ JobFinished (jobId job) (textJobStatus s)
+
+readStatusFile :: (MonadIO m, MonadCatch m) => Output -> Job -> FilePath -> m (Maybe (JobStatus JobOutput))
+readStatusFile tout job jdir = do
+ handleIOError (\_ -> return Nothing) $ do
+ text <- liftIO $ T.readFile (jdir </> "status")
+ readJobStatus tout text $ do
+ artifacts <- forM (jobArtifacts job) $ \( aoutName@(ArtifactName tname), _ ) -> do
+ let adir = jdir </> "artifacts" </> T.unpack tname
+ aoutStorePath = adir </> "data"
+ aoutWorkPath <- fmap T.unpack $ liftIO $ T.readFile (adir </> "path")
+ return ArtifactOutput {..}
+
+ return JobOutput
+ { outArtifacts = artifacts
+ }
+
+updateStatusFile :: MonadIO m => JobManager -> FilePath -> TVar (JobStatus JobOutput) -> m ()
+updateStatusFile JobManager {..} jdir outVar = liftIO $ do
+ atomically $ writeTVar jmOpenStatusUpdates . (+ 1) =<< readTVar jmOpenStatusUpdates
+ void $ forkIO $ loop Nothing
where
loop prev = do
status <- atomically $ do
status <- readTVar outVar
when (Just status == prev) retry
return status
- T.writeFile path $ textJobStatus status <> "\n"
- when (not (jobStatusFinished status)) $ loop $ Just status
+ T.writeFile (jdir </> "status") $ textJobStatus status <> "\n" <> textJobStatusDetails status
+ if (not (jobStatusFinished status))
+ then loop $ Just status
+ else atomically $ writeTVar jmOpenStatusUpdates . (subtract 1) =<< readTVar jmOpenStatusUpdates
jobStorageSubdir :: JobId -> FilePath
jobStorageSubdir (JobId jidParts) = "jobs" </> joinPath (map (T.unpack . textJobIdPart) (jidParts))
-prepareJob :: (MonadIO m, MonadMask m, MonadFail m) => FilePath -> Job -> (FilePath -> FilePath -> m a) -> m a
+
+prepareJob :: (MonadIO m, MonadMask m, MonadFail m) => FilePath -> Job -> (FilePath -> m a) -> m a
prepareJob dir job inner = do
withSystemTempDirectory "minici" $ \checkoutPath -> do
forM_ (jobCheckout job) $ \(JobCheckout tree mbsub dest) -> do
@@ -297,32 +389,65 @@ prepareJob dir job inner = do
let jdir = dir </> jobStorageSubdir (jobId job)
liftIO $ createDirectoryIfMissing True jdir
- inner checkoutPath jdir
+ inner checkoutPath
+
+getArtifactStoredPath :: (MonadIO m, MonadError Text m) => FilePath -> JobId -> ArtifactName -> m FilePath
+getArtifactStoredPath storageDir jid@(JobId ids) (ArtifactName aname) = do
+ let jdir = joinPath $ (storageDir :) $ ("jobs" :) $ map (T.unpack . textJobIdPart) ids
+ adir = jdir </> "artifacts" </> T.unpack aname
+
+ liftIO (doesDirectoryExist jdir) >>= \case
+ True -> return ()
+ False -> throwError $ "job ‘" <> textJobId jid <> "’ not yet executed"
+
+ liftIO (doesDirectoryExist adir) >>= \case
+ True -> return ()
+ False -> throwError $ "artifact ‘" <> aname <> "’ of job ‘" <> textJobId jid <> "’ not found"
+
+ return adir
-runJob :: Job -> [ArtifactOutput] -> FilePath -> FilePath -> ExceptT (JobStatus JobOutput) IO JobOutput
+getArtifactWorkPath :: (MonadIO m, MonadError Text m) => FilePath -> JobId -> ArtifactName -> m FilePath
+getArtifactWorkPath storageDir jid aname = do
+ adir <- getArtifactStoredPath storageDir jid aname
+ liftIO $ readFile (adir </> "path")
+
+copyArtifact :: (MonadIO m, MonadError Text m) => FilePath -> JobId -> ArtifactName -> FilePath -> m ()
+copyArtifact storageDir jid aname tpath = do
+ adir <- getArtifactStoredPath storageDir jid aname
+ liftIO $ copyRecursive (adir </> "data") tpath
+
+
+runJob :: Job -> [ ( ArtifactSpec, ArtifactOutput) ] -> FilePath -> FilePath -> ExceptT (JobStatus JobOutput) IO JobOutput
runJob job uses checkoutPath jdir = do
- liftIO $ forM_ uses $ \aout -> do
+ liftIO $ forM_ (filter ((`elem` jobUses job) . fst) uses) $ \( _, aout ) -> do
let target = checkoutPath </> aoutWorkPath aout
createDirectoryIfMissing True $ takeDirectory target
- copyFile (aoutStorePath aout) target
+ copyRecursive (aoutStorePath aout) target
bracket (liftIO $ openFile (jdir </> "log") WriteMode) (liftIO . hClose) $ \logs -> do
- forM_ (jobRecipe job) $ \p -> do
+ forM_ (fromMaybe [] $ jobRecipe job) $ \ep -> do
+ ( p, input ) <- case ep of
+ Left p -> return ( p, "" )
+ Right script -> do
+ sh <- fromMaybe "/bin/sh" <$> liftIO (lookupEnv "SHELL")
+ return ( proc sh [], script )
(Just hin, _, _, hp) <- liftIO $ createProcess_ "" p
{ cwd = Just checkoutPath
, std_in = CreatePipe
, std_out = UseHandle logs
, std_err = UseHandle logs
}
- liftIO $ hClose hin
+ liftIO $ void $ forkIO $ do
+ T.hPutStr hin input
+ hClose hin
liftIO (waitForProcess hp) >>= \case
ExitSuccess -> return ()
ExitFailure n
| fromIntegral n == -sigINT -> throwError JobCancelled
| otherwise -> throwError JobFailed
- let adir = jdir </> "artifacts"
artifacts <- forM (jobArtifacts job) $ \( name@(ArtifactName tname), pathPattern ) -> do
+ let adir = jdir </> "artifacts" </> T.unpack tname
path <- liftIO (globDir1 pathPattern checkoutPath) >>= \case
[ path ] -> return path
found -> do
@@ -330,17 +455,27 @@ runJob job uses checkoutPath jdir = do
(if null found then "no file" else "multiple files") <> " found matching pattern ‘" <>
decompile pathPattern <> "’ for artifact ‘" <> T.unpack tname <> "’"
throwError JobFailed
- let target = adir </> T.unpack tname </> takeFileName path
+ let target = adir </> "data"
+ workPath = makeRelative checkoutPath path
liftIO $ do
createDirectoryIfMissing True $ takeDirectory target
- copyFile path target
+ copyRecursiveForce path target
+ T.writeFile (adir </> "path") $ T.pack workPath
return $ ArtifactOutput
{ aoutName = name
- , aoutWorkPath = makeRelative checkoutPath path
+ , aoutWorkPath = workPath
, aoutStorePath = target
}
+ forM_ (jobPublish job) $ \pub -> do
+ Just aout <- return $ lookup (jpArtifact pub) $ map (\aout -> ( ( jobName job, aoutName aout ), aout )) artifacts ++ uses
+ let ppath = case jpPath pub of
+ Just path
+ | hasTrailingPathSeparator path -> path </> takeFileName (aoutWorkPath aout)
+ | otherwise -> path
+ Nothing -> aoutWorkPath aout
+ copyToDestination (aoutStorePath aout) (jpDestination pub) ppath
+
return JobOutput
- { outName = jobName job
- , outArtifacts = artifacts
+ { outArtifacts = artifacts
}