Skip to content

Conversation

@smartaquarius10
Copy link

@smartaquarius10 smartaquarius10 commented Nov 17, 2025

WHAT

Introduces connection pooling capability in KEDA. This allows scalers (starting with PostgreSQL) to reuse existing DB connections rather than opening new ones for every trigger evaluation. The design is generic and can be extended to other targets (e.g., Redis, SQS, MySQL) in future, if needed.

WHY

The current PostgreSQL scaler establishes a new DB connection for every trigger query, which can lead to connection exhaustion under high ScaledObject counts. Connection pooling solves this by sharing a finite set of connections across multiple scalers, reducing DB load and improving operator stability.

HOW

  1. Library used: pgxpool

  2. New package: pkg/scalers/connectionpool/

    • A generic framework to manage connection pools across all future DB-based scalers (Postgres now, Redis or SQS later).
  3. Created Pooling Manager

    • Maintains a global map of connection pools keyed by target identifier.
    • Uses thread-safe sync.Map and reference counting to handle concurrent scaler access safely.
    • sync.Map ensures safe concurrent reads/writes when many scalers access pools to fetch their connections simultaneously.
    • Pools are keyed by unique DB identifiers (<scaler>.<host>.<db>), ensuring isolation per database target.
  4. Max connections or any other property can be sent from config map and can be live reloaded

    • This can help in providing updated values to new scaled object however, existing stay as is
    • Graceful fallback to pgxpool defaults when:
      • ConfigMap is missing,
      • Invalid values are provided, or
      • ConfigMap is deleted (operator continues using defaults).
      • Discussion point for reviewers:

      Runtime live reload only benefits new pool creation; existing pools retain prior configuration.
      Considering this, should we keep live reload or simplify to “restart on config change”?

  5. At line #214 in file pkg/scalers/postgresql_scaler.go added a small if s.connection != nil { as it was creating error logs if secret passed is wrong then we doesn't create connection object as getConnection called once

    • In case of passing wrong password via secret, KEDA does not trigger a ScaledObject reconciliation. The added condition prevents repeated connection errors by skipping pool creation until credentials are corrected. Once AWS IAM is also enabled then we can call connection creation like already covered for Azure
  6. Old behavior (new DB connection per trigger) replaced with pooled base access (via kind of factory, singleton and strategy pattern):

db, err := connectionpool.GetOrCreate(poolKey, func() (ResourcePool, error) {
    return connectionpool.NewPostgresPool(ctx, connectionString, maxConns)
})

TEST CASES(on EKS)

  1. Pool creation with defaults
  2. Created 2 scaled objects to reuse same pool for same DB.
  3. Independent pools per DB.
  4. Removed last scaled object closed the connection DB
  5. Used config map to set properties
  6. Put wrong value on config map to switch scaled object to fallback on default value
  7. If put wrong secret the remove then request for release of connection from the pool.

PENDING

  • Configmap support in KEDA helm chart

Fallback is already handled so it will not break. Can we take it separately?

Checklist

Fixes #6955

Relates to 7206

@smartaquarius10 smartaquarius10 requested a review from a team as a code owner November 17, 2025 18:02
@keda-automation keda-automation requested a review from a team November 17, 2025 18:02
@snyk-io
Copy link

snyk-io bot commented Nov 17, 2025

Snyk checks have passed. No issues have been found so far.

Status Scanner Critical High Medium Low Total (0)
Open Source Security 0 0 0 0 0 issues

💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse.

@github-actions
Copy link

Thank you for your contribution! 🙏

Please understand that we will do our best to review your PR and give you feedback as soon as possible, but please bear with us if it takes a little longer as expected.

While you are waiting, make sure to:

  • Add an entry in our changelog in alphabetical order and link related issue
  • Update the documentation, if needed
  • Add unit & e2e tests for your changes
  • GitHub checks are passing
  • Is the DCO check failing? Here is how you can fix DCO issues

