Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions agent-service/src/agent/texera-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import {
type ExecutionConfig,
} from "./tools/workflow-execution-tools";
import { assembleContext } from "./util/context-utils";
import { compileWorkflowAsync, type WorkflowCompilationResponse } from "../api/compile-api";
import { compileWorkflowAsync } from "../api/compile-api";
import type { WorkflowCompilationResponse } from "../types/dto";
import { createLogger } from "../logger";
import type { Logger } from "pino";

Expand Down Expand Up @@ -560,15 +561,18 @@ export class TexeraAgent {
onStepFinish: async ({ text, toolCalls, toolResults, usage }) => {
stepIndex++;

// The AI SDK types tc.input / tr.output as `unknown` for dynamically
// registered tools; narrow to the shapes our tools actually produce
// (object args, string results — see tools/*).
const formattedToolCalls = toolCalls?.map(tc => ({
toolName: tc.toolName,
toolCallId: tc.toolCallId,
input: tc.input,
input: tc.input as Record<string, unknown>,
}));

const formattedToolResults = toolResults?.map(tr => ({
toolCallId: tr.toolCallId,
output: tr.output,
output: tr.output as string,
isError: !!(tr.output as any)?.error,
}));

Expand Down
14 changes: 7 additions & 7 deletions agent-service/src/agent/tools/result-formatting.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ describe("formatOperatorResult - input port metadata", () => {
makeOpInfo({
outputTuples: 1,
result: [{ a: 1 }],
inputPortShapes: [{ portIndex: 0, rows: 5, columns: 3 }],
inputPortShapes: [{ portIndex: 0, rows: 5 }],
}),
EMPTY_STATE
);
expect(out).toContain("Input operator(table shape): input0(5, 3)");
expect(out).toContain("Input operator(table shape): input0(5 rows)");
});

test("uses upstream operator id when an input link matches the port", () => {
Expand All @@ -176,11 +176,11 @@ describe("formatOperatorResult - input port metadata", () => {
makeOpInfo({
outputTuples: 4,
result: [{ a: 1, b: 2 }],
inputPortShapes: [{ portIndex: 0, rows: 10, columns: 2 }],
inputPortShapes: [{ portIndex: 0, rows: 10 }],
}),
state
);
expect(out).toContain("Input operator(table shape): upstream(10, 2)");
expect(out).toContain("Input operator(table shape): upstream(10 rows)");
});

test("sorts multiple input ports by portIndex regardless of input order", () => {
Expand All @@ -197,13 +197,13 @@ describe("formatOperatorResult - input port metadata", () => {
outputTuples: 1,
result: [{ a: 1 }],
inputPortShapes: [
{ portIndex: 1, rows: 2, columns: 2 },
{ portIndex: 0, rows: 1, columns: 1 },
{ portIndex: 1, rows: 2 },
{ portIndex: 0, rows: 1 },
],
}),
state
);
expect(out).toContain("Input operator(table shape): up0(1, 1), up1(2, 2)");
expect(out).toContain("Input operator(table shape): up0(1 rows), up1(2 rows)");
});
});

Expand Down
9 changes: 5 additions & 4 deletions agent-service/src/agent/tools/result-formatting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ export function formatOperatorResult(operatorId: string, opInfo: OperatorInfo, w
return "(no result data)";
}

const jsonArray = opInfo.result as Record<string, any>[];
const jsonArray = opInfo.result;
const headers = jsonArray.length > 0 ? getVisibleResultHeaders(jsonArray[0]) : [];
const columns = headers.length;

