From 7823daf2f81d822a70c057e9fdeb2b99c75c6094 Mon Sep 17 00:00:00 2001 From: Nathan Clevenger Date: Mon, 19 May 2025 06:14:11 -0500 Subject: [PATCH] feat: return optional job data from runJobs --- packages/payload/src/queues/localAPI.ts | 67 +++++++++++++++---- .../src/queues/operations/runJobs/index.ts | 32 ++++++++- 2 files changed, 85 insertions(+), 14 deletions(-) diff --git a/packages/payload/src/queues/localAPI.ts b/packages/payload/src/queues/localAPI.ts index b23e68936ce..a5940230f3f 100644 --- a/packages/payload/src/queues/localAPI.ts +++ b/packages/payload/src/queues/localAPI.ts @@ -13,8 +13,8 @@ import { jobAfterRead, jobsCollectionSlug } from './config/index.js' import { runJobs } from './operations/runJobs/index.js' import { updateJob, updateJobs } from './utilities/updateJob.js' -export const getJobsLocalAPI = (payload: Payload) => ({ - queue: async < +export const getJobsLocalAPI = (payload: Payload) => { + const queue = async < // eslint-disable-next-line @typescript-eslint/no-duplicate-type-constituents TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] | keyof TypedJobs['workflows'], >( @@ -95,9 +95,9 @@ export const getJobsLocalAPI = (payload: Payload) => ({ }), }) as unknown as ReturnType } - }, + } - run: async (args?: { + const run = async (args?: { limit?: number overrideAccess?: boolean /** @@ -126,12 +126,13 @@ export const getJobsLocalAPI = (payload: Payload) => ({ sequential: args?.sequential, where: args?.where, }) - }, + } - runByID: async (args: { + const runByID = async (args: { id: number | string overrideAccess?: boolean req?: PayloadRequest + returnJob?: boolean }): Promise> => { const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload)) @@ -139,10 +140,11 @@ export const getJobsLocalAPI = (payload: Payload) => ({ id: args.id, overrideAccess: args.overrideAccess !== false, req: newReq, + returnJobs: args.returnJob, }) - }, + } - cancel: async (args: { + const cancel = async (args: { overrideAccess?: boolean queue?: string req?: PayloadRequest @@ -191,9 +193,9 @@ export const getJobsLocalAPI = (payload: Payload) => ({ returning: false, where: { and }, }) - }, + } - cancelByID: async (args: { + const cancelByID = async (args: { id: number | string overrideAccess?: boolean req?: PayloadRequest @@ -219,5 +221,46 @@ export const getJobsLocalAPI = (payload: Payload) => ({ req: newReq, returning: false, }) - }, -}) + } + + const queueAndRun = async < + // eslint-disable-next-line @typescript-eslint/no-duplicate-type-constituents + TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] | keyof TypedJobs['workflows'], + >( + args: + | { + input: TypedJobs['tasks'][TTaskOrWorkflowSlug]['input'] + queue?: string + req?: PayloadRequest + task: TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] ? TTaskOrWorkflowSlug : never + waitUntil?: Date + workflow?: never + } + | { + input: TypedJobs['workflows'][TTaskOrWorkflowSlug]['input'] + queue?: string + req?: PayloadRequest + task?: never + waitUntil?: Date + workflow: TTaskOrWorkflowSlug extends keyof TypedJobs['workflows'] + ? TTaskOrWorkflowSlug + : never + }, + ): Promise< + TTaskOrWorkflowSlug extends keyof TypedJobs['workflows'] + ? RunningJob + : RunningJobFromTask + > => { + const queued = await queue(args as Parameters[0]) + + const result = await runByID({ id: queued.id, req: args.req, returnJob: true }) + + type ReturnType = TTaskOrWorkflowSlug extends keyof TypedJobs['workflows'] + ? RunningJob + : RunningJobFromTask + + return result.jobs?.[0] as ReturnType + } + + return { cancel, cancelByID, queue, queueAndRun, run, runByID } +} diff --git a/packages/payload/src/queues/operations/runJobs/index.ts b/packages/payload/src/queues/operations/runJobs/index.ts index c8ba056a04c..b30dd234547 100644 --- a/packages/payload/src/queues/operations/runJobs/index.ts +++ b/packages/payload/src/queues/operations/runJobs/index.ts @@ -35,6 +35,11 @@ export type RunJobsArgs = { processingOrder?: Sort queue?: string req: PayloadRequest + /** + * Return the processed job documents. Defaults to `false` to maintain + * backward compatibility. + */ + returnJobs?: boolean /** * By default, jobs are run in parallel. * If you want to run them in sequence, set this to true. @@ -44,6 +49,10 @@ export type RunJobsArgs = { } export type RunJobsResult = { + /** + * The jobs that were processed. Only returned when `returnJobs` is `true`. + */ + jobs?: BaseJob[] jobStatus?: Record /** * If this is false, there for sure are no jobs remaining, regardless of the limit @@ -63,6 +72,7 @@ export const runJobs = async (args: RunJobsArgs): Promise => { processingOrder, queue, req, + returnJobs, sequential, where: whereFromProps, } = args @@ -125,6 +135,8 @@ export const runJobs = async (args: RunJobsArgs): Promise => { docs: BaseJob[] } = { docs: [] } + const processedJobs: BaseJob[] = [] + if (id) { // Only one job to run jobsQuery.docs = [ @@ -190,10 +202,16 @@ export const runJobs = async (args: RunJobsArgs): Promise => { ) if (!jobsQuery.docs.length) { - return { + const result: RunJobsResult = { noJobsRemaining: true, remainingJobsFromQueried: 0, } + + if (returnJobs) { + result.jobs = [] + } + + return result } if (jobsQuery?.docs?.length) { @@ -273,6 +291,8 @@ export const runJobs = async (args: RunJobsArgs): Promise => { jobsToDelete.push(job.id) } + processedJobs.push(job) + return { id: job.id, result } } else { const result = await runJSONJob({ @@ -287,6 +307,8 @@ export const runJobs = async (args: RunJobsArgs): Promise => { jobsToDelete.push(job.id) } + processedJobs.push(job) + return { id: job.id, result } } } @@ -343,8 +365,14 @@ export const runJobs = async (args: RunJobsArgs): Promise => { } } - return { + const returnValue: RunJobsResult = { jobStatus: resultsObject, remainingJobsFromQueried, } + + if (returnJobs) { + returnValue.jobs = processedJobs + } + + return returnValue }