Once the initial tests are successful, a KEDA member will ensure that the e2e tests are run. Once the e2e tests have been successfully completed, the PR may be merged at a later date. Please be patient.

Learn more about our contribution guide.

@smartaquarius10
Copy link
Author

smartaquarius10 commented Nov 17, 2025

Hey @JorTurFer 👋
I’ve raised the PR — could you please take a look and let me know if I’ve missed anything or made any mistakes?

This is my first contribution, so I’d really appreciate your guidance and any feedback you can share.

Thanks a lot for all your help so far!

@smartaquarius10
Copy link
Author

Hey @JorTurFer ,
Hope you’re doing great!
Wanted to check if you’ve had a chance to review the PR. No rush. Just making sure it’s on your radar. Really appreciate your time and help with this! 🙏

@rickbrouwer
Copy link
Member

Regardless of the content of the PR (which I didn't look at yet), there are definitely a number of jobs that have failed. You might want to look into those.

@smartaquarius10 smartaquarius10 force-pushed the main branch 2 times, most recently from fa88420 to d938d5b Compare November 24, 2025 14:40
bhasin tanul and others added 2 commits November 24, 2025 17:00
@smartaquarius10
Copy link
Author

smartaquarius10 commented Nov 25, 2025

Hello @rickbrouwer ,
I think its resolved. Jobs are not failing any more. Correct?

@smartaquarius10
Copy link
Author

Hello @rickbrouwer @JorTurFer
Just a friendly check-in if there are any updates on this PR. Please let me know if anything else is needed from my side. Thanks!

@wozniakjan
Copy link
Member

wozniakjan commented Dec 3, 2025

/run-e2e postgresql
Update: You can check the progress here

Copilot finished reviewing on behalf of wozniakjan December 3, 2025 09:19
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a generic connection pooling framework for KEDA and implements it for the PostgreSQL scaler to address connection exhaustion issues when dealing with high ScaledObject counts. The implementation uses pgxpool for connection management and provides a reusable pooling infrastructure that can be extended to other database-based scalers in the future.

Key Changes:

  • New connection pool management framework with reference counting and thread-safe access via sync.Map
  • PostgreSQL scaler migration from per-query connections to shared connection pools
  • Optional ConfigMap-based configuration for pool parameters with live reload support

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 18 comments.

Show a summary per file
File Description
pkg/scalers/connectionpool/manager.go Core pooling manager with reference counting for safe pool lifecycle management
pkg/scalers/connectionpool/postgresql_pool.go PostgreSQL-specific pool implementation wrapping pgxpool
pkg/scalers/connectionpool/config.go Configuration loader with file watching for live ConfigMap updates
pkg/scalers/connectionpool/connectionpool_test.go Unit tests for pool creation, reuse, and concurrent access
pkg/scalers/postgresql_scaler.go Updated PostgreSQL scaler to use connection pools instead of direct connections
pkg/scalers/postgresql_scaler_test.go Updated test to match new scaler struct signature
cmd/operator/main.go Added configuration path flag and pool initialization at operator startup

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}
defer watcher.Close()

_ = watcher.Add(configPath)
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent error on watcher registration: Line 68 ignores the error from watcher.Add(configPath) using _ =. If adding the watch fails (e.g., file doesn't exist yet), the watcher will silently do nothing and configuration updates will never be detected. Check and log this error to help diagnose configuration issues.

Suggested change
_ = watcher.Add(configPath)
if err := watcher.Add(configPath); err != nil {
logger.Error(err, "Failed to add config file to watcher", "path", configPath)
return
}

Copilot uses AI. Check for mistakes.
}

db, err := sql.Open("pgx", connectionString)
maxConns, err := strconv.ParseInt(connectionpool.LookupConfigValue("postgres", fmt.Sprintf("%s.%s", meta.Host, meta.DBName)), 10, 32)
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The poolKey format is inconsistent: line 172 creates host:port/dbname while line 181 looks up postgres.host.dbname (missing port). This mismatch means configuration lookups will always fail. Consider standardizing the format or ensuring both use the same pattern.

