Skip to content

Commit 681f078

Browse files
committed
feat: stream CLI logs to /python-cli-runs/* lifecycle endpoints
Buffer the CLI's own log records and POST them in 5s batches to a new register/upload/finalize lifecycle so the admin dashboard renders what the user saw in their terminal alongside the run's terminal status. New modules: - core/cli_run.py — register_cli_run / finalize_cli_run helpers - core/log_uploader.py — BatchedLogUploader (daemon-thread flusher, chunked under the 256KB cap, swallows network errors, drains on shutdown) and UploadingLogHandler routing log records to it - core/streaming.py — setup_streaming() wires both into the socketcli and socketdev loggers, forces them to DEBUG so uploads capture the full history regardless of local terminal verbosity, and returns a teardown callable for the caller to register with atexit - set_run_status() propagates the terminal status through the teardown; socketcli.py exception handlers call it for KeyboardInterrupt (cancelled), uncaught Exception (failure), and any SystemExit with a non-zero code (failure) so sys.exit() paths inside main_code surface correctly instead of defaulting to success Best-effort end-to-end: registration failures fall back to no-streaming and never block the scan. Opt out with --disable-server-log-streaming. Tested against local depscan with the matching /v0/python-cli-runs/* endpoints; 173 unit tests pass.
1 parent 206efe9 commit 681f078

8 files changed

Lines changed: 716 additions & 0 deletions

File tree

socketsecurity/config.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class CliConfig:
9292
ignore_commit_files: bool = False
9393
disable_blocking: bool = False
9494
disable_ignore: bool = False
95+
disable_server_log_streaming: bool = False
9596
strict_blocking: bool = False
9697
integration_type: IntegrationType = "api"
9798
integration_org_slug: Optional[str] = None
@@ -207,6 +208,7 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig':
207208
'ignore_commit_files': args.ignore_commit_files,
208209
'disable_blocking': args.disable_blocking,
209210
'disable_ignore': args.disable_ignore,
211+
'disable_server_log_streaming': args.disable_server_log_streaming,
210212
'strict_blocking': args.strict_blocking,
211213
'integration_type': args.integration,
212214
'pending_head': args.pending_head,
@@ -716,6 +718,18 @@ def create_argument_parser() -> argparse.ArgumentParser:
716718
action="store_true",
717719
help=argparse.SUPPRESS
718720
)
721+
advanced_group.add_argument(
722+
"--disable-server-log-streaming",
723+
dest="disable_server_log_streaming",
724+
action="store_true",
725+
help="Disable streaming server-side log lines to the terminal during long-running CLI operations."
726+
)
727+
advanced_group.add_argument(
728+
"--disable_server_log_streaming",
729+
dest="disable_server_log_streaming",
730+
action="store_true",
731+
help=argparse.SUPPRESS
732+
)
719733
advanced_group.add_argument(
720734
"--strict-blocking",
721735
dest="strict_blocking",

socketsecurity/core/cli_run.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
"""Lifecycle helpers for a CLI run on the Socket backend.
2+
3+
A "run" represents a single CLI invocation. `register_cli_run` opens it and
4+
returns a server-issued `run_id`; `finalize_cli_run` closes it on exit. The
5+
run_id keys the rows that `BatchedLogUploader` POSTs to
6+
`/python-cli-runs/<run_id>/logs` during the run so the dashboard can show
7+
what the user saw in their terminal.
8+
9+
Both calls are best-effort: failures fall back to no-streaming and never
10+
prevent the scan from running.
11+
"""
12+
13+
import json
14+
import logging
15+
from typing import Optional
16+
17+
from .cli_client import CliClient
18+
from .exceptions import APIFailure
19+
20+
log = logging.getLogger("socketcli")
21+
22+
23+
def register_cli_run(
24+
client: CliClient,
25+
client_version: str,
26+
integration: Optional[str] = None,
27+
) -> Optional[str]:
28+
payload = {"client_version": client_version}
29+
if integration:
30+
payload["integration"] = integration
31+
try:
32+
resp = client.request(
33+
path="python-cli-runs",
34+
method="POST",
35+
payload=json.dumps(payload),
36+
)
37+
except APIFailure as e:
38+
log.debug(f"cli-run register failed (streaming disabled): {e}")
39+
return None
40+
41+
try:
42+
body = resp.json()
43+
except (ValueError, json.JSONDecodeError) as e:
44+
log.debug(f"cli-run register: bad JSON body: {e}")
45+
return None
46+
47+
run_id = body.get("run_id")
48+
if not isinstance(run_id, str) or not run_id:
49+
log.debug(f"cli-run register: missing run_id in response: {body!r}")
50+
return None
51+
return run_id
52+
53+
54+
def finalize_cli_run(
55+
client: CliClient,
56+
run_id: str,
57+
status: str = "success",
58+
) -> None:
59+
try:
60+
client.request(
61+
path=f"python-cli-runs/{run_id}/finalize",
62+
method="POST",
63+
payload=json.dumps({"status": status}),
64+
)
65+
except Exception as e:
66+
log.debug(f"cli-run finalize failed (swallowed): {e}")
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
"""Buffer the CLI's local log records and POST them in batches to
2+
/python-cli-runs/<run_id>/logs so the dashboard's view of a CLI run
3+
mirrors what the user sees in their terminal.
4+
5+
Behavior:
6+
- daemon thread, 5s flush
7+
- swallow all network errors (debug log only)
8+
- skip empty buffers
9+
- drain on shutdown
10+
- at-most-once semantics (failed batches dropped, not retried)
11+
12+
A thread-local recursion guard prevents the uploader's own request-error
13+
log lines (emitted by `cli_client.py`'s `socketdev` logger) from being
14+
re-enqueued during a flush.
15+
"""
16+
17+
import json
18+
import logging
19+
import threading
20+
from datetime import datetime, timezone
21+
from typing import Optional
22+
23+
from .cli_client import CliClient
24+
25+
log = logging.getLogger(__name__)
26+
27+
_FLUSH_GUARD = threading.local()
28+
29+
_MAX_BATCH_BYTES = 256 * 1024 - 1024 # depscan body cap is 256KB; reserve headroom for envelope/headers
30+
31+
_LEVEL_MAP = {
32+
logging.DEBUG: "DEBUG",
33+
logging.INFO: "INFO",
34+
logging.WARNING: "WARN",
35+
logging.ERROR: "ERROR",
36+
logging.CRITICAL: "ERROR",
37+
}
38+
39+
40+
def _now_str() -> str:
41+
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
42+
43+
44+
class BatchedLogUploader:
45+
def __init__(
46+
self,
47+
client: CliClient,
48+
run_id: str,
49+
flush_interval: float = 5.0,
50+
):
51+
self._client = client
52+
self._run_id = run_id
53+
self._flush_interval = flush_interval
54+
self._buf: list = []
55+
self._lock = threading.Lock()
56+
self._stop = threading.Event()
57+
self._thread: Optional[threading.Thread] = None
58+
59+
def add(self, entry: dict) -> None:
60+
with self._lock:
61+
self._buf.append(entry)
62+
63+
def start(self) -> None:
64+
if self._thread is not None:
65+
return
66+
self._thread = threading.Thread(
67+
target=self._run,
68+
name=f"socket-log-uploader-{self._run_id[:8]}",
69+
daemon=True,
70+
)
71+
self._thread.start()
72+
73+
def stop(self, timeout: float = 2.0) -> None:
74+
if self._thread is None:
75+
self._flush()
76+
return
77+
self._stop.set()
78+
self._thread.join(timeout=timeout)
79+
self._thread = None
80+
self._flush()
81+
82+
def _run(self) -> None:
83+
while not self._stop.is_set():
84+
self._flush()
85+
self._stop.wait(self._flush_interval)
86+
87+
def _flush(self) -> None:
88+
with self._lock:
89+
if not self._buf:
90+
return
91+
entries = self._buf
92+
self._buf = []
93+
94+
_FLUSH_GUARD.active = True
95+
try:
96+
for chunk in _chunk_by_size(entries):
97+
try:
98+
self._client.request(
99+
path=f"python-cli-runs/{self._run_id}/logs",
100+
method="POST",
101+
payload=json.dumps({"logs": chunk}),
102+
)
103+
except Exception as e:
104+
log.debug(f"log upload failed (swallowed, {len(chunk)} entries dropped): {e}")
105+
finally:
106+
_FLUSH_GUARD.active = False
107+
108+
109+
def _chunk_by_size(entries: list) -> list:
110+
"""Split entries into chunks that each serialize to <= _MAX_BATCH_BYTES.
111+
Single entries that exceed the cap are dropped with a debug log."""
112+
chunks: list = []
113+
current: list = []
114+
envelope = len('{"logs":[]}')
115+
current_size = envelope
116+
for entry in entries:
117+
entry_size = len(json.dumps(entry)) + 1 # +1 for inter-entry comma
118+
if entry_size + envelope > _MAX_BATCH_BYTES:
119+
log.debug(f"log entry too large ({entry_size}B), dropped")
120+
continue
121+
if current and current_size + entry_size > _MAX_BATCH_BYTES:
122+
chunks.append(current)
123+
current = [entry]
124+
current_size = envelope + entry_size
125+
else:
126+
current.append(entry)
127+
current_size += entry_size
128+
if current:
129+
chunks.append(current)
130+
return chunks
131+
132+
133+
class UploadingLogHandler(logging.Handler):
134+
def __init__(self, uploader: BatchedLogUploader, context: str = "socket-python-cli"):
135+
super().__init__()
136+
self._uploader = uploader
137+
self._context = context
138+
139+
def emit(self, record: logging.LogRecord) -> None:
140+
if getattr(_FLUSH_GUARD, "active", False):
141+
return
142+
try:
143+
self._uploader.add({
144+
"timestamp": _now_str(),
145+
"level": _LEVEL_MAP.get(record.levelno, "INFO"),
146+
"message": self.format(record),
147+
"context": self._context,
148+
})
149+
except Exception:
150+
self.handleError(record)

socketsecurity/core/streaming.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""Wire the server log streaming pipeline for one CLI run.
2+
3+
`setup_streaming` registers the run with the backend, attaches handlers that
4+
route the CLI's own log output through both the local terminal and a batched
5+
uploader, and forces the loggers into DEBUG so the upload captures everything
6+
regardless of local terminal verbosity.
7+
8+
Returns a teardown callable to invoke on exit (typically via `atexit.register`).
9+
Returns None if registration failed; in that case nothing was wired up.
10+
"""
11+
12+
import logging
13+
from typing import Callable, Optional
14+
15+
from .cli_client import CliClient
16+
from .cli_run import finalize_cli_run, register_cli_run
17+
from .log_uploader import BatchedLogUploader, UploadingLogHandler
18+
19+
_run_status: str = "success"
20+
21+
22+
def set_run_status(status: str) -> None:
23+
global _run_status
24+
_run_status = status
25+
26+
27+
def setup_streaming(
28+
*,
29+
client: CliClient,
30+
cli_logger: logging.Logger,
31+
sdk_logger: logging.Logger,
32+
client_version: str,
33+
integration: Optional[str],
34+
enable_debug: bool,
35+
) -> Optional[Callable[[], None]]:
36+
run_id = register_cli_run(
37+
client,
38+
client_version=client_version,
39+
integration=integration,
40+
)
41+
if not run_id:
42+
cli_logger.debug("server log streaming disabled (register failed)")
43+
return None
44+
45+
log_uploader = BatchedLogUploader(client, run_id)
46+
log_uploader.start()
47+
upload_handler = UploadingLogHandler(log_uploader, context="socket-python-cli")
48+
upload_handler.setFormatter(logging.Formatter("%(message)s"))
49+
50+
terminal_handler = logging.StreamHandler()
51+
terminal_handler.setLevel(logging.DEBUG if enable_debug else logging.INFO)
52+
terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
53+
54+
saved_levels = (cli_logger.level, sdk_logger.level)
55+
saved_propagate = (cli_logger.propagate, sdk_logger.propagate)
56+
cli_logger.setLevel(logging.DEBUG)
57+
sdk_logger.setLevel(logging.DEBUG)
58+
cli_logger.propagate = False
59+
sdk_logger.propagate = False
60+
cli_logger.addHandler(terminal_handler)
61+
sdk_logger.addHandler(terminal_handler)
62+
cli_logger.addHandler(upload_handler)
63+
sdk_logger.addHandler(upload_handler)
64+
65+
cli_logger.debug(f"server log streaming enabled (run_id={run_id})")
66+
67+
def teardown() -> None:
68+
cli_logger.removeHandler(upload_handler)
69+
sdk_logger.removeHandler(upload_handler)
70+
log_uploader.stop()
71+
finalize_cli_run(client, run_id, status=_run_status)
72+
cli_logger.removeHandler(terminal_handler)
73+
sdk_logger.removeHandler(terminal_handler)
74+
cli_logger.setLevel(saved_levels[0])
75+
sdk_logger.setLevel(saved_levels[1])
76+
cli_logger.propagate = saved_propagate[0]
77+
sdk_logger.propagate = saved_propagate[1]
78+
79+
return teardown

socketsecurity/socketcli.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import atexit
12
import json
23
import os
34
import sys
@@ -20,6 +21,7 @@
2021
from socketsecurity.core.messages import Messages
2122
from socketsecurity.core.scm_comments import Comments
2223
from socketsecurity.core.socket_config import SocketConfig
24+
from socketsecurity.core.streaming import set_run_status, setup_streaming
2325
from socketsecurity.output import OutputHandler
2426

2527
socket_logger, log = initialize_logging()
@@ -30,13 +32,19 @@ def cli():
3032
try:
3133
main_code()
3234
except KeyboardInterrupt:
35+
set_run_status("cancelled")
3336
log.info("Keyboard Interrupt detected, exiting")
3437
config = CliConfig.from_args() # Get current config
3538
if not config.disable_blocking:
3639
sys.exit(2)
3740
else:
3841
sys.exit(0)
42+
except SystemExit as e:
43+
if e.code:
44+
set_run_status("failure")
45+
raise
3946
except Exception as error:
47+
set_run_status("failure")
4048
log.error("Unexpected error when running the cli")
4149
log.error(error)
4250
traceback.print_exc()
@@ -89,6 +97,19 @@ def main_code():
8997
client = CliClient(socket_config)
9098
sdk.api.api_url = socket_config.api_url
9199
log.debug("loaded client")
100+
101+
if not config.disable_server_log_streaming:
102+
teardown = setup_streaming(
103+
client=client,
104+
cli_logger=log,
105+
sdk_logger=socket_logger,
106+
client_version=config.version,
107+
integration=config.integration_type,
108+
enable_debug=config.enable_debug,
109+
)
110+
if teardown:
111+
atexit.register(teardown)
112+
92113
core = Core(socket_config, sdk, config)
93114
log.debug("loaded core")
94115

0 commit comments

Comments
 (0)