From af73dff4f81c90a8afad4856046f1eaeb18888e2 Mon Sep 17 00:00:00 2001 From: Jiadong Bai <43344272+bobbai00@users.noreply.github.com> Date: Wed, 24 Jun 2026 00:19:49 -0700 Subject: [PATCH 1/2] refactor(agent-service): centralize backend DTOs, metadata, endpoint config, and auth Consolidate the agent-service type/config layer. Stacked on the WS PR (#5751); types/execution.ts is left untouched (its redesign is a separate PR). - types/dto.ts: backend request/response DTOs (workflow persistence, compiling-service responses, WorkflowFatalError). - types/metadata.ts: operator-metadata types extracted from api/backend-api. - config/endpoints.ts: getServiceEndpoints() centralizing backend base URLs. - auth/jwt.ts: relocated from api/auth-api; update import paths. - Replace `any` with precise types across src/types/*; type ReActStep tool/message fields precisely. - Consume the centralized types at call sites (clients, server, agent utils). - Expand test coverage (jwt, workflow-system-metadata, compile/workflow clients); rename *.test.ts -> *.spec.ts. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_012qFkyrpTd5PrkNBPcBeo4Q --- agent-service/src/agent/texera-agent.ts | 10 +- agent-service/src/agent/util/context-utils.ts | 2 +- .../util/workflow-system-metadata.spec.ts | 165 ++++++++++++++++++ .../agent/util/workflow-system-metadata.ts | 12 +- agent-service/src/api/backend-api.ts | 49 ++---- agent-service/src/api/compile-api.spec.ts | 97 ++++++++++ agent-service/src/api/compile-api.ts | 15 +- agent-service/src/api/index.ts | 2 +- agent-service/src/api/workflow-api.spec.ts | 111 ++++++++++++ agent-service/src/api/workflow-api.ts | 21 +-- agent-service/src/auth/jwt.spec.ts | 98 +++++++++++ .../src/{api/auth-api.ts => auth/jwt.ts} | 0 agent-service/src/config/endpoints.ts | 39 +++++ agent-service/src/server.ts | 2 +- agent-service/src/types/agent.ts | 7 +- agent-service/src/types/dto.ts | 62 +++++++ agent-service/src/types/index.ts | 2 + agent-service/src/types/metadata.ts | 74 ++++++++ agent-service/src/types/workflow.ts | 6 +- 19 files changed, 680 insertions(+), 94 deletions(-) create mode 100644 agent-service/src/agent/util/workflow-system-metadata.spec.ts create mode 100644 agent-service/src/api/compile-api.spec.ts create mode 100644 agent-service/src/api/workflow-api.spec.ts create mode 100644 agent-service/src/auth/jwt.spec.ts rename agent-service/src/{api/auth-api.ts => auth/jwt.ts} (100%) create mode 100644 agent-service/src/config/endpoints.ts create mode 100644 agent-service/src/types/dto.ts create mode 100644 agent-service/src/types/metadata.ts diff --git a/agent-service/src/agent/texera-agent.ts b/agent-service/src/agent/texera-agent.ts index ccd0545919a..a5caeef0e7b 100644 --- a/agent-service/src/agent/texera-agent.ts +++ b/agent-service/src/agent/texera-agent.ts @@ -48,7 +48,8 @@ import { type ExecutionConfig, } from "./tools/workflow-execution-tools"; import { assembleContext } from "./util/context-utils"; -import { compileWorkflowAsync, type WorkflowCompilationResponse } from "../api/compile-api"; +import { compileWorkflowAsync } from "../api/compile-api"; +import type { WorkflowCompilationResponse } from "../types/dto"; import { createLogger } from "../logger"; import type { Logger } from "pino"; @@ -560,15 +561,18 @@ export class TexeraAgent { onStepFinish: async ({ text, toolCalls, toolResults, usage }) => { stepIndex++; + // The AI SDK types tc.input / tr.output as `unknown` for dynamically + // registered tools; narrow to the shapes our tools actually produce + // (object args, string results — see tools/*). const formattedToolCalls = toolCalls?.map(tc => ({ toolName: tc.toolName, toolCallId: tc.toolCallId, - input: tc.input, + input: tc.input as Record, })); const formattedToolResults = toolResults?.map(tr => ({ toolCallId: tr.toolCallId, - output: tr.output, + output: tr.output as string, isError: !!(tr.output as any)?.error, })); diff --git a/agent-service/src/agent/util/context-utils.ts b/agent-service/src/agent/util/context-utils.ts index 195692cbf50..04e801e3aa7 100644 --- a/agent-service/src/agent/util/context-utils.ts +++ b/agent-service/src/agent/util/context-utils.ts @@ -25,7 +25,7 @@ import type { ModelMessage } from "ai"; import type { WorkflowState } from "../workflow-state"; import type { OperatorPredicate, OperatorPortSchemaMap, PortSchema } from "../../types/workflow"; import type { ReActStep } from "../../types/agent"; -import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../api/compile-api"; +import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../types/dto"; import { extractOperatorInputPortSchemaMap } from "./workflow-utils"; import { createLogger } from "../../logger"; diff --git a/agent-service/src/agent/util/workflow-system-metadata.spec.ts b/agent-service/src/agent/util/workflow-system-metadata.spec.ts new file mode 100644 index 00000000000..cc2976850f4 --- /dev/null +++ b/agent-service/src/agent/util/workflow-system-metadata.spec.ts @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test } from "bun:test"; +import { + WorkflowSystemMetadata, + formatValidationErrors, + formatCompactSchemaForError, +} from "./workflow-system-metadata"; +import type { OperatorMetadata, OperatorSchema } from "../../types/metadata"; + +function operator(overrides: Partial & Pick): OperatorSchema { + return { + operatorVersion: "1", + jsonSchema: {}, + additionalMetadata: { + userFriendlyName: overrides.operatorType, + operatorGroupName: "Test", + inputPorts: [], + outputPorts: [], + }, + ...overrides, + }; +} + +function metadataWith(...operators: OperatorSchema[]): OperatorMetadata { + return { operators, groups: [] }; +} + +// Filter exercises ref-inlining + key filtering; Limit is a clean Ajv-validatable schema. +const FILTER = operator({ + operatorType: "Filter", + additionalMetadata: { + userFriendlyName: "Filter", + operatorGroupName: "Filter", + inputPorts: [], + outputPorts: [], + operatorDescription: "Filters rows", + }, + jsonSchema: { + properties: { + attribute: { $ref: "#/definitions/AttributeName" }, + limit: { type: "integer", propertyOrder: 5 }, + dummyPropertyList: { type: "array" }, + }, + definitions: { + AttributeName: { type: "string", title: "Attribute" }, + PortDescription: { type: "object" }, + }, + required: ["attribute"], + }, +}); + +const LIMIT = operator({ + operatorType: "Limit", + jsonSchema: { type: "object", properties: { limit: { type: "integer" } }, required: ["limit"] }, +}); + +function loaded(...operators: OperatorSchema[]): WorkflowSystemMetadata { + const meta = new WorkflowSystemMetadata(); + meta.loadFromMetadata(metadataWith(...operators)); + return meta; +} + +describe("WorkflowSystemMetadata.loadFromMetadata", () => { + test("indexes operators by type", () => { + const meta = loaded(FILTER, LIMIT); + expect(meta.getOperatorCount()).toBe(2); + expect(meta.operatorTypeExists("Filter")).toBe(true); + expect(meta.operatorTypeExists("Nope")).toBe(false); + expect(meta.getSchema("Filter")).toEqual(FILTER.jsonSchema); + expect(meta.getAllOperatorTypes()).toEqual({ Filter: "Filters rows", Limit: "Limit" }); + }); + + test("getDescription falls back to userFriendlyName when no description", () => { + const meta = loaded(FILTER, LIMIT); + expect(meta.getDescription("Filter")).toBe("Filters rows"); + expect(meta.getDescription("Limit")).toBe("Limit"); + expect(meta.getDescription("Unknown")).toBe(""); + }); +}); + +describe("WorkflowSystemMetadata.getCompactSchema", () => { + test("returns null for an unknown operator type", () => { + expect(loaded(FILTER).getCompactSchema("Nope")).toBeNull(); + }); + + test("inlines $refs, strips noise keys, and drops filtered properties", () => { + const compact = loaded(FILTER).getCompactSchema("Filter"); + expect(compact).not.toBeNull(); + // $ref resolved to the AttributeName definition. + expect(compact!.properties.attribute).toEqual({ type: "string", title: "Attribute" }); + // propertyOrder is in COMPACT_SCHEMA_EXCLUDED_KEYS and is stripped. + expect(compact!.properties.limit).toEqual({ type: "integer" }); + // dummyPropertyList is in FILTERED_PROPERTY_KEYS and is removed. + expect(compact!.properties).not.toHaveProperty("dummyPropertyList"); + expect(compact!.required).toEqual(["attribute"]); + }); +}); + +describe("WorkflowSystemMetadata.getAllSchemasAsJson", () => { + test("emits filtered properties and definitions as JSON", () => { + const parsed = JSON.parse(loaded(FILTER).getAllSchemasAsJson()); + expect(Object.keys(parsed.Filter.properties)).toEqual(["attribute", "limit"]); // dummyPropertyList filtered + expect(parsed.Filter.definitions).toHaveProperty("AttributeName"); + expect(parsed.Filter.definitions).not.toHaveProperty("PortDescription"); // filtered definition + expect(parsed.Filter.required).toEqual(["attribute"]); + }); +}); + +describe("WorkflowSystemMetadata.validateOperatorProperties", () => { + test("accepts properties that satisfy the schema", () => { + expect(loaded(LIMIT).validateOperatorProperties("Limit", { limit: 5 })).toEqual({ isValid: true }); + }); + + test("reports the missing required property", () => { + const result = loaded(LIMIT).validateOperatorProperties("Limit", {}); + expect(result.isValid).toBe(false); + expect(result.isValid ? {} : result.messages).toHaveProperty("limit"); + }); + + test("rejects an unknown operator type", () => { + const result = loaded(LIMIT).validateOperatorProperties("Nope", {}); + expect(result.isValid).toBe(false); + expect(result.isValid ? "" : result.messages.error).toContain("Unknown operator type"); + }); +}); + +describe("formatValidationErrors", () => { + test("returns empty string when valid", () => { + expect(formatValidationErrors({ isValid: true })).toBe(""); + }); + + test("joins messages as 'key: msg'", () => { + expect(formatValidationErrors({ isValid: false, messages: { limit: "is required", attribute: "bad" } })).toBe( + "limit: is required; attribute: bad" + ); + }); +}); + +describe("formatCompactSchemaForError", () => { + test("renders only the required properties", () => { + const formatted = formatCompactSchemaForError({ + properties: { a: { type: "string" }, b: { type: "integer" } }, + required: ["a"], + }); + expect(formatted).toBe('required: [a], properties: {"a":{"type":"string"}}'); + }); +}); diff --git a/agent-service/src/agent/util/workflow-system-metadata.ts b/agent-service/src/agent/util/workflow-system-metadata.ts index 9269a0cff7c..da4fcdf3630 100644 --- a/agent-service/src/agent/util/workflow-system-metadata.ts +++ b/agent-service/src/agent/util/workflow-system-metadata.ts @@ -20,23 +20,13 @@ import Ajv from "ajv"; import { fetchOperatorMetadata, type OperatorSchema, type OperatorMetadata } from "../../api/backend-api"; import type { ValidationError, Validation } from "../../types/workflow"; +import type { OperatorSchemaInfo, CompactOperatorSchema } from "../../types/metadata"; import { createLogger } from "../../logger"; const log = createLogger("WorkflowSystemMetadata"); export type { ValidationError, Validation } from "../../types/workflow"; -interface OperatorSchemaInfo { - properties: any; - required: any; - definitions: any; -} - -interface CompactOperatorSchema { - properties: Record; - required: string[]; -} - const FILTERED_PROPERTY_KEYS = ["dummyPropertyList"]; const FILTERED_DEFINITION_KEYS = [ diff --git a/agent-service/src/api/backend-api.ts b/agent-service/src/api/backend-api.ts index ffd2c59433f..56e5ada4d64 100644 --- a/agent-service/src/api/backend-api.ts +++ b/agent-service/src/api/backend-api.ts @@ -18,6 +18,16 @@ */ import { env } from "../config/env"; +import type { OperatorMetadata } from "../types/metadata"; + +export type { + InputPortInfo, + OutputPortInfo, + OperatorAdditionalMetadata, + OperatorSchema, + GroupInfo, + OperatorMetadata, +} from "../types/metadata"; interface BackendConfig { apiEndpoint: string; @@ -37,45 +47,6 @@ export function getBackendConfig(): BackendConfig { return { ...currentConfig }; } -export interface InputPortInfo { - displayName?: string; - disallowMultiLinks?: boolean; - dependencies?: { id: number; internal: boolean }[]; -} - -export interface OutputPortInfo { - displayName?: string; -} - -interface OperatorAdditionalMetadata { - userFriendlyName: string; - operatorGroupName: string; - operatorDescription?: string; - inputPorts: InputPortInfo[]; - outputPorts: OutputPortInfo[]; - dynamicInputPorts?: boolean; - dynamicOutputPorts?: boolean; - supportReconfiguration?: boolean; - allowPortCustomization?: boolean; -} - -export interface OperatorSchema { - operatorType: string; - jsonSchema: any; - additionalMetadata: OperatorAdditionalMetadata; - operatorVersion: string; -} - -interface GroupInfo { - groupName: string; - children?: GroupInfo[] | null; -} - -export interface OperatorMetadata { - operators: OperatorSchema[]; - groups: GroupInfo[]; -} - export async function fetchOperatorMetadata(): Promise { const url = `${currentConfig.apiEndpoint}/api/resources/operator-metadata`; const response = await fetch(url); diff --git a/agent-service/src/api/compile-api.spec.ts b/agent-service/src/api/compile-api.spec.ts new file mode 100644 index 00000000000..270463db20a --- /dev/null +++ b/agent-service/src/api/compile-api.spec.ts @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { afterEach, describe, expect, mock, test } from "bun:test"; +import { compileWorkflowAsync } from "./compile-api"; +import type { LogicalPlan } from "../types/workflow"; + +const realFetch = globalThis.fetch; +afterEach(() => { + globalThis.fetch = realFetch; +}); + +const plan: LogicalPlan = { + operators: [{ operatorID: "op1", operatorType: "Filter" }], + links: [], +}; + +describe("compileWorkflowAsync", () => { + test("POSTs to /api/compile and returns the parsed compilation response", async () => { + // operatorErrors uses the proto-accurate WorkflowFatalError shape (type is the enum name string). + const responseBody = { + physicalPlan: { nodes: [] }, + operatorOutputSchemas: {}, + operatorErrors: { + op1: { + type: "COMPILATION_ERROR", + message: "bad attribute", + details: "stack", + operatorId: "op1", + workerId: "", + timestamp: { seconds: 1, nanos: 0 }, + }, + }, + }; + const fn = mock(async () => ({ + ok: true, + status: 200, + statusText: "OK", + json: async () => responseBody, + text: async () => "", + })); + globalThis.fetch = fn as unknown as typeof fetch; + + const result = await compileWorkflowAsync(plan); + + const [url, init] = fn.mock.calls[0] as unknown as [string, RequestInit]; + expect(url).toEndWith("/api/compile"); + expect(init.method).toBe("POST"); + expect(JSON.parse(init.body as string)).toEqual({ + operators: plan.operators, + links: plan.links, + opsToReuseResult: [], + opsToViewResult: [], + }); + expect(result).not.toBeNull(); + expect(result!.operatorErrors.op1.type).toBe("COMPILATION_ERROR"); + expect(result!.operatorErrors.op1.message).toBe("bad attribute"); + }); + + test("returns null on a non-ok response", async () => { + const fn = mock(async () => ({ + ok: false, + status: 400, + statusText: "Bad Request", + json: async () => ({}), + text: async () => "compile error", + })); + globalThis.fetch = fn as unknown as typeof fetch; + + expect(await compileWorkflowAsync(plan)).toBeNull(); + }); + + test("returns null when the request throws", async () => { + const fn = mock(async () => { + throw new Error("network down"); + }); + globalThis.fetch = fn as unknown as typeof fetch; + + expect(await compileWorkflowAsync(plan)).toBeNull(); + }); +}); diff --git a/agent-service/src/api/compile-api.ts b/agent-service/src/api/compile-api.ts index 8ffd27fd52c..defd02344a1 100644 --- a/agent-service/src/api/compile-api.ts +++ b/agent-service/src/api/compile-api.ts @@ -18,7 +18,8 @@ */ import { getBackendConfig } from "./backend-api"; -import type { LogicalPlan, OperatorPortSchemaMap } from "../types/workflow"; +import type { LogicalPlan } from "../types/workflow"; +import type { WorkflowCompilationResponse } from "../types/dto"; import { createLogger } from "../logger"; const log = createLogger("CompileAPI"); @@ -30,18 +31,6 @@ export interface SchemaAttribute { export type PortSchema = ReadonlyArray; -export interface WorkflowFatalError { - type: string; - message: string; - operatorId?: string; -} - -export interface WorkflowCompilationResponse { - physicalPlan?: any; - operatorOutputSchemas: Record; - operatorErrors: Record; -} - export async function compileWorkflowAsync(logicalPlan: LogicalPlan): Promise { const config = getBackendConfig(); const url = `${config.compileEndpoint}/api/compile`; diff --git a/agent-service/src/api/index.ts b/agent-service/src/api/index.ts index eca292d7ffe..1efba63ed67 100644 --- a/agent-service/src/api/index.ts +++ b/agent-service/src/api/index.ts @@ -20,5 +20,5 @@ export * from "./backend-api"; export * from "./execution-api"; export * from "./workflow-api"; -export * from "./auth-api"; +export * from "../auth/jwt"; export * from "./compile-api"; diff --git a/agent-service/src/api/workflow-api.spec.ts b/agent-service/src/api/workflow-api.spec.ts new file mode 100644 index 00000000000..8a8ecb3ed69 --- /dev/null +++ b/agent-service/src/api/workflow-api.spec.ts @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { afterEach, describe, expect, mock, test } from "bun:test"; +import { persistWorkflow, retrieveWorkflow } from "./workflow-api"; +import type { WorkflowContent } from "../types/workflow"; + +const realFetch = globalThis.fetch; +afterEach(() => { + globalThis.fetch = realFetch; +}); + +interface FakeResponseInit { + ok: boolean; + status?: number; + statusText?: string; + json?: unknown; + text?: string; +} + +function mockFetch(init: FakeResponseInit) { + const fn = mock(async () => ({ + ok: init.ok, + status: init.status ?? (init.ok ? 200 : 500), + statusText: init.statusText ?? "", + json: async () => init.json, + text: async () => init.text ?? "", + })); + globalThis.fetch = fn as unknown as typeof fetch; + return fn; +} + +const content: WorkflowContent = { + operators: [], + operatorPositions: {}, + links: [], + commentBoxes: [], + settings: { dataTransferBatchSize: 400 }, +}; + +function lastCall(fn: ReturnType): [string, RequestInit] { + return fn.mock.calls[0] as unknown as [string, RequestInit]; +} + +describe("persistWorkflow", () => { + test("POSTs to /workflow/persist with bearer auth and a stringified content body", async () => { + const fn = mockFetch({ ok: true, json: { wid: 1, name: "wf", content: JSON.stringify(content) } }); + + const result = await persistWorkflow("tok", 1, "wf", content, "desc"); + + const [url, init] = lastCall(fn); + expect(url).toEndWith("/api/workflow/persist"); + expect(init.method).toBe("POST"); + expect((init.headers as Record).Authorization).toBe("Bearer tok"); + expect(JSON.parse(init.body as string)).toEqual({ + wid: 1, + name: "wf", + description: "desc", + content: JSON.stringify(content), + isPublic: false, + }); + // The stringified content in the response is parsed back into an object. + expect(result.content).toEqual(content); + }); + + test("throws with status detail on a non-ok response", () => { + mockFetch({ ok: false, status: 500, statusText: "Server Error", text: "boom" }); + expect(persistWorkflow("tok", 1, "wf", content)).rejects.toThrow("Failed to persist workflow: 500"); + }); +}); + +describe("retrieveWorkflow", () => { + test("GETs /workflow/:wid with bearer auth and parses stringified content", async () => { + const fn = mockFetch({ ok: true, json: { wid: 7, name: "wf", content: JSON.stringify(content) } }); + + const result = await retrieveWorkflow("tok", 7); + + const [url, init] = lastCall(fn); + expect(url).toEndWith("/api/workflow/7"); + expect(init.method).toBe("GET"); + expect((init.headers as Record).Authorization).toBe("Bearer tok"); + expect(result.content).toEqual(content); + }); + + test("leaves an already-parsed content object untouched", async () => { + mockFetch({ ok: true, json: { wid: 7, name: "wf", content } }); + const result = await retrieveWorkflow("tok", 7); + expect(result.content).toEqual(content); + }); + + test("throws with status detail on a non-ok response", () => { + mockFetch({ ok: false, status: 404, statusText: "Not Found", text: "missing" }); + expect(retrieveWorkflow("tok", 7)).rejects.toThrow("Failed to retrieve workflow: 404"); + }); +}); diff --git a/agent-service/src/api/workflow-api.ts b/agent-service/src/api/workflow-api.ts index 7a96f979a1c..39387563b6e 100644 --- a/agent-service/src/api/workflow-api.ts +++ b/agent-service/src/api/workflow-api.ts @@ -18,26 +18,9 @@ */ import { getBackendConfig } from "./backend-api"; -import { createAuthHeaders } from "./auth-api"; +import { createAuthHeaders } from "../auth/jwt"; import type { WorkflowContent } from "../types/workflow"; - -export interface Workflow { - wid: number; - name: string; - description?: string; - content: WorkflowContent; - creationTime?: number; - lastModifiedTime?: number; - isPublished?: boolean; -} - -interface WorkflowPersistRequest { - wid?: number; - name: string; - description?: string; - content: string; - isPublic?: boolean; -} +import type { Workflow, WorkflowPersistRequest } from "../types/dto"; const WORKFLOW_BASE_URL = "workflow"; diff --git a/agent-service/src/auth/jwt.spec.ts b/agent-service/src/auth/jwt.spec.ts new file mode 100644 index 00000000000..4ba403dcb41 --- /dev/null +++ b/agent-service/src/auth/jwt.spec.ts @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test } from "bun:test"; +import { extractUserFromToken, validateToken, extractBearerToken, createAuthHeaders } from "./jwt"; + +// Encode segments as base64url (no padding, `-`/`_` alphabet) to match real JWTs. +function makeToken(payload: Record): string { + const encode = (o: Record) => Buffer.from(JSON.stringify(o)).toString("base64url"); + return `${encode({ alg: "none", typ: "JWT" })}.${encode(payload)}.signature`; +} + +const nowSeconds = () => Math.floor(Date.now() / 1000); + +describe("extractUserFromToken", () => { + test("maps JWT claims onto a UserInfo", () => { + const token = makeToken({ userId: 7, sub: "alice", email: "alice@example.com", role: "ADMIN" }); + expect(extractUserFromToken(token)).toEqual({ uid: 7, name: "alice", email: "alice@example.com", role: "ADMIN" }); + }); + + test("defaults missing email and role", () => { + const token = makeToken({ userId: 1, sub: "bob" }); + expect(extractUserFromToken(token)).toEqual({ uid: 1, name: "bob", email: "", role: "REGULAR" }); + }); + + test("throws on a malformed token", () => { + expect(() => extractUserFromToken("not-a-jwt")).toThrow("Failed to decode JWT"); + }); + + test("decodes a token whose base64url payload contains -/_ characters", () => { + const token = makeToken({ userId: 9, sub: "a~?>>", email: "x@y.z" }); + // Guard that this case stays meaningful: the payload segment must use the url-safe alphabet. + expect(token.split(".")[1]).toMatch(/[-_]/); + expect(extractUserFromToken(token)).toEqual({ uid: 9, name: "a~?>>", email: "x@y.z", role: "REGULAR" }); + }); +}); + +describe("validateToken", () => { + test("accepts a token expiring in the future", () => { + expect(validateToken(makeToken({ sub: "a", exp: nowSeconds() + 3600 }))).toBe(true); + }); + + test("rejects an expired token", () => { + expect(validateToken(makeToken({ sub: "a", exp: nowSeconds() - 3600 }))).toBe(false); + }); + + test("treats a token without exp as valid", () => { + expect(validateToken(makeToken({ sub: "a" }))).toBe(true); + }); + + test("rejects a malformed token", () => { + expect(validateToken("garbage")).toBe(false); + }); +}); + +describe("extractBearerToken", () => { + test("extracts the token from a Bearer header", () => { + expect(extractBearerToken("Bearer abc.def.ghi")).toBe("abc.def.ghi"); + }); + + test("is case-insensitive on the scheme", () => { + expect(extractBearerToken("bearer xyz")).toBe("xyz"); + }); + + test("returns undefined for a non-Bearer scheme", () => { + expect(extractBearerToken("Basic abc")).toBeUndefined(); + }); + + test("returns undefined when the token is missing", () => { + expect(extractBearerToken("Bearer")).toBeUndefined(); + }); + + test("returns undefined for an absent header", () => { + expect(extractBearerToken(undefined)).toBeUndefined(); + }); +}); + +describe("createAuthHeaders", () => { + test("builds bearer auth headers", () => { + expect(createAuthHeaders("tok")).toEqual({ Authorization: "Bearer tok", "Content-Type": "application/json" }); + }); +}); diff --git a/agent-service/src/api/auth-api.ts b/agent-service/src/auth/jwt.ts similarity index 100% rename from agent-service/src/api/auth-api.ts rename to agent-service/src/auth/jwt.ts diff --git a/agent-service/src/config/endpoints.ts b/agent-service/src/config/endpoints.ts new file mode 100644 index 00000000000..cab8c7ff20c --- /dev/null +++ b/agent-service/src/config/endpoints.ts @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { env } from "./env"; + +/** Base URLs of the backend services this agent service talks to. */ +export interface ServiceEndpoints { + apiEndpoint: string; + modelsEndpoint: string; + compileEndpoint: string; + executionEndpoint: string; +} + +const endpoints: ServiceEndpoints = { + apiEndpoint: env.TEXERA_DASHBOARD_SERVICE_ENDPOINT, + modelsEndpoint: env.LLM_ENDPOINT, + compileEndpoint: env.WORKFLOW_COMPILING_SERVICE_ENDPOINT, + executionEndpoint: env.WORKFLOW_EXECUTION_SERVICE_ENDPOINT, +}; + +export function getServiceEndpoints(): ServiceEndpoints { + return { ...endpoints }; +} diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index 030d27b95bf..3d0afc0c85b 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -23,7 +23,7 @@ import { createOpenAI } from "@ai-sdk/openai"; import { TexeraAgent } from "./agent/texera-agent"; import { getVisibleResultHeaders } from "./agent/tools/tools-utility"; import { getBackendConfig } from "./api/backend-api"; -import { extractBearerToken, extractUserFromToken, validateToken } from "./api/auth-api"; +import { extractBearerToken, extractUserFromToken, validateToken } from "./auth/jwt"; import { retrieveWorkflow } from "./api/workflow-api"; import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; import { env } from "./config/env"; diff --git a/agent-service/src/types/agent.ts b/agent-service/src/types/agent.ts index 694b51785fd..03c3524a837 100644 --- a/agent-service/src/types/agent.ts +++ b/agent-service/src/types/agent.ts @@ -17,6 +17,7 @@ * under the License. */ +import type { ModelMessage } from "ai"; import type { WorkflowContent } from "./workflow"; export enum AgentState { @@ -48,15 +49,15 @@ export interface ReActStep { toolCalls?: Array<{ toolName: string; toolCallId: string; - input: any; + input: Record; }>; toolResults?: Array<{ toolCallId: string; - output: any; + output: string; isError?: boolean; }>; usage?: TokenUsage; - inputMessages?: any[]; + inputMessages?: ModelMessage[]; messageSource?: "chat" | "feedback"; beforeWorkflowContent?: WorkflowContent; afterWorkflowContent?: WorkflowContent; diff --git a/agent-service/src/types/dto.ts b/agent-service/src/types/dto.ts new file mode 100644 index 00000000000..f740d930dc1 --- /dev/null +++ b/agent-service/src/types/dto.ts @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// DTOs: request/response bodies exchanged with backend services. Distinct from +// domain types (workflow.ts, execution.ts, agent.ts) which model in-memory +// state, and from ws.ts which carries this service's own WebSocket frames. + +import type { WorkflowContent, OperatorPortSchemaMap } from "./workflow"; + +// --- Dashboard Service: workflow persistence --- + +export interface Workflow { + wid: number; + name: string; + description?: string; + content: WorkflowContent; + creationTime?: number; + lastModifiedTime?: number; + isPublished?: boolean; +} + +export interface WorkflowPersistRequest { + wid?: number; + name: string; + description?: string; + content: string; + isPublic?: boolean; +} + +// --- Workflow Compiling Service --- + +export interface WorkflowFatalError { + // FatalErrorType enum name, e.g. "COMPILATION_ERROR" | "EXECUTION_FAILURE". + type: string; + message: string; + details?: string; + operatorId?: string; + workerId?: string; + timestamp?: { seconds: number; nanos: number }; +} + +export interface WorkflowCompilationResponse { + physicalPlan?: unknown; + operatorOutputSchemas: Record; + operatorErrors: Record; +} diff --git a/agent-service/src/types/index.ts b/agent-service/src/types/index.ts index 498f5a9c9af..64227cc909f 100644 --- a/agent-service/src/types/index.ts +++ b/agent-service/src/types/index.ts @@ -19,5 +19,7 @@ export * from "./workflow"; export * from "./execution"; +export * from "./metadata"; export * from "./agent"; +export * from "./dto"; export * from "./ws"; diff --git a/agent-service/src/types/metadata.ts b/agent-service/src/types/metadata.ts new file mode 100644 index 00000000000..49a3bdeed98 --- /dev/null +++ b/agent-service/src/types/metadata.ts @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Operator metadata shapes served by the Dashboard Service +// (`/api/resources/operator-metadata`) and the compact variants the agent +// derives from them for prompts and validation. + +export interface InputPortInfo { + displayName?: string; + disallowMultiLinks?: boolean; + dependencies?: { id: number; internal: boolean }[]; +} + +export interface OutputPortInfo { + displayName?: string; +} + +export interface OperatorAdditionalMetadata { + userFriendlyName: string; + operatorGroupName: string; + operatorDescription?: string; + inputPorts: InputPortInfo[]; + outputPorts: OutputPortInfo[]; + dynamicInputPorts?: boolean; + dynamicOutputPorts?: boolean; + supportReconfiguration?: boolean; + allowPortCustomization?: boolean; +} + +export interface OperatorSchema { + operatorType: string; + jsonSchema: Record; + additionalMetadata: OperatorAdditionalMetadata; + operatorVersion: string; +} + +export interface GroupInfo { + groupName: string; + children?: GroupInfo[] | null; +} + +export interface OperatorMetadata { + operators: OperatorSchema[]; + groups: GroupInfo[]; +} + +/** Full per-operator schema slice surfaced to debugging/inspection callers. */ +export interface OperatorSchemaInfo { + properties: Record; + required: string[]; + definitions: Record; +} + +/** Reduced operator schema (refs inlined, noise stripped) used in prompts and errors. */ +export interface CompactOperatorSchema { + properties: Record; + required: string[]; +} diff --git a/agent-service/src/types/workflow.ts b/agent-service/src/types/workflow.ts index 52c6493cf5f..241b4d9e83c 100644 --- a/agent-service/src/types/workflow.ts +++ b/agent-service/src/types/workflow.ts @@ -52,7 +52,7 @@ export interface OperatorPredicate { readonly operatorID: string; readonly operatorType: string; readonly operatorVersion: string; - readonly operatorProperties: Record; + readonly operatorProperties: Record; readonly inputPorts: PortDescription[]; readonly outputPorts: PortDescription[]; readonly dynamicInputPorts?: boolean; @@ -67,7 +67,7 @@ export interface OperatorPredicate { export interface LogicalOperator { readonly operatorID: string; readonly operatorType: string; - readonly [key: string]: any; + readonly [key: string]: unknown; } export interface OperatorLink { @@ -131,7 +131,7 @@ export interface OperatorDetail { operatorId: string; operatorType: string; customDisplayName?: string; - operatorProperties: Record; + operatorProperties: Record; inputPorts: PortDescription[]; outputPorts: PortDescription[]; } From 8d739200ce088b581022f498e2443bc08fe763cc Mon Sep 17 00:00:00 2001 From: Jiadong Bai <43344272+bobbai00@users.noreply.github.com> Date: Sun, 28 Jun 2026 16:04:42 -0700 Subject: [PATCH 2/2] save changes --- .../src/agent/tools/result-formatting.spec.ts | 14 +- .../src/agent/tools/result-formatting.ts | 9 +- .../agent/tools/workflow-execution-tools.ts | 34 ++-- agent-service/src/agent/util/context-utils.ts | 2 +- .../agent/util/workflow-system-metadata.ts | 9 +- agent-service/src/api/backend-api.ts | 23 +-- agent-service/src/api/compile-api.spec.ts | 73 +++++-- agent-service/src/api/compile-api.ts | 24 +-- agent-service/src/api/execution-api.ts | 38 ---- agent-service/src/api/index.ts | 1 - agent-service/src/api/workflow-api.ts | 41 ++-- agent-service/src/auth/jwt.ts | 22 ++- agent-service/src/config/endpoints.ts | 11 ++ agent-service/src/server.spec.ts | 1 - agent-service/src/server.ts | 82 ++++---- agent-service/src/types/agent.ts | 11 +- agent-service/src/types/dto.ts | 56 +++++- agent-service/src/types/execution.ts | 20 +- agent-service/src/types/workflow.ts | 13 +- .../agent-interaction.component.html | 30 --- .../agent-interaction.component.ts | 55 ------ .../agent-chat/agent-chat.component.ts | 29 +-- .../react-step-detail-modal.component.html | 64 +----- .../react-step-detail-modal.component.spec.ts | 19 -- .../react-step-detail-modal.component.ts | 14 -- .../workflow-editor.component.html | 3 +- .../workflow-editor.component.ts | 71 +------ .../workspace/service/agent/agent-types.ts | 12 -- .../service/agent/agent.service.spec.ts | 2 +- .../workspace/service/agent/agent.service.ts | 183 ++++++------------ 30 files changed, 349 insertions(+), 617 deletions(-) delete mode 100644 agent-service/src/api/execution-api.ts diff --git a/agent-service/src/agent/tools/result-formatting.spec.ts b/agent-service/src/agent/tools/result-formatting.spec.ts index e6d1afdf2e3..37ae8d7043f 100644 --- a/agent-service/src/agent/tools/result-formatting.spec.ts +++ b/agent-service/src/agent/tools/result-formatting.spec.ts @@ -158,11 +158,11 @@ describe("formatOperatorResult - input port metadata", () => { makeOpInfo({ outputTuples: 1, result: [{ a: 1 }], - inputPortShapes: [{ portIndex: 0, rows: 5, columns: 3 }], + inputPortShapes: [{ portIndex: 0, rows: 5 }], }), EMPTY_STATE ); - expect(out).toContain("Input operator(table shape): input0(5, 3)"); + expect(out).toContain("Input operator(table shape): input0(5 rows)"); }); test("uses upstream operator id when an input link matches the port", () => { @@ -176,11 +176,11 @@ describe("formatOperatorResult - input port metadata", () => { makeOpInfo({ outputTuples: 4, result: [{ a: 1, b: 2 }], - inputPortShapes: [{ portIndex: 0, rows: 10, columns: 2 }], + inputPortShapes: [{ portIndex: 0, rows: 10 }], }), state ); - expect(out).toContain("Input operator(table shape): upstream(10, 2)"); + expect(out).toContain("Input operator(table shape): upstream(10 rows)"); }); test("sorts multiple input ports by portIndex regardless of input order", () => { @@ -197,13 +197,13 @@ describe("formatOperatorResult - input port metadata", () => { outputTuples: 1, result: [{ a: 1 }], inputPortShapes: [ - { portIndex: 1, rows: 2, columns: 2 }, - { portIndex: 0, rows: 1, columns: 1 }, + { portIndex: 1, rows: 2 }, + { portIndex: 0, rows: 1 }, ], }), state ); - expect(out).toContain("Input operator(table shape): up0(1, 1), up1(2, 2)"); + expect(out).toContain("Input operator(table shape): up0(1 rows), up1(2 rows)"); }); }); diff --git a/agent-service/src/agent/tools/result-formatting.ts b/agent-service/src/agent/tools/result-formatting.ts index 5ed4aacc5d4..d1a1277ffa4 100644 --- a/agent-service/src/agent/tools/result-formatting.ts +++ b/agent-service/src/agent/tools/result-formatting.ts @@ -30,14 +30,14 @@ export function formatOperatorResult(operatorId: string, opInfo: OperatorInfo, w return "(no result data)"; } - const jsonArray = opInfo.result as Record[]; + const jsonArray = opInfo.result; const headers = jsonArray.length > 0 ? getVisibleResultHeaders(jsonArray[0]) : []; const columns = headers.length; const isViz = jsonArray.length > 0 && jsonArray[0]["__is_visualization__"] === true; const serializableArray = isViz ? jsonArray.map(row => { - const cleaned: Record = {}; + const cleaned: Record = {}; for (const key of Object.keys(row)) { if (key === "__is_visualization__") continue; if (key === "html-content" || key === "json-content") { @@ -89,14 +89,15 @@ function formatInputOutputMetadata( .sort((a, b) => a.portIndex - b.portIndex) .map(p => { const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; - return `${name}(${p.rows}, ${p.columns})`; + // The backend reports only a row count per input port (no column count). + return `${name}(${p.rows} rows)`; }) .join(", "); return `Input operator(table shape): ${inputPart}\n${outputLine}`; } -function jsonToTableFormat(jsonResult: Record[]): string { +function jsonToTableFormat(jsonResult: Record[]): string { if (!jsonResult || jsonResult.length === 0) return ""; const hasRowIndex = "__row_index__" in jsonResult[0]; diff --git a/agent-service/src/agent/tools/workflow-execution-tools.ts b/agent-service/src/agent/tools/workflow-execution-tools.ts index 78c6cfa3d55..31ab252ee3d 100644 --- a/agent-service/src/agent/tools/workflow-execution-tools.ts +++ b/agent-service/src/agent/tools/workflow-execution-tools.ts @@ -21,10 +21,9 @@ import { z } from "zod"; import { tool } from "ai"; import { createErrorResult, formatExecuteOperatorResult, getVisibleResultHeaders } from "./tools-utility"; import type { WorkflowState } from "../workflow-state"; -import { getBackendConfig } from "../../api/backend-api"; -import { env } from "../../config/env"; -import type { LogicalPlan, LogicalLink } from "../../api/execution-api"; -import type { OperatorInfo, SyncExecutionResult } from "../../types/execution"; +import { executionEndpointFor } from "../../config/endpoints"; +import type { LogicalPlan, LogicalLink, LogicalOperator } from "../../types/workflow"; +import type { OperatorInfo, SyncExecutionResult, SyncExecutionRequest } from "../../types/execution"; import { WorkflowSystemMetadata } from "../util/workflow-system-metadata"; import { DEFAULT_AGENT_SETTINGS } from "../../types/agent"; import { createLogger } from "../../logger"; @@ -87,7 +86,7 @@ interface OperatorValidation { messages: Record; } -function validateOperatorSchema(operatorType: string, operatorProperties: Record): OperatorValidation { +function validateOperatorSchema(operatorType: string, operatorProperties: Record): OperatorValidation { const metadataStore = WorkflowSystemMetadata.getInstance(); const validation = metadataStore.validateOperatorProperties(operatorType, operatorProperties); return validation.isValid ? { isValid: true, messages: {} } : { isValid: false, messages: validation.messages }; @@ -183,7 +182,7 @@ function buildLogicalPlan(workflowState: WorkflowState, opsToViewResult?: string const useSubDAG = opsToViewResult && opsToViewResult.length === 1; const targetOperatorId = useSubDAG ? opsToViewResult[0] : undefined; - let operatorsList: { operatorID: string; operatorType: string; [key: string]: any }[]; + let operatorsList: LogicalOperator[]; let linksList: LogicalLink[]; const getInputPortOrdinal = (operatorID: string, inputPortID: string): number => { @@ -256,23 +255,16 @@ async function executeWorkflowHttp( logicalPlan: LogicalPlan, options: { abortSignal?: AbortSignal } = {} ): Promise { - const backendConfig = getBackendConfig(); - const workflowId = config.workflowId; const computingUnitId = config.computingUnitId ?? 0; - // In k8s each computing unit is a separate pod, so the endpoint varies per cuid. - const executionEndpoint = env.EXECUTION_ENDPOINT_TEMPLATE - ? env.EXECUTION_ENDPOINT_TEMPLATE.replace("{cuid}", String(computingUnitId)) - : backendConfig.executionEndpoint; - - const url = `${executionEndpoint}/api/execution/${workflowId}/${computingUnitId}/run`; + const url = `${executionEndpointFor(computingUnitId)}/api/execution/${workflowId}/${computingUnitId}/run`; const timeoutSeconds = config.executionTimeoutMs ? Math.ceil(config.executionTimeoutMs / 1000) : Math.ceil(DEFAULT_AGENT_SETTINGS.executionTimeoutMs / 1000); - const request = { + const request: SyncExecutionRequest = { executionName: "agent-execution", logicalPlan: { operators: logicalPlan.operators, @@ -355,7 +347,8 @@ function formatInputOutput( .sort((a, b) => a.portIndex - b.portIndex) .map(p => { const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; - return `${name}(${p.rows}, ${p.columns})`; + // The backend reports only a row count per input port (no column count). + return `${name}(${p.rows} rows)`; }) .join(", "); @@ -393,7 +386,7 @@ function formatExecutionError( return lines.join("\n"); } -function jsonToTableFormat(jsonResult: Record[]): string { +function jsonToTableFormat(jsonResult: Record[]): string { if (!jsonResult || jsonResult.length === 0) return ""; const hasRowIndex = jsonResult.length > 0 && "__row_index__" in jsonResult[0]; @@ -471,10 +464,7 @@ export async function executeOperatorAndFormat( }); if (!result.success) { - const compilationErrors = - result.state === "CompilationFailed" || result.state === "ValidationFailed" - ? result.compilationErrors - : undefined; + const compilationErrors = result.state === "CompilationFailed" ? result.compilationErrors : undefined; const operatorErrors = result.state === "Failed" @@ -519,7 +509,7 @@ export async function executeOperatorAndFormat( return "(no result data)"; } - const jsonArray = opInfo.result as Record[]; + const jsonArray = opInfo.result; const headers = jsonArray.length > 0 ? getVisibleResultHeaders(jsonArray[0]) : []; const columns = headers.length; diff --git a/agent-service/src/agent/util/context-utils.ts b/agent-service/src/agent/util/context-utils.ts index 04e801e3aa7..60cc0b6d0b7 100644 --- a/agent-service/src/agent/util/context-utils.ts +++ b/agent-service/src/agent/util/context-utils.ts @@ -176,7 +176,7 @@ function serializeDag( ); const outputSchemas = compilationResult?.operatorOutputSchemas ?? {}; - const compilationErrors = compilationResult?.operatorErrors ?? {}; + const compilationErrors = compilationResult?.type === "failure" ? compilationResult.operatorErrors : {}; lines.push("## Operators"); lines.push(""); diff --git a/agent-service/src/agent/util/workflow-system-metadata.ts b/agent-service/src/agent/util/workflow-system-metadata.ts index da4fcdf3630..cbf85f068c5 100644 --- a/agent-service/src/agent/util/workflow-system-metadata.ts +++ b/agent-service/src/agent/util/workflow-system-metadata.ts @@ -20,7 +20,7 @@ import Ajv from "ajv"; import { fetchOperatorMetadata, type OperatorSchema, type OperatorMetadata } from "../../api/backend-api"; import type { ValidationError, Validation } from "../../types/workflow"; -import type { OperatorSchemaInfo, CompactOperatorSchema } from "../../types/metadata"; +import type { OperatorSchemaInfo, CompactOperatorSchema, OperatorAdditionalMetadata } from "../../types/metadata"; import { createLogger } from "../../logger"; const log = createLogger("WorkflowSystemMetadata"); @@ -134,9 +134,10 @@ export class WorkflowSystemMetadata { return instance; } + // jsonSchema blobs are dynamically traversed/inlined, so they stay loosely typed. private schemas: Map = new Map(); private descriptions: Map = new Map(); - private additionalMetadata: Map = new Map(); + private additionalMetadata: Map = new Map(); private initialized = false; async initializeFromBackend(): Promise { @@ -174,7 +175,7 @@ export class WorkflowSystemMetadata { return this.descriptions.get(operatorType) || ""; } - getAdditionalMetadata(operatorType: string): any | undefined { + getAdditionalMetadata(operatorType: string): OperatorAdditionalMetadata | undefined { return this.additionalMetadata.get(operatorType); } @@ -212,7 +213,7 @@ export class WorkflowSystemMetadata { return this.schemas.has(operatorType); } - validateOperatorProperties(operatorType: string, properties: Record): Validation { + validateOperatorProperties(operatorType: string, properties: Record): Validation { const schema = this.schemas.get(operatorType); if (!schema) { return { isValid: false, messages: { error: `Unknown operator type: ${operatorType}` } }; diff --git a/agent-service/src/api/backend-api.ts b/agent-service/src/api/backend-api.ts index 56e5ada4d64..206960339ec 100644 --- a/agent-service/src/api/backend-api.ts +++ b/agent-service/src/api/backend-api.ts @@ -17,7 +17,7 @@ * under the License. */ -import { env } from "../config/env"; +import { getServiceEndpoints } from "../config/endpoints"; import type { OperatorMetadata } from "../types/metadata"; export type { @@ -29,26 +29,9 @@ export type { OperatorMetadata, } from "../types/metadata"; -interface BackendConfig { - apiEndpoint: string; - modelsEndpoint: string; - compileEndpoint: string; - executionEndpoint: string; -} - -const currentConfig: BackendConfig = { - apiEndpoint: env.TEXERA_DASHBOARD_SERVICE_ENDPOINT, - modelsEndpoint: env.LLM_ENDPOINT, - compileEndpoint: env.WORKFLOW_COMPILING_SERVICE_ENDPOINT, - executionEndpoint: env.WORKFLOW_EXECUTION_SERVICE_ENDPOINT, -}; - -export function getBackendConfig(): BackendConfig { - return { ...currentConfig }; -} - export async function fetchOperatorMetadata(): Promise { - const url = `${currentConfig.apiEndpoint}/api/resources/operator-metadata`; + const { apiEndpoint } = getServiceEndpoints(); + const url = `${apiEndpoint}/api/resources/operator-metadata`; const response = await fetch(url); if (!response.ok) { diff --git a/agent-service/src/api/compile-api.spec.ts b/agent-service/src/api/compile-api.spec.ts index 270463db20a..3a5f0cba752 100644 --- a/agent-service/src/api/compile-api.spec.ts +++ b/agent-service/src/api/compile-api.spec.ts @@ -32,32 +32,22 @@ const plan: LogicalPlan = { }; describe("compileWorkflowAsync", () => { - test("POSTs to /api/compile and returns the parsed compilation response", async () => { - // operatorErrors uses the proto-accurate WorkflowFatalError shape (type is the enum name string). - const responseBody = { - physicalPlan: { nodes: [] }, - operatorOutputSchemas: {}, - operatorErrors: { - op1: { - type: "COMPILATION_ERROR", - message: "bad attribute", - details: "stack", - operatorId: "op1", - workerId: "", - timestamp: { seconds: 1, nanos: 0 }, - }, - }, - }; + function mockJson(body: unknown) { const fn = mock(async () => ({ ok: true, status: 200, statusText: "OK", - json: async () => responseBody, + json: async () => body, text: async () => "", })); globalThis.fetch = fn as unknown as typeof fetch; + return fn; + } - const result = await compileWorkflowAsync(plan); + test("POSTs the logical plan to /api/compile", async () => { + const fn = mockJson({ type: "success", physicalPlan: { nodes: [] }, operatorOutputSchemas: {} }); + + await compileWorkflowAsync(plan); const [url, init] = fn.mock.calls[0] as unknown as [string, RequestInit]; expect(url).toEndWith("/api/compile"); @@ -68,9 +58,52 @@ describe("compileWorkflowAsync", () => { opsToReuseResult: [], opsToViewResult: [], }); + }); + + test("parses a success response", async () => { + mockJson({ type: "success", physicalPlan: { nodes: [] }, operatorOutputSchemas: {} }); + + const result = await compileWorkflowAsync(plan); + expect(result).not.toBeNull(); - expect(result!.operatorErrors.op1.type).toBe("COMPILATION_ERROR"); - expect(result!.operatorErrors.op1.message).toBe("bad attribute"); + const compiled = result!; + expect(compiled.type).toBe("success"); + if (compiled.type === "success") { + expect(compiled.physicalPlan).toEqual({ nodes: [] }); + } + }); + + test("parses a failure response with proto-accurate operator errors", async () => { + // operatorErrors uses the WorkflowFatalError shape (type is the enum name string). + mockJson({ + type: "failure", + operatorOutputSchemas: {}, + operatorErrors: { + op1: { + type: "COMPILATION_ERROR", + message: "bad attribute", + details: "stack", + operatorId: "op1", + workerId: "", + timestamp: { seconds: 1, nanos: 0 }, + }, + }, + }); + + const result = await compileWorkflowAsync(plan); + + expect(result).not.toBeNull(); + const compiled = result!; + expect(compiled.type).toBe("failure"); + if (compiled.type === "failure") { + expect(compiled.operatorErrors.op1.type).toBe("COMPILATION_ERROR"); + expect(compiled.operatorErrors.op1.message).toBe("bad attribute"); + } + }); + + test("returns null on an unrecognized response shape", async () => { + mockJson({ somethingElse: true }); + expect(await compileWorkflowAsync(plan)).toBeNull(); }); test("returns null on a non-ok response", async () => { diff --git a/agent-service/src/api/compile-api.ts b/agent-service/src/api/compile-api.ts index defd02344a1..5921b4302cb 100644 --- a/agent-service/src/api/compile-api.ts +++ b/agent-service/src/api/compile-api.ts @@ -17,23 +17,16 @@ * under the License. */ -import { getBackendConfig } from "./backend-api"; +import { getServiceEndpoints } from "../config/endpoints"; import type { LogicalPlan } from "../types/workflow"; import type { WorkflowCompilationResponse } from "../types/dto"; import { createLogger } from "../logger"; const log = createLogger("CompileAPI"); -export interface SchemaAttribute { - attributeName: string; - attributeType: "string" | "integer" | "double" | "boolean" | "long" | "timestamp" | "binary"; -} - -export type PortSchema = ReadonlyArray; - export async function compileWorkflowAsync(logicalPlan: LogicalPlan): Promise { - const config = getBackendConfig(); - const url = `${config.compileEndpoint}/api/compile`; + const { compileEndpoint } = getServiceEndpoints(); + const url = `${compileEndpoint}/api/compile`; const body = { operators: logicalPlan.operators, @@ -55,7 +48,16 @@ export async function compileWorkflowAsync(logicalPlan: LogicalPlan): Promise { - const config = getBackendConfig(); - const url = `${config.apiEndpoint}/api/${WORKFLOW_BASE_URL}/persist`; + const { apiEndpoint } = getServiceEndpoints(); + const url = `${apiEndpoint}/api/${WORKFLOW_BASE_URL}/persist`; const response = await fetch(url, { method: "POST", @@ -51,16 +68,12 @@ export async function persistWorkflow( throw new Error(`Failed to persist workflow: ${response.status} ${response.statusText} - ${errorText}`); } - const data = (await response.json()) as Workflow; - if (typeof data.content === "string") { - data.content = JSON.parse(data.content as unknown as string); - } - return data; + return toWorkflow((await response.json()) as WorkflowPojo); } export async function retrieveWorkflow(token: string, wid: number): Promise { - const config = getBackendConfig(); - const url = `${config.apiEndpoint}/api/${WORKFLOW_BASE_URL}/${wid}`; + const { apiEndpoint } = getServiceEndpoints(); + const url = `${apiEndpoint}/api/${WORKFLOW_BASE_URL}/${wid}`; const response = await fetch(url, { method: "GET", @@ -72,9 +85,5 @@ export async function retrieveWorkflow(token: string, wid: number): Promise { warnings: [], consoleLogs: [], totalRowCount: 2, - resultStatistics: {}, }, }, ], diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index 3d0afc0c85b..d606f826044 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -22,18 +22,20 @@ import { cors } from "@elysiajs/cors"; import { createOpenAI } from "@ai-sdk/openai"; import { TexeraAgent } from "./agent/texera-agent"; import { getVisibleResultHeaders } from "./agent/tools/tools-utility"; -import { getBackendConfig } from "./api/backend-api"; import { extractBearerToken, extractUserFromToken, validateToken } from "./auth/jwt"; import { retrieveWorkflow } from "./api/workflow-api"; import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; import { env } from "./config/env"; +import { getServiceEndpoints } from "./config/endpoints"; import { createLogger } from "./logger"; const log = createLogger("Server"); const wsLog = createLogger("WS"); import type { AgentInfo, + AgentDetail, AgentDelegateConfig, + AgentSettings, CreateAgentRequest, UpdateAgentSettingsRequest, AgentSettingsApi, @@ -43,6 +45,7 @@ import { AgentState, OperatorResultSerializationMode } from "./types/agent"; import type { WsClientCommand, WsServerEvent } from "./types/ws"; import { WsServerSnapshotEvent, WsServerStepEvent, WsServerStatusEvent, WsServerErrorEvent } from "./types/ws"; import type { OperatorResultSummary } from "./types/execution"; +import type { ErrorResponse } from "./types/dto"; const agentStore = new Map(); let agentCounter = 0; @@ -53,10 +56,10 @@ async function createAgentInstance( customName?: string ): Promise<{ agentId: string; agent: TexeraAgent }> { const agentId = `agent-${++agentCounter}`; - const config = getBackendConfig(); + const { modelsEndpoint } = getServiceEndpoints(); const openai = createOpenAI({ - baseURL: `${config.modelsEndpoint}/api`, + baseURL: `${modelsEndpoint}/api`, // The LLM gateway (access-control-service) enforces a REGULAR/ADMIN-role // JWT (apache/texera#5421) and injects the LiteLLM master key downstream, // so the delegating user's JWT is the only credential this service sends. @@ -102,19 +105,26 @@ async function createAgentInstance( return { agentId, agent }; } -function getAgentInfo(agentId: string, agent: TexeraAgent): AgentInfo { - const agentSettings = agent.getSettings(); - const settingsApi: AgentSettingsApi = { - maxOperatorResultCharLimit: agentSettings.maxOperatorResultCharLimit, - maxOperatorResultCellCharLimit: agentSettings.maxOperatorResultCellCharLimit, - operatorResultSerializationMode: agentSettings.operatorResultSerializationMode, - toolTimeoutSeconds: Math.round(agentSettings.toolTimeoutMs / 1000), - executionTimeoutMinutes: Math.round(agentSettings.executionTimeoutMs / 60000), - disabledTools: Array.from(agentSettings.disabledTools), - maxSteps: agentSettings.maxSteps, - allowedOperatorTypes: agentSettings.allowedOperatorTypes, +// Project the internal AgentSettings onto the API/wire shape (ms -> s/min, Set +// -> array). Single source for every route that returns settings. +function toAgentSettingsApi(settings: AgentSettings): AgentSettingsApi { + return { + maxOperatorResultCharLimit: settings.maxOperatorResultCharLimit, + maxOperatorResultCellCharLimit: settings.maxOperatorResultCellCharLimit, + operatorResultSerializationMode: settings.operatorResultSerializationMode, + toolTimeoutSeconds: Math.round(settings.toolTimeoutMs / 1000), + executionTimeoutMinutes: Math.round(settings.executionTimeoutMs / 60000), + disabledTools: Array.from(settings.disabledTools), + maxSteps: settings.maxSteps, + allowedOperatorTypes: settings.allowedOperatorTypes, }; +} + +function errorResponse(message: string): ErrorResponse { + return { error: message }; +} +function getAgentInfo(agentId: string, agent: TexeraAgent): AgentInfo { const delegateConfig = agent.getDelegateConfig(); return { @@ -132,7 +142,7 @@ function getAgentInfo(agentId: string, agent: TexeraAgent): AgentInfo { computingUnitId: delegateConfig.computingUnitId, } : undefined, - settings: settingsApi, + settings: toAgentSettingsApi(agent.getSettings()), }; } @@ -162,10 +172,10 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) // Body schema violations and malformed JSON are client errors, not 500s. if (code === "VALIDATION" || code === "PARSE") { set.status = 400; - return { error: errorMessage || "Invalid request body" }; + return errorResponse(errorMessage || "Invalid request body"); } set.status = ERROR_STATUS[errorMessage] ?? 500; - return { error: errorMessage || "Internal server error" }; + return errorResponse(errorMessage || "Internal server error"); }) .get("/", () => { const agentList = Array.from(agentStore.entries()).map(([id, agent]) => getAgentInfo(id, agent)); @@ -255,14 +265,14 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) ...getAgentInfo(id, agent), workflow: agent.getWorkflowState().getWorkflowContent(), stepCount: agent.getReActSteps().length, - }; + } satisfies AgentDetail; }) .delete("/:id", ({ params: { id }, set }) => { const agent = agentStore.get(id); if (!agent) { set.status = 404; - return { error: "Agent not found" }; + return errorResponse("Agent not found"); } agent.destroy(); @@ -320,17 +330,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) .get("/:id/settings", ({ params: { id } }) => { const agent = getAgent(id); - const agentSettings = agent.getSettings(); - return { - maxOperatorResultCharLimit: agentSettings.maxOperatorResultCharLimit, - maxOperatorResultCellCharLimit: agentSettings.maxOperatorResultCellCharLimit, - operatorResultSerializationMode: agentSettings.operatorResultSerializationMode, - toolTimeoutSeconds: Math.round(agentSettings.toolTimeoutMs / 1000), - executionTimeoutMinutes: Math.round(agentSettings.executionTimeoutMs / 60000), - disabledTools: Array.from(agentSettings.disabledTools), - maxSteps: agentSettings.maxSteps, - allowedOperatorTypes: agentSettings.allowedOperatorTypes, - }; + return toAgentSettingsApi(agent.getSettings()); }) .patch( @@ -362,17 +362,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) allowedOperatorTypes: settings.allowedOperatorTypes, }); - const agentSettings = agent.getSettings(); - return { - maxOperatorResultCharLimit: agentSettings.maxOperatorResultCharLimit, - maxOperatorResultCellCharLimit: agentSettings.maxOperatorResultCellCharLimit, - operatorResultSerializationMode: agentSettings.operatorResultSerializationMode, - toolTimeoutSeconds: Math.round(agentSettings.toolTimeoutMs / 1000), - executionTimeoutMinutes: Math.round(agentSettings.executionTimeoutMs / 60000), - disabledTools: Array.from(agentSettings.disabledTools), - maxSteps: agentSettings.maxSteps, - allowedOperatorTypes: agentSettings.allowedOperatorTypes, - }; + return toAgentSettingsApi(agent.getSettings()); }, { body: t.Object({ @@ -405,7 +395,6 @@ function getOperatorResultSummaries(agent: TexeraAgent): Record) { console.log(""); console.log("Environment:"); - console.log(` LLM_ENDPOINT: ${getBackendConfig().modelsEndpoint}`); - console.log(` WORKFLOW_COMPILING_SERVICE_ENDPOINT: ${getBackendConfig().compileEndpoint}`); - console.log(` TEXERA_DASHBOARD_SERVICE_ENDPOINT: ${getBackendConfig().apiEndpoint}`); + const endpoints = getServiceEndpoints(); + console.log(` LLM_ENDPOINT: ${endpoints.modelsEndpoint}`); + console.log(` WORKFLOW_COMPILING_SERVICE_ENDPOINT: ${endpoints.compileEndpoint}`); + console.log(` TEXERA_DASHBOARD_SERVICE_ENDPOINT: ${endpoints.apiEndpoint}`); console.log(""); console.log("Features:"); console.log(" - Auto-persistence with debounce (500ms)"); diff --git a/agent-service/src/types/agent.ts b/agent-service/src/types/agent.ts index 03c3524a837..d599014a8f3 100644 --- a/agent-service/src/types/agent.ts +++ b/agent-service/src/types/agent.ts @@ -109,11 +109,14 @@ export const DEFAULT_AGENT_SETTINGS: Omit = { ], }; +/** Mirrors the backend UserRoleEnum. */ +export type UserRole = "INACTIVE" | "RESTRICTED" | "REGULAR" | "ADMIN"; + export interface UserInfo { uid: number; name: string; email: string; - role: string; + role: UserRole; } export interface AgentDelegateConfig { @@ -145,6 +148,12 @@ export interface AgentInfo { settings?: AgentSettingsApi; } +/** `GET /agents/:id` response: an AgentInfo enriched with its loaded workflow. */ +export interface AgentDetail extends AgentInfo { + workflow?: WorkflowContent; + stepCount: number; +} + export interface CreateAgentRequest { modelType: string; name?: string; diff --git a/agent-service/src/types/dto.ts b/agent-service/src/types/dto.ts index f740d930dc1..c16f465cfd3 100644 --- a/agent-service/src/types/dto.ts +++ b/agent-service/src/types/dto.ts @@ -25,6 +25,12 @@ import type { WorkflowContent, OperatorPortSchemaMap } from "./workflow"; // --- Dashboard Service: workflow persistence --- +/** + * Parsed, in-memory workflow returned by the workflow client functions. The + * backend serializes `content` as a JSON string; the client decodes it into a + * WorkflowContent before returning, so this is distinct from the raw wire + * shapes below. + */ export interface Workflow { wid: number; name: string; @@ -32,7 +38,35 @@ export interface Workflow { content: WorkflowContent; creationTime?: number; lastModifiedTime?: number; +} + +/** + * Raw JOOQ Workflow POJO returned by `POST /workflow/persist`. `content` is a + * JSON string and the published flag is `isPublic` (the column name). + */ +export interface WorkflowPojo { + wid: number; + name: string; + description?: string; + content: string; + creationTime?: number; + lastModifiedTime?: number; + isPublic?: boolean; +} + +/** + * `GET /workflow/{wid}` response wrapper. `content` is a JSON string and the + * published flag is renamed to `isPublished`; it also adds `readonly`. + */ +export interface WorkflowWithPrivilege { + wid: number; + name: string; + description?: string; + content: string; + creationTime?: number; + lastModifiedTime?: number; isPublished?: boolean; + readonly?: boolean; } export interface WorkflowPersistRequest { @@ -55,8 +89,26 @@ export interface WorkflowFatalError { timestamp?: { seconds: number; nanos: number }; } -export interface WorkflowCompilationResponse { - physicalPlan?: unknown; +// `POST /api/compile` returns a Jackson polymorphic type discriminated by +// `type`: a success carries the physical plan, a failure carries per-operator +// errors. Both carry the output schemas computed so far. +export interface WorkflowCompilationSuccess { + type: "success"; + physicalPlan: unknown; operatorOutputSchemas: Record; +} + +export interface WorkflowCompilationFailure { + type: "failure"; operatorErrors: Record; + operatorOutputSchemas: Record; +} + +export type WorkflowCompilationResponse = WorkflowCompilationSuccess | WorkflowCompilationFailure; + +// --- Shared HTTP envelopes --- + +/** Error body returned by the agent-service REST routes. */ +export interface ErrorResponse { + error: string; } diff --git a/agent-service/src/types/execution.ts b/agent-service/src/types/execution.ts index d638a889d47..bb6ea05fc65 100644 --- a/agent-service/src/types/execution.ts +++ b/agent-service/src/types/execution.ts @@ -17,15 +17,17 @@ * under the License. */ +import type { LogicalPlan, WorkflowSettings } from "./workflow"; + interface ConsoleMessage { msgType: string; + title: string; message: string; } interface PortShape { portIndex: number; rows: number; - columns: number; } export interface OperatorInfo { @@ -34,14 +36,25 @@ export interface OperatorInfo { outputTuples: number; inputPortShapes?: PortShape[]; resultMode: string; - result?: Record[]; + result?: Record[]; totalRowCount?: number; displayedRows?: number; truncated?: boolean; consoleLogs?: ConsoleMessage[]; error?: string; warnings?: string[]; - resultStatistics?: Record; +} + +/** `POST /api/execution/{wid}/{cuid}/run` request body (SyncExecutionRequest). */ +export interface SyncExecutionRequest { + executionName: string; + logicalPlan: LogicalPlan; + // Optional on the backend (Scala Option); the agent omits it. + workflowSettings?: WorkflowSettings; + targetOperatorIds: string[]; + timeoutSeconds: number; + maxOperatorResultCharLimit: number; + maxOperatorResultCellCharLimit: number; } export interface SyncExecutionResult { @@ -68,5 +81,4 @@ export interface OperatorResultSummary { consoleLogCount?: number; totalRowCount?: number; sampleRecords?: Record[]; - resultStatistics?: Record; } diff --git a/agent-service/src/types/workflow.ts b/agent-service/src/types/workflow.ts index 241b4d9e83c..1e5e64bfffe 100644 --- a/agent-service/src/types/workflow.ts +++ b/agent-service/src/types/workflow.ts @@ -116,7 +116,18 @@ export interface WorkflowContent { readonly settings: WorkflowSettings; } -type AttributeType = "string" | "integer" | "double" | "boolean" | "long" | "timestamp" | "binary"; +// Mirrors the backend AttributeType enum (its Jackson @JsonValue strings). ANY +// serializes as "" and is not emitted in compiled output schemas. +type AttributeType = + | "string" + | "integer" + | "double" + | "boolean" + | "long" + | "timestamp" + | "binary" + | "large_binary" + | ""; export interface SchemaAttribute { readonly attributeName: string; diff --git a/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html b/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html index cd2f1e0f52a..d0a1e083ffa 100644 --- a/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html +++ b/frontend/src/app/workspace/component/agent/agent-interaction/agent-interaction.component.html @@ -55,36 +55,6 @@ - -
-
- - Column Statistics -
-
-
-
- {{ col.column }} - {{ col.dataType }} -
-
- - {{ s.key }}: {{ s.value }} - -
-
-
-
-
[]; - @Input() resultStatistics?: Record; public availableAgents: Array<{ id: string; name: string; isConnected: boolean }> = []; public selectedAgentId: string | null = null; @@ -210,60 +209,6 @@ export class AgentInteractionComponent implements OnInit, OnChanges { return col; } - /** - * Parse resultStatistics into displayable column stats. - * Each entry in resultStatistics is a JSON string with { data_type, statistics: { ... } }. - */ - public getParsedColumnStats(): Array<{ - column: string; - dataType: string; - stats: Array<{ key: string; value: string }>; - }> { - if (!this.resultStatistics) return []; - const sampleCols = this.getSampleColumns().filter(c => !c.startsWith("_") || !c.includes("row_index")); - const columns = sampleCols.length > 0 ? sampleCols : Object.keys(this.resultStatistics); - const result: Array<{ column: string; dataType: string; stats: Array<{ key: string; value: string }> }> = []; - const excludedKeys = new Set(["count", "std", "p25", "median", "p75"]); - - for (const colName of columns) { - const statsJson = this.resultStatistics[colName]; - if (!statsJson) continue; - try { - const parsed = JSON.parse(statsJson); - const dataType: string = parsed.data_type ?? "unknown"; - const statistics: Record = parsed.statistics ?? {}; - const statEntries: Array<{ key: string; value: string }> = []; - - for (const [key, value] of Object.entries(statistics)) { - if (value === undefined || excludedKeys.has(key)) continue; - if (key === "top_10" && typeof value === "object") { - const topEntries = Object.entries(value as Record) - .slice(0, 5) - .map(([k, v]) => `${k}: ${v}`) - .join(", "); - statEntries.push({ key: "top values", value: topEntries }); - } else if (value === null || String(value) === "null") { - statEntries.push({ key, value: "NaN" }); - } else if (typeof value !== "object") { - const formatted = - typeof value === "number" && !Number.isInteger(value) - ? Number(value.toPrecision(4)).toString() - : String(value); - statEntries.push({ key, value: formatted }); - } - } - result.push({ column: colName, dataType, stats: statEntries }); - } catch { - // skip unparseable - } - } - return result; - } - - public hasColumnStats(): boolean { - return this.getParsedColumnStats().length > 0; - } - public getDisplayRows(): Array<{ record?: Record; isEllipsis: boolean }> { if (!this.sampleRecords || this.sampleRecords.length === 0) return []; const rowIndexKey = Object.keys(this.sampleRecords[0]).find(k => k.startsWith("_") && k.includes("row_index")); diff --git a/frontend/src/app/workspace/component/agent/agent-panel/agent-chat/agent-chat.component.ts b/frontend/src/app/workspace/component/agent/agent-panel/agent-chat/agent-chat.component.ts index 55b6c6a3f66..5dd6c890a80 100644 --- a/frontend/src/app/workspace/component/agent/agent-panel/agent-chat/agent-chat.component.ts +++ b/frontend/src/app/workspace/component/agent/agent-panel/agent-chat/agent-chat.component.ts @@ -320,8 +320,6 @@ export class AgentChatComponent implements OnInit, AfterViewChecked, OnDestroy, } this.hoveredMessageIndex = index; - const hoveredStep = index !== null && index >= 0 ? this.visibleSteps[index] : null; - this.agentService.setHoveredMessage(this.agentInfo.id, hoveredStep); } public showResponseDetails(response: ReActStep): void { @@ -385,20 +383,6 @@ export class AgentChatComponent implements OnInit, AfterViewChecked, OnDestroy, return toolResult.output || toolResult.result || toolResult; } - public getToolOperatorAccess( - response: ReActStep, - toolCallIndex: number - ): { viewedOperatorIds: string[]; modifiedOperatorIds: string[] } | null { - if (!response.operatorAccess) { - return null; - } - return response.operatorAccess.get(toolCallIndex) || null; - } - - public hasOperatorAccess(response: ReActStep): boolean { - return !!response.operatorAccess && response.operatorAccess.size > 0; - } - public sendMessage(): void { if (!this.currentMessage.trim() || !this.canSendMessage()) { return; @@ -505,18 +489,7 @@ export class AgentChatComponent implements OnInit, AfterViewChecked, OnDestroy, .pipe(untilDestroyed(this)) .subscribe({ next: (steps: ReActStep[]) => { - // Convert steps to plain objects (handle Map -> object for operatorAccess) - const exportSteps = steps.map(step => { - const plain: any = { ...step }; - if (step.operatorAccess) { - const accessObj: Record = {}; - step.operatorAccess.forEach((value, key) => { - accessObj[key] = value; - }); - plain.operatorAccess = accessObj; - } - return plain; - }); + const exportSteps = steps.map(step => ({ ...step })); const exportData = { agentId: this.agentInfo.id, diff --git a/frontend/src/app/workspace/component/agent/agent-panel/react-step-detail-modal/react-step-detail-modal.component.html b/frontend/src/app/workspace/component/agent/agent-panel/react-step-detail-modal/react-step-detail-modal.component.html index 4a0973e7ac8..0bebcf1043a 100644 --- a/frontend/src/app/workspace/component/agent/agent-panel/react-step-detail-modal/react-step-detail-modal.component.html +++ b/frontend/src/app/workspace/component/agent/agent-panel/react-step-detail-modal/react-step-detail-modal.component.html @@ -351,67 +351,6 @@

{{ formatResult(getToolResult(step, idx)) }}

-
-
- Operator Access: -
-
-
- - - VIEWED: - - - {{ opId }} - -
-
- - - ADDED: - - - {{ opId }} - -
-
- - - MODIFIED: - - - {{ opId }} - -
-
-
@@ -422,8 +361,7 @@

(!step.content || step.content.trim().length === 0) && !step.usage && (!step.inputMessages || step.inputMessages.length === 0) && - (!step.toolCalls || step.toolCalls.length === 0) && - !hasOperatorAccess(step) + (!step.toolCalls || step.toolCalls.length === 0) " style="text-align: center; padding: 40px; color: #8c8c8c"> { }); }); - describe("getToolOperatorAccess / hasOperatorAccess", () => { - it("returns null when the step has no operator access", () => { - expect(component.getToolOperatorAccess({} as any, 0)).toBeNull(); - expect(component.hasOperatorAccess({} as any)).toBe(false); - }); - - it("returns the access entry for a tool-call index", () => { - const access = { viewedOperatorIds: ["v"], addedOperatorIds: [], modifiedOperatorIds: [] }; - const step = { operatorAccess: new Map([[0, access]]) } as any; - expect(component.getToolOperatorAccess(step, 0)).toBe(access); - expect(component.getToolOperatorAccess(step, 1)).toBeNull(); - expect(component.hasOperatorAccess(step)).toBe(true); - }); - - it("reports no access for an empty map", () => { - expect(component.hasOperatorAccess({ operatorAccess: new Map() } as any)).toBe(false); - }); - }); - describe("getMessageRoleColor", () => { it("maps known roles and falls back to default", () => { expect(component.getMessageRoleColor("user")).toBe("blue"); diff --git a/frontend/src/app/workspace/component/agent/agent-panel/react-step-detail-modal/react-step-detail-modal.component.ts b/frontend/src/app/workspace/component/agent/agent-panel/react-step-detail-modal/react-step-detail-modal.component.ts index b9a08a19f49..e5bf457b0d0 100644 --- a/frontend/src/app/workspace/component/agent/agent-panel/react-step-detail-modal/react-step-detail-modal.component.ts +++ b/frontend/src/app/workspace/component/agent/agent-panel/react-step-detail-modal/react-step-detail-modal.component.ts @@ -86,20 +86,6 @@ export class ReActStepDetailModalComponent { return toolResult.output || toolResult.result || toolResult; } - public getToolOperatorAccess( - step: ReActStep, - toolCallIndex: number - ): { viewedOperatorIds: string[]; addedOperatorIds: string[]; modifiedOperatorIds: string[] } | null { - if (!step.operatorAccess) { - return null; - } - return step.operatorAccess.get(toolCallIndex) || null; - } - - public hasOperatorAccess(step: ReActStep): boolean { - return !!step.operatorAccess && step.operatorAccess.size > 0; - } - /** * Get tag color for a message role. */ diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.html b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.html index ed0b7cc748b..b5f9cfef9af 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.html +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.html @@ -40,8 +40,7 @@ + [sampleRecords]="getOperatorSampleRecords(chatPopoverOperator.operatorId)"> diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index 230b867c11e..066579d45a9 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -190,7 +190,6 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy this.handleOperatorStatisticsUpdate(); this.handleRegionEvents(); this.handleOperatorSuggestionHighlightEvent(); - this.handleAgentHoverHighlight(); this.handleElementDelete(); this.handleElementSelectAll(); this.handleElementCopy(); @@ -1518,70 +1517,6 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy }); } - /** - * Handle agent hover highlighting to show "viewed", "added", and "modified" labels on operators - */ - private handleAgentHoverHighlight(): void { - const setupAgentHoverSubscription = () => { - this.agentService - .getAllAgents() - .pipe(untilDestroyed(this)) - .subscribe(agents => { - agents.forEach(agent => { - // Subscribe to each agent's hover operators stream - this.agentService - .getHoveredMessageOperatorsObservable(agent.id) - .pipe(untilDestroyed(this)) - .subscribe(({ viewedOperatorIds, addedOperatorIds, modifiedOperatorIds }) => { - // Clear all previous labels first - this.clearAllAgentActionLabels(); - - // Show "viewed" labels on viewed operators - viewedOperatorIds.forEach(operatorId => { - if (this.workflowActionService.getTexeraGraph().hasOperator(operatorId)) { - this.jointUIService.showAgentActionLabel(this.paper, operatorId, "viewed", agent.name); - } - }); - - // Show "added" labels on added operators - addedOperatorIds.forEach(operatorId => { - if (this.workflowActionService.getTexeraGraph().hasOperator(operatorId)) { - this.jointUIService.showAgentActionLabel(this.paper, operatorId, "added", agent.name); - } - }); - - // Show "modified" labels on modified operators - modifiedOperatorIds.forEach(operatorId => { - if (this.workflowActionService.getTexeraGraph().hasOperator(operatorId)) { - this.jointUIService.showAgentActionLabel(this.paper, operatorId, "modified", agent.name); - } - }); - }); - }); - }); - }; - - // Subscribe to agent changes to set up hover subscriptions - this.agentService.agentChange$.pipe(untilDestroyed(this)).subscribe(() => { - setupAgentHoverSubscription(); - }); - - // Initial setup - setupAgentHoverSubscription(); - } - - /** - * Clear all agent action labels from all operators - */ - private clearAllAgentActionLabels(): void { - this.workflowActionService - .getTexeraGraph() - .getAllOperators() - .forEach(op => { - this.jointUIService.hideAgentActionLabel(this.paper, op.operatorID); - }); - } - /** * Handle the chat button click on operators. * Opens a chat popover for the operator to interact with agents. @@ -1696,14 +1631,10 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy this.changeDetectorRef.detectChanges(); } - getOperatorSampleRecords(operatorId: string): Record[] | undefined { + getOperatorSampleRecords(operatorId: string): Record[] | undefined { return this.operatorSummaries.get(operatorId)?.sampleRecords; } - getOperatorResultStatistics(operatorId: string): Record | undefined { - return this.operatorSummaries.get(operatorId)?.resultStatistics; - } - isOperatorVisualization(operatorId: string): boolean { return this.operatorSummaries.get(operatorId)?.sampleRecords?.[0]?.["__is_visualization__"] === true; } diff --git a/frontend/src/app/workspace/service/agent/agent-types.ts b/frontend/src/app/workspace/service/agent/agent-types.ts index c687de472a2..05531c84540 100644 --- a/frontend/src/app/workspace/service/agent/agent-types.ts +++ b/frontend/src/app/workspace/service/agent/agent-types.ts @@ -22,16 +22,6 @@ import type { ModelMessage } from "ai"; // Re-export ModelMessage for use in other modules export type { ModelMessage }; -/** - * Operator access information for a tool call. - * Tracks which operators were viewed, added, or modified. - */ -export interface ToolOperatorAccess { - viewedOperatorIds: string[]; - addedOperatorIds: string[]; - modifiedOperatorIds: string[]; -} - /** * Agent lifecycle state. */ @@ -64,8 +54,6 @@ export interface ReActStep { }; /** Messages array sent to the LLM for this step (only when context optimization is active) */ inputMessages?: any[]; - // Map from tool call index to operator access information - operatorAccess?: Map; // Versioning fields: /** Unique step ID string for tree references */ diff --git a/frontend/src/app/workspace/service/agent/agent.service.spec.ts b/frontend/src/app/workspace/service/agent/agent.service.spec.ts index 1f9bcd82591..7bfb2eba75a 100644 --- a/frontend/src/app/workspace/service/agent/agent.service.spec.ts +++ b/frontend/src/app/workspace/service/agent/agent.service.spec.ts @@ -103,7 +103,7 @@ describe("AgentService", () => { const req = httpMock.expectOne(r => r.method === "GET" && r.url === "/api/agents/agent-1/operator-results"); req.flush({ results: { - "op-1": { sampleRecords: [{ a: 1 }], resultStatistics: { a: "{}" } }, + "op-1": { sampleRecords: [{ a: 1 }] }, }, }); diff --git a/frontend/src/app/workspace/service/agent/agent.service.ts b/frontend/src/app/workspace/service/agent/agent.service.ts index 5e7c254f22c..2cef404e8fb 100644 --- a/frontend/src/app/workspace/service/agent/agent.service.ts +++ b/frontend/src/app/workspace/service/agent/agent.service.ts @@ -103,14 +103,13 @@ export interface OperatorResultSummary { state: string; inputTuples: number; outputTuples: number; - inputPortShapes?: { portIndex: number; rows: number; columns: number }[]; + inputPortShapes?: { portIndex: number; rows: number }[]; outputColumns?: number; error?: string; warnings?: string[]; consoleLogCount?: number; totalRowCount?: number; - sampleRecords?: Record[]; - resultStatistics?: Record; + sampleRecords?: Record[]; } interface ApiAgentInfo { @@ -120,7 +119,6 @@ interface ApiAgentInfo { state: string; createdAt: string; delegate?: { - userToken: string; userInfo: { uid: number; name: string; email: string; role: string }; workflowId?: number; workflowName?: string; @@ -132,21 +130,65 @@ interface ApiAgentListResponse { agents: ApiAgentInfo[]; } +/** + * Wire shape of a ReAct step as received from the agent-service. Mirrors the + * backend `ReActStep` DTO (timestamp is a number on the wire — the frontend + * converts it to a `Date`). Kept permissive on nested tool call/result/message + * payloads, which the converter handles defensively. + */ +interface ApiReActStep { + id?: string; + parentId?: string; + messageId: string; + stepId?: number; + timestamp: number; + role?: "user" | "agent"; + content?: string; + isBegin?: boolean; + isEnd?: boolean; + toolCalls?: any[]; + toolResults?: any[]; + usage?: ReActStep["usage"]; + inputMessages?: any[]; + messageSource?: string; + beforeWorkflowContent?: any; + afterWorkflowContent?: any; +} + interface ApiReActStepsResponse { - steps: any[]; + steps: ApiReActStep[]; state: string; } -interface ApiMessageResponse { - response: string; - steps: any[]; - usage: { inputTokens: number; outputTokens: number; totalTokens: number }; - stats: any; - stopped: boolean; - error?: string; - workflow: any; +/** + * Server -> client WebSocket frames consumed over `/agents/:id/react`. Mirrors + * the agent-service `WsServerEvent` discriminated union (see + * `agent-service/src/types/ws/server.ts`). + */ +interface WsServerSnapshotEvent { + type: "WsServerSnapshotEvent"; + state: string; + steps: ApiReActStep[]; + headId: string | null; +} + +interface WsServerStepEvent { + type: "WsServerStepEvent"; + step: ApiReActStep; +} + +interface WsServerStatusEvent { + type: "WsServerStatusEvent"; + state: string; +} + +interface WsServerErrorEvent { + type: "WsServerErrorEvent"; + error: string; } +type WsServerEvent = WsServerSnapshotEvent | WsServerStepEvent | WsServerStatusEvent | WsServerErrorEvent; + interface LiteLLMModel { id: string; object: string; @@ -165,11 +207,6 @@ interface LiteLLMModelsResponse { interface AgentStateTracking { stateSubject: BehaviorSubject; reActStepsSubject: BehaviorSubject; - hoveredMessageSubject: BehaviorSubject<{ - viewedOperatorIds: string[]; - addedOperatorIds: string[]; - modifiedOperatorIds: string[]; - }>; /** Current HEAD step ID in the version tree */ headIdSubject: BehaviorSubject; workflowSubject: BehaviorSubject; @@ -310,16 +347,7 @@ export class AgentService { * Convert API ReActStep to frontend ReActStep format. * The backend now sends ReActSteps in the aligned format, so minimal conversion is needed. */ - private convertApiReActStep(apiStep: any): ReActStep { - // Convert operator access from object to Map if present - let operatorAccess: Map | undefined; - if (apiStep.operatorAccess) { - operatorAccess = new Map(); - for (const [key, value] of Object.entries(apiStep.operatorAccess)) { - operatorAccess.set(parseInt(key), value); - } - } - + private convertApiReActStep(apiStep: ApiReActStep): ReActStep { return { messageId: apiStep.messageId, stepId: apiStep.stepId || 0, @@ -337,7 +365,6 @@ export class AgentService { })), usage: apiStep.usage, inputMessages: apiStep.inputMessages, - operatorAccess, // Versioning fields id: apiStep.id || `${apiStep.messageId}-${apiStep.stepId || 0}`, parentId: apiStep.parentId, @@ -358,11 +385,6 @@ export class AgentService { tracking = { stateSubject: new BehaviorSubject(AgentState.UNAVAILABLE), reActStepsSubject: new BehaviorSubject([]), - hoveredMessageSubject: new BehaviorSubject<{ - viewedOperatorIds: string[]; - addedOperatorIds: string[]; - modifiedOperatorIds: string[]; - }>({ viewedOperatorIds: [], addedOperatorIds: [], modifiedOperatorIds: [] }), headIdSubject: new BehaviorSubject(null), workflowSubject: new BehaviorSubject(null), workflowId, @@ -447,7 +469,7 @@ export class AgentService { /** * Handle incoming WebSocket messages */ - private handleWebSocketMessage(agentId: string, tracking: AgentStateTracking, message: any): void { + private handleWebSocketMessage(agentId: string, tracking: AgentStateTracking, message: WsServerEvent): void { switch (message.type) { case "WsServerSnapshotEvent": // Initial state and steps @@ -455,22 +477,13 @@ export class AgentService { tracking.stateSubject.next(this.mapStateToAgentState(message.state)); } if (message.steps && Array.isArray(message.steps)) { - const steps = message.steps.map((s: any) => this.convertApiReActStep(s)); + const steps = message.steps.map(s => this.convertApiReActStep(s)); tracking.reActStepsSubject.next(steps); } // Handle initial HEAD pointer if (message.headId !== undefined) { tracking.headIdSubject.next(message.headId); } - // Handle initial workflow content from agent service (ground truth) - if (message.workflowContent) { - tracking.wsWorkflowActive = true; - const workflow: Workflow = { - ...(message.workflowMetadata || tracking.workflowSubject.getValue() || {}), - content: message.workflowContent, - }; - tracking.workflowSubject.next(workflow as Workflow); - } break; case "WsServerStepEvent": @@ -536,7 +549,7 @@ export class AgentService { break; default: - console.warn("Unknown agent WebSocket message type:", message.type); + console.warn("Unknown agent WebSocket message type:", (message as { type?: string }).type); } } @@ -898,7 +911,7 @@ export class AgentService { return this.http .get(`${this.AGENT_API_BASE}/agents/${agentId}/react-steps`, this.agentHeaders(agentId)) .pipe( - map(response => response.steps.map((s: any) => this.convertApiReActStep(s))), + map(response => response.steps.map(s => this.convertApiReActStep(s))), catchError(() => of([])) ); } @@ -1017,78 +1030,6 @@ export class AgentService { ); } - /** - * Set hovered message (local UI state). - */ - public setHoveredMessage(agentId: string, step: ReActStep | null): void { - const tracking = this.agentStateTracking.get(agentId); - if (tracking) { - if (step && step.operatorAccess) { - const viewedOperatorIds: string[] = []; - const addedOperatorIds: string[] = []; - const modifiedOperatorIds: string[] = []; - - step.operatorAccess.forEach(access => { - viewedOperatorIds.push(...access.viewedOperatorIds); - addedOperatorIds.push(...access.addedOperatorIds); - modifiedOperatorIds.push(...access.modifiedOperatorIds); - }); - - tracking.hoveredMessageSubject.next({ - viewedOperatorIds: [...new Set(viewedOperatorIds)], - addedOperatorIds: [...new Set(addedOperatorIds)], - modifiedOperatorIds: [...new Set(modifiedOperatorIds)], - }); - } else { - tracking.hoveredMessageSubject.next({ - viewedOperatorIds: [], - addedOperatorIds: [], - modifiedOperatorIds: [], - }); - } - } - } - - /** - * Get hovered message operators observable. - */ - public getHoveredMessageOperatorsObservable( - agentId: string - ): Observable<{ viewedOperatorIds: string[]; addedOperatorIds: string[]; modifiedOperatorIds: string[] }> { - const tracking = this.getOrCreateStateTracking(agentId); - return tracking.hoveredMessageSubject.asObservable(); - } - - /** - * Get ReActSteps that viewed or modified a specific operator. - */ - public getReActStepsByOperatorAccess( - agentId: string, - operatorId: string - ): Observable<{ viewedBy: ReActStep[]; modifiedBy: ReActStep[] }> { - return this.getReActSteps(agentId).pipe( - map(allSteps => { - const viewedBy: ReActStep[] = []; - const modifiedBy: ReActStep[] = []; - - for (const step of allSteps) { - if (step.operatorAccess) { - step.operatorAccess.forEach(access => { - if (access.viewedOperatorIds.includes(operatorId) && !viewedBy.includes(step)) { - viewedBy.push(step); - } - if (access.modifiedOperatorIds.includes(operatorId) && !modifiedBy.includes(step)) { - modifiedBy.push(step); - } - }); - } - } - - return { viewedBy, modifiedBy }; - }) - ); - } - /** * Get workflow observable for an agent. * This observable emits the full Workflow object from the backend database @@ -1186,11 +1127,11 @@ export class AgentService { public getStepsByOperatorIds(agentId: string, operatorIds: string[]): Observable<{ steps: ReActStep[] }> { return this.http .post<{ - steps: ReActStep[]; + steps: ApiReActStep[]; }>(`${this.AGENT_API_BASE}/agents/${agentId}/steps-by-operators`, { operatorIds }, this.agentHeaders(agentId)) .pipe( map(response => ({ - steps: response.steps.map((s: any) => this.convertApiReActStep(s)), + steps: response.steps.map(s => this.convertApiReActStep(s)), })), catchError(() => of({