Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ def __init__(self, store: "DocumentStore", key: uuid.UUID, options: SessionOptio
self._deleted_entities: Union[
Set[DeletedEntitiesHolder.DeletedEntitiesEnumeratorResult], DeletedEntitiesHolder
] = DeletedEntitiesHolder()
self._pending_key_deletes: List[Dict] = []
self._deferred_commands: List[CommandData] = []
self._deferred_commands_map: Dict[IdTypeAndName, CommandData] = {}
self._ids_for_creating_forced_revisions: Dict[str, ForceRevisionStrategy] = CaseInsensitiveDict()
Expand Down Expand Up @@ -542,14 +543,14 @@ def remove_after_save_changes(self, event: Callable[[AfterSaveChangesEventArgs],
def add_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
self._before_delete.append(event)

def remove_before_delete_entity(self, event: Callable[[BeforeDeleteEventArgs], None]):
def remove_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
self._before_delete.remove(event)

def add_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
self._before_query.append(event)

def remove_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
self._before_query.append(event)
self._before_query.remove(event)

def before_store_invoke(self, before_store_event_args: BeforeStoreEventArgs):
for event in self._before_store:
Expand Down Expand Up @@ -837,12 +838,14 @@ def delete(self, key_or_entity: Union[str, object], expected_change_vector: Opti
change_vector = change_vector if self._use_optimistic_concurrency else None
if self._counters_by_doc_id:
self._counters_by_doc_id.pop(key, None)
self.defer(
DeleteCommandData(
key,
expected_change_vector or change_vector,
expected_change_vector or (document_info.change_vector if document_info is not None else None),
)
self._pending_key_deletes.append(
{
"key": key,
"cv": expected_change_vector or change_vector,
"etag_cv": expected_change_vector
or (document_info.change_vector if document_info is not None else None),
"entity": document_info.entity if document_info is not None else None,
}
)
return

Expand Down Expand Up @@ -902,6 +905,11 @@ def __store_internal(
f"Document id:{key}"
)

if any(e["key"] == key for e in self._pending_key_deletes):
raise InvalidOperationException(
f"Can't store document, it was already deleted in this session. Document id: {key}"
)

if entity in self._deleted_entities:
raise RuntimeError(f"Can't store object, it was already deleted in this session. Document id {key}")

Expand Down Expand Up @@ -954,11 +962,24 @@ def _store_entity_in_unit_of_work(
if key is not None:
self._documents_by_id[key] = document_info

def __prepare_for_key_deletes(self, result: "InMemoryDocumentSessionOperations.SaveChangesData") -> None:
"""Fire OnBeforeDelete for string-key deletes and queue their commands.

Mirrors C#'s PrepareForEntitiesDeletion timing: the event fires during save_changes()
preparation (not during session.delete()), keeping both delete paths consistent.
"""
for entry in self._pending_key_deletes:
self.before_delete_invoke(BeforeDeleteEventArgs(self, entry["key"], entry["entity"]))
result.session_commands.append(DeleteCommandData(entry["key"], entry["cv"], entry["etag_cv"]))
if self._pending_key_deletes:
result.on_success.clear_pending_key_deletes()

def prepare_for_save_changes(self) -> SaveChangesData:
result = InMemoryDocumentSessionOperations.SaveChangesData(self)
deferred_commands_count = len(self._deferred_commands)

self.__prepare_for_entities_deletion(result, None)
self.__prepare_for_key_deletes(result)
self.__prepare_for_entities_puts(result)
self.__prepare_for_creating_revisions_from_ids(result)
self.__prepare_compare_exchange_entities(result)
Expand Down Expand Up @@ -1173,7 +1194,11 @@ def has_changes(self) -> bool:
if self._entity_changed(document, entity.value, None):
return True

return not len(self._deleted_entities) == 0 or not len(self._deferred_commands) == 0
return (
not len(self._deleted_entities) == 0
or not len(self._deferred_commands) == 0
or bool(self._pending_key_deletes)
)

def _what_changed(self) -> Dict[str, List[DocumentsChanges]]:
changes = {}
Expand Down Expand Up @@ -1913,6 +1938,7 @@ def __init__(self, session: InMemoryDocumentSessionOperations):
self.__documents_by_entity_to_remove: List = []
self.__document_infos_to_update: List[Tuple[DocumentInfo, dict]] = []
self.__clear_deleted_entities: bool = False
self.__clear_pending_key_deletes: bool = False

def remove_document_by_id(self, key: str):
self.__documents_by_id_to_remove.append(key)
Expand All @@ -1939,8 +1965,14 @@ def clear_session_state_after_successful_save_changes(self):
if self.__clear_deleted_entities:
self.__session._deleted_entities.clear()

if self.__clear_pending_key_deletes:
self.__session._pending_key_deletes.clear()

self.__session._deferred_commands.clear()
self.__session._deferred_commands_map.clear()

def clear_deleted_entities(self) -> None:
self.__clear_deleted_entities = True

def clear_pending_key_deletes(self) -> None:
self.__clear_pending_key_deletes = True
147 changes: 144 additions & 3 deletions ravendb/documents/store/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,30 @@ def operations(self) -> OperationExecutor:
def open_session(self, database: Optional[str] = None, session_options: Optional = None):
pass

def add_before_store(self, event: Callable[[BeforeStoreEventArgs], None]):
self.__before_store.append(event)

def remove_before_store(self, event: Callable[[BeforeStoreEventArgs], None]):
self.__before_store.remove(event)

def add_after_save_changes(self, event: Callable[[AfterSaveChangesEventArgs], None]):
self.__after_save_changes.append(event)

def remove_after_save_changes(self, event: Callable[[AfterSaveChangesEventArgs], None]):
self.__after_save_changes.remove(event)

def add_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
self.__before_delete.append(event)

def remove_before_delete(self, event: Callable[[BeforeDeleteEventArgs], None]):
self.__before_delete.remove(event)

def add_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
self.__before_query.append(event)

def remove_before_query(self, event: Callable[[BeforeQueryEventArgs], None]):
self.__before_query.remove(event)

def add_on_session_creation(self, event: Callable[[SessionCreatedEventArgs], None]):
self.__on_session_creation.append(event)

Expand Down Expand Up @@ -313,7 +337,7 @@ def __init__(self, urls: Union[str, List[str]] = None, database: Optional[str] =
self.urls = [urls] if isinstance(urls, str) else urls
self.database = database
self.__request_executors: Dict[str, Lazy[RequestExecutor]] = CaseInsensitiveDict()
# todo: aggressive cache
self.__aggressive_cache_changes: Dict[str, "DocumentStore._AggressiveCacheEviction"] = {}
self.__maintenance_operation_executor: Optional[MaintenanceOperationExecutor] = None
self.__operation_executor: Optional[OperationExecutor] = None
# todo: database smuggler
Expand Down Expand Up @@ -379,7 +403,9 @@ def close(self):
for event in self.__before_close:
event()

# todo: evict items from cache based on changes
for eviction in list(self.__aggressive_cache_changes.values()):
eviction.close()
self.__aggressive_cache_changes.clear()

while len(self.__database_changes) > 0:
self.__database_changes.popitem()[1].close()
Expand Down Expand Up @@ -529,7 +555,122 @@ def initialize(self) -> DocumentStore:
self._initialized = True
return self

# todo: aggressively cache
def aggressively_cache_for(
self,
cache_duration: datetime.timedelta,
database: Optional[str] = None,
mode: Optional["AggressiveCacheMode"] = None,
) -> "DocumentStore._AggressiveCacheContext":
from ravendb.http.misc import AggressiveCacheOptions, AggressiveCacheMode

self.assert_initialized()
database = self.get_effective_database(database)
if mode is None:
mode = AggressiveCacheMode.TRACK_CHANGES
request_executor = self.get_request_executor(database)
options = AggressiveCacheOptions(cache_duration, mode)
if mode != AggressiveCacheMode.DO_NOT_TRACK_CHANGES:
self._listen_to_changes_and_update_cache(database)
return DocumentStore._AggressiveCacheContext(request_executor, options)

def _listen_to_changes_and_update_cache(self, database: str) -> None:
# Fast path: already registered.
if database in self.__aggressive_cache_changes:
return
# Create the eviction object BEFORE acquiring the lock.
# _AggressiveCacheEviction.__init__ calls store.changes() which also
# acquires __add_change_lock, so constructing inside the lock would deadlock
# (threading.Lock is not reentrant). If two threads race here both will
# build an eviction object, but only one will be stored; the loser is
# discarded via close().
eviction = DocumentStore._AggressiveCacheEviction(self, database)
with self.__add_change_lock:
if database not in self.__aggressive_cache_changes:
self.__aggressive_cache_changes[database] = eviction
else:
eviction.close() # another thread won the race

def disable_aggressive_caching(self, database: Optional[str] = None) -> "DocumentStore._AggressiveCacheContext":
self.assert_initialized()
database = self.get_effective_database(database)
request_executor = self.get_request_executor(database)
return DocumentStore._AggressiveCacheContext(request_executor, None)

class _AggressiveCacheContext:
def __init__(self, request_executor, options):
self._request_executor = request_executor
self._options = options
self._old_options = None

def __enter__(self):
self._old_options = self._request_executor.aggressive_caching
self._request_executor.aggressive_caching = self._options
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._request_executor.aggressive_caching = self._old_options

class _AggressiveCacheEviction:
"""Subscribes to document/index changes and invalidates the HTTP cache on relevant events.

Mirrors C# EvictItemsFromCacheBasedOnChanges. The mechanism is simple: increment
cache.generation on every relevant change. Any ReleaseCacheItem already held by an
in-flight execute() will then have might_have_been_modified == True (its captured
generation no longer matches the current one), causing TRACK_CHANGES mode to fall
through to the server instead of short-circuiting.

On connection error we also increment — if we can't hear the server we have to assume
something changed. It's better to do one extra round-trip than to serve stale data.
"""

def __init__(self, store: "DocumentStore", database: str):
from ravendb.changes.observers import ActionObserver
from ravendb.changes.types import DocumentChangeType, IndexChangeTypes

self._request_executor = store.get_request_executor(database)
self._changes = store.changes(database)
self._unsubscribers: List[Callable[[], None]] = []

# Capture by reference so the lambdas below always see the live cache object,
# even if RequestExecutor.cache is replaced. (It isn't today, but be explicit.)
cache_ref = self._request_executor.cache

def _invalidate() -> None:
cache_ref.generation += 1

def on_document_change(change) -> None:
# Only Put and Delete affect query results; ConflictResolved etc. do not.
if change.type_of_change in (DocumentChangeType.PUT, DocumentChangeType.DELETE):
_invalidate()

def on_index_change(change) -> None:
# BatchCompleted means new index results are available; IndexRemoved means
# stale queries might have been using it.
if change.type_of_change in (IndexChangeTypes.BATCH_COMPLETED, IndexChangeTypes.INDEX_REMOVED):
_invalidate()

# subscribe_with_observer (not subscribe) so we can attach an on_error callback.
# subscribe() creates an ActionObserver with no on_error, which means a WebSocket
# disconnect silently swallows the error — cache.generation is never bumped and
# the aggressive cache serves stale data indefinitely after the connection dies.
self._unsubscribers.append(
self._changes.for_all_documents().subscribe_with_observer(
ActionObserver(on_next=on_document_change, on_error=lambda e: _invalidate())
)
)
self._unsubscribers.append(
self._changes.for_all_indexes().subscribe_with_observer(
ActionObserver(on_next=on_index_change, on_error=lambda e: _invalidate())
)
)

def close(self) -> None:
for unsub in self._unsubscribers:
try:
unsub()
except Exception:
pass
self._unsubscribers.clear()

def bulk_insert(self, database_name: str = None, options: BulkInsertOptions = None) -> BulkInsertOperation:
self.assert_initialized()
Expand Down
30 changes: 3 additions & 27 deletions ravendb/http/http_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def age(self) -> datetime.timedelta:

@property
def might_have_been_modified(self) -> bool:
if self.item is None:
return True
return self.item.generation != self.__cache_generation

def close(self):
Expand Down Expand Up @@ -73,7 +75,7 @@ def __setitem__(self, key, value):
self.__items.__setitem__(key, value)

def __getitem__(self, item):
self.__items.__getitem__(item)
return self.__items.__getitem__(item)

def close(self):
self.__items.clear()
Expand Down Expand Up @@ -108,29 +110,3 @@ def set_not_found(self, url: str, aggressively_cached: bool) -> None:
{ItemFlags.AGGRESSIVELY_CACHED, ItemFlags.NOT_FOUND} if aggressively_cached else {ItemFlags.NOT_FOUND}
)
self.__items[url] = http_cache_item

class ReleaseCacheItem:
def __init__(self, item: HttpCacheItem = None):
self.item: Union[None, HttpCacheItem] = item
self.__cache_generation = item.cache.generation if item else 0

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def not_modified(self) -> None:
if self.item is not None:
self.item.last_server_update = datetime.datetime.now()
self.item.generation = self.__cache_generation

@property
def age(self) -> datetime.timedelta:
if self.item is None:
return datetime.timedelta.max
return datetime.datetime.now() - self.item.last_server_update

@property
def might_have_been_modified(self) -> bool:
return self.item.generation != self.__cache_generation
Loading