diff --git a/ravendb/documents/session/document_session.py b/ravendb/documents/session/document_session.py index 327da2ea..9fad8ec6 100644 --- a/ravendb/documents/session/document_session.py +++ b/ravendb/documents/session/document_session.py @@ -367,7 +367,7 @@ def _load_internal( return load_operation.get_documents(object_type) - def _load_internal_stream(self, keys: List[str], operation: LoadOperation, stream: Optional[bytes] = None) -> None: + def _load_internal_stream(self, keys: List[str], operation: LoadOperation, stream=None) -> None: operation.by_keys(keys) command = operation.create_request() @@ -375,13 +375,13 @@ def _load_internal_stream(self, keys: List[str], operation: LoadOperation, strea if command: self._request_executor.execute_command(command, self.session_info) - if stream: + if stream is not None: try: result = command.result - stream_to_dict = json.loads(stream.decode("utf-8")) - result.__dict__.update(stream_to_dict) - except IOError as e: - raise RuntimeError(f"Unable to serialize returned value into stream {e.args[0]}", e) + data = json.dumps({k: v for k, v in result.to_json().items() if v is not None}).encode("utf-8") + stream.write(data) + except Exception as e: + raise RuntimeError("Unable to serialize returned value into stream") from e else: operation.set_result(command.result) @@ -404,16 +404,19 @@ def load_starting_with( def load_starting_with_into_stream( self, id_prefix: str, + output, matches: str = None, start: int = 0, page_size: int = 25, exclude: str = None, start_after: str = None, - ) -> bytes: + ) -> None: if id_prefix is None: - raise ValueError("Arg 'id_prefix' is cannot be None.") - return self._load_starting_with_into_stream_internal( - id_prefix, LoadStartingWithOperation(self), matches, start, page_size, exclude, start_after + raise ValueError("id_prefix cannot be None.") + if output is None: + raise ValueError("output cannot be None") + self._load_starting_with_into_stream_internal( + id_prefix, LoadStartingWithOperation(self), output, matches, start, page_size, exclude, start_after ) def _load_starting_with_internal( @@ -437,23 +440,23 @@ def _load_starting_with_into_stream_internal( self, id_prefix: str, operation: LoadStartingWithOperation, + output, matches: str, start: int, page_size: int, exclude: str, start_after: str, - ) -> bytes: + ) -> None: operation.with_start_with(id_prefix, matches, start, page_size, exclude, start_after) command = operation.create_request() - bytes_result = None if command: self.request_executor.execute_command(command, self.session_info) try: result = command.result - bytes_result = json.dumps(result.to_json()).encode("utf-8") + data = json.dumps({k: v for k, v in result.to_json().items() if v is not None}).encode("utf-8") + output.write(data) except Exception as e: - raise RuntimeError("Unable sto serialize returned value into stream") from e - return bytes_result + raise RuntimeError("Unable to serialize returned value into stream") from e def document_query_from_index_type(self, index_type: Type[_TIndex], object_type: Type[_T]) -> DocumentQuery[_T]: try: @@ -873,19 +876,22 @@ def load_starting_with( def load_starting_with_into_stream( self, id_prefix: str, + output, matches: str = None, start: int = 0, page_size: int = 25, exclude: str = None, start_after: str = None, - ) -> bytes: + ) -> None: return self._session.load_starting_with_into_stream( - id_prefix, matches, start, page_size, exclude, start_after + id_prefix, output, matches, start, page_size, exclude, start_after ) - def load_into_stream(self, keys: List[str], output: bytes) -> None: + def load_into_stream(self, keys: List[str], output) -> None: if keys is None: raise ValueError("Keys cannot be None") + if output is None: + raise ValueError("output cannot be None") self._session._load_internal_stream(keys, LoadOperation(self._session), output) @@ -1172,8 +1178,21 @@ def _yield_result(self, query: AbstractDocumentQuery, enumerator: Iterator[Dict] object_type=query.query_class, ) - def stream_into(self): # query: Union[DocumentQuery, RawDocumentQuery], output: iter): - pass + def stream_into(self, query: AbstractDocumentQuery, output) -> None: + stream_operation = StreamOperation(self._session) + command = stream_operation.create_request(query.index_query) + + self.request_executor.execute_command(command, self.session_info) + + with stream_operation.set_result(command.result) as result_iter: + output.write(b'{"Results":[') + first = True + for item in result_iter: + if not first: + output.write(b",") + output.write(json.dumps(item).encode("utf-8")) + first = False + output.write(b"]}") def conditional_load( self, key: str, change_vector: str, object_type: Type[_T] = None diff --git a/ravendb/documents/session/query.py b/ravendb/documents/session/query.py index fdb5b4a7..a9efbf7a 100644 --- a/ravendb/documents/session/query.py +++ b/ravendb/documents/session/query.py @@ -2757,6 +2757,9 @@ def suggest_using( self._suggest_using(suggestion_or_builder) return SuggestionDocumentQuery(self) + def to_stream(self, output) -> None: + self._the_session.advanced.stream_into(self, output) + class RawDocumentQuery(Generic[_T], AbstractDocumentQuery[_T]): def __init__(self, object_type: Type[_T], session: InMemoryDocumentSessionOperations, raw_query: str): @@ -2834,6 +2837,9 @@ def projection(self, projection_behavior: ProjectionBehavior) -> RawDocumentQuer self._projection(projection_behavior) return self + def to_stream(self, output) -> None: + self._the_session.advanced.stream_into(self, output) + class DocumentQueryCustomizationDelegate(DocumentQueryCustomization): def __init__(self, query: AbstractDocumentQuery): diff --git a/ravendb/tests/issue_tests/test_RDBC_1037.py b/ravendb/tests/issue_tests/test_RDBC_1037.py new file mode 100644 index 00000000..25d3f22e --- /dev/null +++ b/ravendb/tests/issue_tests/test_RDBC_1037.py @@ -0,0 +1,106 @@ +""" +RDBC-1037: load_into_stream writes to BytesIO output; load_starting_with_into_stream + takes an output parameter and writes to it. + +C# reference: IAdvancedSessionOperations.LoadIntoStream() / LoadStartingWithIntoStream() +""" + +import io +import json +import unittest + +from ravendb.tests.test_base import TestBase + + +class TestLoadIntoStreamUnit(unittest.TestCase): + """Unit tests — no server required.""" + + def test_load_into_stream_method_exists_on_advanced(self): + from ravendb.documents.session.document_session import DocumentSession + + self.assertTrue(hasattr(DocumentSession._Advanced, "load_into_stream")) + + def test_load_starting_with_into_stream_method_exists(self): + from ravendb.documents.session.document_session import DocumentSession + + self.assertTrue(hasattr(DocumentSession._Advanced, "load_starting_with_into_stream")) + + +class TestLoadIntoStream(TestBase): + """Integration tests — require a live server.""" + + def setUp(self): + super().setUp() + self.store = self.get_document_store() + + def tearDown(self): + super().tearDown() + self.store.close() + + def test_load_into_stream_writes_to_bytesio(self): + class Doc: + def __init__(self, name: str = None): + self.name = name + + with self.store.open_session() as session: + session.store(Doc("alpha"), "docs/1") + session.store(Doc("beta"), "docs/2") + session.save_changes() + + output = io.BytesIO() + with self.store.open_session() as session: + session.advanced.load_into_stream(["docs/1", "docs/2"], output) + + output.seek(0) + data = json.loads(output.read()) + self.assertIn("Results", data) + names = [r["name"] for r in data["Results"]] + self.assertIn("alpha", names) + self.assertIn("beta", names) + + def test_load_into_stream_single_document(self): + class Doc: + def __init__(self, name: str = None): + self.name = name + + with self.store.open_session() as session: + session.store(Doc("gamma"), "docs/3") + session.save_changes() + + output = io.BytesIO() + with self.store.open_session() as session: + session.advanced.load_into_stream(["docs/3"], output) + + output.seek(0) + data = json.loads(output.read()) + self.assertIn("Results", data) + self.assertEqual(1, len(data["Results"])) + self.assertEqual("gamma", data["Results"][0]["name"]) + + def test_load_starting_with_into_stream_writes_to_bytesio(self): + class Doc: + def __init__(self, name: str = None): + self.name = name + + with self.store.open_session() as session: + session.store(Doc("one"), "prefix/1") + session.store(Doc("two"), "prefix/2") + session.save_changes() + + output = io.BytesIO() + with self.store.open_session() as session: + session.advanced.load_starting_with_into_stream("prefix/", output) + + output.seek(0) + data = json.loads(output.read()) + self.assertIn("Results", data) + self.assertEqual(2, len(data["Results"])) + expected_names = {"one", "two"} + for r in data["Results"]: + self.assertIn(r["name"], expected_names) + expected_names.discard(r["name"]) + self.assertEqual(0, len(expected_names)) + + +if __name__ == "__main__": + unittest.main() diff --git a/ravendb/tests/issue_tests/test_RDBC_1038.py b/ravendb/tests/issue_tests/test_RDBC_1038.py new file mode 100644 index 00000000..220c368b --- /dev/null +++ b/ravendb/tests/issue_tests/test_RDBC_1038.py @@ -0,0 +1,126 @@ +""" +RDBC-1038: stream_into() implemented; DocumentQuery.to_stream() delegates to session. + +C# reference: IDocumentSession.Advanced.StreamInto(), IDocumentQuery.ToStream() + +StreamInto writes {"Results": [...]} JSON to the output stream, matching the C# +QueryIntoStream test which does JObject.Load(new JsonTextReader(new StreamReader(stream))) +and asserts json.GetValue("Results").Children().Count() == expected, then verifies +each result's field value. +""" + +import io +import json +import unittest + +from ravendb.tests.test_base import TestBase + + +class TestStreamIntoUnit(unittest.TestCase): + """Unit tests — no server required.""" + + def test_stream_into_method_exists_on_advanced(self): + from ravendb.documents.session.document_session import DocumentSession + + self.assertTrue(hasattr(DocumentSession._Advanced, "stream_into")) + + def test_to_stream_method_exists_on_document_query(self): + from ravendb.documents.session.query import DocumentQuery + + self.assertTrue(hasattr(DocumentQuery, "to_stream")) + + def test_to_stream_method_exists_on_raw_document_query(self): + from ravendb.documents.session.query import RawDocumentQuery + + self.assertTrue(hasattr(RawDocumentQuery, "to_stream")) + + def test_stream_into_rejects_non_query_argument(self): + from ravendb.documents.session.document_session import DocumentSession + + advanced = object.__new__(DocumentSession._Advanced) + advanced._session = object() + with self.assertRaises(AttributeError): + advanced.stream_into("not_a_query", io.BytesIO()) + + +class TestStreamInto(TestBase): + """Integration tests — require a live server.""" + + def setUp(self): + super().setUp() + self.store = self.get_document_store() + + def tearDown(self): + super().tearDown() + self.store.close() + + def _seed_documents(self): + class Doc: + def __init__(self, name: str = None, value: int = 0): + self.name = name + self.value = value + + with self.store.open_session() as session: + for i in range(3): + d = Doc(f"doc{i}", i) + session.store(d, f"docs/{i}") + session.save_changes() + return Doc + + def test_stream_into_writes_results_json(self): + Doc = self._seed_documents() + + output = io.BytesIO() + with self.store.open_session() as session: + query = session.query(object_type=Doc) + session.advanced.stream_into(query, output) + + output.seek(0) + data = json.loads(output.read()) + self.assertIn("Results", data) + self.assertEqual(3, len(data["Results"])) + expected_names = {"doc0", "doc1", "doc2"} + for r in data["Results"]: + self.assertIn(r["name"], expected_names) + expected_names.discard(r["name"]) + self.assertEqual(0, len(expected_names)) + + def test_document_query_to_stream(self): + Doc = self._seed_documents() + + output = io.BytesIO() + with self.store.open_session() as session: + query = session.query(object_type=Doc) + query.to_stream(output) + + output.seek(0) + data = json.loads(output.read()) + self.assertIn("Results", data) + self.assertEqual(3, len(data["Results"])) + expected_names = {"doc0", "doc1", "doc2"} + for r in data["Results"]: + self.assertIn(r["name"], expected_names) + expected_names.discard(r["name"]) + self.assertEqual(0, len(expected_names)) + + def test_raw_document_query_to_stream(self): + self._seed_documents() + + output = io.BytesIO() + with self.store.open_session() as session: + query = session.advanced.raw_query("FROM Docs") + query.to_stream(output) + + output.seek(0) + data = json.loads(output.read()) + self.assertIn("Results", data) + self.assertEqual(3, len(data["Results"])) + expected_names = {"doc0", "doc1", "doc2"} + for r in data["Results"]: + self.assertIn(r["name"], expected_names) + expected_names.discard(r["name"]) + self.assertEqual(0, len(expected_names)) + + +if __name__ == "__main__": + unittest.main() diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/test_load_into_stream.py b/ravendb/tests/jvm_migrated_tests/client_tests/test_load_into_stream.py index 5ecc36ec..689464f3 100644 --- a/ravendb/tests/jvm_migrated_tests/client_tests/test_load_into_stream.py +++ b/ravendb/tests/jvm_migrated_tests/client_tests/test_load_into_stream.py @@ -1,3 +1,4 @@ +import io import json from ravendb import DocumentStore @@ -27,14 +28,32 @@ def _insert_employee(name: str = None): _insert_employee("Michael") session.save_changes() + def test_can_load_by_ids_into_stream(self): + self.insert_data(self.store) + # employees/1-A, employees/4-A, employees/7-A are the 1st, 4th, and 7th inserted docs + ids = ["employees/1-A", "employees/4-A", "employees/7-A"] + output = io.BytesIO() + with self.store.open_session() as session: + session.advanced.load_into_stream(ids, output) + output.seek(0) + json_node = json.loads(output.read().decode("utf-8")) + result = GetDocumentsResult.from_json(json_node) + self.assertEqual(3, len(result.results)) + names = ["Aviv", "Maxim", "Michael"] + for name_from_results in [r["first_name"] for r in result.results]: + self.assertIn(name_from_results, names) + names.remove(name_from_results) + def test_can_load_starting_with_into_stream(self): self.insert_data(self.store) + output = io.BytesIO() with self.store.open_session() as session: - stream = session.advanced.load_starting_with_into_stream("employees/") - json_node = json.loads(stream.decode("utf-8")) - result = GetDocumentsResult.from_json(json_node) - self.assertEqual(7, len(result.results)) - names = ["Aviv", "Iftah", "Tal", "Maxim", "Karmel", "Grisha", "Michael"] - for name_from_results in [result["first_name"] for result in result.results]: - self.assertIn(name_from_results, names) - names.remove(name_from_results) + session.advanced.load_starting_with_into_stream("employees/", output) + output.seek(0) + json_node = json.loads(output.read().decode("utf-8")) + result = GetDocumentsResult.from_json(json_node) + self.assertEqual(7, len(result.results)) + names = ["Aviv", "Iftah", "Tal", "Maxim", "Karmel", "Grisha", "Michael"] + for name_from_results in [result["first_name"] for result in result.results]: + self.assertIn(name_from_results, names) + names.remove(name_from_results)