diff --git a/frontend/app/store/wshclientapi.ts b/frontend/app/store/wshclientapi.ts index d64f7f06b0..de6cd7efff 100644 --- a/frontend/app/store/wshclientapi.ts +++ b/frontend/app/store/wshclientapi.ts @@ -378,6 +378,12 @@ export class RpcApiType { return client.wshRpcCall("filerestorebackup", data, opts); } + // command "filestream" [call] + FileStreamCommand(client: WshClient, data: CommandFileStreamData, opts?: RpcOpts): Promise { + if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "filestream", data, opts); + return client.wshRpcCall("filestream", data, opts); + } + // command "filewrite" [call] FileWriteCommand(client: WshClient, data: FileData, opts?: RpcOpts): Promise { if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "filewrite", data, opts); @@ -720,6 +726,12 @@ export class RpcApiType { return client.wshRpcCall("remotefilemultiinfo", data, opts); } + // command "remotefilestream" [call] + RemoteFileStreamCommand(client: WshClient, data: CommandRemoteFileStreamData, opts?: RpcOpts): Promise { + if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "remotefilestream", data, opts); + return client.wshRpcCall("remotefilestream", data, opts); + } + // command "remotefiletouch" [call] RemoteFileTouchCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "remotefiletouch", data, opts); diff --git a/frontend/app/view/preview/preview-streaming.tsx b/frontend/app/view/preview/preview-streaming.tsx index 408da54de4..21726d9fbc 100644 --- a/frontend/app/view/preview/preview-streaming.tsx +++ b/frontend/app/view/preview/preview-streaming.tsx @@ -72,18 +72,14 @@ function StreamingPreview({ model }: SpecializedViewProps) { if (fileInfo.mimetype.startsWith("video/")) { return (
- +
); } if (fileInfo.mimetype.startsWith("audio/")) { return (
- +
); } diff --git a/frontend/app/view/term/termwrap.ts b/frontend/app/view/term/termwrap.ts index 05490ad466..909c8571f4 100644 --- a/frontend/app/view/term/termwrap.ts +++ b/frontend/app/view/term/termwrap.ts @@ -531,7 +531,7 @@ export class TermWrap { } } let resolve: () => void = null; - let prtn = new Promise((presolve, _) => { + const prtn = new Promise((presolve, _) => { resolve = presolve; }); this.terminal.write(data, () => { diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index f2ec1d2c64..193929ef4c 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -380,6 +380,13 @@ declare global { restoretofilename: string; }; + // wshrpc.CommandFileStreamData + type CommandFileStreamData = { + info: FileInfo; + byterange?: string; + streammeta: StreamMeta; + }; + // wshrpc.CommandGetMetaData type CommandGetMetaData = { oref: ORef; @@ -521,6 +528,13 @@ declare global { paths: string[]; }; + // wshrpc.CommandRemoteFileStreamData + type CommandRemoteFileStreamData = { + path: string; + byterange?: string; + streammeta: StreamMeta; + }; + // wshrpc.CommandRemoteListEntriesData type CommandRemoteListEntriesData = { path: string; diff --git a/pkg/remote/fileshare/wshfs/wshfs.go b/pkg/remote/fileshare/wshfs/wshfs.go index 06429bc8a2..80ab53e99b 100644 --- a/pkg/remote/fileshare/wshfs/wshfs.go +++ b/pkg/remote/fileshare/wshfs/wshfs.go @@ -61,12 +61,37 @@ func ReadStream(ctx context.Context, data wshrpc.FileData) <-chan wshrpc.RespOrE func readStream(conn *connparse.Connection, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData] { byteRange := "" if data.At != nil && data.At.Size > 0 { - byteRange = fmt.Sprintf("%d-%d", data.At.Offset, data.At.Offset+int64(data.At.Size)) + byteRange = fmt.Sprintf("%d-%d", data.At.Offset, data.At.Offset+int64(data.At.Size)-1) } streamFileData := wshrpc.CommandRemoteStreamFileData{Path: conn.Path, ByteRange: byteRange} return wshclient.RemoteStreamFileCommand(RpcClient, streamFileData, &wshrpc.RpcOpts{Route: wshutil.MakeConnectionRouteId(conn.Host)}) } +func GetConnectionRouteId(ctx context.Context, path string) (string, error) { + conn, err := parseConnection(ctx, path) + if err != nil { + return "", err + } + return wshutil.MakeConnectionRouteId(conn.Host), nil +} + +func FileStream(ctx context.Context, data wshrpc.CommandFileStreamData) (*wshrpc.FileInfo, error) { + if data.Info == nil { + return nil, fmt.Errorf("file info is required") + } + log.Printf("FileStream: %v", data.Info.Path) + conn, err := parseConnection(ctx, data.Info.Path) + if err != nil { + return nil, err + } + remoteData := wshrpc.CommandRemoteFileStreamData{ + Path: conn.Path, + ByteRange: data.ByteRange, + StreamMeta: data.StreamMeta, + } + return wshclient.RemoteFileStreamCommand(RpcClient, remoteData, &wshrpc.RpcOpts{Route: wshutil.MakeConnectionRouteId(conn.Host)}) +} + func ListEntries(ctx context.Context, path string, opts *wshrpc.FileListOpts) ([]*wshrpc.FileInfo, error) { log.Printf("ListEntries: %v", path) conn, err := parseConnection(ctx, path) diff --git a/pkg/util/fileutil/fileutil.go b/pkg/util/fileutil/fileutil.go index a26409e5e0..ce9663d521 100644 --- a/pkg/util/fileutil/fileutil.go +++ b/pkg/util/fileutil/fileutil.go @@ -1,10 +1,11 @@ -// Copyright 2025, Command Line Inc. +// Copyright 2026, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 package fileutil import ( "bytes" + "errors" "fmt" "io" "io/fs" @@ -18,6 +19,38 @@ import ( "github.com/wavetermdev/waveterm/pkg/wavebase" ) +type ByteRangeType struct { + All bool + Start int64 + End int64 // inclusive; only valid when OpenEnd is false + OpenEnd bool // true when range is "N-" (read from Start to EOF) +} + +func ParseByteRange(rangeStr string) (ByteRangeType, error) { + if rangeStr == "" { + return ByteRangeType{All: true}, nil + } + // handle open-ended range "N-" + if len(rangeStr) > 0 && rangeStr[len(rangeStr)-1] == '-' { + var start int64 + _, err := fmt.Sscanf(rangeStr, "%d-", &start) + if err != nil || start < 0 { + return ByteRangeType{}, errors.New("invalid byte range") + } + return ByteRangeType{Start: start, OpenEnd: true}, nil + } + var start, end int64 + _, err := fmt.Sscanf(rangeStr, "%d-%d", &start, &end) + if err != nil { + return ByteRangeType{}, errors.New("invalid byte range") + } + if start < 0 || end < 0 || start > end { + return ByteRangeType{}, errors.New("invalid byte range") + } + // End is inclusive (HTTP byte range semantics: bytes=0-999 means 1000 bytes) + return ByteRangeType{Start: start, End: end}, nil +} + func FixPath(path string) (string, error) { origPath := path var err error diff --git a/pkg/web/web.go b/pkg/web/web.go index 52e48ab48c..106db981e4 100644 --- a/pkg/web/web.go +++ b/pkg/web/web.go @@ -4,8 +4,6 @@ package web import ( - "bytes" - "context" "encoding/base64" "encoding/json" "fmt" @@ -17,6 +15,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "time" "github.com/google/uuid" @@ -28,12 +27,10 @@ import ( "github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs" "github.com/wavetermdev/waveterm/pkg/schema" "github.com/wavetermdev/waveterm/pkg/service" - "github.com/wavetermdev/waveterm/pkg/util/utilfn" + "github.com/wavetermdev/waveterm/pkg/util/fileutil" "github.com/wavetermdev/waveterm/pkg/wavebase" "github.com/wavetermdev/waveterm/pkg/wshrpc" "github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient" - "github.com/wavetermdev/waveterm/pkg/wshrpc/wshserver" - "github.com/wavetermdev/waveterm/pkg/wshutil" ) type WebFnType = func(http.ResponseWriter, *http.Request) @@ -220,6 +217,7 @@ func serveTransparentGIF(w http.ResponseWriter) { } func handleLocalStreamFile(w http.ResponseWriter, r *http.Request, path string, no404 bool) { + http.NewResponseController(w).SetWriteDeadline(time.Time{}) if no404 { log.Printf("streaming file w/no404: %q\n", path) // use the custom response writer @@ -246,78 +244,84 @@ func handleLocalStreamFile(w http.ResponseWriter, r *http.Request, path string, } } -func handleRemoteStreamFile(w http.ResponseWriter, req *http.Request, conn string, path string, no404 bool) error { - client := wshserver.GetMainRpcClient() - streamFileData := wshrpc.CommandRemoteStreamFileData{Path: path} - route := wshutil.MakeConnectionRouteId(conn) - rpcOpts := &wshrpc.RpcOpts{Route: route, Timeout: 60 * 1000} - rtnCh := wshclient.RemoteStreamFileCommand(client, streamFileData, rpcOpts) - return handleRemoteStreamFileFromCh(w, req, path, rtnCh, rpcOpts.StreamCancelFn, no404) -} +func handleStreamFileFromReader(w http.ResponseWriter, r *http.Request, path string, no404 bool) error { + startTime := time.Now() + rangeHeader := r.Header.Get("Range") + log.Printf("stream-file path=%q range=%q\n", path, rangeHeader) -func handleRemoteStreamFileFromCh(w http.ResponseWriter, req *http.Request, path string, rtnCh <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData], streamCancelFn func(context.Context) error, no404 bool) error { - firstPk := true - var fileInfo *wshrpc.FileInfo - loopDone := false - defer func() { - if loopDone { - return + writerRouteId, err := wshfs.GetConnectionRouteId(r.Context(), path) + if err != nil { + return err + } + + byteRange := "" + if rangeHeader != "" { + stripped := strings.TrimPrefix(rangeHeader, "bytes=") + br, parseErr := fileutil.ParseByteRange(stripped) + if parseErr != nil || br.All { + http.Error(w, "invalid range", http.StatusRequestedRangeNotSatisfiable) + return nil } - // if loop didn't finish naturally clear it out - utilfn.DrainChannelSafe(rtnCh, "handleRemoteStreamFile") + byteRange = stripped + } + + bareRpc := wshclient.GetBareRpcClient() + readerRouteId := wshclient.GetBareRpcClientRouteId() + reader, streamMeta := bareRpc.StreamBroker.CreateStreamReader(readerRouteId, writerRouteId, 256*1024) + defer reader.Close() + go func() { + <-r.Context().Done() + reader.Close() }() - ctx := req.Context() - for { - select { - case <-ctx.Done(): - if streamCancelFn != nil { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - streamCancelFn(ctx) - } - return ctx.Err() - case respUnion, ok := <-rtnCh: - if !ok { - loopDone = true - return nil - } - if respUnion.Error != nil { - return respUnion.Error - } - if firstPk { - firstPk = false - if respUnion.Response.Info == nil { - return fmt.Errorf("stream file protocol error, fileinfo is empty") - } - fileInfo = respUnion.Response.Info - if fileInfo.NotFound { - if no404 { - serveTransparentGIF(w) - return nil - } else { - return fmt.Errorf("file not found: %q", path) - } - } - if fileInfo.IsDir { - return fmt.Errorf("cannot stream directory: %q", path) - } - w.Header().Set(ContentTypeHeaderKey, fileInfo.MimeType) - w.Header().Set(ContentLengthHeaderKey, fmt.Sprintf("%d", fileInfo.Size)) - continue - } - if respUnion.Response.Data64 == "" { - continue - } - decoder := base64.NewDecoder(base64.StdEncoding, bytes.NewReader([]byte(respUnion.Response.Data64))) - _, err := io.Copy(w, decoder) - if err != nil { - log.Printf("error streaming file %q: %v\n", path, err) - // not sure what to do here, the headers have already been sent. - // just return - return nil - } + + data := wshrpc.CommandFileStreamData{ + Info: &wshrpc.FileInfo{Path: path}, + ByteRange: byteRange, + StreamMeta: *streamMeta, + } + fileInfo, err := wshfs.FileStream(r.Context(), data) + if err != nil { + if no404 { + serveTransparentGIF(w) + return nil + } + return err + } + if fileInfo.NotFound { + if no404 { + serveTransparentGIF(w) + return nil + } + http.Error(w, fmt.Sprintf("file not found: %q", path), http.StatusNotFound) + return nil + } + if fileInfo.IsDir { + http.Error(w, fmt.Sprintf("cannot stream directory: %q", path), http.StatusBadRequest) + return nil + } + log.Printf("stream-file headers-ready path=%q time-to-headers=%v\n", path, time.Since(startTime)) + w.Header().Set(ContentTypeHeaderKey, fileInfo.MimeType) + w.Header().Set("Accept-Ranges", "bytes") + if byteRange != "" { + br, _ := fileutil.ParseByteRange(byteRange) + var rangeEnd int64 + if br.OpenEnd { + rangeEnd = fileInfo.Size - 1 + } else { + rangeEnd = br.End } + w.Header().Set(ContentLengthHeaderKey, fmt.Sprintf("%d", rangeEnd-br.Start+1)) + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", br.Start, rangeEnd, fileInfo.Size)) + w.WriteHeader(http.StatusPartialContent) + } else { + w.Header().Set(ContentLengthHeaderKey, fmt.Sprintf("%d", fileInfo.Size)) + } + http.NewResponseController(w).SetWriteDeadline(time.Time{}) + _, copyErr := io.Copy(w, reader) + if copyErr != nil && r.Context().Err() == nil { + log.Printf("error streaming file %q: %v\n", path, copyErr) } + return nil } func handleStreamLocalFile(w http.ResponseWriter, r *http.Request) { @@ -338,13 +342,7 @@ func handleStreamFile(w http.ResponseWriter, r *http.Request) { } no404 := r.URL.Query().Get("no404") // path should already be formatted as a wsh:// URI (e.g. wsh://local/path or wsh://connection/path) - data := wshrpc.FileData{ - Info: &wshrpc.FileInfo{ - Path: path, - }, - } - rtnCh := wshfs.ReadStream(r.Context(), data) - err := handleRemoteStreamFileFromCh(w, r, path, rtnCh, nil, no404 != "") + err := handleStreamFileFromReader(w, r, path, no404 != "") if err != nil { log.Printf("error streaming file %q: %v\n", path, err) http.Error(w, fmt.Sprintf("error streaming file: %v", err), http.StatusInternalServerError) @@ -448,11 +446,15 @@ const schemaPrefix = "/schema/" func RunWebServer(listener net.Listener) { gr := mux.NewRouter() - // Create separate routers for different timeout requirements + // Streaming routes must be registered before the /wave/ prefix catch-all to bypass TimeoutHandler. + // http.TimeoutHandler buffers the entire response before flushing, which stalls streaming. + gr.HandleFunc("/wave/stream-local-file", WebFnWrap(WebFnOpts{AllowCaching: true}, handleStreamLocalFile)) + gr.HandleFunc("/wave/stream-file", WebFnWrap(WebFnOpts{AllowCaching: true}, handleStreamFile)) + gr.PathPrefix("/wave/stream-file/").HandlerFunc(WebFnWrap(WebFnOpts{AllowCaching: true}, handleStreamFile)) + gr.HandleFunc("/api/post-chat-message", WebFnWrap(WebFnOpts{AllowCaching: false}, aiusechat.WaveAIPostMessageHandler)) + + // Non-streaming /wave/ routes get timeout protection waveRouter := mux.NewRouter() - waveRouter.HandleFunc("/wave/stream-local-file", WebFnWrap(WebFnOpts{AllowCaching: true}, handleStreamLocalFile)) - waveRouter.HandleFunc("/wave/stream-file", WebFnWrap(WebFnOpts{AllowCaching: true}, handleStreamFile)) - waveRouter.PathPrefix("/wave/stream-file/").HandlerFunc(WebFnWrap(WebFnOpts{AllowCaching: true}, handleStreamFile)) waveRouter.HandleFunc("/wave/file", WebFnWrap(WebFnOpts{AllowCaching: false}, handleWaveFile)) waveRouter.HandleFunc("/wave/service", WebFnWrap(WebFnOpts{JsonErrors: true}, handleService)) waveRouter.HandleFunc("/wave/aichat", WebFnWrap(WebFnOpts{JsonErrors: true, AllowCaching: false}, aiusechat.WaveAIGetChatHandler)) @@ -460,13 +462,9 @@ func RunWebServer(listener net.Listener) { vdomRouter := mux.NewRouter() vdomRouter.HandleFunc("/vdom/{uuid}/{path:.*}", WebFnWrap(WebFnOpts{AllowCaching: true}, handleVDom)) - // Routes that need timeout handling gr.PathPrefix("/wave/").Handler(http.TimeoutHandler(waveRouter, HttpTimeoutDuration, "Timeout")) gr.PathPrefix("/vdom/").Handler(http.TimeoutHandler(vdomRouter, HttpTimeoutDuration, "Timeout")) - // Routes that should NOT have timeout handling (for streaming) - gr.HandleFunc("/api/post-chat-message", WebFnWrap(WebFnOpts{AllowCaching: false}, aiusechat.WaveAIPostMessageHandler)) - // Other routes without timeout gr.PathPrefix(schemaPrefix).Handler(http.StripPrefix(schemaPrefix, schema.GetSchemaHandler())) diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index 103089144e..67ef8669ec 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -375,6 +375,12 @@ func FileRestoreBackupCommand(w *wshutil.WshRpc, data wshrpc.CommandFileRestoreB return err } +// command "filestream", wshserver.FileStreamCommand +func FileStreamCommand(w *wshutil.WshRpc, data wshrpc.CommandFileStreamData, opts *wshrpc.RpcOpts) (*wshrpc.FileInfo, error) { + resp, err := sendRpcRequestCallHelper[*wshrpc.FileInfo](w, "filestream", data, opts) + return resp, err +} + // command "filewrite", wshserver.FileWriteCommand func FileWriteCommand(w *wshutil.WshRpc, data wshrpc.FileData, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "filewrite", data, opts) @@ -717,6 +723,12 @@ func RemoteFileMultiInfoCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteFile return resp, err } +// command "remotefilestream", wshserver.RemoteFileStreamCommand +func RemoteFileStreamCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteFileStreamData, opts *wshrpc.RpcOpts) (*wshrpc.FileInfo, error) { + resp, err := sendRpcRequestCallHelper[*wshrpc.FileInfo](w, "remotefilestream", data, opts) + return resp, err +} + // command "remotefiletouch", wshserver.RemoteFileTouchCommand func RemoteFileTouchCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "remotefiletouch", data, opts) diff --git a/pkg/wshrpc/wshremote/wshremote_file.go b/pkg/wshrpc/wshremote/wshremote_file.go index 015e976b32..d796154f48 100644 --- a/pkg/wshrpc/wshremote/wshremote_file.go +++ b/pkg/wshrpc/wshremote/wshremote_file.go @@ -16,6 +16,7 @@ import ( "strings" "time" + "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/remote/connparse" "github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil" "github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs" @@ -31,28 +32,8 @@ const RemoteFileTransferSizeLimit = 32 * 1024 * 1024 var DisableRecursiveFileOpts = true -type ByteRangeType struct { - All bool - Start int64 - End int64 -} - -func parseByteRange(rangeStr string) (ByteRangeType, error) { - if rangeStr == "" { - return ByteRangeType{All: true}, nil - } - var start, end int64 - _, err := fmt.Sscanf(rangeStr, "%d-%d", &start, &end) - if err != nil { - return ByteRangeType{}, errors.New("invalid byte range") - } - if start < 0 || end < 0 || start > end { - return ByteRangeType{}, errors.New("invalid byte range") - } - return ByteRangeType{Start: start, End: end}, nil -} -func (impl *ServerImpl) remoteStreamFileDir(ctx context.Context, path string, byteRange ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange ByteRangeType)) error { +func (impl *ServerImpl) remoteStreamFileDir(ctx context.Context, path string, byteRange fileutil.ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error { innerFilesEntries, err := os.ReadDir(path) if err != nil { return fmt.Errorf("cannot open dir %q: %w", path, err) @@ -63,9 +44,14 @@ func (impl *ServerImpl) remoteStreamFileDir(ctx context.Context, path string, by } } else { if byteRange.Start < int64(len(innerFilesEntries)) { - realEnd := byteRange.End - if realEnd > int64(len(innerFilesEntries)) { + var realEnd int64 + if byteRange.OpenEnd { realEnd = int64(len(innerFilesEntries)) + } else { + realEnd = byteRange.End + 1 + if realEnd > int64(len(innerFilesEntries)) { + realEnd = int64(len(innerFilesEntries)) + } } innerFilesEntries = innerFilesEntries[byteRange.Start:realEnd] } else { @@ -94,7 +80,7 @@ func (impl *ServerImpl) remoteStreamFileDir(ctx context.Context, path string, by return nil } -func (impl *ServerImpl) remoteStreamFileRegular(ctx context.Context, path string, byteRange ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange ByteRangeType)) error { +func (impl *ServerImpl) remoteStreamFileRegular(ctx context.Context, path string, byteRange fileutil.ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error { fd, err := os.Open(path) if err != nil { return fmt.Errorf("cannot open file %q: %w", path, err) @@ -115,13 +101,13 @@ func (impl *ServerImpl) remoteStreamFileRegular(ctx context.Context, path string } n, err := fd.Read(buf) if n > 0 { - if !byteRange.All && filePos+int64(n) > byteRange.End { - n = int(byteRange.End - filePos) + if !byteRange.All && !byteRange.OpenEnd && filePos+int64(n) > byteRange.End+1 { + n = int(byteRange.End + 1 - filePos) } filePos += int64(n) dataCallback(nil, buf[:n], byteRange) } - if !byteRange.All && filePos >= byteRange.End { + if !byteRange.All && !byteRange.OpenEnd && filePos >= byteRange.End+1 { break } if errors.Is(err, io.EOF) { @@ -134,8 +120,8 @@ func (impl *ServerImpl) remoteStreamFileRegular(ctx context.Context, path string return nil } -func (impl *ServerImpl) remoteStreamFileInternal(ctx context.Context, data wshrpc.CommandRemoteStreamFileData, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange ByteRangeType)) error { - byteRange, err := parseByteRange(data.ByteRange) +func (impl *ServerImpl) remoteStreamFileInternal(ctx context.Context, data wshrpc.CommandRemoteStreamFileData, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error { + byteRange, err := fileutil.ParseByteRange(data.ByteRange) if err != nil { return err } @@ -164,9 +150,12 @@ func (impl *ServerImpl) remoteStreamFileInternal(ctx context.Context, data wshrp func (impl *ServerImpl) RemoteStreamFileCommand(ctx context.Context, data wshrpc.CommandRemoteStreamFileData) chan wshrpc.RespOrErrorUnion[wshrpc.FileData] { ch := make(chan wshrpc.RespOrErrorUnion[wshrpc.FileData], 16) go func() { + defer func() { + panichandler.PanicHandler("RemoteStreamFileCommand", recover()) + }() defer close(ch) firstPk := true - err := impl.remoteStreamFileInternal(ctx, data, func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange ByteRangeType) { + err := impl.remoteStreamFileInternal(ctx, data, func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType) { resp := wshrpc.FileData{} fileInfoLen := len(fileInfo) if fileInfoLen > 1 || !firstPk { @@ -344,6 +333,9 @@ func (impl *ServerImpl) RemoteFileCopyCommand(ctx context.Context, data wshrpc.C func (impl *ServerImpl) RemoteListEntriesCommand(ctx context.Context, data wshrpc.CommandRemoteListEntriesData) chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteListEntriesRtnData] { ch := make(chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteListEntriesRtnData], 16) go func() { + defer func() { + panichandler.PanicHandler("RemoteListEntriesCommand", recover()) + }() defer close(ch) path, err := wavebase.ExpandHomeDir(data.Path) if err != nil { @@ -680,6 +672,89 @@ func (*ServerImpl) RemoteWriteFileCommand(ctx context.Context, data wshrpc.FileD return nil } +func (impl *ServerImpl) RemoteFileStreamCommand(ctx context.Context, data wshrpc.CommandRemoteFileStreamData) (*wshrpc.FileInfo, error) { + wshRpc := wshutil.GetWshRpcFromContext(ctx) + if wshRpc == nil || wshRpc.StreamBroker == nil { + return nil, fmt.Errorf("no stream broker available") + } + + writer, err := wshRpc.StreamBroker.CreateStreamWriter(&data.StreamMeta) + if err != nil { + return nil, fmt.Errorf("error creating stream writer: %w", err) + } + + path, err := wavebase.ExpandHomeDir(data.Path) + if err != nil { + writer.CloseWithError(err) + return nil, err + } + cleanedPath := filepath.Clean(path) + + finfo, err := os.Stat(cleanedPath) + if err != nil { + writer.CloseWithError(err) + return nil, fmt.Errorf("cannot stat file %q: %w", data.Path, err) + } + if finfo.IsDir() { + writer.CloseWithError(fmt.Errorf("path is a directory")) + return nil, fmt.Errorf("cannot stream directory %q", data.Path) + } + + byteRange, err := fileutil.ParseByteRange(data.ByteRange) + if err != nil { + writer.CloseWithError(err) + return nil, err + } + + fileInfo := statToFileInfo(cleanedPath, finfo, true) + fileInfo.Path = data.Path + + go func() { + defer func() { + panichandler.PanicHandler("RemoteFileStreamCommand", recover()) + }() + defer writer.Close() + + file, err := os.Open(cleanedPath) + if err != nil { + writer.CloseWithError(fmt.Errorf("cannot open file %q: %w", data.Path, err)) + return + } + defer utilfn.GracefulClose(file, "RemoteFileStreamCommand", cleanedPath) + + if !byteRange.All && byteRange.Start > 0 { + if _, err := file.Seek(byteRange.Start, io.SeekStart); err != nil { + writer.CloseWithError(fmt.Errorf("cannot seek in file %q: %w", data.Path, err)) + return + } + } + + var src io.Reader = file + if !byteRange.All && !byteRange.OpenEnd { + src = io.LimitReader(file, byteRange.End-byteRange.Start+1) + } + + buf := make([]byte, 32*1024) + for { + n, readErr := src.Read(buf) + if n > 0 { + if _, writeErr := writer.Write(buf[:n]); writeErr != nil { + return + } + } + if readErr == io.EOF { + return + } + if readErr != nil { + writer.CloseWithError(fmt.Errorf("error reading file %q: %w", data.Path, readErr)) + return + } + } + }() + + return fileInfo, nil +} + func (*ServerImpl) RemoteFileDeleteCommand(ctx context.Context, data wshrpc.CommandDeleteFileData) error { expandedPath, err := wavebase.ExpandHomeDir(data.Path) if err != nil { diff --git a/pkg/wshrpc/wshrpctypes_file.go b/pkg/wshrpc/wshrpctypes_file.go index ccb93aa410..a43a68bc2e 100644 --- a/pkg/wshrpc/wshrpctypes_file.go +++ b/pkg/wshrpc/wshrpctypes_file.go @@ -25,10 +25,17 @@ type WshRpcFileInterface interface { FileListCommand(ctx context.Context, data FileListData) ([]*FileInfo, error) FileJoinCommand(ctx context.Context, paths []string) (*FileInfo, error) FileListStreamCommand(ctx context.Context, data FileListData) <-chan RespOrErrorUnion[CommandRemoteListEntriesRtnData] + // modern streaming interface + FileStreamCommand(ctx context.Context, data CommandFileStreamData) (*FileInfo, error) } type WshRpcRemoteFileInterface interface { + // old streaming inferface RemoteStreamFileCommand(ctx context.Context, data CommandRemoteStreamFileData) chan RespOrErrorUnion[FileData] + + // modern streaming interface + RemoteFileStreamCommand(ctx context.Context, data CommandRemoteFileStreamData) (*FileInfo, error) + RemoteFileCopyCommand(ctx context.Context, data CommandFileCopyData) (bool, error) RemoteListEntriesCommand(ctx context.Context, data CommandRemoteListEntriesData) chan RespOrErrorUnion[CommandRemoteListEntriesRtnData] RemoteFileInfoCommand(ctx context.Context, path string) (*FileInfo, error) @@ -130,6 +137,18 @@ type CommandRemoteStreamFileData struct { ByteRange string `json:"byterange,omitempty"` } +type CommandRemoteFileStreamData struct { + Path string `json:"path"` + ByteRange string `json:"byterange,omitempty"` + StreamMeta StreamMeta `json:"streammeta"` +} + +type CommandFileStreamData struct { + Info *FileInfo `json:"info"` + ByteRange string `json:"byterange,omitempty"` + StreamMeta StreamMeta `json:"streammeta"` +} + type CommandRemoteListEntriesData struct { Path string `json:"path"` Opts *FileListOpts `json:"opts,omitempty"` diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index b9d320f697..b2c533ff48 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -22,9 +22,9 @@ import ( "github.com/skratchdot/open-golang/open" "github.com/wavetermdev/waveterm/pkg/aiusechat" - "github.com/wavetermdev/waveterm/pkg/baseds" "github.com/wavetermdev/waveterm/pkg/aiusechat/chatstore" "github.com/wavetermdev/waveterm/pkg/aiusechat/uctypes" + "github.com/wavetermdev/waveterm/pkg/baseds" "github.com/wavetermdev/waveterm/pkg/blockcontroller" "github.com/wavetermdev/waveterm/pkg/blocklogger" "github.com/wavetermdev/waveterm/pkg/buildercontroller" @@ -392,6 +392,10 @@ func (ws *WshServer) FileReadStreamCommand(ctx context.Context, data wshrpc.File return wshfs.ReadStream(ctx, data) } +func (ws *WshServer) FileStreamCommand(ctx context.Context, data wshrpc.CommandFileStreamData) (*wshrpc.FileInfo, error) { + return wshfs.FileStream(ctx, data) +} + func (ws *WshServer) FileCopyCommand(ctx context.Context, data wshrpc.CommandFileCopyData) error { return wshfs.Copy(ctx, data) }