Skip to content

Commit edf7a36

Browse files
dom96elithrar
authored andcommitted
Adds Python examples to queues/configuration/ (#26870)
1 parent 15e2f5c commit edf7a36

File tree

3 files changed

+222
-9
lines changed

3 files changed

+222
-9
lines changed

src/content/docs/queues/configuration/batching-retries.mdx

Lines changed: 119 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ sidebar:
55
order: 2
66
---
77

8-
import { WranglerConfig } from "~/components";
8+
import { WranglerConfig, TypeScriptExample, Tabs, TabItem } from "~/components";
99

1010
## Batching
1111

@@ -62,7 +62,9 @@ You can acknowledge individual messages within a batch by explicitly acknowledgi
6262

6363
To explicitly acknowledge a message as delivered, call the `ack()` method on the message.
6464

65-
```ts title="index.js"
65+
<Tabs syncKey="workersExamples">
66+
<TypeScriptExample filename="index.ts" omitTabs={true}>
67+
```ts
6668
export default {
6769
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
6870
for (const msg of batch.messages) {
@@ -73,10 +75,26 @@ export default {
7375
},
7476
};
7577
```
78+
</TypeScriptExample>
79+
<TabItem label="Python" icon="seti:python">
80+
```python
81+
from workers import WorkerEntrypoint
82+
83+
class Default(WorkerEntrypoint):
84+
async def queue(self, batch):
85+
for msg in batch.messages:
86+
# TODO: do something with the message
87+
# Explicitly acknowledge the message as delivered
88+
msg.ack()
89+
```
90+
</TabItem>
91+
</Tabs>
7692

7793
You can also call `retry()` to explicitly force a message to be redelivered in a subsequent batch. This is referred to as "negative acknowledgement". This can be particularly useful when you want to process the rest of the messages in that batch without throwing an error that would force the entire batch to be redelivered.
7894

79-
```ts title="index.ts"
95+
<Tabs syncKey="workersExamples">
96+
<TypeScriptExample filename="index.ts" omitTabs={true}>
97+
```ts
8098
export default {
8199
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
82100
for (const msg of batch.messages) {
@@ -86,6 +104,19 @@ export default {
86104
},
87105
};
88106
```
107+
</TypeScriptExample>
108+
<TabItem label="Python" icon="seti:python">
109+
```python
110+
from workers import WorkerEntrypoint
111+
112+
class Default(WorkerEntrypoint):
113+
async def queue(self, batch):
114+
for msg in batch.messages:
115+
# TODO: do something with the message that fails
116+
msg.retry()
117+
```
118+
</TabItem>
119+
</Tabs>
89120

90121
You can also acknowledge or negatively acknowledge messages at a batch level with `ackAll()` and `retryAll()`. Calling `ackAll()` on the batch of messages (`MessageBatch`) delivered to your consumer Worker has the same behaviour as a consumer Worker that successfully returns (does not throw an error).
91122

@@ -133,6 +164,8 @@ Configuring delivery and retry delays via the `wrangler` CLI or when [developing
133164

134165
To delay a message or batch of messages when sending to a queue, you can provide a `delaySeconds` parameter when sending a message.
135166

167+
<Tabs syncKey="workersExamples">
168+
<TypeScriptExample filename="index.ts" omitTabs={true}>
136169
```ts
137170
// Delay a singular message by 600 seconds (10 minutes)
138171
await env.YOUR_QUEUE.send(message, { delaySeconds: 600 });
@@ -144,6 +177,21 @@ await env.YOUR_QUEUE.sendBatch(messages, { delaySeconds: 300 });
144177
// If there is a global delay configured on the queue, ignore it.
145178
await env.YOUR_QUEUE.sendBatch(messages, { delaySeconds: 0 });
146179
```
180+
</TypeScriptExample>
181+
<TabItem label="Python" icon="seti:python">
182+
```python
183+
# Delay a singular message by 600 seconds (10 minutes)
184+
await env.YOUR_QUEUE.send(message, delaySeconds=600)
185+
186+
# Delay a batch of messages by 300 seconds (5 minutes)
187+
await env.YOUR_QUEUE.sendBatch(messages, delaySeconds=300)
188+
189+
# Do not delay this message.
190+
# If there is a global delay configured on the queue, ignore it.
191+
await env.YOUR_QUEUE.sendBatch(messages, delaySeconds=0)
192+
```
193+
</TabItem>
194+
</Tabs>
147195

148196
You can also configure a default, global delay on a per-queue basis by passing `--delivery-delay-secs` when creating a queue via the `wrangler` CLI:
149197

@@ -158,7 +206,9 @@ When [consuming messages from a queue](/queues/reference/how-queues-works/#consu
158206

159207
To delay an individual message within a batch:
160208

161-
```ts title="index.ts"
209+
<Tabs syncKey="workersExamples">
210+
<TypeScriptExample filename="index.ts" omitTabs={true}>
211+
```ts
162212
export default {
163213
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
164214
for (const msg of batch.messages) {
@@ -169,10 +219,26 @@ export default {
169219
},
170220
};
171221
```
222+
</TypeScriptExample>
223+
<TabItem label="Python" icon="seti:python">
224+
```python
225+
from workers import WorkerEntrypoint
226+
227+
class Default(WorkerEntrypoint):
228+
async def queue(self, batch):
229+
for msg in batch.messages:
230+
# Mark for retry and delay a singular message
231+
# by 3600 seconds (1 hour)
232+
msg.retry(delaySeconds=3600)
233+
```
234+
</TabItem>
235+
</Tabs>
172236

173237
To delay a batch of messages:
174238

175-
```ts title="index.ts"
239+
<Tabs syncKey="workersExamples">
240+
<TypeScriptExample filename="index.ts" omitTabs={true}>
241+
```ts
176242
export default {
177243
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
178244
// Mark for retry and delay a batch of messages
@@ -181,6 +247,19 @@ export default {
181247
},
182248
};
183249
```
250+
</TypeScriptExample>
251+
<TabItem label="Python" icon="seti:python">
252+
```python
253+
from workers import WorkerEntrypoint
254+
255+
class Default(WorkerEntrypoint):
256+
async def queue(self, batch):
257+
# Mark for retry and delay a batch of messages
258+
# by 600 seconds (10 minutes)
259+
batch.retryAll(delaySeconds=600)
260+
```
261+
</TabItem>
262+
</Tabs>
184263

185264
You can also choose to set a default retry delay to any messages that are retried due to either implicit failure or when calling `retry()` explicitly. This is set at the consumer level, and is supported in both push-based (Worker) and pull-based (HTTP) consumers.
186265

@@ -233,6 +312,8 @@ Each message delivered to a consumer includes an `attempts` property that tracks
233312

234313
For example, to generate an [exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) for a message, you can create a helper function that calculates this for you:
235314

315+
<Tabs syncKey="workersExamples">
316+
<TypeScriptExample filename="index.ts" omitTabs={true}>
236317
```ts
237318
const calculateExponentialBackoff = (
238319
attempts: number,
@@ -241,10 +322,20 @@ const calculateExponentialBackoff = (
241322
return baseDelaySeconds ** attempts;
242323
};
243324
```
325+
</TypeScriptExample>
326+
<TabItem label="Python" icon="seti:python">
327+
```python
328+
def calculate_exponential_backoff(attempts, base_delay_seconds):
329+
return base_delay_seconds ** attempts
330+
```
331+
</TabItem>
332+
</Tabs>
244333

245334
In your consumer, you then pass the value of `msg.attempts` and your desired delay factor as the argument to `delaySeconds` when calling `retry()` on an individual message:
246335

247-
```ts title="index.ts"
336+
<Tabs syncKey="workersExamples">
337+
<TypeScriptExample filename="index.ts" omitTabs={true}>
338+
```ts
248339
const BASE_DELAY_SECONDS = 30;
249340

250341
export default {
@@ -262,9 +353,30 @@ export default {
262353
},
263354
};
264355
```
356+
</TypeScriptExample>
357+
<TabItem label="Python" icon="seti:python">
358+
```python
359+
from workers import WorkerEntrypoint
360+
361+
BASE_DELAY_SECONDS = 30
362+
363+
class Default(WorkerEntrypoint):
364+
async def queue(self, batch):
365+
for msg in batch.messages:
366+
# Mark for retry and delay a singular message
367+
# by 3600 seconds (1 hour)
368+
msg.retry(
369+
delaySeconds=calculate_exponential_backoff(
370+
msg.attempts,
371+
BASE_DELAY_SECONDS,
372+
)
373+
)
374+
```
375+
</TabItem>
376+
</Tabs>
265377

266378
## Related
267379

268380
- Review the [JavaScript API](/queues/configuration/javascript-apis/) documentation for Queues.
269381
- Learn more about [How Queues Works](/queues/reference/how-queues-works/).
270-
- Understand the [metrics available](/queues/observability/metrics/) for your queues, including backlog and delayed message counts.
382+
- Understand the [metrics available](/queues/observability/metrics/) for your queues, including backlog and delayed message counts.

src/content/docs/queues/configuration/javascript-apis.mdx

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ head:
99

1010
---
1111

12-
import { Type } from "~/components";
12+
import { Type, TypeScriptExample, Tabs, TabItem } from "~/components";
1313

1414
Cloudflare Queues is integrated with [Cloudflare Workers](/workers). To send and receive messages, you must use a Worker.
1515

@@ -23,6 +23,8 @@ These APIs allow a producer Worker to send messages to a Queue.
2323

2424
An example of writing a single message to a Queue:
2525

26+
<Tabs syncKey="workersExamples">
27+
<TypeScriptExample filename="index.ts" omitTabs={true}>
2628
```ts
2729
type Environment = {
2830
readonly MY_QUEUE: Queue;
@@ -39,9 +41,28 @@ export default {
3941
},
4042
};
4143
```
44+
</TypeScriptExample>
45+
<TabItem label="Python" icon="seti:python">
46+
```python
47+
from pyodide.ffi import to_js
48+
from workers import Response, WorkerEntrypoint
49+
50+
class Default(WorkerEntrypoint):
51+
async def fetch(self, request):
52+
await self.env.MY_QUEUE.send(to_js({
53+
"url": request.url,
54+
"method": request.method,
55+
"headers": dict(request.headers),
56+
}))
57+
return Response("Sent!")
58+
```
59+
</TabItem>
60+
</Tabs>
4261

4362
The Queues API also supports writing multiple messages at once:
4463

64+
<Tabs syncKey="workersExamples">
65+
<TypeScriptExample filename="index.ts" omitTabs={true}>
4566
```ts
4667
const sendResultsToQueue = async (results: Array<any>, env: Environment) => {
4768
const batch: MessageSendRequest[] = results.map((value) => ({
@@ -50,6 +71,21 @@ const sendResultsToQueue = async (results: Array<any>, env: Environment) => {
5071
await env.queue.sendBatch(batch);
5172
};
5273
```
74+
</TypeScriptExample>
75+
<TabItem label="Python" icon="seti:python">
76+
```python
77+
import json
78+
from pyodide.ffi import to_js
79+
80+
async def send_results_to_queue(results, env):
81+
batch = [
82+
{"body": json.dumps(value)}
83+
for value in results
84+
]
85+
await env.queue.sendBatch(to_js(batch))
86+
```
87+
</TabItem>
88+
</Tabs>
5389

5490
### `Queue`
5591

@@ -170,6 +206,8 @@ If the `queue()` function throws, or the promise returned by it or any of the pr
170206

171207
:::
172208

209+
<Tabs syncKey="workersExamples">
210+
<TypeScriptExample filename="index.ts" omitTabs={true}>
173211
```ts
174212
export default {
175213
async queue(
@@ -183,6 +221,18 @@ export default {
183221
},
184222
};
185223
```
224+
</TypeScriptExample>
225+
<TabItem label="Python" icon="seti:python">
226+
```python
227+
from workers import WorkerEntrypoint
228+
229+
class Default(WorkerEntrypoint):
230+
async def queue(self, batch):
231+
for message in batch.messages:
232+
print("Received", message)
233+
```
234+
</TabItem>
235+
</Tabs>
186236

187237
The `env` and `ctx` fields are as [documented in the Workers documentation](/workers/reference/migrate-to-module-workers/).
188238

0 commit comments

Comments
 (0)