Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
* [ENHANCEMENT] Querier: Add active API requests tracker logging to help with OOMKill troubleshooting. #7216
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
Expand Down
51 changes: 39 additions & 12 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/request_tracker"
)

const (
Expand Down Expand Up @@ -285,35 +286,61 @@ func NewQuerierHandler(

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)

requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger))
var apiHandler http.Handler
var instantQueryHandler http.Handler
var rangedQueryHandler http.Handler
var legacyAPIHandler http.Handler
if requestTracker != nil {
apiHandler = request_tracker.NewRequestWrapper(promRouter, requestTracker, &request_tracker.ApiExtractor{})
legacyAPIHandler = request_tracker.NewRequestWrapper(legacyPromRouter, requestTracker, &request_tracker.ApiExtractor{})
instantQueryHandler = request_tracker.NewRequestWrapper(queryAPI.Wrap(queryAPI.InstantQueryHandler), requestTracker, &request_tracker.InstantQueryExtractor{})
rangedQueryHandler = request_tracker.NewRequestWrapper(queryAPI.Wrap(queryAPI.RangeQueryHandler), requestTracker, &request_tracker.RangedQueryExtractor{})

httpHeaderMiddleware := &HTTPHeaderMiddleware{
TargetHeaders: cfg.HTTPRequestHeadersToLog,
RequestIdHeader: cfg.RequestIdHeader,
}
apiHandler = httpHeaderMiddleware.Wrap(apiHandler)
legacyAPIHandler = httpHeaderMiddleware.Wrap(legacyAPIHandler)
instantQueryHandler = httpHeaderMiddleware.Wrap(instantQueryHandler)
rangedQueryHandler = httpHeaderMiddleware.Wrap(rangedQueryHandler)
} else {
apiHandler = promRouter
legacyAPIHandler = legacyPromRouter
instantQueryHandler = queryAPI.Wrap(queryAPI.InstantQueryHandler)
rangedQueryHandler = queryAPI.Wrap(queryAPI.RangeQueryHandler)
}

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(instantQueryHandler)
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(rangedQueryHandler)
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/parse_query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(apiHandler)

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(instantQueryHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(rangedQueryHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/parse_query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Methods("GET").Handler(legacyAPIHandler)

if cfg.buildInfoEnabled {
router.Path(path.Join(prefix, "/api/v1/status/buildinfo")).Methods("GET").Handler(promRouter)
Expand Down
115 changes: 115 additions & 0 deletions pkg/util/request_tracker/request_extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package request_tracker

import (
"encoding/json"
"net/http"
"strings"
"time"
"unicode/utf8"

"github.com/cortexproject/cortex/pkg/util/requestmeta"
"github.com/cortexproject/cortex/pkg/util/users"
)

type Extractor interface {
Extract(r *http.Request) []byte
}

type ApiExtractor struct{}

type InstantQueryExtractor struct{}

type RangedQueryExtractor struct{}

func generateCommonMap(r *http.Request) map[string]interface{} {
ctx := r.Context()
entryMap := make(map[string]interface{})
entryMap["timestampSec"] = time.Now().Unix()
entryMap["Path"] = r.URL.Path
entryMap["Method"] = r.Method
entryMap["TenantID"], _ = users.TenantID(ctx)
entryMap["RequestID"] = requestmeta.RequestIdFromContext(ctx)
entryMap["UserAgent"] = r.Header.Get("User-Agent")
entryMap["DashboardUID"] = r.Header.Get("X-Dashboard-UID")
entryMap["PanelId"] = r.Header.Get("X-Panel-Id")

return entryMap
}

func (e *ApiExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["limit"] = r.URL.Query().Get("limit")
entryMap["start"] = r.URL.Query().Get("start")
entryMap["end"] = r.URL.Query().Get("end")

matches := r.URL.Query()["match[]"]
entryMap["numberOfMatches"] = len(matches)
matchesStr := strings.Join(matches, ",")

return generateJSONEntryWithTruncatedField(entryMap, "matches", matchesStr)
}

func (e *InstantQueryExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["time"] = r.URL.Query().Get("time")
return generateJSONEntryWithTruncatedField(entryMap, "query", r.URL.Query().Get("query"))
}

func (e *RangedQueryExtractor) Extract(r *http.Request) []byte {
entryMap := generateCommonMap(r)
entryMap["start"] = r.URL.Query().Get("start")
entryMap["end"] = r.URL.Query().Get("end")
entryMap["step"] = r.URL.Query().Get("step")
return generateJSONEntryWithTruncatedField(entryMap, "query", r.URL.Query().Get("query"))
}

func generateJSONEntry(entryMap map[string]interface{}) []byte {
jsonEntry, err := json.Marshal(entryMap)
if err != nil {
return []byte{}
}

return jsonEntry
}

func generateJSONEntryWithTruncatedField(entryMap map[string]interface{}, fieldName, fieldValue string) []byte {
entryMap[fieldName] = ""
minEntryJSON := generateJSONEntry(entryMap)
entryMap[fieldName] = trimForJsonMarshal(fieldValue, maxEntrySize-(len(minEntryJSON)+1))
return generateJSONEntry(entryMap)
}

func trimStringByBytes(str string, size int) string {
bytesStr := []byte(str)
trimIndex := len(bytesStr)
if size < len(bytesStr) {
for !utf8.RuneStart(bytesStr[size]) {
size--
}
trimIndex = size
}

return string(bytesStr[:trimIndex])
}

func trimForJsonMarshal(field string, size int) string {
return trimForJsonMarshalRecursive(field, size, 0, size)
}

func trimForJsonMarshalRecursive(field string, size int, repeatCount int, repeatSize int) string {
//Should only repeat once since were over slightly over cutting based on the encoded size if we miss once
if repeatCount > 1 {
return ""
}

fieldTrimmed := trimStringByBytes(field, repeatSize)
fieldEncoded, err := json.Marshal(fieldTrimmed)
if err != nil {
return ""
}
if len(fieldEncoded) > size {
repeatSize = repeatSize - (len(fieldEncoded) - repeatSize)
return trimForJsonMarshalRecursive(fieldTrimmed, size, repeatCount+1, repeatSize)
}
return fieldTrimmed
}
90 changes: 90 additions & 0 deletions pkg/util/request_tracker/request_extractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package request_tracker

import (
"encoding/json"
"net/http/httptest"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGetSeriesExtractor(t *testing.T) {
extractor := &ApiExtractor{}
req := httptest.NewRequest("GET", "/api/v1/series", nil)
q := req.URL.Query()
q.Add("limit", "100")
q.Add("match[]", "up")
q.Add("match[]", "down")
req.URL.RawQuery = q.Encode()

result := extractor.Extract(req)
require.NotEmpty(t, result)

var data map[string]interface{}
require.NoError(t, json.Unmarshal(result, &data))

assert.Equal(t, "100", data["limit"])
assert.Equal(t, float64(2), data["numberOfMatches"])
assert.Contains(t, data["matches"], "up")
}

func TestInstantQueryExtractor(t *testing.T) {
extractor := &InstantQueryExtractor{}
req := httptest.NewRequest("GET", "/api/v1/query", nil)
q := req.URL.Query()
q.Add("query", "up{job=\"prometheus\"}")
q.Add("time", "1234567890")
req.URL.RawQuery = q.Encode()

result := extractor.Extract(req)
require.NotEmpty(t, result)

var data map[string]interface{}
require.NoError(t, json.Unmarshal(result, &data))

assert.Equal(t, "1234567890", data["time"])
assert.Equal(t, "up{job=\"prometheus\"}", data["query"])
}

func TestRangedQueryExtractor(t *testing.T) {
extractor := &RangedQueryExtractor{}
req := httptest.NewRequest("GET", "/api/v1/query_range", nil)
q := req.URL.Query()
q.Add("query", "rate(http_requests_total[5m])")
q.Add("start", "1000")
q.Add("end", "2000")
q.Add("step", "15")
req.URL.RawQuery = q.Encode()

result := extractor.Extract(req)
require.NotEmpty(t, result)

var data map[string]interface{}
require.NoError(t, json.Unmarshal(result, &data))

assert.Equal(t, "1000", data["start"])
assert.Equal(t, "2000", data["end"])
assert.Equal(t, "15", data["step"])
assert.Equal(t, "rate(http_requests_total[5m])", data["query"])
}

func TestLongQueryTruncate(t *testing.T) {
longQuery := strings.Repeat("metric_name{label=\"value\"} or ", maxEntrySize*2) + "final_metric"
req := httptest.NewRequest("GET", "/api/v1/query", nil)
q := req.URL.Query()
q.Add("query", longQuery)
q.Add("time", "1234567890")
req.URL.RawQuery = q.Encode()

extractor := &InstantQueryExtractor{}
extractedData := extractor.Extract(req)

require.NotEmpty(t, extractedData)
assert.True(t, len(extractedData) > 0)
assert.LessOrEqual(t, len(extractedData), maxEntrySize)
assert.Contains(t, string(extractedData), "metric_name")
assert.Contains(t, string(extractedData), "1234567890")
assert.NotContains(t, string(extractedData), "final_metric")
}
Loading
Loading