Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/server/main-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,6 @@ func shutdownActivityUpdate() {

func createMainWshClient() {
rpc := wshserver.GetMainRpcClient()
wshfs.RpcClient = rpc
wshutil.DefaultRouter.RegisterTrustedLeaf(rpc, wshutil.DefaultRoute)
wps.Broker.SetClient(wshutil.DefaultRouter)
localInitialEnv := envutil.PruneInitialEnv(envutil.SliceToMap(os.Environ()))
Expand All @@ -393,6 +392,8 @@ func createMainWshClient() {
localConnWsh := wshutil.MakeWshRpc(wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, remoteImpl, "conn:local")
go wshremote.RunSysInfoLoop(localConnWsh, wshrpc.LocalConnName)
wshutil.DefaultRouter.RegisterTrustedLeaf(localConnWsh, wshutil.MakeConnectionRouteId(wshrpc.LocalConnName))
wshfs.RpcClient = localConnWsh
wshfs.RpcClientRouteId = wshutil.MakeConnectionRouteId(wshrpc.LocalConnName)
}

func grabAndRemoveEnvVars() error {
Expand Down
11 changes: 7 additions & 4 deletions cmd/wsh/cmd/wshcmd-connserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func runListener(listener net.Listener, router *wshutil.WshRouter) {
}
}

func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName string) (*wshutil.WshRpc, error) {
func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName string) (*wshutil.WshRpc, string, error) {
routeId := wshutil.MakeConnectionRouteId(connServerConnName)
rpcCtx := wshrpc.RpcContext{
RouteId: routeId,
Expand All @@ -196,7 +196,7 @@ func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName stri

connServerClient := wshutil.MakeWshRpc(rpcCtx, wshremote.MakeRemoteRpcServerImpl(os.Stdout, router, bareClient, false, connServerInitialEnv, sockName), routeId)
router.RegisterTrustedLeaf(connServerClient, routeId)
return connServerClient, nil
return connServerClient, routeId, nil
}

func serverRunRouter() error {
Expand Down Expand Up @@ -236,11 +236,12 @@ func serverRunRouter() error {
sockName := getRemoteDomainSocketName()

// setup the connserver rpc client first
client, err := setupConnServerRpcClientWithRouter(router, sockName)
client, bareRouteId, err := setupConnServerRpcClientWithRouter(router, sockName)
if err != nil {
return fmt.Errorf("error setting up connserver rpc client: %v", err)
}
wshfs.RpcClient = client
wshfs.RpcClientRouteId = bareRouteId

log.Printf("trying to get JWT public key")

Expand Down Expand Up @@ -360,11 +361,12 @@ func serverRunRouterDomainSocket(jwtToken string) error {
log.Printf("got JWT public key")

// now setup the connserver rpc client
client, err := setupConnServerRpcClientWithRouter(router, sockName)
client, bareRouteId, err := setupConnServerRpcClientWithRouter(router, sockName)
if err != nil {
return fmt.Errorf("error setting up connserver rpc client: %v", err)
}
wshfs.RpcClient = client
wshfs.RpcClientRouteId = bareRouteId

// set up the local domain socket listener for local wsh commands
unixListener, err := MakeRemoteUnixListener()
Expand Down Expand Up @@ -402,6 +404,7 @@ func serverRunNormal(jwtToken string) error {
return err
}
wshfs.RpcClient = RpcClient
wshfs.RpcClientRouteId = RpcClientRouteId
WriteStdout("running wsh connserver (%s)\n", RpcContext.Conn)
go func() {
defer func() {
Expand Down
38 changes: 34 additions & 4 deletions cmd/wsh/cmd/wshcmd-file-util.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2025, Command Line Inc.
// Copyright 2026, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0

package cmd
Expand All @@ -12,10 +12,10 @@ import (
"strings"

"github.com/wavetermdev/waveterm/pkg/remote/connparse"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil"
"github.com/wavetermdev/waveterm/pkg/util/fileutil"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
"github.com/wavetermdev/waveterm/pkg/wshutil"
)

func convertNotFoundErr(err error) error {
Expand Down Expand Up @@ -91,8 +91,38 @@ func streamWriteToFile(fileData wshrpc.FileData, reader io.Reader) error {
}

func streamReadFromFile(ctx context.Context, fileData wshrpc.FileData, writer io.Writer) error {
ch := wshclient.FileReadStreamCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
return fsutil.ReadFileStreamToWriter(ctx, ch, writer)
broker := RpcClient.StreamBroker
if broker == nil {
return fmt.Errorf("stream broker not available")
}
if fileData.Info == nil {
return fmt.Errorf("file info is required")
}
readerRouteId := RpcClientRouteId
if readerRouteId == "" {
return fmt.Errorf("no route id available")
}
conn, err := connparse.ParseURI(fileData.Info.Path)
if err != nil {
return fmt.Errorf("parsing file path: %w", err)
}
writerRouteId := wshutil.MakeConnectionRouteId(conn.Host)
reader, streamMeta := broker.CreateStreamReader(readerRouteId, writerRouteId, 256*1024)
defer reader.Close()
go func() {
<-ctx.Done()
reader.Close()
}()
data := wshrpc.CommandFileStreamData{
Info: fileData.Info,
StreamMeta: *streamMeta,
}
_, err = wshclient.FileStreamCommand(RpcClient, data, nil)
if err != nil {
return fmt.Errorf("starting file stream: %w", err)
}
_, err = io.Copy(writer, reader)
return err
}

func fixRelativePaths(path string) (string, error) {
Expand Down
5 changes: 0 additions & 5 deletions cmd/wsh/cmd/wshcmd-file.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,6 @@ func fileCatRun(cmd *cobra.Command, args []string) error {
return err
}

_, err = checkFileSize(path, MaxFileSize)
if err != nil {
return err
}

fileData := wshrpc.FileData{
Info: &wshrpc.FileInfo{
Path: path}}
Expand Down
11 changes: 9 additions & 2 deletions cmd/wsh/cmd/wshcmd-root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var WrappedStdout io.Writer = &WrappedWriter{dest: os.Stdout}
var WrappedStderr io.Writer = &WrappedWriter{dest: os.Stderr}
var RpcClient *wshutil.WshRpc
var RpcContext wshrpc.RpcContext
var RpcClientRouteId string
var UsingTermWshMode bool
var blockArg string
var WshExitCode int
Expand Down Expand Up @@ -140,7 +141,12 @@ func setupRpcClientWithToken(swapTokenStr string) (wshrpc.CommandAuthenticateRtn
if err != nil {
return rtn, fmt.Errorf("error setting up domain socket rpc client: %w", err)
}
return wshclient.AuthenticateTokenCommand(RpcClient, wshrpc.CommandAuthenticateTokenData{Token: token.Token}, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
rtn, err = wshclient.AuthenticateTokenCommand(RpcClient, wshrpc.CommandAuthenticateTokenData{Token: token.Token}, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
if err != nil {
return rtn, err
}
RpcClientRouteId = rtn.RouteId
return rtn, nil
}

// returns the wrapped stdin and a new rpc client (that wraps the stdin input and stdout output)
Expand All @@ -158,10 +164,11 @@ func setupRpcClient(serverImpl wshutil.ServerImpl, jwtToken string) error {
if err != nil {
return fmt.Errorf("error setting up domain socket rpc client: %v", err)
}
_, err = wshclient.AuthenticateCommand(RpcClient, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
authRtn, err := wshclient.AuthenticateCommand(RpcClient, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
if err != nil {
return fmt.Errorf("error authenticating: %v", err)
}
RpcClientRouteId = authRtn.RouteId
blockId := os.Getenv("WAVETERM_BLOCKID")
if blockId != "" {
peerInfo := fmt.Sprintf("domain:block:%s", blockId)
Expand Down
12 changes: 0 additions & 12 deletions frontend/app/store/wshclientapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,6 @@ export class RpcApiType {
return client.wshRpcCall("fileread", data, opts);
}

// command "filereadstream" [responsestream]
FileReadStreamCommand(client: WshClient, data: FileData, opts?: RpcOpts): AsyncGenerator<FileData, void, boolean> {
if (this.mockClient) return this.mockClient.mockWshRpcStream(client, "filereadstream", data, opts);
return client.wshRpcStream("filereadstream", data, opts);
}

// command "filerestorebackup" [call]
FileRestoreBackupCommand(client: WshClient, data: CommandFileRestoreBackupData, opts?: RpcOpts): Promise<void> {
if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "filerestorebackup", data, opts);
Expand Down Expand Up @@ -780,12 +774,6 @@ export class RpcApiType {
return client.wshRpcStream("remotestreamcpudata", null, opts);
}

// command "remotestreamfile" [responsestream]
RemoteStreamFileCommand(client: WshClient, data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator<FileData, void, boolean> {
if (this.mockClient) return this.mockClient.mockWshRpcStream(client, "remotestreamfile", data, opts);
return client.wshRpcStream("remotestreamfile", data, opts);
}

// command "remoteterminatejobmanager" [call]
RemoteTerminateJobManagerCommand(client: WshClient, data: CommandRemoteTerminateJobManagerData, opts?: RpcOpts): Promise<void> {
if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "remoteterminatejobmanager", data, opts);
Expand Down
27 changes: 12 additions & 15 deletions frontend/app/view/preview/preview-directory.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

import { ContextMenuModel } from "@/app/store/contextmenu";
import { useWaveEnv } from "@/app/waveenv/waveenv";
import { globalStore } from "@/app/store/jotaiStore";
import { TabRpcClient } from "@/app/store/wshrpcutil";
import { useWaveEnv } from "@/app/waveenv/waveenv";
import { checkKeyPressed, isCharacterKeyEvent } from "@/util/keyutil";
import { PLATFORM, PlatformMacOS } from "@/util/platformutil";
import { addOpenMenuItems } from "@/util/previewutil";
Expand Down Expand Up @@ -112,7 +112,6 @@ function DirectoryTable({
newDirectory,
}: DirectoryTableProps) {
const env = useWaveEnv<PreviewEnv>();
const searchActive = useAtomValue(model.directorySearchActive);
const fullConfig = useAtomValue(env.atoms.fullConfigAtom);
const defaultSort = useAtomValue(env.getSettingsKeyAtom("preview:defaultsort")) ?? "name";
const setErrorMsg = useSetAtom(model.errorMsgAtom);
Expand Down Expand Up @@ -587,28 +586,26 @@ function DirectoryPreview({ model }: DirectoryPreviewProps) {
useEffect(
() =>
fireAndForget(async () => {
let entries: FileInfo[];
const entries: FileInfo[] = [];
try {
const file = await env.rpc.FileReadCommand(
TabRpcClient,
{
info: {
path: await model.formatRemoteUri(dirPath, globalStore.get),
},
},
null
);
entries = file.entries ?? [];
if (file?.info && file.info.dir && file.info?.path !== file.info?.dir) {
const remotePath = await model.formatRemoteUri(dirPath, globalStore.get);
const stream = env.rpc.FileListStreamCommand(TabRpcClient, { path: remotePath }, null);
for await (const chunk of stream) {
if (chunk?.fileinfo) {
entries.push(...chunk.fileinfo);
}
}
if (finfo?.dir && finfo?.path !== finfo?.dir) {
entries.unshift({
name: "..",
path: file?.info?.dir,
path: finfo.dir,
isdir: true,
modtime: new Date().getTime(),
mimetype: "directory",
});
}
} catch (e) {
console.error("Directory Read Error", e);
setErrorMsg({
status: "Cannot Read Directory",
text: `${e}`,
Expand Down
1 change: 1 addition & 0 deletions frontend/app/view/preview/previewenv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export type PreviewEnv = WaveEnvSubset<{
ConnEnsureCommand: WaveEnv["rpc"]["ConnEnsureCommand"];
FileInfoCommand: WaveEnv["rpc"]["FileInfoCommand"];
FileReadCommand: WaveEnv["rpc"]["FileReadCommand"];
FileListStreamCommand: WaveEnv["rpc"]["FileListStreamCommand"];
FileWriteCommand: WaveEnv["rpc"]["FileWriteCommand"];
FileMoveCommand: WaveEnv["rpc"]["FileMoveCommand"];
FileDeleteCommand: WaveEnv["rpc"]["FileDeleteCommand"];
Expand Down
61 changes: 16 additions & 45 deletions frontend/preview/mock/mockfilesystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,23 @@ const MockDirMimeType = "directory";
const MockDirMode = 0o040755;
const MockFileMode = 0o100644;
const MockDirectoryChunkSize = 128;
const MockFileChunkSize = 64 * 1024;
const MockBaseModTime = Date.parse("2026-03-10T09:00:00.000Z");
const TinyPngBytes = Uint8Array.from([
0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44, 0x52,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x04, 0x00, 0x00, 0x00, 0xb5, 0x1c, 0x0c,
0x02, 0x00, 0x00, 0x00, 0x0b, 0x49, 0x44, 0x41, 0x54, 0x78, 0xda, 0x63, 0xfc, 0xff, 0x1f, 0x00,
0x03, 0x03, 0x01, 0xff, 0xa5, 0xf8, 0x8f, 0xb1, 0x00, 0x00, 0x00, 0x00, 0x49, 0x45, 0x4e, 0x44,
0xae, 0x42, 0x60, 0x82,
0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, 0x00,
0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x04, 0x00, 0x00, 0x00, 0xb5, 0x1c, 0x0c, 0x02, 0x00, 0x00, 0x00, 0x0b, 0x49,
0x44, 0x41, 0x54, 0x78, 0xda, 0x63, 0xfc, 0xff, 0x1f, 0x00, 0x03, 0x03, 0x01, 0xff, 0xa5, 0xf8, 0x8f, 0xb1, 0x00,
0x00, 0x00, 0x00, 0x49, 0x45, 0x4e, 0x44, 0xae, 0x42, 0x60, 0x82,
]);
const TinyJpegBytes = Uint8Array.from([
0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10, 0x4a, 0x46, 0x49, 0x46, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01,
0x00, 0x01, 0x00, 0x00, 0xff, 0xdb, 0x00, 0x43, 0x00, 0x03, 0x02, 0x02, 0x03, 0x02, 0x02, 0x03,
0x03, 0x03, 0x03, 0x04, 0x03, 0x03, 0x04, 0x05, 0x08, 0x05, 0x05, 0x04, 0x04, 0x05, 0x0a, 0x07,
0x07, 0x06, 0x08, 0x0c, 0x0a, 0x0c, 0x0c, 0x0b, 0x0a, 0x0b, 0x0b, 0x0d, 0x0e, 0x12, 0x10, 0x0d,
0x0e, 0x11, 0x0e, 0x0b, 0x0b, 0x10, 0x16, 0x10, 0x11, 0x13, 0x14, 0x15, 0x15, 0x15, 0x0c, 0x0f,
0x17, 0x18, 0x16, 0x14, 0x18, 0x12, 0x14, 0x15, 0x14, 0xff, 0xc0, 0x00, 0x0b, 0x08, 0x00, 0x01,
0x00, 0x01, 0x01, 0x01, 0x11, 0x00, 0xff, 0xc4, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0xff, 0xc4, 0x00, 0x14,
0x10, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0xff, 0xda, 0x00, 0x08, 0x01, 0x01, 0x00, 0x00, 0x3f, 0x00, 0xbf, 0xff, 0xd9,
0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10, 0x4a, 0x46, 0x49, 0x46, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00,
0x00, 0xff, 0xdb, 0x00, 0x43, 0x00, 0x03, 0x02, 0x02, 0x03, 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, 0x04, 0x03, 0x03,
0x04, 0x05, 0x08, 0x05, 0x05, 0x04, 0x04, 0x05, 0x0a, 0x07, 0x07, 0x06, 0x08, 0x0c, 0x0a, 0x0c, 0x0c, 0x0b, 0x0a,
0x0b, 0x0b, 0x0d, 0x0e, 0x12, 0x10, 0x0d, 0x0e, 0x11, 0x0e, 0x0b, 0x0b, 0x10, 0x16, 0x10, 0x11, 0x13, 0x14, 0x15,
0x15, 0x15, 0x0c, 0x0f, 0x17, 0x18, 0x16, 0x14, 0x18, 0x12, 0x14, 0x15, 0x14, 0xff, 0xc0, 0x00, 0x0b, 0x08, 0x00,
0x01, 0x00, 0x01, 0x01, 0x01, 0x11, 0x00, 0xff, 0xc4, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0xff, 0xc4, 0x00, 0x14, 0x10, 0x01, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xda, 0x00, 0x08, 0x01, 0x01,
0x00, 0x00, 0x3f, 0x00, 0xbf, 0xff, 0xd9,
]);

type MockFsEntry = {
Expand Down Expand Up @@ -61,7 +58,6 @@ export type MockFilesystem = {
fileRead: (data: FileData) => Promise<FileData>;
fileList: (data: FileListData) => Promise<FileInfo[]>;
fileJoin: (paths: string[]) => Promise<FileInfo>;
fileReadStream: (data: FileData) => AsyncGenerator<FileData, void, boolean>;
fileListStream: (data: FileListData) => AsyncGenerator<CommandRemoteListEntriesRtnData, void, boolean>;
};

Expand Down Expand Up @@ -492,33 +488,9 @@ export function makeMockFilesystem(): MockFilesystem {
}
return toFileInfo(entry);
};
const fileReadStream = async function* (data: FileData): AsyncGenerator<FileData, void, boolean> {
const info = await fileInfo(data);
yield { info };
if (info.notfound) {
return;
}
const entry = getEntry(info.path);
if (entry.isdir) {
const dirEntries = (childrenByDir.get(entry.path) ?? []).map((child) => toFileInfo(child));
for (let idx = 0; idx < dirEntries.length; idx += MockDirectoryChunkSize) {
yield { entries: dirEntries.slice(idx, idx + MockDirectoryChunkSize) };
}
return;
}
if (entry.content == null || entry.content.byteLength === 0) {
return;
}
const { offset, end } = getReadRange(data, entry.content.byteLength);
for (let currentOffset = offset; currentOffset < end; currentOffset += MockFileChunkSize) {
const chunkEnd = Math.min(currentOffset + MockFileChunkSize, end);
yield {
data64: arrayToBase64(entry.content.slice(currentOffset, chunkEnd)),
at: { offset: currentOffset, size: chunkEnd - currentOffset },
};
}
};
const fileListStream = async function* (data: FileListData): AsyncGenerator<CommandRemoteListEntriesRtnData, void, boolean> {
const fileListStream = async function* (
data: FileListData
): AsyncGenerator<CommandRemoteListEntriesRtnData, void, boolean> {
const fileInfos = await fileList(data);
for (let idx = 0; idx < fileInfos.length; idx += MockDirectoryChunkSize) {
yield { fileinfo: fileInfos.slice(idx, idx + MockDirectoryChunkSize) };
Expand All @@ -535,7 +507,6 @@ export function makeMockFilesystem(): MockFilesystem {
fileRead,
fileList,
fileJoin,
fileReadStream,
fileListStream,
};
}
Expand Down
Loading
Loading