Suggested change
maxConns, err := strconv.ParseInt(connectionpool.LookupConfigValue("postgres", fmt.Sprintf("%s.%s", meta.Host, meta.DBName)), 10, 32)
maxConns, err := strconv.ParseInt(connectionpool.LookupConfigValue("postgres", poolKey), 10, 32)

Copilot uses AI. Check for mistakes.
}
defer watcher.Close()

_ = watcher.Add(configPath)
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigMap watcher may miss updates: The fsnotify watcher on line 68 watches the file path directly, but Kubernetes ConfigMaps are typically mounted as symlinks that get replaced atomically. When a ConfigMap updates, Kubernetes creates a new directory and updates the symlink, which means the watcher on the old file path will stop receiving events. Consider watching the parent directory or the symlink target using filepath.EvalSymlinks() to handle ConfigMap updates correctly.

Copilot uses AI. Check for mistakes.
func loadConfig() {
data, err := os.ReadFile(configPath)
if err != nil {
logger.V(1).Info("No pool config found;", "path", configPath, "err", err)
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading error message: The log message "No pool config found;" on line 31 includes a semicolon and incomplete context. When the file doesn't exist (which is a valid scenario per the PR description), this reads as an error but should communicate that defaults will be used. Consider: "Pool config file not found, using defaults" or similar phrasing without the trailing semicolon.

Suggested change
logger.V(1).Info("No pool config found;", "path", configPath, "err", err)
logger.V(1).Info("Pool config file not found, using defaults", "path", configPath, "err", err)

Copilot uses AI. Check for mistakes.
}

// LookupConfigValue returns config for a scaler/resource identifier.
// Keys are structured for eg. as <scaler>.<identifier>, e.g., "postgres.dbserver.db".
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete documentation: The comment on line 90 says keys are structured as <scaler>.<identifier>, with example postgres.dbserver.db, but this doesn't match how the code actually constructs keys. At line 172 in postgresql_scaler.go, the key is host:port/dbname (no "postgres" prefix), and at line 181 it's postgres.host.dbname (still doesn't match the documented format). Update the documentation to accurately reflect the actual key structure used throughout the code.

Suggested change
// Keys are structured for eg. as <scaler>.<identifier>, e.g., "postgres.dbserver.db".
// Keys are typically structured as "<scaler>.<identifier>" (e.g., "postgres.host.dbname") or as "host:port/dbname", depending on the caller.

Copilot uses AI. Check for mistakes.

_, err := GetOrCreate(key, createFn)
if err == nil {
t.Log("expected error during pool creation")
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inverted test logic: Line 124 checks if err == nil but logs "expected error during pool creation", meaning the test expects an error but doesn't fail when no error occurs. This should be if err == nil { t.Fatal("expected error during pool creation, but got none") } to properly validate the error case.

Suggested change
t.Log("expected error during pool creation")
t.Fatal("expected error during pool creation, but got none")

Copilot uses AI. Check for mistakes.
Comment on lines +197 to +199
if err := pgPool.Ping(ctx); err != nil {
logger.Error(err, "Failed to ping PostgreSQL. Releasing bad connection pool", "poolKey", poolKey)
connectionpool.Release(poolKey)
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error handling issue: If the pool creation fails initially (line 187-193), and then Release() is called with the poolKey (line 199), the release attempts to decrement a reference count that was never incremented. This happens because GetOrCreate returns an error but doesn't add the entry to poolMap, yet the error path still calls Release(poolKey). This can lead to incorrect reference counting or silent failures. Consider only calling Release() when GetOrCreate succeeds.

Copilot uses AI. Check for mistakes.
Comment on lines +214 to 233
if s.connection != nil {
if s.podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload {
if s.metadata.azureAuthContext.token.ExpiresOn.Before(time.Now()) {
s.logger.Info("The Azure Access Token expired, retrieving a new Azure Access Token and instantiating a new Postgres connection object.")
s.connection.Close()
newConnection, _, err := getConnection(ctx, s.metadata, s.podIdentity, s.logger)
if err != nil {
return 0, fmt.Errorf("error establishing postgreSQL connection: %w", err)
}
s.connection = newConnection
}
s.connection = newConnection
}
}

err := s.connection.QueryRowContext(ctx, s.metadata.Query).Scan(&id)
if err != nil {
s.logger.Error(err, fmt.Sprintf("could not query postgreSQL: %s", err))
return 0, fmt.Errorf("could not query postgreSQL: %w", err)
err := s.connection.QueryRow(ctx, s.metadata.Query).Scan(&id)
if err != nil {
s.logger.Error(err, fmt.Sprintf("could not query postgreSQL: %s", err))
return 0, fmt.Errorf("could not query postgreSQL: %w", err)
}
}
return id, nil
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nil check for s.connection at line 214 creates a critical issue: if connection is nil (e.g., due to wrong credentials), getActiveNumber() will skip the query entirely and return id=0, nil, making KEDA think the metric is zero. This could trigger incorrect scaling decisions. Instead, return an error when connection is nil to prevent misleading metrics.

Copilot uses AI. Check for mistakes.
Comment on lines +218 to +223
s.connection.Close()
newConnection, _, err := getConnection(ctx, s.metadata, s.podIdentity, s.logger)
if err != nil {
return 0, fmt.Errorf("error establishing postgreSQL connection: %w", err)
}
s.connection = newConnection
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in Azure token refresh: Lines 218-223 close the old connection and create a new one, updating s.connection. However, concurrent calls to getActiveNumber() could be using the old connection while it's being closed. Consider using a mutex to protect the connection replacement, or use atomic pointers to ensure thread-safe connection swapping.

Copilot uses AI. Check for mistakes.
Copy link
Member

@JorTurFer JorTurFer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the really slow response :(

I like your approach for the implementation. I agree with copilot comments and I've kept some extra comments too, but you're in the right direction!

Comment on lines 23 to 39

eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
eventingcontrollers "github.com/kedacore/keda/v2/controllers/eventing"
kedacontrollers "github.com/kedacore/keda/v2/controllers/keda"
"github.com/kedacore/keda/v2/pkg/certificates"
"github.com/kedacore/keda/v2/pkg/eventemitter"
"github.com/kedacore/keda/v2/pkg/k8s"
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/metricsservice"
"github.com/kedacore/keda/v2/pkg/scalers/authentication"
"github.com/kedacore/keda/v2/pkg/scalers/connectionpool"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/spf13/pflag"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please undo this change, dependencies must be

  • stdlib
  • external
  • internal

@@ -0,0 +1,97 @@
package connectionpool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to not read extra configMaps tbh. Are so many parameters needed? can we pass the via cmd args during the startup?

@@ -0,0 +1,129 @@
package connectionpool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's name this file as the one we are testing

func Release(poolKey string) {
val, ok := poolMap.Load(poolKey)
if !ok {
logger.V(1).Info("Attempted to release non-existent pool", "poolKey", poolKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a kind of error? if yes, let's log it as error to V(0)

func getConnection(ctx context.Context, meta *postgreSQLMetadata, podIdentity kedav1alpha1.AuthPodIdentity, logger logr.Logger) (*pgxpool.Pool, string, error) {
connectionString := meta.Connection

poolKey := fmt.Sprintf("%s:%s/%s", meta.Host, meta.Port, meta.DBName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we have to include also user info and credentials as part of the key (hashing them or so). I can imagine different scalers accessing the same database to different schemas with different users (and there are also secretless flows). I think that hashing the connection string can be a good key, WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Does KEDA Reuse DB Connections Across ScaledObjects

4 participants