-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add generic connection pooling framework and enable PostgreSQL scaler to reuse DB connections #7248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ Snyk checks have passed. No issues have been found so far.
💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse. |
|
Thank you for your contribution! 🙏 Please understand that we will do our best to review your PR and give you feedback as soon as possible, but please bear with us if it takes a little longer as expected. While you are waiting, make sure to:
Once the initial tests are successful, a KEDA member will ensure that the e2e tests are run. Once the e2e tests have been successfully completed, the PR may be merged at a later date. Please be patient. Learn more about our contribution guide. |
|
Hey @JorTurFer 👋 This is my first contribution, so I’d really appreciate your guidance and any feedback you can share. Thanks a lot for all your help so far! |
|
Hey @JorTurFer , |
|
Regardless of the content of the PR (which I didn't look at yet), there are definitely a number of jobs that have failed. You might want to look into those. |
fa88420 to
d938d5b
Compare
Signed-off-by: smartaquarius10 <[email protected]>
Signed-off-by: smartaquarius10 <[email protected]>
Signed-off-by: smartaquarius10 <[email protected]>
Signed-off-by: smartaquarius10 <[email protected]>
Signed-off-by: smartaquarius10 <[email protected]>
Signed-off-by: smartaquarius10 <[email protected]>
Signed-off-by: smartaquarius10 <[email protected]>
Signed-off-by: smartaquarius10 <[email protected]>
Signed-off-by: smartaquarius10 <[email protected]>
Signed-off-by: smartaquarius10 <[email protected]>
ddb2200 to
870cbf9
Compare
Signed-off-by: bhasin tanul <[email protected]>
Signed-off-by: Tan <[email protected]>
|
Hello @rickbrouwer , |
|
Hello @rickbrouwer @JorTurFer |
|
/run-e2e postgresql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a generic connection pooling framework for KEDA and implements it for the PostgreSQL scaler to address connection exhaustion issues when dealing with high ScaledObject counts. The implementation uses pgxpool for connection management and provides a reusable pooling infrastructure that can be extended to other database-based scalers in the future.
Key Changes:
- New connection pool management framework with reference counting and thread-safe access via
sync.Map - PostgreSQL scaler migration from per-query connections to shared connection pools
- Optional ConfigMap-based configuration for pool parameters with live reload support
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 18 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/scalers/connectionpool/manager.go | Core pooling manager with reference counting for safe pool lifecycle management |
| pkg/scalers/connectionpool/postgresql_pool.go | PostgreSQL-specific pool implementation wrapping pgxpool |
| pkg/scalers/connectionpool/config.go | Configuration loader with file watching for live ConfigMap updates |
| pkg/scalers/connectionpool/connectionpool_test.go | Unit tests for pool creation, reuse, and concurrent access |
| pkg/scalers/postgresql_scaler.go | Updated PostgreSQL scaler to use connection pools instead of direct connections |
| pkg/scalers/postgresql_scaler_test.go | Updated test to match new scaler struct signature |
| cmd/operator/main.go | Added configuration path flag and pool initialization at operator startup |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| defer watcher.Close() | ||
|
|
||
| _ = watcher.Add(configPath) |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silent error on watcher registration: Line 68 ignores the error from watcher.Add(configPath) using _ =. If adding the watch fails (e.g., file doesn't exist yet), the watcher will silently do nothing and configuration updates will never be detected. Check and log this error to help diagnose configuration issues.
| _ = watcher.Add(configPath) | |
| if err := watcher.Add(configPath); err != nil { | |
| logger.Error(err, "Failed to add config file to watcher", "path", configPath) | |
| return | |
| } |
| } | ||
|
|
||
| db, err := sql.Open("pgx", connectionString) | ||
| maxConns, err := strconv.ParseInt(connectionpool.LookupConfigValue("postgres", fmt.Sprintf("%s.%s", meta.Host, meta.DBName)), 10, 32) |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The poolKey format is inconsistent: line 172 creates host:port/dbname while line 181 looks up postgres.host.dbname (missing port). This mismatch means configuration lookups will always fail. Consider standardizing the format or ensuring both use the same pattern.
| maxConns, err := strconv.ParseInt(connectionpool.LookupConfigValue("postgres", fmt.Sprintf("%s.%s", meta.Host, meta.DBName)), 10, 32) | |
| maxConns, err := strconv.ParseInt(connectionpool.LookupConfigValue("postgres", poolKey), 10, 32) |
| } | ||
| defer watcher.Close() | ||
|
|
||
| _ = watcher.Add(configPath) |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConfigMap watcher may miss updates: The fsnotify watcher on line 68 watches the file path directly, but Kubernetes ConfigMaps are typically mounted as symlinks that get replaced atomically. When a ConfigMap updates, Kubernetes creates a new directory and updates the symlink, which means the watcher on the old file path will stop receiving events. Consider watching the parent directory or the symlink target using filepath.EvalSymlinks() to handle ConfigMap updates correctly.
| func loadConfig() { | ||
| data, err := os.ReadFile(configPath) | ||
| if err != nil { | ||
| logger.V(1).Info("No pool config found;", "path", configPath, "err", err) |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Misleading error message: The log message "No pool config found;" on line 31 includes a semicolon and incomplete context. When the file doesn't exist (which is a valid scenario per the PR description), this reads as an error but should communicate that defaults will be used. Consider: "Pool config file not found, using defaults" or similar phrasing without the trailing semicolon.
| logger.V(1).Info("No pool config found;", "path", configPath, "err", err) | |
| logger.V(1).Info("Pool config file not found, using defaults", "path", configPath, "err", err) |
| } | ||
|
|
||
| // LookupConfigValue returns config for a scaler/resource identifier. | ||
| // Keys are structured for eg. as <scaler>.<identifier>, e.g., "postgres.dbserver.db". |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete documentation: The comment on line 90 says keys are structured as <scaler>.<identifier>, with example postgres.dbserver.db, but this doesn't match how the code actually constructs keys. At line 172 in postgresql_scaler.go, the key is host:port/dbname (no "postgres" prefix), and at line 181 it's postgres.host.dbname (still doesn't match the documented format). Update the documentation to accurately reflect the actual key structure used throughout the code.
| // Keys are structured for eg. as <scaler>.<identifier>, e.g., "postgres.dbserver.db". | |
| // Keys are typically structured as "<scaler>.<identifier>" (e.g., "postgres.host.dbname") or as "host:port/dbname", depending on the caller. |
|
|
||
| _, err := GetOrCreate(key, createFn) | ||
| if err == nil { | ||
| t.Log("expected error during pool creation") |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inverted test logic: Line 124 checks if err == nil but logs "expected error during pool creation", meaning the test expects an error but doesn't fail when no error occurs. This should be if err == nil { t.Fatal("expected error during pool creation, but got none") } to properly validate the error case.
| t.Log("expected error during pool creation") | |
| t.Fatal("expected error during pool creation, but got none") |
| if err := pgPool.Ping(ctx); err != nil { | ||
| logger.Error(err, "Failed to ping PostgreSQL. Releasing bad connection pool", "poolKey", poolKey) | ||
| connectionpool.Release(poolKey) |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error handling issue: If the pool creation fails initially (line 187-193), and then Release() is called with the poolKey (line 199), the release attempts to decrement a reference count that was never incremented. This happens because GetOrCreate returns an error but doesn't add the entry to poolMap, yet the error path still calls Release(poolKey). This can lead to incorrect reference counting or silent failures. Consider only calling Release() when GetOrCreate succeeds.
| if s.connection != nil { | ||
| if s.podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload { | ||
| if s.metadata.azureAuthContext.token.ExpiresOn.Before(time.Now()) { | ||
| s.logger.Info("The Azure Access Token expired, retrieving a new Azure Access Token and instantiating a new Postgres connection object.") | ||
| s.connection.Close() | ||
| newConnection, _, err := getConnection(ctx, s.metadata, s.podIdentity, s.logger) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("error establishing postgreSQL connection: %w", err) | ||
| } | ||
| s.connection = newConnection | ||
| } | ||
| s.connection = newConnection | ||
| } | ||
| } | ||
|
|
||
| err := s.connection.QueryRowContext(ctx, s.metadata.Query).Scan(&id) | ||
| if err != nil { | ||
| s.logger.Error(err, fmt.Sprintf("could not query postgreSQL: %s", err)) | ||
| return 0, fmt.Errorf("could not query postgreSQL: %w", err) | ||
| err := s.connection.QueryRow(ctx, s.metadata.Query).Scan(&id) | ||
| if err != nil { | ||
| s.logger.Error(err, fmt.Sprintf("could not query postgreSQL: %s", err)) | ||
| return 0, fmt.Errorf("could not query postgreSQL: %w", err) | ||
| } | ||
| } | ||
| return id, nil |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nil check for s.connection at line 214 creates a critical issue: if connection is nil (e.g., due to wrong credentials), getActiveNumber() will skip the query entirely and return id=0, nil, making KEDA think the metric is zero. This could trigger incorrect scaling decisions. Instead, return an error when connection is nil to prevent misleading metrics.
| s.connection.Close() | ||
| newConnection, _, err := getConnection(ctx, s.metadata, s.podIdentity, s.logger) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("error establishing postgreSQL connection: %w", err) | ||
| } | ||
| s.connection = newConnection |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition in Azure token refresh: Lines 218-223 close the old connection and create a new one, updating s.connection. However, concurrent calls to getActiveNumber() could be using the old connection while it's being closed. Consider using a mutex to protect the connection replacement, or use atomic pointers to ensure thread-safe connection swapping.
JorTurFer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the really slow response :(
I like your approach for the implementation. I agree with copilot comments and I've kept some extra comments too, but you're in the right direction!
|
|
||
| eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" | ||
| kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" | ||
| eventingcontrollers "github.com/kedacore/keda/v2/controllers/eventing" | ||
| kedacontrollers "github.com/kedacore/keda/v2/controllers/keda" | ||
| "github.com/kedacore/keda/v2/pkg/certificates" | ||
| "github.com/kedacore/keda/v2/pkg/eventemitter" | ||
| "github.com/kedacore/keda/v2/pkg/k8s" | ||
| "github.com/kedacore/keda/v2/pkg/metricscollector" | ||
| "github.com/kedacore/keda/v2/pkg/metricsservice" | ||
| "github.com/kedacore/keda/v2/pkg/scalers/authentication" | ||
| "github.com/kedacore/keda/v2/pkg/scalers/connectionpool" | ||
| "github.com/kedacore/keda/v2/pkg/scaling" | ||
| kedautil "github.com/kedacore/keda/v2/pkg/util" | ||
| "github.com/spf13/pflag" | ||
| apimachineryruntime "k8s.io/apimachinery/pkg/runtime" | ||
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please undo this change, dependencies must be
- stdlib
- external
- internal
| @@ -0,0 +1,97 @@ | |||
| package connectionpool | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to not read extra configMaps tbh. Are so many parameters needed? can we pass the via cmd args during the startup?
| @@ -0,0 +1,129 @@ | |||
| package connectionpool | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's name this file as the one we are testing
| func Release(poolKey string) { | ||
| val, ok := poolMap.Load(poolKey) | ||
| if !ok { | ||
| logger.V(1).Info("Attempted to release non-existent pool", "poolKey", poolKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a kind of error? if yes, let's log it as error to V(0)
| func getConnection(ctx context.Context, meta *postgreSQLMetadata, podIdentity kedav1alpha1.AuthPodIdentity, logger logr.Logger) (*pgxpool.Pool, string, error) { | ||
| connectionString := meta.Connection | ||
|
|
||
| poolKey := fmt.Sprintf("%s:%s/%s", meta.Host, meta.Port, meta.DBName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we have to include also user info and credentials as part of the key (hashing them or so). I can imagine different scalers accessing the same database to different schemas with different users (and there are also secretless flows). I think that hashing the connection string can be a good key, WDYT?
WHAT
Introduces connection pooling capability in KEDA. This allows scalers (starting with PostgreSQL) to reuse existing DB connections rather than opening new ones for every trigger evaluation. The design is generic and can be extended to other targets (e.g., Redis, SQS, MySQL) in future, if needed.
WHY
The current PostgreSQL scaler establishes a new DB connection for every trigger query, which can lead to connection exhaustion under high ScaledObject counts. Connection pooling solves this by sharing a finite set of connections across multiple scalers, reducing DB load and improving operator stability.
HOW
Library used:
pgxpoolNew package:
pkg/scalers/connectionpool/Created Pooling Manager
(<scaler>.<host>.<db>), ensuring isolation per database target.Max connections or any other property can be sent from config map and can be live reloaded
At line
#214in filepkg/scalers/postgresql_scaler.goadded a smallif s.connection != nil {as it was creating error logs if secret passed is wrong then we doesn't create connection object asgetConnectioncalled onceOld behavior (new DB connection per trigger) replaced with pooled base access (via kind of factory, singleton and strategy pattern):
TEST CASES(on EKS)
PENDING
Checklist
Fixes #6955
Relates to 7206