Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
169 commits
Select commit Hold shift + click to select a range
11b63e6
debug an error function name
tangg555 Oct 20, 2025
72e8f39
feat: Add DynamicCache compatibility for different transformers versions
tangg555 Oct 20, 2025
5702870
feat: implement APIAnalyzerForScheduler for memory operations
tangg555 Oct 21, 2025
4655b41
feat: Add search_ws API endpoint and enhance API analyzer functionality
tangg555 Oct 21, 2025
c20736c
fix: resolve test failures and warnings in test suite
tangg555 Oct 21, 2025
da72e7e
feat: add a test_robustness execution to test thread pool execution
tangg555 Oct 21, 2025
5b9b1e4
feat: optimize scheduler configuration and API search functionality
tangg555 Oct 22, 2025
6dac11e
feat: Add Redis auto-initialization with fallback strategies
tangg555 Oct 22, 2025
a207bf4
feat: add database connection management to ORM module
tangg555 Oct 24, 2025
8c1cc04
remove part of test
tangg555 Oct 24, 2025
f2b0da4
feat: add Redis-based ORM with multiprocess synchronization
tangg555 Oct 24, 2025
f0e8aab
fix: resolve scheduler module import and Redis integration issues
tangg555 Oct 24, 2025
731f00d
revise naive memcube creation in server router
tangg555 Oct 25, 2025
6d442fb
remove long-time tests in test_scheduler
tangg555 Oct 25, 2025
157f858
remove redis test which needs .env
tangg555 Oct 25, 2025
c483011
refactor all codes about mixture search with scheduler
tangg555 Oct 25, 2025
b81b82e
fix: resolve Redis API synchronization issues and implement search AP…
tangg555 Oct 26, 2025
90d1a0b
remove a test for api module
tangg555 Oct 26, 2025
1de72cf
revise to pass the test suite
tangg555 Oct 26, 2025
c72858e
addressed all conflicts
tangg555 Oct 27, 2025
3245376
address some bugs to make mix_search normally running
tangg555 Oct 27, 2025
57482cf
modify codes according to evaluation logs
tangg555 Oct 27, 2025
e4b8313
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 27, 2025
011d248
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 28, 2025
8c8d672
feat: Optimize mixture search and enhance API client
tangg555 Oct 28, 2025
aabad8d
feat: Add conversation_turn tracking for session-based memory search
tangg555 Oct 28, 2025
3faa5c3
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 28, 2025
c6376cd
adress time bug in monitor
tangg555 Oct 29, 2025
bd0b234
revise simple tree
tangg555 Oct 29, 2025
5332d12
add mode to evaluation client; rewrite print to logger.info in db files
tangg555 Oct 29, 2025
aee13ba
feat: 1. add redis queue for scheduler 2. finish the code related to …
tangg555 Nov 5, 2025
f957967
debug the working memory code
tangg555 Nov 5, 2025
f520cca
addressed conflicts to merge
tangg555 Nov 5, 2025
a3f6636
addressed a range of bugs to make scheduler running correctly
tangg555 Nov 5, 2025
47e9851
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 5, 2025
161af12
remove test_dispatch_parallel test
tangg555 Nov 5, 2025
1d8d14b
print change to logger.info
tangg555 Nov 5, 2025
00e3a75
addressed conflicts
tangg555 Nov 6, 2025
2852e56
adjucted the core code related to fine and mixture apis
tangg555 Nov 17, 2025
5d3cf45
addressed conflicts
tangg555 Nov 17, 2025
ab71f17
feat: create task queue to wrap local queue and redis queue. queue no…
tangg555 Nov 18, 2025
7665cda
fix bugs: debug bugs about internet trigger
tangg555 Nov 18, 2025
3559323
debug get searcher mode
tangg555 Nov 18, 2025
7c8e0d0
feat: add manual internet
fridayL Nov 18, 2025
27b0971
Merge branch 'feat/redis_scheduler' of https://github.com/MemTensor/M…
fridayL Nov 18, 2025
94d456b
Fix: fix code format
fridayL Nov 18, 2025
87b5358
feat: add strategy for fine search
tangg555 Nov 18, 2025
127fdc7
debug redis queue
tangg555 Nov 18, 2025
0911ced
debug redis queue
tangg555 Nov 18, 2025
d1a7261
fix bugs: completely addressed bugs about redis queue
tangg555 Nov 18, 2025
232be6f
refactor: add searcher to handler_init; remove info log from task_queue
tangg555 Nov 19, 2025
d16a7c8
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 19, 2025
bc7236f
refactor: modify analyzer
tangg555 Nov 19, 2025
afaf8df
refactor: revise locomo_eval to make it support llm other than gpt-4o…
tangg555 Nov 19, 2025
0b02d3c
feat: develop advanced searcher with deep search
tangg555 Nov 20, 2025
2097eae
feat: finish a complete version of deep search
tangg555 Nov 21, 2025
aff2932
refactor: refactor deep search feature, now only allowing one-round d…
tangg555 Nov 24, 2025
4226a77
feat: implement the feature of get_tasks_status, but completed tasks …
tangg555 Nov 24, 2025
e27483c
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 24, 2025
51964ec
debuging merged code; searching memories have bugs
tangg555 Nov 24, 2025
1e28ee5
change logging level
tangg555 Nov 24, 2025
e0001ea
debug api evaluation
tangg555 Nov 24, 2025
bae7022
fix bugs: change top to top_k
tangg555 Nov 24, 2025
d6cf824
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 24, 2025
742df4e
change log
tangg555 Nov 24, 2025
9b310c4
refactor: rewrite deep search to make it work better
tangg555 Nov 25, 2025
7e4cfc5
change num_users
tangg555 Nov 26, 2025
c0cadac
feat: developed and test task broker and orchestrator
tangg555 Nov 26, 2025
e0eb490
Fix: Include task_id in ScheduleMessageItem serialization
Nov 29, 2025
2606fc7
Fix(Scheduler): Correct event log creation and task_id serialization
Nov 29, 2025
b3a6f1b
Feat(Scheduler): Add conditional detailed logging for KB updates
Nov 29, 2025
4b2cc2f
Fix(Scheduler): Correct create_event_log call sites
Nov 29, 2025
d8726ec
Fix(Scheduler): Deserialize task_id in ScheduleMessageItem.from_dict
Nov 29, 2025
b8cc42a
Refactor(Config): Centralize RabbitMQ config override logic
Nov 29, 2025
b6ebee6
Revert "Refactor(Config): Centralize RabbitMQ config override logic"
Nov 29, 2025
702d3e1
Fix(Redis): Convert None task_id to empty string during serialization
Nov 29, 2025
975e585
Feat(Log): Add diagnostic log to /product/add endpoint
Nov 29, 2025
bceaf68
Merge branch 'dev' into hotfix/task-id-loss
glin93 Nov 29, 2025
82a95c4
Feat(Log): Add comprehensive diagnostic logs for /product/add flow
Nov 29, 2025
c5631cc
Feat(Log): Add comprehensive diagnostic logs for /product/add flow an…
Nov 29, 2025
600fe24
Fix(rabbitmq): Use env vars for KB updates and improve logging
Nov 29, 2025
1da7c71
Fix(rabbitmq): Explicitly use MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME and…
Nov 29, 2025
f32399b
Fix(add_handler): Update diagnostic log timestamp
Nov 29, 2025
42fea63
Fix(add_handler): Update diagnostic log timestamp again (auto-updated)
Nov 29, 2025
003a169
Update default scheduler redis stream prefix
Nov 29, 2025
6b5d5c6
Update diagnostic timestamp in add handler
Nov 29, 2025
5339b08
Allow optional log_content in scheduler event log
Nov 29, 2025
e1304c1
feat: new examples to test scheduelr
tangg555 Dec 1, 2025
045d154
feat: fair scheduler and refactor of search function
tangg555 Dec 1, 2025
85611c8
Merge remote-tracking branch 'upstream/hotfix/task-id-loss' into dev
tangg555 Dec 1, 2025
4aaeb54
fix bugs: address bugs caused by outdated test code
tangg555 Dec 1, 2025
480c8e3
feat: add task_schedule_monitor
tangg555 Dec 1, 2025
c49a498
fix: handle nil mem_cube in scheduler message consumers
Dec 2, 2025
b772d88
fix bugs: response messaged changed in memos code
tangg555 Dec 1, 2025
630c21c
refactor: revise task queue to allow it dealing with pending tasks wh…
tangg555 Dec 1, 2025
198aade
refactor: revise mixture search and scheduler logger
tangg555 Dec 2, 2025
5ba44d6
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 2, 2025
28e1368
Fix scheduler task tracking
Dec 2, 2025
173bebc
fix bugs: address ai review issues
tangg555 Dec 2, 2025
9708f4d
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 2, 2025
6746563
fix bugs: address rabbitmq initialization failed when doing pytest
tangg555 Dec 2, 2025
9613258
fix(scheduler): Correct dispatcher task and future tracking
Dec 2, 2025
8d7053b
Remove dump.rdb
tangg555 Dec 2, 2025
73b4711
fix bugs: revised message ack logics; refactor add log function
tangg555 Dec 2, 2025
11654e5
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 2, 2025
bcd5d8f
fix bugs: change Chinese notation to English
tangg555 Dec 2, 2025
5b6cd2e
fix indent error in logger
Dec 2, 2025
48a3e9d
fix bugs: addressed the issues caused by multiprocessing codes obtain…
tangg555 Dec 2, 2025
13abb20
addMemory/updateMemory log
Dec 2, 2025
eeb31ed
Merge origin/dev into scheduler
Dec 2, 2025
9198b85
fix bugs: modify redis queue logics to make it run as expected
tangg555 Dec 2, 2025
4c10503
feat: add a default mem cube initialization for scheduler
tangg555 Dec 2, 2025
3ac309d
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 2, 2025
fb6ddc5
Merge remote-tracking branch 'upstream/scheduler' into dev
tangg555 Dec 2, 2025
cc7bc86
address scheduler init bug
tangg555 Dec 3, 2025
c7ecdae
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 3, 2025
5994a27
feat(scheduler): Propagate trace_id across process boundaries for mem…
tangg555 Dec 3, 2025
bd19c4c
fix bugs: redis queue allows to reget pending tasks which exceeding i…
tangg555 Dec 3, 2025
3fd189f
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 3, 2025
c7090c8
fix(scheduler): Correct lazy-loading logic for mem_cube property
Dec 3, 2025
30109f6
Add MONITOR_EVENT logs for scheduler lifecycle
Dec 3, 2025
5850d7a
fix: Resolve Ruff linting and formatting issues
Dec 3, 2025
57c45dc
Handle dequeue timestamp without pydantic errors
Dec 3, 2025
7ed1f42
feat: orchestrator add task priority; move task labels into task_sche…
tangg555 Dec 4, 2025
3b927e2
feat: more logs for debug
tangg555 Dec 4, 2025
b56f57a
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 4, 2025
4e65f58
fix bugs: addresss some bugs
tangg555 Dec 4, 2025
d3dd54d
refactor: remove logger info in pref add function
tangg555 Dec 4, 2025
9be93fb
refactor: change redis queue to periodically refresh pending tasks
tangg555 Dec 5, 2025
41ea8b7
feat: a faster and better redis queue
tangg555 Dec 5, 2025
99f61f3
refactor: remove cleanup in redis queue
tangg555 Dec 5, 2025
d03f960
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 7, 2025
f892678
feat: allow directly execute task if task priority is level 1
tangg555 Dec 7, 2025
0a522b3
refactor: refactor log_add_handler and redis queue to make the code r…
tangg555 Dec 7, 2025
3280f25
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 7, 2025
1ddfe9c
fix bugs: fix the bug in _process_chat_data
tangg555 Dec 7, 2025
2a10311
fix: use message item_id for task status updates instead of execution id
Dec 7, 2025
d18a917
style: format dispatcher.py with ruff
Dec 7, 2025
8d4c854
chore: emit dequeue for immediate tasks
Dec 7, 2025
6476442
fix: resolve ruff UP038 in base_scheduler.py
Dec 7, 2025
257b7f6
feat: add scheduler queue status endpoint
Dec 7, 2025
6048b2b
fix: lazy-init redis in queue status handler
Dec 7, 2025
4d9cef4
fix: unwrap queue wrapper for redis status
Dec 7, 2025
248e86a
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 7, 2025
5d83f7c
Merge remote-tracking branch 'upstream/scheduler' into dev
tangg555 Dec 7, 2025
dce178f
fix bugs: fix a bug causing no schedule memory
tangg555 Dec 7, 2025
73a190e
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 7, 2025
0e0dcb5
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 8, 2025
7b9db93
feat: add a new env variable to set stream_prefix in redis; make add …
tangg555 Dec 8, 2025
e0f2be5
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 8, 2025
f95e3ba
fix bugs: update start_listening in redis_queue
tangg555 Dec 8, 2025
b44d6a7
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 8, 2025
b1237d6
refactor: revise polardb and scheduelr init
tangg555 Dec 8, 2025
ce144f4
Merge branch 'dev' into scheduler
fridayL Dec 8, 2025
7fa718b
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 8, 2025
9487eb6
feat: time task_broker; add a hallucination filter for simple struct add
tangg555 Dec 9, 2025
d70239b
feat & fix bugs: redis scheduler support periodically refresh active …
tangg555 Dec 9, 2025
2aba3f5
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 9, 2025
b229d28
Merge remote-tracking branch 'upstream/scheduler' into dev
tangg555 Dec 9, 2025
23137c6
refactor: revise the code according to llm suggestions
tangg555 Dec 9, 2025
15ac0a2
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 9, 2025
b03d26e
address ruff
tangg555 Dec 9, 2025
bf8a0be
modify examples
tangg555 Dec 9, 2025
705ea33
Merge branch 'dev' into scheduler
glin93 Dec 9, 2025
12342fb
feat: process chunks from redis streams
tangg555 Dec 9, 2025
b33fc30
Merge remote-tracking branch 'upstream/scheduler' into dev
tangg555 Dec 9, 2025
87e2fef
refactor: update add operation
tangg555 Dec 9, 2025
259fda2
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 9, 2025
675ddde
Merge branch 'dev' into scheduler
tangg555 Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions examples/mem_scheduler/task_stop_rerun.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
task_id = str(msg.item_id)
file_path = tmp_dir / f"{task_id}.txt"
try:
print(f"writing {file_path}...")
file_path.write_text(f"Task {task_id} processed.\n")
sleep(5)
file_path.write_text(f"Task {task_id} processed.\n")
print(f"writing {file_path} done")
except Exception as e:
print(f"Failed to write {file_path}: {e}")

