Skip to content

Commit 04b1007

Browse files
authored
feat: Pass source_doc_id in task completion logs (#664)
This commit addresses the issue where 'source_doc_id' was not being propagated in task completion (success/failure) logs emitted by the scheduler dispatcher. Changes made: - Added 'source_doc_id: str | None' field to the 'ScheduleLogForWebItem' schema in 'src/memos/mem_scheduler/schemas/message_schemas.py'. - Modified '_maybe_emit_task_completion' in 'src/memos/mem_scheduler/task_schedule_modules/dispatcher.py' to: - Extract 'source_doc_id' from 'ScheduleMessageItem.info'. - Pass 'source_doc_id' to the 'ScheduleLogForWebItem' constructor for both 'completed' and 'failed' task status events. This ensures better traceability and debugging for task completion events related to specific source documents. Co-authored-by: [email protected] <>
1 parent b6efb0c commit 04b1007

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

src/memos/mem_scheduler/schemas/message_schemas.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin):
157157
status: str | None = Field(
158158
default=None, description="Completion status of the task (e.g., 'completed', 'failed')"
159159
)
160+
source_doc_id: str | None = Field(default=None, description="Source document ID")
160161

161162
def debug_info(self) -> dict[str, Any]:
162163
"""Return structured debug information for logging purposes."""

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,20 @@ def _maybe_emit_task_completion(
295295
return
296296

297297
# messages in one batch can belong to different business task_ids; check each
298-
task_ids = {getattr(msg, "task_id", None) for msg in messages}
299-
task_ids.discard(None)
298+
task_ids = set()
299+
task_id_to_doc_id = {}
300+
301+
for msg in messages:
302+
tid = getattr(msg, "task_id", None)
303+
if tid:
304+
task_ids.add(tid)
305+
# Try to capture source_doc_id for this task if we haven't already
306+
if tid not in task_id_to_doc_id:
307+
info = msg.info or {}
308+
sid = info.get("source_doc_id")
309+
if sid:
310+
task_id_to_doc_id[tid] = sid
311+
300312
if not task_ids:
301313
return
302314

@@ -311,6 +323,7 @@ def _maybe_emit_task_completion(
311323
return
312324

313325
for task_id in task_ids:
326+
source_doc_id = task_id_to_doc_id.get(task_id)
314327
status_data = self.status_tracker.get_task_status_by_business_id(
315328
business_task_id=task_id, user_id=user_id
316329
)
@@ -332,6 +345,7 @@ def _maybe_emit_task_completion(
332345
to_memory_type="status",
333346
log_content=f"Task {task_id} completed",
334347
status="completed",
348+
source_doc_id=source_doc_id,
335349
)
336350
self.submit_web_logs(event)
337351

@@ -355,6 +369,7 @@ def _maybe_emit_task_completion(
355369
to_memory_type="status",
356370
log_content=f"Task {task_id} failed: {error_msg}",
357371
status="failed",
372+
source_doc_id=source_doc_id,
358373
)
359374
self.submit_web_logs(event)
360375
except Exception:

0 commit comments

Comments
 (0)