Skip to content

Commit c02e906

Browse files
authored
RequestContextHTTPClient + Server implementation in local mode (#457)
RequestContextHTTPClient is a picklable RequestContext implementation that provides all RequestContext functionality in any Python thread and subprocess. It exchanges small http requests with an HTTP Server which implements: presigning of read/write blobs, commiting multipart uploads to blob store, adding request metrics, sending function progress updates. The BLOBs are not sent to the Server. BLOB store client is directly invoked by RequestContextHTTPClient. User code can use RequestContextHTTPClient in a thread or subprocess by passing the result of RequestContext.get() as an argument when creating a Thread or Process. This usage scenario is demonstrated in the added tests. When user passes RequestContextHTTPClient into a Thread the context is passed by reference. RequestContextHTTPClient is thread safe so this is okay. When user passes RequestContextHTTPClient into a Process, Python runtime pickles RequestContextHTTPClient and then unpickles its clone in the subprocess so the subprocess uses a clone of the original RequestContextHTTPClient. RequestContextHTTPClient has all the data required to reestablish http connection to the HTTP Server and tell the Server which allocation and request the http requests are for. This PR adds implementation of the HTTP Server in local mode. Remote (FE) side HTTP Server implementation will be done in a separate PR. New integration tests are covering all RequestContext functionaility working correctly when accessed from a user function, a thread and a subprocess. Each LocalRunner (one per request) uses its own HTTP Server. Each HTTP Server uses a unique ephemeral port where the LocalRunner request's RequestContextHTTPClient connects to. So multiple local runners can exist at the same time without any problems. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Introduces a picklable RequestContext HTTP client/server for local mode with thread/process support, refactors BLOB store and internal logging, and updates FE/local runner and tests accordingly. > > - **Request Context (Local mode)**: > - Add `RequestContextHTTPClient` (picklable, thread-safe) and lightweight local HTTP server/router with handlers for `progress`, `metrics`, and request state (`prepare_read`, `prepare_write`, `commit_write`). > - Local handlers print progress/metrics and serve file-backed BLOBs for request state. > - **BLOB Store**: > - Introduce SDK `BLOB`/`BLOBChunk` pydantic models and `BLOBStore` with local FS and S3 backends; parallel chunked `get/put` with `InternalLogger`. > - Remove old local blob store; add local `SerializedValueStore` wrapper. > - **Logging**: > - Replace `FunctionExecutorLogger` with picklable `InternalLogger`; unify usage across SDK/FE. > - **Local Runner**: > - Use per-request local HTTP server + `RequestContextHTTPClient`, new value store, cleanup lifecycle; set multiprocessing start method via `setup_multiprocessing`. > - **Function Executor**: > - Migrate to new `InternalLogger`/`BLOBStore`; add blob proto↔SDK converters; validate progress attributes; minor proto doc tweak. > - **Runtime hooks & multiprocessing**: > - Add `_raise_multiprocessing_usage_error`; initialize spawn method; improved SDK usage errors outside function threads. > - **Cloud events**: > - Move CloudEvents helpers under `applications` and update imports; clarify JSON serialization errors. > - **Tests**: > - Add/adjust tests for blob store (local/S3), request context (metrics/progress/state/request_id) in function, thread, and subprocess scenarios; update benchmarks. > - **Version**: bump to `0.2.89`. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit e487a23. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 2781c43 commit c02e906

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+2205
-522
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "tensorlake"
3-
version = "0.2.88"
3+
version = "0.2.89"
44
description = "Tensorlake SDK for Document Ingestion API and Serverless Applications"
55
readme = "README.md"
66
authors = ["Tensorlake Inc. <[email protected]>"]
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .blob import BLOB, BLOBChunk
2+
from .blob_store import BLOBStore
3+
4+
__all__ = ["BLOB", "BLOBChunk", "BLOBStore"]
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from pydantic import BaseModel
2+
3+
4+
class BLOBChunk(BaseModel):
5+
# URI of the chunk.
6+
# S3 URI if the data is stored in S3.
7+
# Starts with "file://"" prefix if the data is stored on a local file system.
8+
uri: str
9+
# Actual size of chunk data if the BLOB has data.
10+
# Requested chunk size if the BLOB has no data yet (can be used for data upload).
11+
size: int
12+
# None if the BLOB has data.
13+
# ETag of the chunk data (generated by BLOB store) if the BLOB chunk was used to upload data.
14+
etag: str | None
15+
16+
17+
class BLOB(BaseModel):
18+
# ID of the BLOB
19+
id: str
20+
# Ordered chunks of the BLOB if the BLOB has data.
21+
# Ordered chunks of the BLOB that can be used to upload data if the BLOB has no data yet.
22+
chunks: list[BLOBChunk]

src/tensorlake/function_executor/blob_store/blob_store.py renamed to src/tensorlake/applications/blob_store/blob_store.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
from concurrent.futures import FIRST_EXCEPTION, Future, ThreadPoolExecutor, wait
22
from dataclasses import dataclass
3-
from typing import List
3+
from typing import Any
44

5-
from tensorlake.applications import InternalError
5+
from tensorlake.applications.interface.exceptions import InternalError
6+
from tensorlake.applications.internal_logger import InternalLogger
67

7-
from ..logger import FunctionExecutorLogger
8-
from ..proto.function_executor_pb2 import BLOB, BLOBChunk
8+
from .blob import BLOB, BLOBChunk
99
from .local_fs_blob_store import LocalFSBLOBStore
1010
from .s3_blob_store import S3BLOBStore
1111

@@ -27,10 +27,13 @@ class _ChunkInfo:
2727
class BLOBStore:
2828
"""Dispatches generic BLOB store calls to their real backends.
2929
30-
Implements chunking."""
30+
Implements chunking. Thread-safe. Picklable.
31+
"""
3132

32-
def __init__(self, available_cpu_count: int, logger: FunctionExecutorLogger):
33+
def __init__(self, available_cpu_count: int):
3334
"""Creates a BLOB store that uses the supplied BLOB stores."""
35+
self._available_cpu_count: int = available_cpu_count
36+
3437
max_io_workers: int = min(
3538
available_cpu_count * _IO_WORKER_THREADS_PER_AVAILABLE_CPU,
3639
_MAX_WORKER_THREADS,
@@ -40,15 +43,20 @@ def __init__(self, available_cpu_count: int, logger: FunctionExecutorLogger):
4043
)
4144
self._local: LocalFSBLOBStore = LocalFSBLOBStore()
4245
self._s3: S3BLOBStore = S3BLOBStore(io_workers_count=max_io_workers)
43-
logger.info(
44-
"BLOBStore initialized",
45-
available_cpu_count=available_cpu_count,
46-
max_io_workers=max_io_workers,
46+
47+
def __getstate__(self):
48+
"""Get the state for pickling."""
49+
return {
50+
"available_cpu_count": self._available_cpu_count,
51+
}
52+
53+
def __setstate__(self, state: dict[str, Any]):
54+
"""Set the state for unpickling."""
55+
self.__init__(
56+
available_cpu_count=state["available_cpu_count"],
4757
)
4858

49-
def get(
50-
self, blob: BLOB, offset: int, size: int, logger: FunctionExecutorLogger
51-
) -> bytes:
59+
def get(self, blob: BLOB, offset: int, size: int, logger: InternalLogger) -> bytes:
5260
"""Returns binary data stored in BLOB with the supplied URI at the supplied offset.
5361
5462
Raises InternalError on error.
@@ -60,7 +68,7 @@ def get(
6068

6169
# Read data from BLOB chunks in parallel until all data is read.
6270
# Minimize data copying by not creating any intermediate bytes/bytearray objects.
63-
read_chunk_futures: List[Future] = []
71+
read_chunk_futures: list[Future] = []
6472
destination: bytearray = bytearray(size)
6573
destination_view: memoryview = memoryview(destination)
6674
read_offset: int = offset
@@ -110,7 +118,7 @@ def _read_into(
110118
blob_uri: str,
111119
blob_read_offset: int,
112120
destination: memoryview,
113-
logger: FunctionExecutorLogger,
121+
logger: InternalLogger,
114122
) -> bytes:
115123
if _is_file_uri(blob_uri):
116124
self._local.get(
@@ -127,9 +135,7 @@ def _read_into(
127135
logger=logger,
128136
)
129137

130-
def put(
131-
self, blob: BLOB, data: List[bytes], logger: FunctionExecutorLogger
132-
) -> BLOB:
138+
def put(self, blob: BLOB, data: list[bytes], logger: InternalLogger) -> BLOB:
133139
"""Stores the supplied binary data into the supplied BLOB starting from its very beginning.
134140
135141
Overwrites BLOB. Raises Exception on error.
@@ -150,14 +156,14 @@ def put(
150156
# Write data to BLOB chunks in parallel until all data is written.
151157
# Minimize data copying by not creating any intermediate bytes/bytearray objects.
152158
data_read_offset: int = 0
153-
write_chunk_futures: List[Future] = []
154-
uploaded_chunk_sizes: List[int] = []
159+
write_chunk_futures: list[Future] = []
160+
uploaded_chunk_sizes: list[int] = []
155161

156162
data_ix: int = 0
157163
read_offset_inside_data: int = 0
158164
for chunk in blob.chunks:
159165
chunk: BLOBChunk
160-
chunk_data: List[memoryview] = []
166+
chunk_data: list[memoryview] = []
161167
chunk_data_size: int = 0
162168
chunk_offset: int = data_read_offset
163169
if data_ix == len(data):
@@ -198,6 +204,7 @@ def put(
198204
wait(write_chunk_futures, return_when=FIRST_EXCEPTION)
199205
uploaded_blob: BLOB = BLOB(
200206
id=blob.id,
207+
chunks=[],
201208
)
202209
for ix, future in enumerate(write_chunk_futures):
203210
if future.exception() is not None:
@@ -206,8 +213,7 @@ def put(
206213
) from future.exception()
207214
# The futures list is ordered by the chunk index, so appending here preserves
208215
# the original chunks order.
209-
uploaded_chunk: BLOBChunk = BLOBChunk()
210-
uploaded_chunk.CopyFrom(blob.chunks[ix])
216+
uploaded_chunk: BLOBChunk = blob.chunks[ix].model_copy()
211217
uploaded_chunk.size = uploaded_chunk_sizes[ix]
212218
uploaded_chunk.etag = future.result()
213219
uploaded_blob.chunks.append(uploaded_chunk)
@@ -218,8 +224,8 @@ def _write_chunk(
218224
self,
219225
chunk_uri: str,
220226
chunk_offset: int,
221-
source: List[memoryview],
222-
logger: FunctionExecutorLogger,
227+
source: list[memoryview],
228+
logger: InternalLogger,
223229
) -> str:
224230
if _is_file_uri(chunk_uri):
225231
return self._local.put(

src/tensorlake/function_executor/blob_store/local_fs_blob_store.py renamed to src/tensorlake/applications/blob_store/local_fs_blob_store.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
import os.path
44
from typing import List
55

6-
from tensorlake.applications import InternalError
7-
8-
from ..logger import FunctionExecutorLogger
6+
from tensorlake.applications.interface.exceptions import InternalError
7+
from tensorlake.applications.internal_logger import InternalLogger
98

109

1110
class LocalFSBLOBStore:
@@ -16,7 +15,7 @@ def get(
1615
uri: str,
1716
offset: int,
1817
destination: memoryview,
19-
logger: FunctionExecutorLogger,
18+
logger: InternalLogger,
2019
) -> None:
2120
"""Reads binary data stored in file at the supplied URI and offset into the destination memoryview.
2221
@@ -40,7 +39,7 @@ def put(
4039
uri: str,
4140
offset: int,
4241
source: List[memoryview],
43-
logger: FunctionExecutorLogger,
42+
logger: InternalLogger,
4443
) -> str:
4544
"""Stores the supplied memoryviews of binary data in a file at the supplied URI and offset.
4645

src/tensorlake/function_executor/blob_store/s3_blob_store.py renamed to src/tensorlake/applications/blob_store/s3_blob_store.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
import httpx
44

5-
from tensorlake.applications import InternalError
5+
from tensorlake.applications.interface.exceptions import InternalError
6+
from tensorlake.applications.internal_logger import InternalLogger
67
from tensorlake.utils.retries import exponential_backoff
78

8-
from ..logger import FunctionExecutorLogger
9-
109
# Customers can upload and download large files, allow up to 1 hour per S3 operation.
1110
_S3_OPERATION_TIMEOUT_SEC = 1 * 60 * 60 # 1 hour
1211
# Keep established connections around for up to 1 hour to maximize download/upload throughput
@@ -35,7 +34,7 @@ def get(
3534
uri: str,
3635
offset: int,
3736
destination: memoryview,
38-
logger: FunctionExecutorLogger,
37+
logger: InternalLogger,
3938
) -> bytes:
4039
"""Reads binary data stored in S3 object at the supplied URI and offset into the destination memoryview.
4140
@@ -57,10 +56,10 @@ def on_retry(
5756
# because we're in streaming mode.
5857
e.response.read()
5958
response = e.response.text
60-
except Exception as e:
59+
except Exception as extract_response_exception:
6160
logger.error(
6261
"failed to extract status code and response from failed S3 get response",
63-
exc_info=e,
62+
exc_info=extract_response_exception,
6463
)
6564

6665
# The URI can be presigned, it should not be logged as it provides access to customer data.
@@ -105,10 +104,23 @@ def get_with_retries() -> bytes:
105104
except httpx.HTTPStatusError as e:
106105
# The URI can be presigned, it should not be logged as it provides access to customer data.
107106
# Response text doesn't contain the URI, so it can be logged.
107+
status_code: str = "None"
108+
response: str = "None"
109+
try:
110+
status_code = e.response.status_code
111+
# .read() is required before accessing .text
112+
# because we're in streaming mode.
113+
e.response.read()
114+
response = e.response.text
115+
except Exception as extract_response_exception:
116+
logger.error(
117+
"failed to extract status code and response from failed S3 get response",
118+
exc_info=extract_response_exception,
119+
)
108120
logger.error(
109121
"failed to get S3 object",
110-
status_code=e.response.status_code,
111-
response=e.response.text,
122+
status_code=status_code,
123+
response=response,
112124
offset=offset,
113125
size=len(destination),
114126
)
@@ -129,7 +141,7 @@ def put(
129141
self,
130142
uri: str,
131143
source: List[memoryview],
132-
logger: FunctionExecutorLogger,
144+
logger: InternalLogger,
133145
) -> str:
134146
"""Stores the supplied memoryviews in a S3 object at the supplied URI.
135147
@@ -152,10 +164,10 @@ def on_retry(
152164
if isinstance(e, httpx.HTTPStatusError):
153165
status_code = e.response.status_code
154166
response = e.response.text
155-
except Exception as e:
167+
except Exception as extract_response_exception:
156168
logger.error(
157169
"failed to extract status code and response from failed S3 put response",
158-
exc_info=e,
170+
exc_info=extract_response_exception,
159171
)
160172

161173
logger.error(

src/tensorlake/function_executor/cloud_events.py renamed to src/tensorlake/applications/cloud_events.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import uuid
44
from typing import Any
55

6-
from tensorlake.applications import SerializationError
6+
from .interface.exceptions import SerializationError
77

88

99
def print_cloud_event(
@@ -14,6 +14,8 @@ def print_cloud_event(
1414
) -> None:
1515
"""
1616
Takes a dictionary representing an event produced by the executor, wraps it in a CloudEvent and prints it to stdout.
17+
18+
Raises SerializationError: If the event cannot be serialized to JSON.
1719
"""
1820
print(_serialize_json(new_cloud_event(event, type, source, message)), flush=True)
1921

@@ -56,7 +58,7 @@ def _serialize_json(obj: dict[str, Any]) -> str:
5658
obj: The dictionary to serialize
5759
5860
Returns:
59-
A JSON-serializable version of the object as a string
61+
A version of the object serialized into a JSON string
6062
6163
Raises:
6264
SerializationError: If the object cannot be serialized to JSON

src/tensorlake/applications/interface/request_context.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ def get(self, key: str, default: Any | None = None) -> Any | None:
3232
class RequestMetrics:
3333
"""Abstract interface for reporting application request metrics."""
3434

35-
def timer(self, name: str, value: float):
35+
def timer(self, name: str, value: float) -> None:
3636
"""Records a duration metric with the supplied name and value.
3737
3838
Raises TensorlakeError on error.
3939
"""
4040
raise InternalError("RequestMetrics subclasses must implement timer method.")
4141

42-
def counter(self, name: str, value: int = 1):
42+
def counter(self, name: str, value: int = 1) -> None:
4343
"""Adds the supplied value to the counter with the supplied name.
4444
4545
If the counter does not exist, it is created with the supplied value.
@@ -68,7 +68,8 @@ def update(
6868
attributes: A dictionary of key/value string pairs to pass to the progress update
6969
7070
Raises:
71-
SerializationError: If attributes cannot be serialized to JSON.
71+
SDKUsageError: If attributes is not a dictionary of string key/value pairs or None.
72+
TensorlakeError: On other errors.
7273
"""
7374
raise InternalError("FunctionProgress subclasses must implement update method.")
7475

0 commit comments

Comments
 (0)