Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from server_api.auth.database import get_db
from server_api.auth.router import get_current_user
from server_api.ehtool import router as ehtool_router
from server_api.workflow import router as workflow_router

from fastapi.staticfiles import StaticFiles
import os
Expand Down Expand Up @@ -77,6 +78,7 @@ def _ensure_chatbot():

app.include_router(auth_router.router)
app.include_router(ehtool_router.router, prefix="/eh", tags=["ehtool"])
app.include_router(workflow_router)

app.add_middleware(
CORSMiddleware,
Expand Down
3 changes: 3 additions & 0 deletions server_api/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .metrics import router

__all__ = ["router"]
92 changes: 92 additions & 0 deletions server_api/workflow/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from collections import Counter
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional

from fastapi import APIRouter, Request

router = APIRouter()


def _to_datetime(value: Any) -> Optional[datetime]:
if isinstance(value, datetime):
return value
if not isinstance(value, str) or not value.strip():
return None

candidate = value.strip()
if candidate.endswith("Z"):
candidate = candidate[:-1] + "+00:00"

try:
return datetime.fromisoformat(candidate)
except ValueError:
return None


def _as_iso(value: Optional[datetime]) -> Optional[str]:
return value.isoformat() if value else None


def _compute_workflow_metrics(events: Iterable[Dict[str, Any]]) -> Dict[str, Any]:
normalized_events: List[Dict[str, Any]] = [event for event in events if isinstance(event, dict)]

event_type_counts: Counter = Counter()
stage_transition_counts: Counter = Counter()
approvals = 0
rejections = 0
timestamps: List[datetime] = []

for event in normalized_events:
event_type = str(event.get("type") or "unknown")
event_type_counts[event_type] += 1

decision = str(event.get("decision") or "").strip().lower()
if decision == "approved":
approvals += 1
elif decision == "rejected":
rejections += 1

from_stage = event.get("from_stage")
to_stage = event.get("to_stage")
if from_stage is not None and to_stage is not None:
stage_transition_counts[f"{from_stage}->{to_stage}"] += 1

ts = _to_datetime(event.get("timestamp"))
if ts:
timestamps.append(ts)

decision_total = approvals + rejections
approval_rate = (approvals / decision_total) if decision_total else 0.0
rejection_rate = (rejections / decision_total) if decision_total else 0.0

first_ts = min(timestamps) if timestamps else None
last_ts = max(timestamps) if timestamps else None

return {
"event_counts": dict(sorted(event_type_counts.items())),
"approvals": {
"count": approvals,
"rejections": rejections,
"total": decision_total,
"approval_rate": approval_rate,
"rejection_rate": rejection_rate,
},
"stage_transitions": dict(sorted(stage_transition_counts.items())),
"timeline": {
"first_event_at": _as_iso(first_ts),
"last_event_at": _as_iso(last_ts),
},
}


@router.get("/api/workflows/{workflow_id}/metrics")
def get_workflow_metrics(workflow_id: str, request: Request):
workflow_events = getattr(request.app.state, "workflow_events", {}) or {}
events = workflow_events.get(workflow_id, [])
metrics = _compute_workflow_metrics(events)

return {
"workflow_id": workflow_id,
"event_total": len([event for event in events if isinstance(event, dict)]),
"metrics": metrics,
}
110 changes: 110 additions & 0 deletions tests/test_workflow_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import unittest

from fastapi import FastAPI
from fastapi.testclient import TestClient

from server_api.workflow.metrics import router as workflow_metrics_router


class WorkflowMetricsRouteTests(unittest.TestCase):
def setUp(self):
self.app = FastAPI()
self.app.include_router(workflow_metrics_router)
self.client = TestClient(self.app)

def tearDown(self):
self.app.state.workflow_events = {}

def test_metrics_endpoint_returns_safe_defaults_for_empty_workflow(self):
self.app.state.workflow_events = {"wf-empty": []}

response = self.client.get("/api/workflows/wf-empty/metrics")

self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["workflow_id"], "wf-empty")
self.assertEqual(payload["event_total"], 0)

metrics = payload["metrics"]
self.assertEqual(metrics["event_counts"], {})
self.assertEqual(metrics["stage_transitions"], {})
self.assertEqual(
metrics["approvals"],
{
"count": 0,
"rejections": 0,
"total": 0,
"approval_rate": 0.0,
"rejection_rate": 0.0,
},
)
self.assertIsNone(metrics["timeline"]["first_event_at"])
self.assertIsNone(metrics["timeline"]["last_event_at"])

def test_metrics_endpoint_matches_seeded_events(self):
self.app.state.workflow_events = {
"wf-123": [
{
"type": "stage_transition",
"from_stage": "draft",
"to_stage": "review",
"timestamp": "2026-04-10T10:00:00Z",
},
{
"type": "agent_action",
"timestamp": "2026-04-10T10:05:00Z",
},
{
"type": "approval",
"decision": "approved",
"timestamp": "2026-04-10T10:06:00Z",
},
{
"type": "approval",
"decision": "rejected",
"timestamp": "2026-04-10T10:07:00Z",
},
{
"type": "stage_transition",
"from_stage": "review",
"to_stage": "done",
"timestamp": "2026-04-10T10:08:00Z",
},
]
}

response = self.client.get("/api/workflows/wf-123/metrics")

self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["workflow_id"], "wf-123")
self.assertEqual(payload["event_total"], 5)

metrics = payload["metrics"]
self.assertEqual(
metrics["event_counts"],
{
"agent_action": 1,
"approval": 2,
"stage_transition": 2,
},
)
self.assertEqual(
metrics["stage_transitions"],
{
"draft->review": 1,
"review->done": 1,
},
)
self.assertEqual(metrics["approvals"]["count"], 1)
self.assertEqual(metrics["approvals"]["rejections"], 1)
self.assertEqual(metrics["approvals"]["total"], 2)
self.assertEqual(metrics["approvals"]["approval_rate"], 0.5)
self.assertEqual(metrics["approvals"]["rejection_rate"], 0.5)

self.assertEqual(metrics["timeline"]["first_event_at"], "2026-04-10T10:00:00+00:00")
self.assertEqual(metrics["timeline"]["last_event_at"], "2026-04-10T10:08:00+00:00")


if __name__ == "__main__":
unittest.main()
Loading