Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
bac05b5
feat(finworld): Added AgentScope learning protocol and OpenJudge eval…
TaoShuchang Jan 16, 2026
ba41164
Merge remote-tracking branch 'origin/main' into dev/shuchang
TaoShuchang Jan 16, 2026
c7ca8c7
Precommit fix (#4)
binary-husky Jan 16, 2026
7f2b017
fix test bench import
binary-husky Jan 16, 2026
9dd3c42
refactor(finworld): Replace agent protocol and unify configuration up…
TaoShuchang Jan 17, 2026
757f8a1
feat(finworld): Added FinWorld training environment configuration scr…
TaoShuchang Jan 18, 2026
079e4bd
refactor(utils): Remove unused extract and compute functions `extract…
TaoShuchang Jan 18, 2026
bcce8f0
refactor(finworld): Replace the old model with OpenJudge, update eval…
TaoShuchang Jan 18, 2026
4662d63
feat(task_reader): Support data reading of type jsonl_with_env_service
TaoShuchang Jan 19, 2026
de81c1d
feat(core): add finworld task reader support to framework
TaoShuchang Jan 19, 2026
248acc4
feat(finworld): implement specialized data reader and openjudge-based…
TaoShuchang Jan 19, 2026
9d651fd
refactor(finworld): optimize configuration templates and prompt engin…
TaoShuchang Jan 19, 2026
7475ecc
chore(finworld): update launch scripts and add variant experiment scr…
TaoShuchang Jan 19, 2026
b95d491
Merge remote-tracking branch 'origin/main' into dev/shuchang
TaoShuchang Jan 19, 2026
f20ab91
feat(finworld): Added support for multi-machine, multi-GPU training s…
TaoShuchang Jan 19, 2026
ea87d4b
chore(git): ignore finworld/yaml/*
TaoShuchang Jan 20, 2026
3082bca
fix(metrics): Fix and enhance the compatibility and debugging output …
TaoShuchang Jan 20, 2026
ef44b63
fix(metrics): Remove debug prints and synchronize reward statistics
TaoShuchang Jan 20, 2026
0889483
chore: "Stop tracking existing yaml files in tutorial directory"
TaoShuchang Jan 20, 2026
db7114c
fix(task_runner): Synchronize reward_stats to log_metrics
TaoShuchang Jan 20, 2026
5a25550
refactor(script): Refactored the finworld training script, integratin…
TaoShuchang Jan 20, 2026
623b7d9
Refactor(deep_finance): Replace and remove finworld-related implement…
TaoShuchang Jan 20, 2026
0aaab86
refactor(deepfinance): Rename and unify DeepFinance module and config…
TaoShuchang Jan 20, 2026
04f4959
refactor(tutorial): Optimize dynamic generation logic for configurati…
TaoShuchang Jan 20, 2026
d0ff68b
fix(deep_finance): argparse: with-deepfinance
TaoShuchang Jan 20, 2026
1c356d7
Merge remote-tracking branch 'origin/main' into dev/shuchang
TaoShuchang Jan 20, 2026
37dcbcc
fix(tutorial): Fixed issues with multi-machine training environment v…
TaoShuchang Jan 20, 2026
529ae7e
fix(env): Corrected the assignment logic for reward and info when ret…
TaoShuchang Jan 20, 2026
f4eb231
chore(config): Update example_deep_finance configuration and clean up…
TaoShuchang Jan 20, 2026
1e07515
Refactor(metric): Optimize tool metric calculation and data saving logic
TaoShuchang Jan 20, 2026
08ba184
fix(metric_helper): fix tool cache metric
TaoShuchang Jan 20, 2026
3d55692
fix little bug
TaoShuchang Jan 21, 2026
a478827
fix(utils): Suppress httpx AsyncClient.aclose() exception warnings
TaoShuchang Jan 21, 2026
88be3e4
comments to english
binary-husky Jan 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,8 @@ datasets
tutorial2
site
dump.rdb


tutorial/example_deep_finance/yaml/*
tutorial/example_deep_finance/config/*
tutorial/example_deep_finance/scripts/*
4 changes: 2 additions & 2 deletions ajet/backbone/trainer_verl.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ def fit(self): # noqa: C901
}
)
save_trajectory_as_json_file(context_tracker_arr, self.global_steps, self.config, prefix="train")
update_metrics(context_tracker_arr, metrics)
update_metrics(context_tracker_arr, metrics, prefix="train_")
if self.config.ajet.execute_test: # apply a test probe
from swanlab.data.run.main import get_run

Expand Down Expand Up @@ -1047,7 +1047,7 @@ def eval_dataset(self, target_dataset, target_dataset_name, mode, epoch):
"mean_reward": sum(rewards) / len(rewards) if rewards else 0,
}
save_trajectory_as_json_file(ctx_trackers, self.global_steps, self.config, prefix="eval")
update_metrics(ctx_trackers, val_metrics)
update_metrics(ctx_trackers, val_metrics, prefix="eval_")
print_dict(
val_metrics,
narrow=True,
Expand Down
3 changes: 2 additions & 1 deletion ajet/backbone/warm_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import asyncio
import logging
import os
from ajet.utils.async_utils import apply_httpx_aclose_patch
from ajet.utils.async_utils import apply_httpx_aclose_patch, suppress_httpx_aclose_exception
apply_httpx_aclose_patch()
suppress_httpx_aclose_exception()


def init_parallel_rollout_logger(experiment_name):
Expand Down
5 changes: 2 additions & 3 deletions ajet/context_tracker/base_tracker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import List, Tuple, Union
from typing import List, Union, Tuple, Dict, Optional
from typing import Any, Dict, List, Optional, Tuple, Union
from ajet.schema.task import WorkflowTask

from ajet.schema.extended_msg import (
Expand Down Expand Up @@ -141,7 +140,7 @@ def __init__(self, config, tokenizer, workflow_task: WorkflowTask, **kwargs):
self.already_mad_flag: bool = False
self.round_cnt = 0
self.generation_prompt_token = None
self.log_metrics: Optional[Dict[str, Union[float, List[float]]]] = None # Initialize workflow_metadata to store tool statistics
self.log_metrics: Optional[Dict[str, Union[float, List[float], Dict[str, Any]]]] = None # Initialize workflow_metadata to store tool statistics

assert (
self.config.ajet.data.max_prompt_length
Expand Down
8 changes: 4 additions & 4 deletions ajet/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ def parse_args():
help="Launch appworld",
)
parser.add_argument(
"--with-finworld",
"--with-deepfinance",
action="store_true",
default=False,
help="Launch finworld",
help="Launch deepfinance",
)
parser.add_argument(
"--with-webshop",
Expand Down Expand Up @@ -303,8 +303,8 @@ def main():
if args.with_appworld:
pty_launch("appworld")

if args.with_finworld:
pty_launch("finworld")
if args.with_deepfinance:
pty_launch("deepfinance")

if args.with_crafters:
pty_launch("crafters")
Expand Down
2 changes: 1 addition & 1 deletion ajet/schema/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ class WorkflowOutput(BaseModel):
reward: Union[float, List[float], None] = Field(default=None)
is_success: Union[bool, None] = Field(default=None)
metadata: Dict[str, Any] = Field(default_factory=dict)
log_metrics: Dict[str, Union[float, List[float]]] = Field(default_factory=dict)
log_metrics: Dict[str, Union[float, List[float], Dict[str, Any]]] = Field(default_factory=dict)
4 changes: 4 additions & 0 deletions ajet/task_reader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ def __init__(self, reader_type, reader_config):
self.task_reader = DataGeneratorTaskReader(reader_config)
elif task_reader_type == "random_dummy":
self.task_reader = RandomDummyTaskReader(reader_config)
elif task_reader_type == "deep_finance":
# deep_finance: load message from JSON file and assemble init_messages, tool calls go through env_service
from tutorial.example_deep_finance.deep_finance_reader import DeepFinanceReader
self.task_reader = DeepFinanceReader(reader_config)
else:
raise ValueError(f"Unsupported task reader type: {task_reader_type}")

Expand Down
38 changes: 34 additions & 4 deletions ajet/task_rollout/resource_keeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __enter__(self):
self.tokenizer = self.workflow_task.tokenizer
self.llm_inference_fn = self.workflow_task.llm_inference_fn
self.observation_window = self.workflow_task.observation_window
if self.config.ajet.task_reader.type == "env_service":
if self.config.ajet.task_reader.type in ("env_service", "deep_finance"):
url = self.config.ajet.task_reader.env_service.env_url
env_type = self.config.ajet.task_reader.env_service.env_type
self.env = EnvClientNg(base_url=url)
Expand Down Expand Up @@ -74,7 +74,9 @@ def _initialize_environment_and_messages(self) -> List[dict]:
Exception: If environment creation fails or required task data is missing
"""

if self.config.ajet.task_reader.type == "env_service":
reader_type = self.config.ajet.task_reader.type

if reader_type == "env_service":
if self.env is None:
raise ValueError("Environment client is None but env_service type is specified")
try:
Expand All @@ -95,6 +97,32 @@ def _initialize_environment_and_messages(self) -> List[dict]:
if self.env is not None:
self.env.release_instance(self.workflow_task.episode_uuid)
raise e
elif reader_type == "deep_finance":
# deep_finance: call create_instance to register instance, but use init_messages assembled by the reader
if self.env is None:
raise ValueError("Environment client is None but deep_finance type is specified")
try:
# call create_instance, let the server create an instance, so that subsequent step() can work
self.env.create_instance(
env_type=self.env_type,
task_id=self.task_id,
instance_id=self.workflow_task.episode_uuid,
params=self.env_params,
)
# Do not use the returned state, directly use the init_messages assembled by the reader
task = self.workflow_task.task
if task.init_messages:
init_messages = task.init_messages
else:
assert task.main_query, "deep_finance requires init_messages or main_query."
init_messages = [{"role": "user", "content": task.main_query}]
except Exception as e:
logger.bind(exception=True).exception(
f"encounter exception in env_worker.create_instance~ error={e.args}"
)
if self.env is not None:
self.env.release_instance(self.workflow_task.episode_uuid)
raise e
else:
task = self.workflow_task.task
if task.init_messages:
Expand Down Expand Up @@ -177,11 +205,15 @@ def step(self, action: dict) -> Tuple[str, float, bool, dict]:
action=action,
)
obs = ""
reward = 0
info = {}
assert isinstance(env_output, dict)

if isinstance(env_output["state"], list):
# 1. If state is a list (new standard format), pass through directly
obs = env_output["state"]
reward = env_output["reward"]
info = env_output["info"]
else:
# 2. If state is a dict (old format or error)
if ("content" not in env_output["state"]) and ("error" in env_output["state"]):
Expand All @@ -191,8 +223,6 @@ def step(self, action: dict) -> Tuple[str, float, bool, dict]:
else:
obs = env_output["state"]["content"]

reward = 0
info = {}
terminate = env_output["is_terminated"]
return obs, reward, terminate, info # type: ignore

Expand Down
3 changes: 3 additions & 0 deletions ajet/task_runner/general_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def execute(self, workflow_task: WorkflowTask) -> BaseContextTracker:
else:
raw_reward, is_success = self.get_judge().compute_reward(workflow_task, workflow_output)

if "reward_stats" in workflow_output.metadata:
workflow_output.log_metrics["reward_stats"] = workflow_output.metadata["reward_stats"]

workflow_task.gym_env = None # clear gym env client reference to avoid serialization issue

assert not isinstance(
Expand Down
48 changes: 48 additions & 0 deletions ajet/utils/async_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import concurrent.futures
import logging
from typing import Any

def run_async_coroutine_with_timeout(coro, timeout: int = 3600) -> Any:
Expand Down Expand Up @@ -68,3 +69,50 @@ def _patched_del(self) -> None:
print("Applied httpx aclose patch.")
except ImportError:
pass


def suppress_httpx_aclose_exception():
"""
Suppress the 'Task exception was never retrieved' error from httpx AsyncClient.aclose().
This error occurs when the event loop is closed before the AsyncClient is properly closed.
"""
# Custom exception handler for asyncio
def custom_exception_handler(loop, context):
exception = context.get('exception')
message = context.get('message', '')

# Check if this is the specific httpx aclose RuntimeError we want to suppress
if exception is not None:
if isinstance(exception, RuntimeError):
exc_str = str(exception)
if 'unable to perform operation on' in exc_str and 'the handler is closed' in exc_str:
return # Suppress this specific error
if 'TCPTransport' in exc_str and 'closed' in exc_str:
return # Suppress this specific error

# For other exceptions, use the default handler
loop.default_exception_handler(context)

# Apply custom exception handler to current or new event loop
try:
loop = asyncio.get_running_loop()
loop.set_exception_handler(custom_exception_handler)
except RuntimeError:
# No running loop, will be applied when loop starts
pass

# Also filter the logging output for this specific error
class HttpxAcloseFilter(logging.Filter):
def filter(self, record):
msg = record.getMessage()
if 'Task exception was never retrieved' in msg and 'aclose' in msg:
return False
if 'unable to perform operation on' in msg and 'the handler is closed' in msg:
return False
if 'TCPTransport' in msg and 'closed' in msg:
return False
return True

# Apply filter to root logger and asyncio logger
logging.getLogger().addFilter(HttpxAcloseFilter())
logging.getLogger('asyncio').addFilter(HttpxAcloseFilter())
2 changes: 1 addition & 1 deletion ajet/utils/env_service_client/env_client_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def retry_call(
class EnvClient:
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url.rstrip("/")
self.timeout = 30.0
self.timeout = 300.0

def _make_request(
self,
Expand Down
6 changes: 3 additions & 3 deletions ajet/utils/metric_helper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ def save_trajectory_as_json_file(ctx_trackers, global_steps, config, prefix):
if config.ajet.trainer_common.save_trajectory_as_json_file:
save_trajectory_as_json(ctx_trackers, global_steps, prefix)

def update_metrics(context_tracker_arr, metrics:dict):
tool_metrics = compute_tool_metrics_from_trajectories(context_tracker_arr)
reward_metrics = compute_reward_metrics_from_trajectories(context_tracker_arr)
def update_metrics(context_tracker_arr, metrics:dict, prefix):
tool_metrics = compute_tool_metrics_from_trajectories(context_tracker_arr, prefix)
reward_metrics = compute_reward_metrics_from_trajectories(context_tracker_arr, prefix)
if tool_metrics:
metrics.update(tool_metrics)
if reward_metrics:
Expand Down
Loading
Loading