Skip to content

Commit bf9cb3d

Browse files
feat: add configurable step limits for Workflows (#12622)
Co-authored-by: emily-shen <69125074+emily-shen@users.noreply.github.com>
1 parent b379b43 commit bf9cb3d

File tree

12 files changed

+708
-5
lines changed

12 files changed

+708
-5
lines changed

‎.changeset/workflow-step-limits.md‎

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
---
2+
"wrangler": minor
3+
"miniflare": minor
4+
"@cloudflare/workflows-shared": minor
5+
---
6+
7+
Add configurable step limits for Workflows
8+
9+
You can now set a maximum number of steps for a Workflow instance via the `limits.steps` configuration in your Wrangler config. When a Workflow instance exceeds this limit, it will fail with an error indicating the limit was reached.
10+
11+
```jsonc
12+
// wrangler.jsonc
13+
{
14+
"workflows": [
15+
{
16+
"binding": "MY_WORKFLOW",
17+
"name": "my-workflow",
18+
"class_name": "MyWorkflow",
19+
"limits": {
20+
"steps": 5000,
21+
},
22+
},
23+
],
24+
}
25+
```
26+
27+
The `steps` value must be an integer between 1 and 25,000. If not specified, the default limit of 10,000 steps is used. Step limits are also enforced in local development via `wrangler dev`.

‎packages/miniflare/src/plugins/workflows/index.ts‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export const WorkflowsOptionsSchema = z.object({
2323
remoteProxyConnectionString: z
2424
.custom<RemoteProxyConnectionString>()
2525
.optional(),
26+
stepLimit: z.number().int().min(1).optional(),
2627
})
2728
)
2829
.optional(),
@@ -151,6 +152,14 @@ export const WORKFLOWS_PLUGIN: Plugin<
151152
name: "BINDING_NAME",
152153
json: JSON.stringify(bindingName),
153154
},
155+
...(workflow.stepLimit !== undefined
156+
? [
157+
{
158+
name: "STEP_LIMIT",
159+
json: JSON.stringify(workflow.stepLimit),
160+
},
161+
]
162+
: []),
154163
],
155164
},
156165
};

‎packages/workers-utils/src/config/environment.ts‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,11 @@ export type WorkflowBinding = {
675675
script_name?: string;
676676
/** Whether the Workflow should be remote or not in local development */
677677
remote?: boolean;
678+
/** Optional limits for the Workflow */
679+
limits?: {
680+
/** Maximum number of steps a Workflow instance can execute */
681+
steps?: number;
682+
};
678683
};
679684

