Skip to content

Commit d3e2d3b

Browse files
tangg555fridayLglin93
authored
new feat of scheduler (#669)
* debug an error function name * feat: Add DynamicCache compatibility for different transformers versions - Fix build_kv_cache method in hf.py to handle both old and new DynamicCache structures - Support new 'layers' attribute with key_cache/value_cache or keys/values - Maintain backward compatibility with direct key_cache/value_cache attributes - Add comprehensive error handling and logging for unsupported structures - Update move_dynamic_cache_htod function in kv.py for cross-version compatibility - Handle layers-based structure in newer transformers versions - Support alternative attribute names (keys/values vs key_cache/value_cache) - Preserve original functionality for older transformers versions - Add comprehensive tests for DynamicCache compatibility - Test activation memory update with mock DynamicCache layers - Verify layers attribute access across different transformers versions - Fix scheduler logger mock to include memory_manager attribute This resolves AttributeError issues when using different versions of the transformers library and ensures robust handling of DynamicCache objects. debug * feat: implement APIAnalyzerForScheduler for memory operations - Add APIAnalyzerForScheduler class with search/add operations - Support requests and http.client with connection reuse - Include comprehensive error handling and dynamic configuration - Add English test suite with real-world conversation scenarios * feat: Add search_ws API endpoint and enhance API analyzer functionality - Add search_ws endpoint in server_router.py for scheduler-enabled search - Fix missing imports: time module, SearchRequest class, and get_mos_product_instance function - Implement search_ws method in api_analyzer.py with HTTP client support - Add _search_ws_with_requests and _search_ws_with_http_client private methods - Include search_ws usage example in demonstration code - Enhance scheduler and dispatcher capabilities for improved memory management - Expand test coverage to ensure functionality stability This update primarily strengthens the memory scheduling system's search capabilities, providing users with more flexible API interface options. * fix: resolve test failures and warnings in test suite - Fix Pydantic serialization warning in test_memos_chen_tang_hello_world * Add warnings filter to suppress UserWarning from Pydantic serialization - Fix KeyError: 'past_key_values' in test_build_kv_cache_and_generation * Update mock configuration to properly return forward_output with past_key_values * Add DynamicCache version compatibility handling in test mocks * Support both old and new transformers versions with layers/key_cache attributes * Improve assertion logic to check all model calls for required parameters - Update base_scheduler.py to use centralized DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE constant * Add import for DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE from general_schemas * Replace hardcoded value 100 with configurable constant (1000) All tests now pass successfully with proper version compatibility handling. * feat: add a test_robustness execution to test thread pool execution * feat: optimize scheduler configuration and API search functionality - Add DEFAULT_TOP_K and DEFAULT_CONTEXT_WINDOW_SIZE global constants in general_schemas.py - Update base_scheduler.py to use global default values instead of hardcoded numbers - Fix SchedulerConfigFactory initialization issue by using keyword argument expansion - Resolve UnboundLocalError variable conflict in search_memories_ws function - Fix indentation and parameter issues in OptimizedScheduler search_for_api method - Improve code standardization and maintainability * feat: Add Redis auto-initialization with fallback strategies - Add auto_initialize_redis() with config/env/local fallback - Move Redis logic from dispatcher_monitor to redis_service - Update base_scheduler to use auto initialization - Add proper resource cleanup and error handling * feat: add database connection management to ORM module - Add MySQL engine loading from environment variables in BaseDBManager - Add Redis connection loading from environment variables in BaseDBManager - Enhance database configuration validation and error handling - Complete database adapter infrastructure for ORM module - Provide unified database connection management interface This update provides comprehensive database connection management capabilities for the mem_scheduler module, supporting dynamic MySQL and Redis configuration loading from environment variables, establishing reliable data persistence foundation for scheduling services and API services. * remove part of test * feat: add Redis-based ORM with multiprocess synchronization - Add RedisDBManager and RedisLockableORM classes - Implement atomic locking mechanism for concurrent access - Add merge functionality for different object types - Include comprehensive test suite and examples - Fix Redis key type conflicts in lock operations * fix: resolve scheduler module import and Redis integration issues * revise naive memcube creation in server router * remove long-time tests in test_scheduler * remove redis test which needs .env * refactor all codes about mixture search with scheduler * fix: resolve Redis API synchronization issues and implement search API with reranker - Fix running_entries to running_task_ids migration across codebase - Update sync_search_data method to properly handle TaskRunningStatus - Correct variable naming and logic in API synchronization flow - Implement search API endpoint with reranker functionality - Update test files to reflect new running_task_ids convention - Ensure proper Redis state management for concurrent tasks * remove a test for api module * revise to pass the test suite * address some bugs to make mix_search normally running * modify codes according to evaluation logs * feat: Optimize mixture search and enhance API client * feat: Add conversation_turn tracking for session-based memory search - Add conversation_turn field to APIMemoryHistoryEntryItem schema with default value 0 - Implement session counter in OptimizedScheduler to track turn count per session_id - Update sync_search_data method to accept and store conversation_turn parameter - Maintain session history with LRU eviction (max 5 sessions) - Rename conversation_id to session_id for consistency with request object - Enable direct access to session_id from search requests This feature allows tracking conversation turns within the same session, providing better context for memory retrieval and search history management. * adress time bug in monitor * revise simple tree * add mode to evaluation client; rewrite print to logger.info in db files * feat: 1. add redis queue for scheduler 2. finish the code related to mix search and fine search * debug the working memory code * addressed a range of bugs to make scheduler running correctly * remove test_dispatch_parallel test * print change to logger.info * adjucted the core code related to fine and mixture apis * feat: create task queue to wrap local queue and redis queue. queue now split FIFO to multi queue from different users. addressed a range of bugs * fix bugs: debug bugs about internet trigger * debug get searcher mode * feat: add manual internet * Fix: fix code format * feat: add strategy for fine search * debug redis queue * debug redis queue * fix bugs: completely addressed bugs about redis queue * refactor: add searcher to handler_init; remove info log from task_queue * refactor: modify analyzer * refactor: revise locomo_eval to make it support llm other than gpt-4o-mini * feat: develop advanced searcher with deep search * feat: finish a complete version of deep search * refactor: refactor deep search feature, now only allowing one-round deep search * feat: implement the feature of get_tasks_status, but completed tasks are not recorded yet; waiting to be developed * debuging merged code; searching memories have bugs * change logging level * debug api evaluation * fix bugs: change top to top_k * change log * refactor: rewrite deep search to make it work better * change num_users * feat: developed and test task broker and orchestrator * Fix: Include task_id in ScheduleMessageItem serialization * Fix(Scheduler): Correct event log creation and task_id serialization * Feat(Scheduler): Add conditional detailed logging for KB updates Fix(Scheduler): Correct create_event_log indentation * Fix(Scheduler): Correct create_event_log call sites Reverts previous incorrect fix to scheduler_logger.py and correctly fixes the TypeError at the call sites in general_scheduler.py by removing the invalid 'log_content' kwarg and adding the missing memory_type kwargs. * Fix(Scheduler): Deserialize task_id in ScheduleMessageItem.from_dict This completes the fix for the task_id loss. The 'to_dict' method was previously fixed to serialize the task_id, but the corresponding 'from_dict' method was not updated to deserialize it, causing the value to be lost when messages were read from the queue. * Refactor(Config): Centralize RabbitMQ config override logic Moves all environment variable override logic into initialize_rabbitmq for a single source of truth. This ensures Nacos-provided environment variables for all RabbitMQ settings are respected over file configurations. Also removes now-redundant logging from the publish method. * Revert "Refactor(Config): Centralize RabbitMQ config override logic" This reverts commit b8cc42a. * Fix(Redis): Convert None task_id to empty string during serialization Resolves DataError in Redis Streams when task_id is None by ensuring it's serialized as an empty string instead of None, which Redis does not support. Applies to ScheduleMessageItem.to_dict method. * Feat(Log): Add diagnostic log to /product/add endpoint Adds an INFO level diagnostic log message at the beginning of the create_memory function to help verify code deployment. * Feat(Log): Add comprehensive diagnostic logs for /product/add flow Introduces detailed INFO level diagnostic logs across the entire call chain for the /product/add API endpoint. These logs include relevant context, such as full request bodies, message items before scheduler submission, and messages before RabbitMQ publication, to aid in debugging deployment discrepancies and tracing data flow, especially concerning task_id propagation. Logs added/enhanced in: - src/memos/api/routers/product_router.py - src/memos/api/handlers/add_handler.py - src/memos/multi_mem_cube/single_cube.py - src/memos/mem_os/core.py - src/memos/mem_scheduler/general_scheduler.py - src/memos/mem_scheduler/base_scheduler.py - src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py * Feat(Log): Add comprehensive diagnostic logs for /product/add flow and apply ruff formatting Introduces detailed INFO level diagnostic logs across the entire call chain for the /product/add API endpoint. These logs include relevant context, such as full request bodies, message items before scheduler submission, and messages before RabbitMQ publication, to aid in debugging deployment discrepancies and tracing data flow, especially concerning task_id propagation. Also applies automatic code formatting using ruff format to all modified files. Logs added/enhanced in: - src/memos/api/routers/product_router.py - src/memos/api/handlers/add_handler.py - src/memos/multi_mem_cube/single_cube.py - src/memos/mem_os/core.py - src/memos/mem_scheduler/general_scheduler.py - src/memos/mem_scheduler/base_scheduler.py - src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py * Fix(rabbitmq): Use env vars for KB updates and improve logging * Fix(rabbitmq): Explicitly use MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME and empty routing key for KB updates * Fix(add_handler): Update diagnostic log timestamp * Fix(add_handler): Update diagnostic log timestamp again (auto-updated) * Update default scheduler redis stream prefix * Update diagnostic timestamp in add handler * Allow optional log_content in scheduler event log * feat: new examples to test scheduelr * feat: fair scheduler and refactor of search function * fix bugs: address bugs caused by outdated test code * feat: add task_schedule_monitor * fix: handle nil mem_cube in scheduler message consumers * fix bugs: response messaged changed in memos code * refactor: revise task queue to allow it dealing with pending tasks when no task remaining * refactor: revise mixture search and scheduler logger * Fix scheduler task tracking * fix bugs: address ai review issues * fix bugs: address rabbitmq initialization failed when doing pytest * fix(scheduler): Correct dispatcher task and future tracking * Remove dump.rdb * fix bugs: revised message ack logics; refactor add log function * fix bugs: change Chinese notation to English * fix indent error in logger * fix bugs: addressed the issues caused by multiprocessing codes obtain same pending tasks * addMemory/updateMemory log * fix bugs: modify redis queue logics to make it run as expected * feat: add a default mem cube initialization for scheduler * address scheduler init bug * feat(scheduler): Propagate trace_id across process boundaries for mem… (#592) feat(scheduler): Propagate trace_id across process boundaries for mem_scheduler logs This commit addresses the issue where 'trace_id' was missing from logs generated by the 'mem_scheduler' module, especially when tasks were executed in separate processes. The changes implement a manual propagation of 'trace_id' from the message producer to the consumer: 1. **Schema Update**: Added an optional 'trace_id' field to 'ScheduleMessageItem' in 'src/memos/mem_scheduler/schemas/message_schemas.py' to allow 'trace_id' to be carried within messages. 2. **Producer-side Capture**: Modified 'src/memos/mem_scheduler/task_schedule_modules/task_queue.py' to capture the current 'trace_id' and embed it into the 'ScheduleMessageItem' before messages are enqueued. 3. **Consumer-side Context Re-establishment**: Updated 'src/memos/mem_scheduler/task_schedule_modules/dispatcher.py' to extract the 'trace_id' from incoming messages and re-establish the logging context using 'RequestContext' for each task's execution. This ensures all logs within a task's scope correctly include its associated 'trace_id', even when crossing process boundaries. This approach ensures robust and accurate tracing of tasks within the scheduler, enhancing observability and debugging capabilities. Co-authored-by: [email protected] <> * fix bugs: redis queue allows to reget pending tasks which exceeding idle time * fix(scheduler): Correct lazy-loading logic for mem_cube property * Add MONITOR_EVENT logs for scheduler lifecycle * fix: Resolve Ruff linting and formatting issues * Handle dequeue timestamp without pydantic errors * feat: orchestrator add task priority; move task labels into task_schemas; add synchronous execuation option in dispatcher * feat: more logs for debug * fix bugs: addresss some bugs * refactor: remove logger info in pref add function * refactor: change redis queue to periodically refresh pending tasks * feat: a faster and better redis queue * refactor: remove cleanup in redis queue * feat: allow directly execute task if task priority is level 1 * refactor: refactor log_add_handler and redis queue to make the code running better * fix bugs: fix the bug in _process_chat_data * fix: use message item_id for task status updates instead of execution id * style: format dispatcher.py with ruff * chore: emit dequeue for immediate tasks * fix: resolve ruff UP038 in base_scheduler.py * feat: add scheduler queue status endpoint * fix: lazy-init redis in queue status handler * fix: unwrap queue wrapper for redis status * fix bugs: fix a bug causing no schedule memory * feat: add a new env variable to set stream_prefix in redis; make add func hallucination filter to improve qualities of added memories * fix bugs: update start_listening in redis_queue * refactor: revise polardb and scheduelr init * feat: time task_broker; add a hallucination filter for simple struct add * feat & fix bugs: redis scheduler support periodically refresh active streams and deleted inactive streams; fix bugs of xautoclaims * refactor: revise the code according to llm suggestions * address ruff * modify examples * feat: process chunks from redis streams * refactor: update add operation --------- Co-authored-by: fridayL <[email protected]> Co-authored-by: [email protected] <> Co-authored-by: Zehao Lin <[email protected]> Co-authored-by: chunyu li <[email protected]>
1 parent 5b76b01 commit d3e2d3b

File tree

7 files changed

+525
-245
lines changed

7 files changed

+525
-245
lines changed

examples/mem_scheduler/task_stop_rerun.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
2525
task_id = str(msg.item_id)
2626
file_path = tmp_dir / f"{task_id}.txt"
2727
try:
28-
print(f"writing {file_path}...")
29-
file_path.write_text(f"Task {task_id} processed.\n")
3028
sleep(5)
29+
file_path.write_text(f"Task {task_id} processed.\n")
30+
print(f"writing {file_path} done")
3131
except Exception as e:
3232
print(f"Failed to write {file_path}: {e}")
3333

@@ -89,4 +89,5 @@ def submit_tasks():
8989

9090
# 7. Stop the scheduler
9191
print("Stopping the scheduler...")
92+
sleep(5)
9293
mem_scheduler.stop()

src/memos/mem_reader/simple_struct.py

Lines changed: 80 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ def get_memory(
453453
@staticmethod
454454
def _parse_hallucination_filter_response(text: str) -> tuple[bool, dict[int, dict]]:
455455
"""Parse index-keyed JSON from hallucination filter response.
456-
Expected shape: { "0": {"if_delete": bool, "rewritten memory content": str}, ... }
456+
Expected shape: { "0": {"delete": bool, "rewritten": str, "reason": str}, ... }
457457
Returns (success, parsed_dict) with int keys.
458458
"""
459459
try:
@@ -476,54 +476,82 @@ def _parse_hallucination_filter_response(text: str) -> tuple[bool, dict[int, dic
476476
continue
477477
if not isinstance(v, dict):
478478
continue
479-
delete_flag = v.get("delete_flag")
480-
rewritten = v.get("rewritten memory content", "")
481-
if isinstance(delete_flag, bool) and isinstance(rewritten, str):
482-
result[idx] = {"delete_flag": delete_flag, "rewritten memory content": rewritten}
479+
delete_flag = v.get("delete")
480+
rewritten = v.get("rewritten", "")
481+
reason = v.get("reason", "")
482+
if (
483+
isinstance(delete_flag, bool)
484+
and isinstance(rewritten, str)
485+
and isinstance(reason, str)
486+
):
487+
result[idx] = {"delete": delete_flag, "rewritten": rewritten, "reason": reason}
483488

484489
return (len(result) > 0), result
485490

486491
def filter_hallucination_in_memories(
487-
self, user_messages: list[str], memory_list: list[list[TextualMemoryItem]]
488-
):
489-
filtered_memory_list = []
490-
for group in memory_list:
491-
try:
492-
flat_memories = [one.memory for one in group]
493-
template = PROMPT_MAPPING["hallucination_filter"]
494-
prompt_args = {
495-
"user_messages_inline": "\n".join(user_messages),
496-
"memories_inline": json.dumps(flat_memories, ensure_ascii=False, indent=2),
497-
}
498-
prompt = template.format(**prompt_args)
492+
self, user_messages: list[str], memory_list: list[TextualMemoryItem]
493+
) -> list[TextualMemoryItem]:
494+
flat_memories = [one.memory for one in memory_list]
495+
template = PROMPT_MAPPING["hallucination_filter"]
496+
prompt_args = {
497+
"user_messages_inline": "\n".join([f"- {memory}" for memory in user_messages]),
498+
"memories_inline": json.dumps(
499+
{str(i): memory for i, memory in enumerate(flat_memories)},
500+
ensure_ascii=False,
501+
indent=2,
502+
),
503+
}
504+
prompt = template.format(**prompt_args)
499505

500-
# Optionally run filter and parse the output
501-
try:
502-
raw = self.llm.generate(prompt)
503-
success, parsed = self._parse_hallucination_filter_response(raw)
504-
logger.info(f"Hallucination filter parsed successfully: {success}")
505-
new_mem_list = []
506-
if success:
507-
logger.info(f"Hallucination filter result: {parsed}")
508-
for mem_idx, (delete_flag, rewritten_mem_content) in parsed.items():
509-
if not delete_flag:
510-
group[mem_idx].memory = rewritten_mem_content
511-
new_mem_list.append(group[mem_idx])
512-
filtered_memory_list.append(new_mem_list)
513-
logger.info(
514-
f"Successfully transform origianl memories from {group} to {new_mem_list}."
515-
)
516-
else:
506+
# Optionally run filter and parse the output
507+
try:
508+
raw = self.llm.generate([{"role": "user", "content": prompt}])
509+
success, parsed = self._parse_hallucination_filter_response(raw)
510+
logger.info(
511+
f"[filter_hallucination_in_memories] Hallucination filter parsed successfully: {success}"
512+
)
513+
if success:
514+
logger.info(f"Hallucination filter result: {parsed}")
515+
total = len(memory_list)
516+
keep_flags = [True] * total
517+
for mem_idx, content in parsed.items():
518+
# Validate index bounds
519+
if not isinstance(mem_idx, int) or mem_idx < 0 or mem_idx >= total:
517520
logger.warning(
518-
"Hallucination filter parsing failed or returned empty result."
521+
f"[filter_hallucination_in_memories] Ignoring out-of-range index: {mem_idx}"
519522
)
520-
except Exception as e:
521-
logger.error(f"Hallucination filter execution error: {e}", stack_info=True)
522-
filtered_memory_list.append(group)
523-
except Exception:
524-
logger.error("Fail to filter memories", stack_info=True)
525-
filtered_memory_list.append(group)
526-
return filtered_memory_list
523+
continue
524+
525+
delete_flag = content.get("delete", False)
526+
rewritten = content.get("rewritten", None)
527+
reason = content.get("reason", "")
528+
529+
logger.info(
530+
f"[filter_hallucination_in_memories] index={mem_idx}, delete={delete_flag}, rewritten='{(rewritten or '')[:100]}', reason='{reason[:120]}'"
531+
)
532+
533+
if delete_flag is True and rewritten is not None:
534+
# Mark for deletion
535+
keep_flags[mem_idx] = False
536+
else:
537+
# Apply rewrite if provided (safe-by-default: keep item when not mentioned or delete=False)
538+
try:
539+
if isinstance(rewritten, str):
540+
memory_list[mem_idx].memory = rewritten
541+
except Exception as e:
542+
logger.warning(
543+
f"[filter_hallucination_in_memories] Failed to apply rewrite for index {mem_idx}: {e}"
544+
)
545+
546+
# Build result, preserving original order; keep items not mentioned by LLM by default
547+
new_mem_list = [memory_list[i] for i in range(total) if keep_flags[i]]
548+
return new_mem_list
549+
else:
550+
logger.warning("Hallucination filter parsing failed or returned empty result.")
551+
except Exception as e:
552+
logger.error(f"Hallucination filter execution error: {e}", stack_info=True)
553+
554+
return memory_list
527555

528556
def _read_memory(
529557
self, messages: list[MessagesType], type: str, info: dict[str, Any], mode: str = "fine"
@@ -572,11 +600,16 @@ def _read_memory(
572600

573601
if os.getenv("SIMPLE_STRUCT_ADD_FILTER", "false") == "true":
574602
# Build inputs
575-
user_messages = [msg.content for msg in messages if msg.role == "user"]
576-
memory_list = self.filter_hallucination_in_memories(
577-
user_messages=user_messages, memory_list=memory_list
578-
)
579-
603+
new_memory_list = []
604+
for unit_messages, unit_memory_list in zip(messages, memory_list, strict=False):
605+
unit_user_messages = [
606+
msg["content"] for msg in unit_messages if msg["role"] == "user"
607+
]
608+
unit_memory_list = self.filter_hallucination_in_memories(
609+
user_messages=unit_user_messages, memory_list=unit_memory_list
610+
)
611+
new_memory_list.append(unit_memory_list)
612+
memory_list = new_memory_list
580613
return memory_list
581614

582615
def fine_transfer_simple_mem(

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def __init__(self, config: GeneralSchedulerConfig):
6666
def long_memory_update_process(
6767
self, user_id: str, mem_cube_id: str, messages: list[ScheduleMessageItem]
6868
):
69-
mem_cube = self.current_mem_cube
69+
mem_cube = self.mem_cube
7070

7171
# update query monitors
7272
for msg in messages:
@@ -109,8 +109,8 @@ def long_memory_update_process(
109109

110110
query_db_manager = self.monitor.query_monitors[user_id][mem_cube_id]
111111
query_db_manager.obj.put(item=item)
112-
# Sync with database after adding new item
113-
query_db_manager.sync_with_orm()
112+
# Sync with database after adding new item
113+
query_db_manager.sync_with_orm()
114114
logger.debug(
115115
f"Queries in monitor for user_id={user_id}, mem_cube_id={mem_cube_id}: {query_db_manager.obj.get_queries_with_timesort()}"
116116
)
@@ -162,7 +162,7 @@ def long_memory_update_process(
162162
label=QUERY_TASK_LABEL,
163163
user_id=user_id,
164164
mem_cube_id=mem_cube_id,
165-
mem_cube=self.current_mem_cube,
165+
mem_cube=self.mem_cube,
166166
)
167167

168168
def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
@@ -249,7 +249,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
249249
to_memory_type=NOT_APPLICABLE_TYPE,
250250
user_id=msg.user_id,
251251
mem_cube_id=msg.mem_cube_id,
252-
mem_cube=self.current_mem_cube,
252+
mem_cube=self.mem_cube,
253253
memcube_log_content=[
254254
{
255255
"content": f"[User] {msg.content}",
@@ -305,7 +305,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
305305
to_memory_type=NOT_APPLICABLE_TYPE,
306306
user_id=msg.user_id,
307307
mem_cube_id=msg.mem_cube_id,
308-
mem_cube=self.current_mem_cube,
308+
mem_cube=self.mem_cube,
309309
memcube_log_content=[
310310
{
311311
"content": f"[Assistant] {msg.content}",
@@ -338,7 +338,7 @@ def log_add_messages(self, msg: ScheduleMessageItem):
338338
try:
339339
# This mem_item represents the NEW content that was just added/processed
340340
mem_item: TextualMemoryItem | None = None
341-
mem_item = self.current_mem_cube.text_mem.get(
341+
mem_item = self.mem_cube.text_mem.get(
342342
memory_id=memory_id, user_name=msg.mem_cube_id
343343
)
344344
if mem_item is None:
@@ -352,8 +352,8 @@ def log_add_messages(self, msg: ScheduleMessageItem):
352352
original_item_id = None
353353

354354
# Only check graph_store if a key exists and the text_mem has a graph_store
355-
if key and hasattr(self.current_mem_cube.text_mem, "graph_store"):
356-
candidates = self.current_mem_cube.text_mem.graph_store.get_by_metadata(
355+
if key and hasattr(self.mem_cube.text_mem, "graph_store"):
356+
candidates = self.mem_cube.text_mem.graph_store.get_by_metadata(
357357
[
358358
{"field": "key", "op": "=", "value": key},
359359
{
@@ -368,7 +368,7 @@ def log_add_messages(self, msg: ScheduleMessageItem):
368368
original_item_id = candidates[0]
369369
# Crucial step: Fetch the original content for updates
370370
# This `get` is for the *existing* memory that will be updated
371-
original_mem_item = self.current_mem_cube.text_mem.get(
371+
original_mem_item = self.mem_cube.text_mem.get(
372372
memory_id=original_item_id, user_name=msg.mem_cube_id
373373
)
374374
original_content = original_mem_item.memory
@@ -481,7 +481,7 @@ def send_add_log_messages_to_local_env(
481481
to_memory_type=LONG_TERM_MEMORY_TYPE,
482482
user_id=msg.user_id,
483483
mem_cube_id=msg.mem_cube_id,
484-
mem_cube=self.current_mem_cube,
484+
mem_cube=self.mem_cube,
485485
memcube_log_content=add_content_legacy,
486486
metadata=add_meta_legacy,
487487
memory_len=len(add_content_legacy),
@@ -496,7 +496,7 @@ def send_add_log_messages_to_local_env(
496496
to_memory_type=LONG_TERM_MEMORY_TYPE,
497497
user_id=msg.user_id,
498498
mem_cube_id=msg.mem_cube_id,
499-
mem_cube=self.current_mem_cube,
499+
mem_cube=self.mem_cube,
500500
memcube_log_content=update_content_legacy,
501501
metadata=update_meta_legacy,
502502
memory_len=len(update_content_legacy),
@@ -562,7 +562,7 @@ def send_add_log_messages_to_cloud_env(
562562
to_memory_type=LONG_TERM_MEMORY_TYPE,
563563
user_id=msg.user_id,
564564
mem_cube_id=msg.mem_cube_id,
565-
mem_cube=self.current_mem_cube,
565+
mem_cube=self.mem_cube,
566566
memcube_log_content=kb_log_content,
567567
metadata=None,
568568
memory_len=len(kb_log_content),
@@ -577,7 +577,7 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
577577
if not messages:
578578
return
579579
message = messages[0]
580-
mem_cube = self.current_mem_cube
580+
mem_cube = self.mem_cube
581581

582582
user_id = message.user_id
583583
mem_cube_id = message.mem_cube_id
@@ -744,9 +744,9 @@ def process_message(message: ScheduleMessageItem):
744744
try:
745745
user_id = message.user_id
746746
mem_cube_id = message.mem_cube_id
747-
mem_cube = self.current_mem_cube
747+
mem_cube = self.mem_cube
748748
if mem_cube is None:
749-
logger.warning(
749+
logger.error(
750750
f"mem_cube is None for user_id={user_id}, mem_cube_id={mem_cube_id}, skipping processing",
751751
stack_info=True,
752752
)
@@ -923,7 +923,7 @@ def _process_memories_with_reader(
923923
to_memory_type=LONG_TERM_MEMORY_TYPE,
924924
user_id=user_id,
925925
mem_cube_id=mem_cube_id,
926-
mem_cube=self.current_mem_cube,
926+
mem_cube=self.mem_cube,
927927
memcube_log_content=kb_log_content,
928928
metadata=None,
929929
memory_len=len(kb_log_content),
@@ -968,7 +968,7 @@ def _process_memories_with_reader(
968968
to_memory_type=LONG_TERM_MEMORY_TYPE,
969969
user_id=user_id,
970970
mem_cube_id=mem_cube_id,
971-
mem_cube=self.current_mem_cube,
971+
mem_cube=self.mem_cube,
972972
memcube_log_content=add_content_legacy,
973973
metadata=add_meta_legacy,
974974
memory_len=len(add_content_legacy),
@@ -1036,7 +1036,7 @@ def _process_memories_with_reader(
10361036
to_memory_type=LONG_TERM_MEMORY_TYPE,
10371037
user_id=user_id,
10381038
mem_cube_id=mem_cube_id,
1039-
mem_cube=self.current_mem_cube,
1039+
mem_cube=self.mem_cube,
10401040
memcube_log_content=kb_log_content,
10411041
metadata=None,
10421042
memory_len=len(kb_log_content),
@@ -1054,7 +1054,7 @@ def process_message(message: ScheduleMessageItem):
10541054
try:
10551055
user_id = message.user_id
10561056
mem_cube_id = message.mem_cube_id
1057-
mem_cube = self.current_mem_cube
1057+
mem_cube = self.mem_cube
10581058
if mem_cube is None:
10591059
logger.warning(
10601060
f"mem_cube is None for user_id={user_id}, mem_cube_id={mem_cube_id}, skipping processing"
@@ -1284,7 +1284,7 @@ def _pref_add_message_consumer(self, messages: list[ScheduleMessageItem]) -> Non
12841284

12851285
def process_message(message: ScheduleMessageItem):
12861286
try:
1287-
mem_cube = self.current_mem_cube
1287+
mem_cube = self.mem_cube
12881288
if mem_cube is None:
12891289
logger.warning(
12901290
f"mem_cube is None for user_id={message.user_id}, mem_cube_id={message.mem_cube_id}, skipping processing"

src/memos/mem_scheduler/optimized_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ def update_search_memories_to_redis(
230230
memories: list[TextualMemoryItem] = self.search_memories(
231231
search_req=APISearchRequest(**content_dict["search_req"]),
232232
user_context=UserContext(**content_dict["user_context"]),
233-
mem_cube=self.current_mem_cube,
233+
mem_cube=self.mem_cube,
234234
mode=SearchMode.FAST,
235235
)
236236
formatted_memories = [format_textual_memory_item(data) for data in memories]

src/memos/mem_scheduler/schemas/task_schemas.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ class TaskPriorityLevel(Enum):
6060
# Interval in seconds for batching and cleaning up deletions (xdel)
6161
DEFAULT_DELETE_CLEANUP_INTERVAL_SEC = 30.0
6262

63+
# Inactivity threshold for stream deletion
64+
# Delete streams whose last message ID timestamp is older than this threshold.
65+
# Unit: seconds. Default: 1 day.
66+
DEFAULT_STREAM_INACTIVITY_DELETE_SECONDS = 86_400.0
67+
68+
# Recency threshold for active streams
69+
# Consider a stream "active" if its last message is within this window.
70+
# Unit: seconds. Default: 30 minutes.
71+
DEFAULT_STREAM_RECENT_ACTIVE_SECONDS = 1_800.0
72+
6373

6474
# task queue
6575
DEFAULT_STREAM_KEY_PREFIX = os.getenv(

0 commit comments

Comments
 (0)