Expand Down Expand Up @@ -89,4 +89,5 @@ def submit_tasks():

# 7. Stop the scheduler
print("Stopping the scheduler...")
sleep(5)
mem_scheduler.stop()
127 changes: 80 additions & 47 deletions src/memos/mem_reader/simple_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def get_memory(
@staticmethod
def _parse_hallucination_filter_response(text: str) -> tuple[bool, dict[int, dict]]:
"""Parse index-keyed JSON from hallucination filter response.
Expected shape: { "0": {"if_delete": bool, "rewritten memory content": str}, ... }
Expected shape: { "0": {"delete": bool, "rewritten": str, "reason": str}, ... }
Returns (success, parsed_dict) with int keys.
"""
try:
Expand All @@ -476,54 +476,82 @@ def _parse_hallucination_filter_response(text: str) -> tuple[bool, dict[int, dic
continue
if not isinstance(v, dict):
continue
delete_flag = v.get("delete_flag")
rewritten = v.get("rewritten memory content", "")
if isinstance(delete_flag, bool) and isinstance(rewritten, str):
result[idx] = {"delete_flag": delete_flag, "rewritten memory content": rewritten}
delete_flag = v.get("delete")
rewritten = v.get("rewritten", "")
reason = v.get("reason", "")
if (
isinstance(delete_flag, bool)
and isinstance(rewritten, str)
and isinstance(reason, str)
):
result[idx] = {"delete": delete_flag, "rewritten": rewritten, "reason": reason}

return (len(result) > 0), result

def filter_hallucination_in_memories(
self, user_messages: list[str], memory_list: list[list[TextualMemoryItem]]
):
filtered_memory_list = []
for group in memory_list:
try:
flat_memories = [one.memory for one in group]
template = PROMPT_MAPPING["hallucination_filter"]
prompt_args = {
"user_messages_inline": "\n".join(user_messages),
"memories_inline": json.dumps(flat_memories, ensure_ascii=False, indent=2),
}
prompt = template.format(**prompt_args)
self, user_messages: list[str], memory_list: list[TextualMemoryItem]
) -> list[TextualMemoryItem]:
flat_memories = [one.memory for one in memory_list]
template = PROMPT_MAPPING["hallucination_filter"]
prompt_args = {
"user_messages_inline": "\n".join([f"- {memory}" for memory in user_messages]),
"memories_inline": json.dumps(
{str(i): memory for i, memory in enumerate(flat_memories)},
ensure_ascii=False,
indent=2,
),
}
prompt = template.format(**prompt_args)

# Optionally run filter and parse the output
try:
raw = self.llm.generate(prompt)
success, parsed = self._parse_hallucination_filter_response(raw)
logger.info(f"Hallucination filter parsed successfully: {success}")
new_mem_list = []
if success:
logger.info(f"Hallucination filter result: {parsed}")
for mem_idx, (delete_flag, rewritten_mem_content) in parsed.items():
if not delete_flag:
group[mem_idx].memory = rewritten_mem_content
new_mem_list.append(group[mem_idx])
filtered_memory_list.append(new_mem_list)
logger.info(
f"Successfully transform origianl memories from {group} to {new_mem_list}."
)
else:
# Optionally run filter and parse the output
try:
raw = self.llm.generate([{"role": "user", "content": prompt}])
success, parsed = self._parse_hallucination_filter_response(raw)
logger.info(
f"[filter_hallucination_in_memories] Hallucination filter parsed successfully: {success}"
)
if success:
logger.info(f"Hallucination filter result: {parsed}")
total = len(memory_list)
keep_flags = [True] * total
for mem_idx, content in parsed.items():
# Validate index bounds
if not isinstance(mem_idx, int) or mem_idx < 0 or mem_idx >= total:
logger.warning(
"Hallucination filter parsing failed or returned empty result."
f"[filter_hallucination_in_memories] Ignoring out-of-range index: {mem_idx}"
)
except Exception as e:
logger.error(f"Hallucination filter execution error: {e}", stack_info=True)
filtered_memory_list.append(group)
except Exception:
logger.error("Fail to filter memories", stack_info=True)
filtered_memory_list.append(group)
return filtered_memory_list
continue

delete_flag = content.get("delete", False)
rewritten = content.get("rewritten", None)
reason = content.get("reason", "")

logger.info(
f"[filter_hallucination_in_memories] index={mem_idx}, delete={delete_flag}, rewritten='{(rewritten or '')[:100]}', reason='{reason[:120]}'"
)

if delete_flag is True and rewritten is not None:
# Mark for deletion
keep_flags[mem_idx] = False
else:
# Apply rewrite if provided (safe-by-default: keep item when not mentioned or delete=False)
try:
if isinstance(rewritten, str):
memory_list[mem_idx].memory = rewritten
except Exception as e:
logger.warning(
f"[filter_hallucination_in_memories] Failed to apply rewrite for index {mem_idx}: {e}"
)

# Build result, preserving original order; keep items not mentioned by LLM by default
new_mem_list = [memory_list[i] for i in range(total) if keep_flags[i]]
return new_mem_list
else:
logger.warning("Hallucination filter parsing failed or returned empty result.")
except Exception as e:
logger.error(f"Hallucination filter execution error: {e}", stack_info=True)

return memory_list

def _read_memory(
self, messages: list[MessagesType], type: str, info: dict[str, Any], mode: str = "fine"
Expand Down Expand Up @@ -572,11 +600,16 @@ def _read_memory(

if os.getenv("SIMPLE_STRUCT_ADD_FILTER", "false") == "true":
# Build inputs
user_messages = [msg.content for msg in messages if msg.role == "user"]
memory_list = self.filter_hallucination_in_memories(
user_messages=user_messages, memory_list=memory_list
)

new_memory_list = []
for unit_messages, unit_memory_list in zip(messages, memory_list, strict=False):
unit_user_messages = [
msg["content"] for msg in unit_messages if msg["role"] == "user"
]
unit_memory_list = self.filter_hallucination_in_memories(
user_messages=unit_user_messages, memory_list=unit_memory_list
)
new_memory_list.append(unit_memory_list)
memory_list = new_memory_list
return memory_list

def fine_transfer_simple_mem(
Expand Down
42 changes: 21 additions & 21 deletions src/memos/mem_scheduler/general_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(self, config: GeneralSchedulerConfig):
def long_memory_update_process(
self, user_id: str, mem_cube_id: str, messages: list[ScheduleMessageItem]
):
mem_cube = self.current_mem_cube
mem_cube = self.mem_cube

# update query monitors
for msg in messages:
Expand Down Expand Up @@ -109,8 +109,8 @@ def long_memory_update_process(

query_db_manager = self.monitor.query_monitors[user_id][mem_cube_id]
query_db_manager.obj.put(item=item)
# Sync with database after adding new item
query_db_manager.sync_with_orm()
# Sync with database after adding new item
query_db_manager.sync_with_orm()
logger.debug(
f"Queries in monitor for user_id={user_id}, mem_cube_id={mem_cube_id}: {query_db_manager.obj.get_queries_with_timesort()}"
)
Expand Down Expand Up @@ -162,7 +162,7 @@ def long_memory_update_process(
label=QUERY_TASK_LABEL,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=self.current_mem_cube,
mem_cube=self.mem_cube,
)

def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
Expand Down Expand Up @@ -249,7 +249,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
to_memory_type=NOT_APPLICABLE_TYPE,
user_id=msg.user_id,
mem_cube_id=msg.mem_cube_id,
mem_cube=self.current_mem_cube,
mem_cube=self.mem_cube,
memcube_log_content=[
{
"content": f"[User] {msg.content}",
Expand Down Expand Up @@ -305,7 +305,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
to_memory_type=NOT_APPLICABLE_TYPE,
user_id=msg.user_id,
mem_cube_id=msg.mem_cube_id,
mem_cube=self.current_mem_cube,
mem_cube=self.mem_cube,
memcube_log_content=[
{
"content": f"[Assistant] {msg.content}",
Expand Down Expand Up @@ -338,7 +338,7 @@ def log_add_messages(self, msg: ScheduleMessageItem):
try:
# This mem_item represents the NEW content that was just added/processed
mem_item: TextualMemoryItem | None = None
mem_item = self.current_mem_cube.text_mem.get(
mem_item = self.mem_cube.text_mem.get(
memory_id=memory_id, user_name=msg.mem_cube_id
)
if mem_item is None:
Expand All @@ -352,8 +352,8 @@ def log_add_messages(self, msg: ScheduleMessageItem):
original_item_id = None

# Only check graph_store if a key exists and the text_mem has a graph_store
if key and hasattr(self.current_mem_cube.text_mem, "graph_store"):
candidates = self.current_mem_cube.text_mem.graph_store.get_by_metadata(
if key and hasattr(self.mem_cube.text_mem, "graph_store"):
candidates = self.mem_cube.text_mem.graph_store.get_by_metadata(
[
{"field": "key", "op": "=", "value": key},
{
Expand All @@ -368,7 +368,7 @@ def log_add_messages(self, msg: ScheduleMessageItem):
original_item_id = candidates[0]
# Crucial step: Fetch the original content for updates
# This `get` is for the *existing* memory that will be updated
original_mem_item = self.current_mem_cube.text_mem.get(
original_mem_item = self.mem_cube.text_mem.get(
memory_id=original_item_id, user_name=msg.mem_cube_id
)
original_content = original_mem_item.memory
Expand Down Expand Up @@ -481,7 +481,7 @@ def send_add_log_messages_to_local_env(
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=msg.user_id,
mem_cube_id=msg.mem_cube_id,
mem_cube=self.current_mem_cube,
mem_cube=self.mem_cube,
memcube_log_content=add_content_legacy,
metadata=add_meta_legacy,
memory_len=len(add_content_legacy),
Expand All @@ -496,7 +496,7 @@ def send_add_log_messages_to_local_env(
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=msg.user_id,
mem_cube_id=msg.mem_cube_id,
mem_cube=self.current_mem_cube,
mem_cube=self.mem_cube,
memcube_log_content=update_content_legacy,
metadata=update_meta_legacy,
memory_len=len(update_content_legacy),
Expand Down Expand Up @@ -562,7 +562,7 @@ def send_add_log_messages_to_cloud_env(
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=msg.user_id,
mem_cube_id=msg.mem_cube_id,
mem_cube=self.current_mem_cube,
mem_cube=self.mem_cube,
memcube_log_content=kb_log_content,
metadata=None,
memory_len=len(kb_log_content),
Expand All @@ -577,7 +577,7 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
if not messages:
return
message = messages[0]
mem_cube = self.current_mem_cube
mem_cube = self.mem_cube

user_id = message.user_id
mem_cube_id = message.mem_cube_id
Expand Down Expand Up @@ -744,9 +744,9 @@ def process_message(message: ScheduleMessageItem):
try:
user_id = message.user_id
mem_cube_id = message.mem_cube_id
mem_cube = self.current_mem_cube
mem_cube = self.mem_cube
if mem_cube is None:
logger.warning(
logger.error(
f"mem_cube is None for user_id={user_id}, mem_cube_id={mem_cube_id}, skipping processing",
stack_info=True,
)
Expand Down Expand Up @@ -923,7 +923,7 @@ def _process_memories_with_reader(
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=self.current_mem_cube,
mem_cube=self.mem_cube,
memcube_log_content=kb_log_content,
metadata=None,
memory_len=len(kb_log_content),
Expand Down Expand Up @@ -968,7 +968,7 @@ def _process_memories_with_reader(
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=self.current_mem_cube,
mem_cube=self.mem_cube,
memcube_log_content=add_content_legacy,
metadata=add_meta_legacy,
memory_len=len(add_content_legacy),
Expand Down Expand Up @@ -1036,7 +1036,7 @@ def _process_memories_with_reader(
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=self.current_mem_cube,
mem_cube=self.mem_cube,
memcube_log_content=kb_log_content,
metadata=None,
memory_len=len(kb_log_content),
Expand All @@ -1054,7 +1054,7 @@ def process_message(message: ScheduleMessageItem):
try:
user_id = message.user_id
mem_cube_id = message.mem_cube_id
mem_cube = self.current_mem_cube
mem_cube = self.mem_cube
if mem_cube is None:
logger.warning(
f"mem_cube is None for user_id={user_id}, mem_cube_id={mem_cube_id}, skipping processing"
Expand Down Expand Up @@ -1284,7 +1284,7 @@ def _pref_add_message_consumer(self, messages: list[ScheduleMessageItem]) -> Non

def process_message(message: ScheduleMessageItem):
try:
mem_cube = self.current_mem_cube
mem_cube = self.mem_cube
if mem_cube is None:
logger.warning(
f"mem_cube is None for user_id={message.user_id}, mem_cube_id={message.mem_cube_id}, skipping processing"
Expand Down
2 changes: 1 addition & 1 deletion src/memos/mem_scheduler/optimized_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def update_search_memories_to_redis(
memories: list[TextualMemoryItem] = self.search_memories(
search_req=APISearchRequest(**content_dict["search_req"]),
user_context=UserContext(**content_dict["user_context"]),
mem_cube=self.current_mem_cube,
mem_cube=self.mem_cube,
mode=SearchMode.FAST,
)
formatted_memories = [format_textual_memory_item(data) for data in memories]
Expand Down
10 changes: 10 additions & 0 deletions src/memos/mem_scheduler/schemas/task_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ class TaskPriorityLevel(Enum):
# Interval in seconds for batching and cleaning up deletions (xdel)
DEFAULT_DELETE_CLEANUP_INTERVAL_SEC = 30.0

# Inactivity threshold for stream deletion
# Delete streams whose last message ID timestamp is older than this threshold.
# Unit: seconds. Default: 1 day.
DEFAULT_STREAM_INACTIVITY_DELETE_SECONDS = 86_400.0

# Recency threshold for active streams
# Consider a stream "active" if its last message is within this window.
# Unit: seconds. Default: 30 minutes.
DEFAULT_STREAM_RECENT_ACTIVE_SECONDS = 1_800.0


# task queue
DEFAULT_STREAM_KEY_PREFIX = os.getenv(
Expand Down
Loading