680685
/**

‎packages/workers-utils/src/config/validation.ts‎

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2584,12 +2584,54 @@ const validateWorkflowBinding: ValidatorFn = (diagnostics, field, value) => {
25842584
isValid = false;
25852585
}
25862586

2587+
if (hasProperty(value, "limits") && value.limits !== undefined) {
2588+
if (
2589+
typeof value.limits !== "object" ||
2590+
value.limits === null ||
2591+
Array.isArray(value.limits)
2592+
) {
2593+
diagnostics.errors.push(
2594+
`"${field}" bindings should, optionally, have an object "limits" field but got ${JSON.stringify(
2595+
value
2596+
)}.`
2597+
);
2598+
isValid = false;
2599+
} else {
2600+
const limits = value.limits as Record<string, unknown>;
2601+
if (limits.steps !== undefined) {
2602+
if (
2603+
typeof limits.steps !== "number" ||
2604+
!Number.isInteger(limits.steps) ||
2605+
limits.steps < 1
2606+
) {
2607+
diagnostics.errors.push(
2608+
`"${field}" bindings "limits.steps" field must be a positive integer but got ${JSON.stringify(
2609+
limits.steps
2610+
)}.`
2611+
);
2612+
isValid = false;
2613+
} else if (limits.steps > 25_000) {
2614+
diagnostics.warnings.push(
2615+
`"${field}" has a step limit of ${limits.steps}, which exceeds the production maximum of 25,000. This configuration may not work when deployed.`
2616+
);
2617+
}
2618+
}
2619+
validateAdditionalProperties(
2620+
diagnostics,
2621+
`${field}.limits`,
2622+
Object.keys(limits),
2623+
["steps"]
2624+
);
2625+
}
2626+
}
2627+
25872628
validateAdditionalProperties(diagnostics, field, Object.keys(value), [
25882629
"binding",
25892630
"name",
25902631
"class_name",
25912632
"script_name",
25922633
"remote",
2634+
"limits",
25932635
]);
25942636

25952637
return isValid;

‎packages/workers-utils/src/worker.ts‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ export interface CfWorkflow {
181181
script_name?: string;
182182
remote?: boolean;
183183
raw?: boolean;
184+
limits?: {
185+
steps?: number;
186+
};
184187
}
185188

186189
export interface CfQueue {

‎packages/workflows-shared/src/context.ts‎

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export class Context extends RpcTarget {
4747
#state: DurableObjectState;
4848

4949
#counters: Map<string, number> = new Map();
50+
#lifetimeStepCounter: number = 0;
5051

5152
constructor(engine: Engine, state: DurableObjectState) {
5253
super();
@@ -85,6 +86,15 @@ export class Context extends RpcTarget {
8586
stepConfig = {};
8687
}
8788

89+
this.#lifetimeStepCounter++;
90+
91+
const stepLimit = this.#engine.stepLimit;
92+
if (this.#lifetimeStepCounter > stepLimit) {
93+
throw new WorkflowFatalError(
94+
`The limit of ${stepLimit} steps has been reached. This limit can be changed in your worker configuration.`
95+
);
96+
}
97+
8898
if (!isValidStepName(name)) {
8999
// NOTE(lduarte): marking errors as user error allows the observability layer to avoid leaking
90100
// user errors to sentry while making everything more observable. `isUserError` is not serialized

‎packages/workflows-shared/src/engine.ts‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import type { WorkflowEntrypoint, WorkflowEvent } from "cloudflare:workers";
2323

2424
interface Env {
2525
USER_WORKFLOW: WorkflowEntrypoint;
26+
STEP_LIMIT?: string; // JSON-encoded number from miniflare binding
2627
}
2728

2829
export type DatabaseWorkflow = {
@@ -74,6 +75,8 @@ const ENGINE_STATUS_KEY = "ENGINE_STATUS";
7475

7576
const EVENT_MAP_PREFIX = "EVENT_MAP";
7677

78+
export const DEFAULT_STEP_LIMIT = 10_000;
79+
7780
export class Engine extends DurableObject<Env> {
7881
logs: Array<unknown> = [];
7982

@@ -83,13 +86,19 @@ export class Engine extends DurableObject<Env> {
8386
workflowName: string | undefined;
8487
timeoutHandler: GracePeriodSemaphore;
8588
priorityQueue: TimePriorityQueue | undefined;
89+
stepLimit: number;
8690

8791
waiters: Map<string, Array<(event: Event | PromiseLike<Event>) => void>> =
8892
new Map();
8993
eventMap: Map<string, Array<Event>> = new Map();
9094

9195
constructor(state: DurableObjectState, env: Env) {
9296
super(state, env);
97+
98+
this.stepLimit = env.STEP_LIMIT
99+
? JSON.parse(env.STEP_LIMIT)
100+
: DEFAULT_STEP_LIMIT;
101+
93102
void this.ctx.blockConcurrencyWhile(async () => {
94103
this.ctx.storage.transactionSync(() => {
95104
try {

‎packages/workflows-shared/tests/engine.test.ts‎

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
} from "cloudflare:test";
66
import { NonRetryableError } from "cloudflare:workflows";
77
import { describe, it, vi } from "vitest";
8-
import { InstanceEvent, InstanceStatus } from "../src";
8+
import { DEFAULT_STEP_LIMIT, InstanceEvent, InstanceStatus } from "../src";
99
import type {
1010
DatabaseInstance,
1111
DatabaseVersion,
@@ -271,4 +271,118 @@ describe("Engine", () => {
271271
logs.logs.some((log) => log.event === InstanceEvent.WORKFLOW_START)
272272
).toBe(true);
273273
});
274+
275+
describe("step limits", () => {
276+
it("should enforce step limit when exceeded", async ({ expect }) => {
277+
const stepLimit = 3;
278+
279+
const engineStub = await runWorkflow(
280+
"STEP-LIMIT-EXCEEDED",
281+
async (_event, step) => {
282+
// Try to run more steps than the limit
283+
for (let i = 0; i < stepLimit + 1; i++) {
284+
await step.do(`step-${i}`, async () => `result-${i}`);
285+
}
286+
}
287+
);
288+
289+
// Set the step limit on the engine
290+
await runInDurableObject(engineStub, (engine) => {
291+
engine.stepLimit = stepLimit;
292+
});
293+
294+
// Re-init to run with the new limit
295+
await setWorkflowEntrypoint(engineStub, async (_event, step) => {
296+
for (let i = 0; i < stepLimit + 1; i++) {
297+
await step.do(`step-${i}`, async () => `result-${i}`);
298+
}
299+
});
300+
301+
const engineId = env.ENGINE.idFromName("STEP-LIMIT-EXCEEDED-2");
302+
const freshStub = env.ENGINE.get(engineId);
303+
304+
await runInDurableObject(freshStub, (engine) => {
305+
engine.stepLimit = stepLimit;
306+
});
307+
308+
await setWorkflowEntrypoint(freshStub, async (_event, step) => {
309+
for (let i = 0; i < stepLimit + 1; i++) {
310+
await step.do(`step-${i}`, async () => `result-${i}`);
311+
}
312+
});
313+
314+
await freshStub.init(
315+
12346,
316+
{} as DatabaseWorkflow,
317+
{} as DatabaseVersion,
318+
{ id: "STEP-LIMIT-EXCEEDED-2" } as DatabaseInstance,
319+
{
320+
payload: {},
321+
timestamp: new Date(),
322+
instanceId: "STEP-LIMIT-EXCEEDED-2",
323+
}
324+
);
325+
326+
const logs = (await freshStub.readLogs()) as EngineLogs;
327+
328+
expect(
329+
logs.logs.some((val) => val.event === InstanceEvent.WORKFLOW_FAILURE)
330+
).toBe(true);
331+
});
332+
333+
it("should succeed when steps are exactly at the limit", async ({
334+
expect,
335+
}) => {
336+
const stepLimit = 3;
337+
338+
const engineId = env.ENGINE.idFromName("STEP-LIMIT-AT-LIMIT");
339+
const freshStub = env.ENGINE.get(engineId);
340+
341+
await runInDurableObject(freshStub, (engine) => {
342+
engine.stepLimit = stepLimit;
343+
});
344+
345+
await setWorkflowEntrypoint(freshStub, async (_event, step) => {
346+
for (let i = 0; i < stepLimit; i++) {
347+
await step.do(`step-${i}`, async () => `result-${i}`);
348+
}
349+
return "done";
350+
});
351+
352+
await freshStub.init(
353+
12346,
354+
{} as DatabaseWorkflow,
355+
{} as DatabaseVersion,
356+
{ id: "STEP-LIMIT-AT-LIMIT" } as DatabaseInstance,
357+
{
358+
payload: {},
359+
timestamp: new Date(),
360+
instanceId: "STEP-LIMIT-AT-LIMIT",
361+
}
362+
);
363+
364+
const logs = (await freshStub.readLogs()) as EngineLogs;
365+
366+
expect(
367+
logs.logs.some((val) => val.event === InstanceEvent.WORKFLOW_SUCCESS)
368+
).toBe(true);
369+
expect(
370+
logs.logs.some((val) => val.event === InstanceEvent.WORKFLOW_FAILURE)
371+
).toBe(false);
372+
});
373+
374+
it("should use DEFAULT_STEP_LIMIT when no limit is configured", async ({
375+
expect,
376+
}) => {
377+
const engineId = env.ENGINE.idFromName("STEP-LIMIT-DEFAULT");
378+
const freshStub = env.ENGINE.get(engineId);
379+
380+
const stepLimit = await runInDurableObject(
381+
freshStub,
382+
(engine) => engine.stepLimit
383+
);
384+
385+
expect(stepLimit).toBe(DEFAULT_STEP_LIMIT);
386+
});
387+
});
274388
});

0 commit comments

Comments
 (0)