Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions internal/rows/arrowbased/batchloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (cft *cloudFetchDownloadTask) Run() {
downloadStart := time.Now()
data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry, cft.speedThresholdMbps, cft.httpClient)
if err != nil {
cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err}
cft.sendResult(cloudFetchDownloadTaskResult{data: nil, err: err})
return
}

Expand All @@ -306,7 +306,7 @@ func (cft *cloudFetchDownloadTask) Run() {
data.Close() //nolint:errcheck,gosec // G104: close after reading data
downloadMs := time.Since(downloadStart).Milliseconds()
if err != nil {
cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err}
cft.sendResult(cloudFetchDownloadTaskResult{data: nil, err: err})
return
}

Expand All @@ -316,10 +316,21 @@ func (cft *cloudFetchDownloadTask) Run() {
cft.link.RowCount,
)

cft.resultChan <- cloudFetchDownloadTaskResult{data: bytes.NewReader(buf), err: nil, downloadMs: downloadMs}
cft.sendResult(cloudFetchDownloadTaskResult{data: bytes.NewReader(buf), err: nil, downloadMs: downloadMs})
}()
}

// sendResult delivers the download result to the consumer, but drops it if the
// task's context has already been cancelled. Without this guard, a goroutine
// that finishes its work after the iterator is closed blocks forever on the
// unbuffered resultChan and pins the downloaded buffer in the heap (issue #356).
func (cft *cloudFetchDownloadTask) sendResult(result cloudFetchDownloadTaskResult) {
select {
case cft.resultChan <- result:
case <-cft.ctx.Done():
}
}

// logCloudFetchSpeed calculates and logs download speed metrics
func logCloudFetchSpeed(fullURL string, contentLength int64, duration time.Duration, speedThresholdMbps float64) {
if contentLength > 0 && duration.Seconds() > 0 {
Expand Down
97 changes: 97 additions & 0 deletions internal/rows/arrowbased/batchloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -604,3 +607,97 @@ func generateMockArrowBytes(record arrow.Record) []byte {
}
return buf.Bytes()
}

// TestCloudFetchIterator_CloseReleasesInFlightDownloads reproduces issue #356:
// when the consumer closes the iterator while downloads are still in flight,
// goroutines that completed their HTTP fetch get permanently blocked sending
// to the unbuffered resultChan. They retain the downloaded buffers (Arrow
// allocations in earlier versions, raw bytes in current code) until process
// exit, producing a heap plateau that only releases on restart.
//
// The test schedules many concurrent downloads, lets them complete, and then
// closes the iterator without consuming the queued results. After Close
// returns, no cloudFetchDownloadTask goroutines must remain.
func TestCloudFetchIterator_CloseReleasesInFlightDownloads(t *testing.T) {
arrowBytes := generateMockArrowBytes(generateArrowRecord())

// Track in-flight downloads. The server signals each request as it starts
// and waits on a release channel so the test can hold downloads in the
// queued-but-not-yet-consumed state before closing the iterator.
var inFlight atomic.Int64
release := make(chan struct{})
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
inFlight.Add(1)
<-release
w.WriteHeader(http.StatusOK)
_, _ = w.Write(arrowBytes)
}))
defer server.Close()

const nLinks = 20
links := make([]*cli_service.TSparkArrowResultLink, nLinks)
for i := range links {
links[i] = &cli_service.TSparkArrowResultLink{
FileLink: server.URL,
ExpiryTime: time.Now().Add(10 * time.Minute).Unix(),
StartRowOffset: int64(i),
RowCount: 1,
}
}

cfg := config.WithDefaults()
cfg.UseLz4Compression = false
cfg.MaxDownloadThreads = 10

bi, err := NewCloudBatchIterator(context.Background(), links, 0, nil, cfg, nil)
assert.Nil(t, err)

// Kick off the first batch download. The iterator schedules
// MaxDownloadThreads concurrent fetches behind the scenes.
go func() { _, _ = bi.Next() }()

// Wait for all MaxDownloadThreads goroutines to be blocked inside the
// server handler (they've issued the GET and are waiting for the body).
assert.Eventually(t, func() bool {
return inFlight.Load() == int64(cfg.MaxDownloadThreads)
}, 5*time.Second, 10*time.Millisecond, "expected %d in-flight downloads", cfg.MaxDownloadThreads)

// Release the downloads so each goroutine finishes its HTTP read and
// attempts to send its result on the unbuffered resultChan. Only the
// first task's result will be read (by the Next() call above); the rest
// will be queued, blocked on the send.
close(release)

// Give the goroutines time to finish their HTTP work and reach the
// channel send.
time.Sleep(200 * time.Millisecond)

// Close the iterator without consuming the remaining batches.
bi.Close()

// After Close, every cloudFetchDownloadTask goroutine must exit. We don't
// compare against the total goroutine count because httptest keeps
// persistent server/transport goroutines around — we look only for our
// own download goroutines.
assert.Eventually(t, func() bool {
return countDownloadTaskGoroutines() == 0
}, 5*time.Second, 50*time.Millisecond,
"cloudFetchDownloadTask goroutines leaked after Close: have %d",
countDownloadTaskGoroutines())
}

// countDownloadTaskGoroutines returns the number of live goroutines whose
// stack includes cloudFetchDownloadTask.Run. Used to detect the leak in
// issue #356.
func countDownloadTaskGoroutines() int {
buf := make([]byte, 64*1024)
for {
n := runtime.Stack(buf, true)
if n < len(buf) {
buf = buf[:n]
break
}
buf = make([]byte, 2*len(buf))
}
return strings.Count(string(buf), "cloudFetchDownloadTask).Run")
}
Loading