Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 39 additions & 20 deletions ravendb/documents/session/document_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,21 +367,21 @@ def _load_internal(

return load_operation.get_documents(object_type)

def _load_internal_stream(self, keys: List[str], operation: LoadOperation, stream: Optional[bytes] = None) -> None:
def _load_internal_stream(self, keys: List[str], operation: LoadOperation, stream=None) -> None:
operation.by_keys(keys)

command = operation.create_request()

if command:
self._request_executor.execute_command(command, self.session_info)

if stream:
if stream is not None:
try:
result = command.result
stream_to_dict = json.loads(stream.decode("utf-8"))
result.__dict__.update(stream_to_dict)
except IOError as e:
raise RuntimeError(f"Unable to serialize returned value into stream {e.args[0]}", e)
data = json.dumps({k: v for k, v in result.to_json().items() if v is not None}).encode("utf-8")
stream.write(data)
except Exception as e:
raise RuntimeError("Unable to serialize returned value into stream") from e
else:
operation.set_result(command.result)

Expand All @@ -404,16 +404,19 @@ def load_starting_with(
def load_starting_with_into_stream(
self,
id_prefix: str,
output,
matches: str = None,
start: int = 0,
page_size: int = 25,
exclude: str = None,
start_after: str = None,
) -> bytes:
) -> None:
if id_prefix is None:
raise ValueError("Arg 'id_prefix' is cannot be None.")
return self._load_starting_with_into_stream_internal(
id_prefix, LoadStartingWithOperation(self), matches, start, page_size, exclude, start_after
raise ValueError("id_prefix cannot be None.")
if output is None:
raise ValueError("output cannot be None")
self._load_starting_with_into_stream_internal(
id_prefix, LoadStartingWithOperation(self), output, matches, start, page_size, exclude, start_after
)

def _load_starting_with_internal(
Expand All @@ -437,23 +440,23 @@ def _load_starting_with_into_stream_internal(
self,
id_prefix: str,
operation: LoadStartingWithOperation,
output,
matches: str,
start: int,
page_size: int,
exclude: str,
start_after: str,
) -> bytes:
) -> None:
operation.with_start_with(id_prefix, matches, start, page_size, exclude, start_after)
command = operation.create_request()
bytes_result = None
if command:
self.request_executor.execute_command(command, self.session_info)
try:
result = command.result
bytes_result = json.dumps(result.to_json()).encode("utf-8")
data = json.dumps({k: v for k, v in result.to_json().items() if v is not None}).encode("utf-8")
output.write(data)
except Exception as e:
raise RuntimeError("Unable sto serialize returned value into stream") from e
return bytes_result
raise RuntimeError("Unable to serialize returned value into stream") from e

def document_query_from_index_type(self, index_type: Type[_TIndex], object_type: Type[_T]) -> DocumentQuery[_T]:
try:
Expand Down Expand Up @@ -873,19 +876,22 @@ def load_starting_with(
def load_starting_with_into_stream(
self,
id_prefix: str,
output,
matches: str = None,
start: int = 0,
page_size: int = 25,
exclude: str = None,
start_after: str = None,
) -> bytes:
) -> None:
return self._session.load_starting_with_into_stream(
id_prefix, matches, start, page_size, exclude, start_after
id_prefix, output, matches, start, page_size, exclude, start_after
)

def load_into_stream(self, keys: List[str], output: bytes) -> None:
def load_into_stream(self, keys: List[str], output) -> None:
if keys is None:
raise ValueError("Keys cannot be None")
if output is None:
raise ValueError("output cannot be None")

self._session._load_internal_stream(keys, LoadOperation(self._session), output)

Expand Down Expand Up @@ -1172,8 +1178,21 @@ def _yield_result(self, query: AbstractDocumentQuery, enumerator: Iterator[Dict]
object_type=query.query_class,
)

def stream_into(self): # query: Union[DocumentQuery, RawDocumentQuery], output: iter):
pass
def stream_into(self, query: AbstractDocumentQuery, output) -> None:
stream_operation = StreamOperation(self._session)
command = stream_operation.create_request(query.index_query)

self.request_executor.execute_command(command, self.session_info)

with stream_operation.set_result(command.result) as result_iter:
output.write(b'{"Results":[')
first = True
for item in result_iter:
if not first:
output.write(b",")
output.write(json.dumps(item).encode("utf-8"))
first = False
output.write(b"]}")

def conditional_load(
self, key: str, change_vector: str, object_type: Type[_T] = None
Expand Down
6 changes: 6 additions & 0 deletions ravendb/documents/session/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2757,6 +2757,9 @@ def suggest_using(
self._suggest_using(suggestion_or_builder)
return SuggestionDocumentQuery(self)

def to_stream(self, output) -> None:
self._the_session.advanced.stream_into(self, output)


class RawDocumentQuery(Generic[_T], AbstractDocumentQuery[_T]):
def __init__(self, object_type: Type[_T], session: InMemoryDocumentSessionOperations, raw_query: str):
Expand Down Expand Up @@ -2834,6 +2837,9 @@ def projection(self, projection_behavior: ProjectionBehavior) -> RawDocumentQuer
self._projection(projection_behavior)
return self

def to_stream(self, output) -> None:
self._the_session.advanced.stream_into(self, output)


class DocumentQueryCustomizationDelegate(DocumentQueryCustomization):
def __init__(self, query: AbstractDocumentQuery):
Expand Down
106 changes: 106 additions & 0 deletions ravendb/tests/issue_tests/test_RDBC_1037.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
RDBC-1037: load_into_stream writes to BytesIO output; load_starting_with_into_stream
takes an output parameter and writes to it.

C# reference: IAdvancedSessionOperations.LoadIntoStream() / LoadStartingWithIntoStream()
"""

import io
import json
import unittest

from ravendb.tests.test_base import TestBase


class TestLoadIntoStreamUnit(unittest.TestCase):
"""Unit tests — no server required."""

def test_load_into_stream_method_exists_on_advanced(self):
from ravendb.documents.session.document_session import DocumentSession

self.assertTrue(hasattr(DocumentSession._Advanced, "load_into_stream"))

def test_load_starting_with_into_stream_method_exists(self):
from ravendb.documents.session.document_session import DocumentSession

self.assertTrue(hasattr(DocumentSession._Advanced, "load_starting_with_into_stream"))


class TestLoadIntoStream(TestBase):
"""Integration tests — require a live server."""

def setUp(self):
super().setUp()
self.store = self.get_document_store()

def tearDown(self):
super().tearDown()
self.store.close()

def test_load_into_stream_writes_to_bytesio(self):
class Doc:
def __init__(self, name: str = None):
self.name = name

with self.store.open_session() as session:
session.store(Doc("alpha"), "docs/1")
session.store(Doc("beta"), "docs/2")
session.save_changes()

output = io.BytesIO()
with self.store.open_session() as session:
session.advanced.load_into_stream(["docs/1", "docs/2"], output)

output.seek(0)
data = json.loads(output.read())
self.assertIn("Results", data)
names = [r["name"] for r in data["Results"]]
self.assertIn("alpha", names)
self.assertIn("beta", names)

def test_load_into_stream_single_document(self):
class Doc:
def __init__(self, name: str = None):
self.name = name

with self.store.open_session() as session:
session.store(Doc("gamma"), "docs/3")
session.save_changes()

output = io.BytesIO()
with self.store.open_session() as session:
session.advanced.load_into_stream(["docs/3"], output)

output.seek(0)
data = json.loads(output.read())
self.assertIn("Results", data)
self.assertEqual(1, len(data["Results"]))
self.assertEqual("gamma", data["Results"][0]["name"])

def test_load_starting_with_into_stream_writes_to_bytesio(self):
class Doc:
def __init__(self, name: str = None):
self.name = name

with self.store.open_session() as session:
session.store(Doc("one"), "prefix/1")
session.store(Doc("two"), "prefix/2")
session.save_changes()

output = io.BytesIO()
with self.store.open_session() as session:
session.advanced.load_starting_with_into_stream("prefix/", output)

output.seek(0)
data = json.loads(output.read())
self.assertIn("Results", data)
self.assertEqual(2, len(data["Results"]))
expected_names = {"one", "two"}
for r in data["Results"]:
self.assertIn(r["name"], expected_names)
expected_names.discard(r["name"])
self.assertEqual(0, len(expected_names))


if __name__ == "__main__":
unittest.main()
126 changes: 126 additions & 0 deletions ravendb/tests/issue_tests/test_RDBC_1038.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
RDBC-1038: stream_into() implemented; DocumentQuery.to_stream() delegates to session.

C# reference: IDocumentSession.Advanced.StreamInto(), IDocumentQuery.ToStream()

StreamInto writes {"Results": [...]} JSON to the output stream, matching the C#
QueryIntoStream test which does JObject.Load(new JsonTextReader(new StreamReader(stream)))
and asserts json.GetValue("Results").Children().Count() == expected, then verifies
each result's field value.
"""

import io
import json
import unittest

from ravendb.tests.test_base import TestBase


class TestStreamIntoUnit(unittest.TestCase):
"""Unit tests — no server required."""

def test_stream_into_method_exists_on_advanced(self):
from ravendb.documents.session.document_session import DocumentSession

self.assertTrue(hasattr(DocumentSession._Advanced, "stream_into"))

def test_to_stream_method_exists_on_document_query(self):
from ravendb.documents.session.query import DocumentQuery

self.assertTrue(hasattr(DocumentQuery, "to_stream"))

def test_to_stream_method_exists_on_raw_document_query(self):
from ravendb.documents.session.query import RawDocumentQuery

self.assertTrue(hasattr(RawDocumentQuery, "to_stream"))

def test_stream_into_rejects_non_query_argument(self):
from ravendb.documents.session.document_session import DocumentSession

advanced = object.__new__(DocumentSession._Advanced)
advanced._session = object()
with self.assertRaises(AttributeError):
advanced.stream_into("not_a_query", io.BytesIO())


class TestStreamInto(TestBase):
"""Integration tests — require a live server."""

def setUp(self):
super().setUp()
self.store = self.get_document_store()

def tearDown(self):
super().tearDown()
self.store.close()

def _seed_documents(self):
class Doc:
def __init__(self, name: str = None, value: int = 0):
self.name = name
self.value = value

with self.store.open_session() as session:
for i in range(3):
d = Doc(f"doc{i}", i)
session.store(d, f"docs/{i}")
session.save_changes()
return Doc

def test_stream_into_writes_results_json(self):
Doc = self._seed_documents()

output = io.BytesIO()
with self.store.open_session() as session:
query = session.query(object_type=Doc)
session.advanced.stream_into(query, output)

output.seek(0)
data = json.loads(output.read())
self.assertIn("Results", data)
self.assertEqual(3, len(data["Results"]))
expected_names = {"doc0", "doc1", "doc2"}
for r in data["Results"]:
self.assertIn(r["name"], expected_names)
expected_names.discard(r["name"])
self.assertEqual(0, len(expected_names))

def test_document_query_to_stream(self):
Doc = self._seed_documents()

output = io.BytesIO()
with self.store.open_session() as session:
query = session.query(object_type=Doc)
query.to_stream(output)

output.seek(0)
data = json.loads(output.read())
self.assertIn("Results", data)
self.assertEqual(3, len(data["Results"]))
expected_names = {"doc0", "doc1", "doc2"}
for r in data["Results"]:
self.assertIn(r["name"], expected_names)
expected_names.discard(r["name"])
self.assertEqual(0, len(expected_names))

def test_raw_document_query_to_stream(self):
self._seed_documents()

output = io.BytesIO()
with self.store.open_session() as session:
query = session.advanced.raw_query("FROM Docs")
query.to_stream(output)

output.seek(0)
data = json.loads(output.read())
self.assertIn("Results", data)
self.assertEqual(3, len(data["Results"]))
expected_names = {"doc0", "doc1", "doc2"}
for r in data["Results"]:
self.assertIn(r["name"], expected_names)
expected_names.discard(r["name"])
self.assertEqual(0, len(expected_names))


if __name__ == "__main__":
unittest.main()
Loading