const isViz = jsonArray.length > 0 && jsonArray[0]["__is_visualization__"] === true;
const serializableArray = isViz
? jsonArray.map(row => {
const cleaned: Record<string, any> = {};
const cleaned: Record<string, unknown> = {};
for (const key of Object.keys(row)) {
if (key === "__is_visualization__") continue;
if (key === "html-content" || key === "json-content") {
Expand Down Expand Up @@ -89,14 +89,15 @@ function formatInputOutputMetadata(
.sort((a, b) => a.portIndex - b.portIndex)
.map(p => {
const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`;
return `${name}(${p.rows}, ${p.columns})`;
// The backend reports only a row count per input port (no column count).
return `${name}(${p.rows} rows)`;
})
.join(", ");

return `Input operator(table shape): ${inputPart}\n${outputLine}`;
}

function jsonToTableFormat(jsonResult: Record<string, any>[]): string {
function jsonToTableFormat(jsonResult: Record<string, unknown>[]): string {
if (!jsonResult || jsonResult.length === 0) return "";

const hasRowIndex = "__row_index__" in jsonResult[0];
Expand Down
34 changes: 12 additions & 22 deletions agent-service/src/agent/tools/workflow-execution-tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import { z } from "zod";
import { tool } from "ai";
import { createErrorResult, formatExecuteOperatorResult, getVisibleResultHeaders } from "./tools-utility";
import type { WorkflowState } from "../workflow-state";
import { getBackendConfig } from "../../api/backend-api";
import { env } from "../../config/env";
import type { LogicalPlan, LogicalLink } from "../../api/execution-api";
import type { OperatorInfo, SyncExecutionResult } from "../../types/execution";
import { executionEndpointFor } from "../../config/endpoints";
import type { LogicalPlan, LogicalLink, LogicalOperator } from "../../types/workflow";
import type { OperatorInfo, SyncExecutionResult, SyncExecutionRequest } from "../../types/execution";
import { WorkflowSystemMetadata } from "../util/workflow-system-metadata";
import { DEFAULT_AGENT_SETTINGS } from "../../types/agent";
import { createLogger } from "../../logger";
Expand Down Expand Up @@ -87,7 +86,7 @@ interface OperatorValidation {
messages: Record<string, string>;
}

function validateOperatorSchema(operatorType: string, operatorProperties: Record<string, any>): OperatorValidation {
function validateOperatorSchema(operatorType: string, operatorProperties: Record<string, unknown>): OperatorValidation {
const metadataStore = WorkflowSystemMetadata.getInstance();
const validation = metadataStore.validateOperatorProperties(operatorType, operatorProperties);
return validation.isValid ? { isValid: true, messages: {} } : { isValid: false, messages: validation.messages };
Expand Down Expand Up @@ -183,7 +182,7 @@ function buildLogicalPlan(workflowState: WorkflowState, opsToViewResult?: string
const useSubDAG = opsToViewResult && opsToViewResult.length === 1;
const targetOperatorId = useSubDAG ? opsToViewResult[0] : undefined;

let operatorsList: { operatorID: string; operatorType: string; [key: string]: any }[];
let operatorsList: LogicalOperator[];
let linksList: LogicalLink[];

const getInputPortOrdinal = (operatorID: string, inputPortID: string): number => {
Expand Down Expand Up @@ -256,23 +255,16 @@ async function executeWorkflowHttp(
logicalPlan: LogicalPlan,
options: { abortSignal?: AbortSignal } = {}
): Promise<SyncExecutionResult> {
const backendConfig = getBackendConfig();

const workflowId = config.workflowId;
const computingUnitId = config.computingUnitId ?? 0;

// In k8s each computing unit is a separate pod, so the endpoint varies per cuid.
const executionEndpoint = env.EXECUTION_ENDPOINT_TEMPLATE
? env.EXECUTION_ENDPOINT_TEMPLATE.replace("{cuid}", String(computingUnitId))
: backendConfig.executionEndpoint;

const url = `${executionEndpoint}/api/execution/${workflowId}/${computingUnitId}/run`;
const url = `${executionEndpointFor(computingUnitId)}/api/execution/${workflowId}/${computingUnitId}/run`;

const timeoutSeconds = config.executionTimeoutMs
? Math.ceil(config.executionTimeoutMs / 1000)
: Math.ceil(DEFAULT_AGENT_SETTINGS.executionTimeoutMs / 1000);

const request = {
const request: SyncExecutionRequest = {
executionName: "agent-execution",
logicalPlan: {
operators: logicalPlan.operators,
Expand Down Expand Up @@ -355,7 +347,8 @@ function formatInputOutput(
.sort((a, b) => a.portIndex - b.portIndex)
.map(p => {
const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`;
return `${name}(${p.rows}, ${p.columns})`;
// The backend reports only a row count per input port (no column count).
return `${name}(${p.rows} rows)`;
})
.join(", ");

Expand Down Expand Up @@ -393,7 +386,7 @@ function formatExecutionError(
return lines.join("\n");
}

function jsonToTableFormat(jsonResult: Record<string, any>[]): string {
function jsonToTableFormat(jsonResult: Record<string, unknown>[]): string {
if (!jsonResult || jsonResult.length === 0) return "";

const hasRowIndex = jsonResult.length > 0 && "__row_index__" in jsonResult[0];
Expand Down Expand Up @@ -471,10 +464,7 @@ export async function executeOperatorAndFormat(
});

if (!result.success) {
const compilationErrors =
result.state === "CompilationFailed" || result.state === "ValidationFailed"
? result.compilationErrors
: undefined;
const compilationErrors = result.state === "CompilationFailed" ? result.compilationErrors : undefined;

const operatorErrors =
result.state === "Failed"
Expand Down Expand Up @@ -519,7 +509,7 @@ export async function executeOperatorAndFormat(
return "(no result data)";
}

const jsonArray = opInfo.result as Record<string, any>[];
const jsonArray = opInfo.result;
const headers = jsonArray.length > 0 ? getVisibleResultHeaders(jsonArray[0]) : [];
const columns = headers.length;

Expand Down
4 changes: 2 additions & 2 deletions agent-service/src/agent/util/context-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import type { ModelMessage } from "ai";
import type { WorkflowState } from "../workflow-state";
import type { OperatorPredicate, OperatorPortSchemaMap, PortSchema } from "../../types/workflow";
import type { ReActStep } from "../../types/agent";
import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../api/compile-api";
import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../types/dto";
import { extractOperatorInputPortSchemaMap } from "./workflow-utils";
import { createLogger } from "../../logger";

Expand Down Expand Up @@ -176,7 +176,7 @@ function serializeDag(
);

const outputSchemas = compilationResult?.operatorOutputSchemas ?? {};
const compilationErrors = compilationResult?.operatorErrors ?? {};
const compilationErrors = compilationResult?.type === "failure" ? compilationResult.operatorErrors : {};

lines.push("## Operators");
lines.push("");
Expand Down
165 changes: 165 additions & 0 deletions agent-service/src/agent/util/workflow-system-metadata.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { describe, expect, test } from "bun:test";
import {
WorkflowSystemMetadata,
formatValidationErrors,
formatCompactSchemaForError,
} from "./workflow-system-metadata";
import type { OperatorMetadata, OperatorSchema } from "../../types/metadata";

function operator(overrides: Partial<OperatorSchema> & Pick<OperatorSchema, "operatorType">): OperatorSchema {
return {
operatorVersion: "1",
jsonSchema: {},
additionalMetadata: {
userFriendlyName: overrides.operatorType,
operatorGroupName: "Test",
inputPorts: [],
outputPorts: [],
},
...overrides,
};
}

function metadataWith(...operators: OperatorSchema[]): OperatorMetadata {
return { operators, groups: [] };
}

// Filter exercises ref-inlining + key filtering; Limit is a clean Ajv-validatable schema.
const FILTER = operator({
operatorType: "Filter",
additionalMetadata: {
userFriendlyName: "Filter",
operatorGroupName: "Filter",
inputPorts: [],
outputPorts: [],
operatorDescription: "Filters rows",
},
jsonSchema: {
properties: {
attribute: { $ref: "#/definitions/AttributeName" },
limit: { type: "integer", propertyOrder: 5 },
dummyPropertyList: { type: "array" },
},
definitions: {
AttributeName: { type: "string", title: "Attribute" },
PortDescription: { type: "object" },
},
required: ["attribute"],
},
});

const LIMIT = operator({
operatorType: "Limit",
jsonSchema: { type: "object", properties: { limit: { type: "integer" } }, required: ["limit"] },
});

function loaded(...operators: OperatorSchema[]): WorkflowSystemMetadata {
const meta = new WorkflowSystemMetadata();
meta.loadFromMetadata(metadataWith(...operators));
return meta;
}

describe("WorkflowSystemMetadata.loadFromMetadata", () => {
test("indexes operators by type", () => {
const meta = loaded(FILTER, LIMIT);
expect(meta.getOperatorCount()).toBe(2);
expect(meta.operatorTypeExists("Filter")).toBe(true);
expect(meta.operatorTypeExists("Nope")).toBe(false);
expect(meta.getSchema("Filter")).toEqual(FILTER.jsonSchema);
expect(meta.getAllOperatorTypes()).toEqual({ Filter: "Filters rows", Limit: "Limit" });
});

test("getDescription falls back to userFriendlyName when no description", () => {
const meta = loaded(FILTER, LIMIT);
expect(meta.getDescription("Filter")).toBe("Filters rows");
expect(meta.getDescription("Limit")).toBe("Limit");
expect(meta.getDescription("Unknown")).toBe("");
});
});

describe("WorkflowSystemMetadata.getCompactSchema", () => {
test("returns null for an unknown operator type", () => {
expect(loaded(FILTER).getCompactSchema("Nope")).toBeNull();
});

test("inlines $refs, strips noise keys, and drops filtered properties", () => {
const compact = loaded(FILTER).getCompactSchema("Filter");
expect(compact).not.toBeNull();
// $ref resolved to the AttributeName definition.
expect(compact!.properties.attribute).toEqual({ type: "string", title: "Attribute" });
// propertyOrder is in COMPACT_SCHEMA_EXCLUDED_KEYS and is stripped.
expect(compact!.properties.limit).toEqual({ type: "integer" });
// dummyPropertyList is in FILTERED_PROPERTY_KEYS and is removed.
expect(compact!.properties).not.toHaveProperty("dummyPropertyList");
expect(compact!.required).toEqual(["attribute"]);
});
});

describe("WorkflowSystemMetadata.getAllSchemasAsJson", () => {
test("emits filtered properties and definitions as JSON", () => {
const parsed = JSON.parse(loaded(FILTER).getAllSchemasAsJson());
expect(Object.keys(parsed.Filter.properties)).toEqual(["attribute", "limit"]); // dummyPropertyList filtered
expect(parsed.Filter.definitions).toHaveProperty("AttributeName");
expect(parsed.Filter.definitions).not.toHaveProperty("PortDescription"); // filtered definition
expect(parsed.Filter.required).toEqual(["attribute"]);
});
});

describe("WorkflowSystemMetadata.validateOperatorProperties", () => {
test("accepts properties that satisfy the schema", () => {
expect(loaded(LIMIT).validateOperatorProperties("Limit", { limit: 5 })).toEqual({ isValid: true });
});

test("reports the missing required property", () => {
const result = loaded(LIMIT).validateOperatorProperties("Limit", {});
expect(result.isValid).toBe(false);
expect(result.isValid ? {} : result.messages).toHaveProperty("limit");
});

test("rejects an unknown operator type", () => {
const result = loaded(LIMIT).validateOperatorProperties("Nope", {});
expect(result.isValid).toBe(false);
expect(result.isValid ? "" : result.messages.error).toContain("Unknown operator type");
});
});

describe("formatValidationErrors", () => {
test("returns empty string when valid", () => {
expect(formatValidationErrors({ isValid: true })).toBe("");
});

test("joins messages as 'key: msg'", () => {
expect(formatValidationErrors({ isValid: false, messages: { limit: "is required", attribute: "bad" } })).toBe(
"limit: is required; attribute: bad"
);
});
});

describe("formatCompactSchemaForError", () => {
test("renders only the required properties", () => {
const formatted = formatCompactSchemaForError({
properties: { a: { type: "string" }, b: { type: "integer" } },
required: ["a"],
});
expect(formatted).toBe('required: [a], properties: {"a":{"type":"string"}}');
});
});
Loading
Loading