diff --git a/Cargo.toml b/Cargo.toml index b8fc7cd..2e0aec2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,17 @@ config = "0.15.11" aws-arn = "0.3.1" # Redis -redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] } +redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager", "cluster", "cluster-async", "tls-rustls", "tokio-rustls-comp"] } # Dev dependencies -criterion = { version = "0.6", features = ["html_reports", "async_tokio"] } \ No newline at end of file +criterion = { version = "0.6", features = ["html_reports", "async_tokio"] } + +# Rustls +# +# NOTE: rustls 0.23 requires selecting exactly one process-wide crypto provider +# (features: `ring` or `aws_lc_rs` / `aws-lc-rs`). Some dependency graphs (e.g. via +# redis-rs' rustls integration) can end up with *no* provider enabled, which causes a +# runtime panic when building TLS client/server configs. +# +# We explicitly enable the `ring` provider here to make TLS work reliably. +rustls = { version = "0.23.32", default-features = false, features = ["std", "ring"] } \ No newline at end of file diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index 0ac7797..92333ef 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -4,7 +4,8 @@ use alloy::{ consensus::{Signed, TypedTransaction}, primitives::Address, }; -use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; +use twmq::redis::{AsyncCommands, Pipeline}; +use twmq::redis::cluster_async::ClusterConnection; use crate::{ eoa::{ @@ -43,7 +44,7 @@ pub trait SafeRedisTransaction: Send + Sync { ) -> Self::OperationResult; fn validation( &self, - conn: &mut ConnectionManager, + conn: &mut ClusterConnection, store: &EoaExecutorStore, ) -> impl Future> + Send; fn watch_keys(&self) -> Vec; @@ -815,7 +816,7 @@ impl SafeRedisTransaction for ResetNoncesTransaction<'_> { async fn validation( &self, - _conn: &mut ConnectionManager, + _conn: &mut ClusterConnection, store: &EoaExecutorStore, ) -> Result { let now = chrono::Utc::now().timestamp_millis().max(0) as u64; diff --git a/executors/src/eoa/store/borrowed.rs b/executors/src/eoa/store/borrowed.rs index 8797a78..3c1260a 100644 --- a/executors/src/eoa/store/borrowed.rs +++ b/executors/src/eoa/store/borrowed.rs @@ -1,7 +1,8 @@ use std::sync::Arc; use twmq::Queue; -use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; +use twmq::redis::{AsyncCommands, Pipeline}; +use twmq::redis::cluster_async::ClusterConnection; use crate::eoa::EoaExecutorStore; use crate::eoa::{ @@ -71,7 +72,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> { async fn validation( &self, - conn: &mut ConnectionManager, + conn: &mut ClusterConnection, _store: &EoaExecutorStore, ) -> Result { // Get all borrowed transaction IDs diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index 2cb59b4..5ec9749 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -9,7 +9,8 @@ use engine_core::transaction::TransactionTypeData; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::ops::Deref; -use twmq::redis::{AsyncCommands, aio::ConnectionManager}; +use twmq::redis::AsyncCommands; +use twmq::redis::cluster_async::ClusterConnection; mod atomic; mod borrowed; @@ -98,7 +99,7 @@ pub struct TransactionData { /// Transaction store focused on transaction_id operations and nonce indexing pub struct EoaExecutorStore { - pub redis: ConnectionManager, + pub redis: ClusterConnection, pub keys: EoaExecutorStoreKeys, pub completed_transaction_ttl_seconds: u64, } @@ -121,8 +122,14 @@ impl EoaExecutorStoreKeys { /// Lock key name for EOA processing pub fn eoa_lock_key_name(&self) -> String { match &self.namespace { - Some(ns) => format!("{ns}:eoa_executor:lock:{}:{}", self.chain_id, self.eoa), - None => format!("eoa_executor:lock:{}:{}", self.chain_id, self.eoa), + Some(ns) => format!( + "{ns}:{}:eoa_executor:lock:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + ), + None => format!( + "{}:eoa_executor:lock:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + ), } } @@ -137,8 +144,14 @@ impl EoaExecutorStoreKeys { /// - "failure_reason": String failure reason (optional) pub fn transaction_data_key_name(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => format!("{ns}:eoa_executor:tx_data:{transaction_id}"), - None => format!("eoa_executor:tx_data:{transaction_id}"), + Some(ns) => format!( + "{ns}:{}:eoa_executor:tx_data:{transaction_id}", + twmq::ENGINE_HASH_TAG + ), + None => format!( + "{}:eoa_executor:tx_data:{transaction_id}", + twmq::ENGINE_HASH_TAG + ), } } @@ -148,8 +161,14 @@ impl EoaExecutorStoreKeys { /// of a TransactionAttempt. This allows efficient append operations. pub fn transaction_attempts_list_name(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => format!("{ns}:eoa_executor:tx_attempts:{transaction_id}"), - None => format!("eoa_executor:tx_attempts:{transaction_id}"), + Some(ns) => format!( + "{ns}:{}:eoa_executor:tx_attempts:{transaction_id}", + twmq::ENGINE_HASH_TAG + ), + None => format!( + "{}:eoa_executor:tx_attempts:{transaction_id}", + twmq::ENGINE_HASH_TAG + ), } } @@ -159,10 +178,13 @@ impl EoaExecutorStoreKeys { pub fn pending_transactions_zset_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:eoa_executor:pending_txs:{}:{}", - self.chain_id, self.eoa + "{ns}:{}:eoa_executor:pending_txs:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + ), + None => format!( + "{}:eoa_executor:pending_txs:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa ), - None => format!("eoa_executor:pending_txs:{}:{}", self.chain_id, self.eoa), } } @@ -172,18 +194,27 @@ impl EoaExecutorStoreKeys { pub fn submitted_transactions_zset_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:eoa_executor:submitted_txs:{}:{}", - self.chain_id, self.eoa + "{ns}:{}:eoa_executor:submitted_txs:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + ), + None => format!( + "{}:eoa_executor:submitted_txs:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa ), - None => format!("eoa_executor:submitted_txs:{}:{}", self.chain_id, self.eoa), } } /// Name of the key that maps transaction hash to transaction id pub fn transaction_hash_to_id_key_name(&self, hash: &str) -> String { match &self.namespace { - Some(ns) => format!("{ns}:eoa_executor:tx_hash_to_id:{hash}"), - None => format!("eoa_executor:tx_hash_to_id:{hash}"), + Some(ns) => format!( + "{ns}:{}:eoa_executor:tx_hash_to_id:{hash}", + twmq::ENGINE_HASH_TAG + ), + None => format!( + "{}:eoa_executor:tx_hash_to_id:{hash}", + twmq::ENGINE_HASH_TAG + ), } } @@ -197,10 +228,13 @@ impl EoaExecutorStoreKeys { pub fn borrowed_transactions_hashmap_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:eoa_executor:borrowed_txs:{}:{}", - self.chain_id, self.eoa + "{ns}:{}:eoa_executor:borrowed_txs:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + ), + None => format!( + "{}:eoa_executor:borrowed_txs:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa ), - None => format!("eoa_executor:borrowed_txs:{}:{}", self.chain_id, self.eoa), } } @@ -214,12 +248,12 @@ impl EoaExecutorStoreKeys { pub fn recycled_nonces_zset_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:eoa_executor:recycled_nonces:{}:{}", - self.chain_id, self.eoa + "{ns}:{}:eoa_executor:recycled_nonces:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa ), None => format!( - "eoa_executor:recycled_nonces:{}:{}", - self.chain_id, self.eoa + "{}:eoa_executor:recycled_nonces:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa ), } } @@ -236,12 +270,12 @@ impl EoaExecutorStoreKeys { pub fn optimistic_transaction_count_key_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:eoa_executor:optimistic_nonce:{}:{}", - self.chain_id, self.eoa + "{ns}:{}:eoa_executor:optimistic_nonce:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa ), None => format!( - "eoa_executor:optimistic_nonce:{}:{}", - self.chain_id, self.eoa + "{}:eoa_executor:optimistic_nonce:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa ), } } @@ -256,10 +290,13 @@ impl EoaExecutorStoreKeys { pub fn last_transaction_count_key_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:eoa_executor:last_tx_nonce:{}:{}", - self.chain_id, self.eoa + "{ns}:{}:eoa_executor:last_tx_nonce:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + ), + None => format!( + "{}:eoa_executor:last_tx_nonce:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa ), - None => format!("eoa_executor:last_tx_nonce:{}:{}", self.chain_id, self.eoa), } } @@ -271,8 +308,14 @@ impl EoaExecutorStoreKeys { /// - timestamp of the last 5 nonce resets pub fn eoa_health_key_name(&self) -> String { match &self.namespace { - Some(ns) => format!("{ns}:eoa_executor:health:{}:{}", self.chain_id, self.eoa), - None => format!("eoa_executor:health:{}:{}", self.chain_id, self.eoa), + Some(ns) => format!( + "{ns}:{}:eoa_executor:health:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + ), + None => format!( + "{}:eoa_executor:health:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa + ), } } @@ -282,12 +325,12 @@ impl EoaExecutorStoreKeys { pub fn manual_reset_key_name(&self) -> String { match &self.namespace { Some(ns) => format!( - "{ns}:eoa_executor:pending_manual_reset:{}:{}", - self.chain_id, self.eoa + "{ns}:{}:eoa_executor:pending_manual_reset:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa ), None => format!( - "eoa_executor:pending_manual_reset:{}:{}", - self.chain_id, self.eoa + "{}:eoa_executor:pending_manual_reset:{}:{}", + twmq::ENGINE_HASH_TAG, self.chain_id, self.eoa ), } } @@ -295,7 +338,7 @@ impl EoaExecutorStoreKeys { impl EoaExecutorStore { pub fn new( - redis: ConnectionManager, + redis: ClusterConnection, namespace: Option, eoa: Address, chain_id: u64, diff --git a/executors/src/eoa/store/pending.rs b/executors/src/eoa/store/pending.rs index 637e010..d3be479 100644 --- a/executors/src/eoa/store/pending.rs +++ b/executors/src/eoa/store/pending.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use alloy::{consensus::Transaction, primitives::Address}; -use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; +use twmq::redis::{AsyncCommands, Pipeline}; +use twmq::redis::cluster_async::ClusterConnection; use crate::eoa::{ EoaExecutorStore, @@ -46,7 +47,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithIncrementedNonces<'_> { async fn validation( &self, - conn: &mut ConnectionManager, + conn: &mut ClusterConnection, _store: &EoaExecutorStore, ) -> Result { if self.transactions.is_empty() { @@ -181,7 +182,7 @@ impl SafeRedisTransaction for MovePendingToBorrowedWithRecycledNonces<'_> { async fn validation( &self, - conn: &mut ConnectionManager, + conn: &mut ClusterConnection, _store: &EoaExecutorStore, ) -> Result { if self.transactions.is_empty() { diff --git a/executors/src/eoa/store/submitted.rs b/executors/src/eoa/store/submitted.rs index fb16739..5da4c2b 100644 --- a/executors/src/eoa/store/submitted.rs +++ b/executors/src/eoa/store/submitted.rs @@ -5,7 +5,8 @@ use std::{ }; use serde::{Deserialize, Serialize}; -use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; +use twmq::redis::{AsyncCommands, Pipeline}; +use twmq::redis::cluster_async::ClusterConnection; use crate::{ TransactionCounts, @@ -279,7 +280,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> { async fn validation( &self, - conn: &mut ConnectionManager, + conn: &mut ClusterConnection, store: &EoaExecutorStore, ) -> Result { // Fetch transactions up to the latest confirmed nonce for replacements @@ -592,7 +593,7 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> { async fn validation( &self, - conn: &mut ConnectionManager, + conn: &mut ClusterConnection, _store: &EoaExecutorStore, ) -> Result { // get the highest submitted nonce diff --git a/executors/src/eoa/worker/mod.rs b/executors/src/eoa/worker/mod.rs index 33e47bf..c235453 100644 --- a/executors/src/eoa/worker/mod.rs +++ b/executors/src/eoa/worker/mod.rs @@ -11,7 +11,7 @@ use engine_eip7702_core::delegated_account::DelegatedAccount; use serde::{Deserialize, Serialize}; use std::{sync::Arc, time::Duration}; use twmq::Queue; -use twmq::redis::aio::ConnectionManager; +use twmq::redis::cluster_async::ClusterConnection; use twmq::{ DurableExecution, FailHookData, NackHookData, SuccessHookData, hooks::TransactionContext, @@ -114,7 +114,7 @@ where pub webhook_queue: Arc>, pub authorization_cache: EoaAuthorizationCache, - pub redis: ConnectionManager, + pub redis: ClusterConnection, pub namespace: Option, pub eoa_signer: Arc, diff --git a/executors/src/external_bundler/deployment.rs b/executors/src/external_bundler/deployment.rs index ac8a969..ac19502 100644 --- a/executors/src/external_bundler/deployment.rs +++ b/executors/src/external_bundler/deployment.rs @@ -7,8 +7,10 @@ use serde::{Deserialize, Serialize}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use twmq::{ error::TwmqError, - redis::{AsyncCommands, Pipeline, aio::ConnectionManager}, + redis::{AsyncCommands, Pipeline}, }; +use twmq::redis::cluster_async::ClusterConnection; +use twmq::redis::cluster::ClusterClient; use uuid::Uuid; const CACHE_PREFIX: &str = "deployment_cache"; @@ -16,12 +18,12 @@ const LOCK_PREFIX: &str = "deployment_lock"; #[derive(Clone)] pub struct RedisDeploymentCache { - connection_manager: twmq::redis::aio::ConnectionManager, + connection: ClusterConnection, } #[derive(Clone)] pub struct RedisDeploymentLock { - connection_manager: twmq::redis::aio::ConnectionManager, + connection: ClusterConnection, } #[derive(Serialize, Deserialize)] @@ -31,18 +33,21 @@ struct LockData { } impl RedisDeploymentCache { - pub async fn new(client: twmq::redis::Client) -> Result { + pub async fn new(client: ClusterClient) -> Result { Ok(Self { - connection_manager: ConnectionManager::new(client).await?, + connection: client.get_async_connection().await?, }) } - pub fn conn(&self) -> &ConnectionManager { - &self.connection_manager + pub fn conn(&self) -> &ClusterConnection { + &self.connection } fn cache_key(&self, chain_id: u64, account_address: &Address) -> String { - format!("{CACHE_PREFIX}:{chain_id}:{account_address}") + format!( + "{}:{CACHE_PREFIX}:{chain_id}:{account_address}", + twmq::ENGINE_HASH_TAG + ) } } @@ -60,22 +65,28 @@ impl DeploymentCache for RedisDeploymentCache { } impl RedisDeploymentLock { - pub async fn new(client: twmq::redis::Client) -> Result { + pub async fn new(client: ClusterClient) -> Result { Ok(Self { - connection_manager: ConnectionManager::new(client).await?, + connection: client.get_async_connection().await?, }) } - pub fn conn(&self) -> &ConnectionManager { - &self.connection_manager + pub fn conn(&self) -> &ClusterConnection { + &self.connection } fn lock_key(&self, chain_id: u64, account_address: &Address) -> String { - format!("{LOCK_PREFIX}:{chain_id}:{account_address}") + format!( + "{}:{LOCK_PREFIX}:{chain_id}:{account_address}", + twmq::ENGINE_HASH_TAG + ) } fn cache_key(&self, chain_id: u64, account_address: &Address) -> String { - format!("{CACHE_PREFIX}:{chain_id}:{account_address}") + format!( + "{}:{CACHE_PREFIX}:{chain_id}:{account_address}", + twmq::ENGINE_HASH_TAG + ) } /// Release a deployment lock using the provided pipeline diff --git a/executors/src/solana_executor/storage.rs b/executors/src/solana_executor/storage.rs index 8a88302..510e22d 100644 --- a/executors/src/solana_executor/storage.rs +++ b/executors/src/solana_executor/storage.rs @@ -3,8 +3,9 @@ use serde_with::{DisplayFromStr, serde_as}; use solana_sdk::{hash::Hash, signature::Signature}; use twmq::{ redis, - redis::{AsyncCommands, aio::ConnectionManager}, + redis::AsyncCommands, }; +use twmq::redis::cluster_async::ClusterConnection; /// Represents a single attempt to send a Solana transaction /// This is stored in Redis BEFORE sending to prevent duplicate transactions @@ -45,7 +46,7 @@ impl SolanaTransactionAttempt { /// Represents a lock held on a transaction /// When dropped, the lock is automatically released pub struct TransactionLock { - redis: ConnectionManager, + redis: ClusterConnection, lock_key: String, lock_value: String, } @@ -121,28 +122,30 @@ impl From for LockError { /// Storage for Solana transaction attempts /// Provides atomic operations to prevent duplicate transactions pub struct SolanaTransactionStorage { - redis: ConnectionManager, + redis: ClusterConnection, namespace: Option, } impl SolanaTransactionStorage { - pub fn new(redis: ConnectionManager, namespace: Option) -> Self { + pub fn new(redis: ClusterConnection, namespace: Option) -> Self { Self { redis, namespace } } /// Get the Redis key for a transaction's attempt fn attempt_key(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => format!("{ns}:solana_tx_attempt:{transaction_id}"), - None => format!("solana_tx_attempt:{transaction_id}"), + Some(ns) => { + format!("{ns}:{}:solana_tx_attempt:{transaction_id}", twmq::ENGINE_HASH_TAG) + } + None => format!("{}:solana_tx_attempt:{transaction_id}", twmq::ENGINE_HASH_TAG), } } /// Get the Redis key for a transaction's lock fn lock_key(&self, transaction_id: &str) -> String { match &self.namespace { - Some(ns) => format!("{ns}:solana_tx_lock:{transaction_id}"), - None => format!("solana_tx_lock:{transaction_id}"), + Some(ns) => format!("{ns}:{}:solana_tx_lock:{transaction_id}", twmq::ENGINE_HASH_TAG), + None => format!("{}:solana_tx_lock:{transaction_id}", twmq::ENGINE_HASH_TAG), } } diff --git a/executors/src/transaction_registry.rs b/executors/src/transaction_registry.rs index a63584c..9754495 100644 --- a/executors/src/transaction_registry.rs +++ b/executors/src/transaction_registry.rs @@ -1,6 +1,7 @@ use engine_core::error::EngineError; use thiserror::Error; -use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager}; +use twmq::redis::{AsyncCommands, Pipeline}; +use twmq::redis::cluster_async::ClusterConnection; #[derive(Debug, Error)] pub enum TransactionRegistryError { @@ -20,19 +21,19 @@ impl From for EngineError { } pub struct TransactionRegistry { - redis: ConnectionManager, + redis: ClusterConnection, namespace: Option, } impl TransactionRegistry { - pub fn new(redis: ConnectionManager, namespace: Option) -> Self { + pub fn new(redis: ClusterConnection, namespace: Option) -> Self { Self { redis, namespace } } fn registry_key(&self) -> String { match &self.namespace { - Some(ns) => format!("{ns}:tx_registry"), - None => "tx_registry".to_string(), + Some(ns) => format!("{ns}:{}:tx_registry", twmq::ENGINE_HASH_TAG), + None => format!("{}:tx_registry", twmq::ENGINE_HASH_TAG), } } diff --git a/integration-tests/tests/setup.rs b/integration-tests/tests/setup.rs index d105cff..e8bdffa 100644 --- a/integration-tests/tests/setup.rs +++ b/integration-tests/tests/setup.rs @@ -168,8 +168,15 @@ impl TestEnvironment { let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client)); // Setup Redis - let redis_client = twmq::redis::Client::open(config.redis.url.as_str()) - .context("Failed to connect to Redis")?; + let initial_nodes: Vec<&str> = config + .redis + .url + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect(); + let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes) + .context("Failed to connect to Valkey Cluster")?; let authorization_cache = EoaAuthorizationCache::new( moka::future::Cache::builder() @@ -253,7 +260,7 @@ impl TestEnvironment { let execution_router = thirdweb_engine::ExecutionRouter { namespace: queue_config.execution_namespace.clone(), - redis: redis_client.get_connection_manager().await?, + redis: redis_client.get_async_connection().await?, authorization_cache, webhook_queue: queue_manager.webhook_queue.clone(), external_bundler_send_queue: queue_manager.external_bundler_send_queue.clone(), diff --git a/server/Cargo.toml b/server/Cargo.toml index d568ffb..0b411b9 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -27,6 +27,7 @@ futures = { workspace = true } serde-bool = { workspace = true } base64 = { workspace = true } bincode = { workspace = true } +rustls = { workspace = true } solana-sdk = { workspace = true } solana-client = { workspace = true } aide = { workspace = true, features = [ diff --git a/server/src/execution_router/mod.rs b/server/src/execution_router/mod.rs index ba41c46..66d0cc0 100644 --- a/server/src/execution_router/mod.rs +++ b/server/src/execution_router/mod.rs @@ -31,7 +31,8 @@ use engine_executors::{ transaction_registry::TransactionRegistry, webhook::WebhookJobHandler, }; -use twmq::{Queue, error::TwmqError, redis::aio::ConnectionManager}; +use twmq::{Queue, error::TwmqError}; +use twmq::redis::cluster_async::ClusterConnection; use vault_sdk::VaultClient; use vault_types::{ RegexRule, Rule, @@ -42,7 +43,7 @@ use vault_types::{ use crate::chains::ThirdwebChainService; pub struct ExecutionRouter { - pub redis: ConnectionManager, + pub redis: ClusterConnection, pub namespace: Option, pub webhook_queue: Arc>, pub external_bundler_send_queue: Arc>>, diff --git a/server/src/main.rs b/server/src/main.rs index 4376aec..758b26a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,6 +22,11 @@ use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt, util::Subscrib #[tokio::main] async fn main() -> anyhow::Result<()> { + // rustls 0.23 requires selecting a process-wide crypto provider (ring or aws-lc-rs). + // Some dependency graphs do not enable either by default, which causes a runtime panic + // when a TLS client config is constructed (e.g. when connecting to `rediss://`). + let _ = rustls::crypto::ring::default_provider().install_default(); + let config = config::get_config(); let subscriber = tracing_subscriber::registry() @@ -70,7 +75,14 @@ async fn main() -> anyhow::Result<()> { }); let eoa_signer = Arc::new(EoaSigner::new(vault_client.clone(), iaw_client.clone())); let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client)); - let redis_client = twmq::redis::Client::open(config.redis.url.as_str())?; + let initial_nodes: Vec<&str> = config + .redis + .url + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect(); + let redis_client = twmq::redis::cluster::ClusterClient::new(initial_nodes)?; let authorization_cache = EoaAuthorizationCache::new( moka::future::Cache::builder() @@ -118,7 +130,7 @@ async fn main() -> anyhow::Result<()> { let execution_router = ExecutionRouter { namespace: config.queue.execution_namespace.clone(), - redis: redis_client.get_connection_manager().await?, + redis: redis_client.get_async_connection().await?, authorization_cache, webhook_queue: queue_manager.webhook_queue.clone(), external_bundler_send_queue: queue_manager.external_bundler_send_queue.clone(), diff --git a/server/src/queue/manager.rs b/server/src/queue/manager.rs index 3e61575..deb042f 100644 --- a/server/src/queue/manager.rs +++ b/server/src/queue/manager.rs @@ -50,7 +50,7 @@ const EOA_EXECUTOR_QUEUE_NAME: &str = "eoa_executor"; impl QueueManager { pub async fn new( - redis_client: twmq::redis::Client, + redis_client: twmq::redis::cluster::ClusterClient, queue_config: &QueueConfig, solana_config: &crate::config::SolanaConfig, chain_service: Arc, @@ -61,7 +61,7 @@ impl QueueManager { ) -> Result { // Create transaction registry let transaction_registry = Arc::new(TransactionRegistry::new( - redis_client.get_connection_manager().await?, + redis_client.get_async_connection().await?, queue_config.execution_namespace.clone(), )); @@ -255,7 +255,7 @@ impl QueueManager { eoa_signer: eoa_signer.clone(), webhook_queue: webhook_queue.clone(), namespace: queue_config.execution_namespace.clone(), - redis: redis_client.get_connection_manager().await?, + redis: redis_client.get_async_connection().await?, authorization_cache, max_inflight: 50, max_recycled_nonces: 50, @@ -286,7 +286,7 @@ impl QueueManager { }; let solana_rpc_cache = Arc::new(SolanaRpcCache::new(solana_rpc_urls)); let solana_storage = SolanaTransactionStorage::new( - redis_client.get_connection_manager().await?, + redis_client.get_async_connection().await?, queue_config.execution_namespace.clone(), ); let solana_executor_handler = SolanaExecutorJobHandler { diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index 164d234..24dc446 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -18,7 +18,8 @@ use job::{ pub use multilane::{MultilanePushableJob, MultilaneQueue}; use queue::QueueOptions; use redis::Pipeline; -use redis::{AsyncCommands, RedisResult, aio::ConnectionManager}; +use redis::{AsyncCommands, RedisResult}; +use redis::cluster_async::ClusterConnection; use serde::{Serialize, de::DeserializeOwned}; use shutdown::WorkerHandle; use tokio::sync::Semaphore; @@ -28,6 +29,10 @@ pub use queue::IdempotencyMode; pub use redis; use tracing::Instrument; +/// Global hash tag used to force all keys into the same cluster hash slot. +/// This is the "easiest" Valkey Cluster strategy: correctness over sharding. +pub const ENGINE_HASH_TAG: &str = "{engine}"; + // Trait for error types to implement user cancellation pub trait UserCancellable { fn user_cancelled() -> Self; @@ -121,7 +126,7 @@ pub struct Queue where H: DurableExecution, { - pub redis: ConnectionManager, + pub redis: ClusterConnection, pub handler: Arc, pub options: QueueOptions, // concurrency: usize, @@ -136,8 +141,13 @@ impl Queue { options: Option, handler: H, ) -> Result { - let client = redis::Client::open(redis_url)?; - let redis = client.get_connection_manager().await?; + let initial_nodes: Vec<&str> = redis_url + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect(); + let client = redis::cluster::ClusterClient::new(initial_nodes)?; + let redis = client.get_async_connection().await?; let queue = Self { redis, @@ -176,51 +186,60 @@ impl Queue { } pub fn pending_list_name(&self) -> String { - format!("twmq:{}:pending", self.name()) + format!("twmq:{}:{}:pending", ENGINE_HASH_TAG, self.name()) } pub fn active_hash_name(&self) -> String { - format!("twmq:{}:active", self.name) + format!("twmq:{}:{}:active", ENGINE_HASH_TAG, self.name) } pub fn delayed_zset_name(&self) -> String { - format!("twmq:{}:delayed", self.name) + format!("twmq:{}:{}:delayed", ENGINE_HASH_TAG, self.name) } pub fn success_list_name(&self) -> String { - format!("twmq:{}:success", self.name) + format!("twmq:{}:{}:success", ENGINE_HASH_TAG, self.name) } pub fn failed_list_name(&self) -> String { - format!("twmq:{}:failed", self.name) + format!("twmq:{}:{}:failed", ENGINE_HASH_TAG, self.name) } pub fn job_data_hash_name(&self) -> String { - format!("twmq:{}:jobs:data", self.name) + format!("twmq:{}:{}:jobs:data", ENGINE_HASH_TAG, self.name) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!("twmq:{}:job:{}:meta", self.name, job_id) + format!("twmq:{}:{}:job:{}:meta", ENGINE_HASH_TAG, self.name, job_id) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!("twmq:{}:job:{}:errors", self.name, job_id) + format!( + "twmq:{}:{}:job:{}:errors", + ENGINE_HASH_TAG, self.name, job_id + ) } pub fn job_result_hash_name(&self) -> String { - format!("twmq:{}:jobs:result", self.name) + format!("twmq:{}:{}:jobs:result", ENGINE_HASH_TAG, self.name) } pub fn dedupe_set_name(&self) -> String { - format!("twmq:{}:dedup", self.name) + format!("twmq:{}:{}:dedup", ENGINE_HASH_TAG, self.name) } pub fn pending_cancellation_set_name(&self) -> String { - format!("twmq:{}:pending_cancellations", self.name) + format!( + "twmq:{}:{}:pending_cancellations", + ENGINE_HASH_TAG, self.name + ) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { - format!("twmq:{}:job:{}:lease:{}", self.name, job_id, lease_token) + format!( + "twmq:{}:{}:job:{}:lease:{}", + ENGINE_HASH_TAG, self.name, job_id, lease_token + ) } pub async fn push( @@ -230,20 +249,20 @@ impl Queue { // Check for duplicates and handle job creation with deduplication let script = redis::Script::new( r#" - local job_id = ARGV[1] - local job_data = ARGV[2] - local now = ARGV[3] - local delay = ARGV[4] - local reentry_position = ARGV[5] -- "first" or "last" + local queue_id = ARGV[1] + local job_id = ARGV[2] + local job_data = ARGV[3] + local now = ARGV[4] + local delay = ARGV[5] + local reentry_position = ARGV[6] -- "first" or "last" - local queue_id = KEYS[1] - local delayed_zset_name = KEYS[2] - local pending_list_name = KEYS[3] + local delayed_zset_name = KEYS[1] + local pending_list_name = KEYS[2] - local job_data_hash_name = KEYS[4] - local job_meta_hash_name = KEYS[5] + local job_data_hash_name = KEYS[3] + local job_meta_hash_name = KEYS[4] - local dedupe_set_name = KEYS[6] + local dedupe_set_name = KEYS[5] -- Check if job already exists in any queue if redis.call('SISMEMBER', dedupe_set_name, job_id) == 1 then @@ -301,12 +320,12 @@ impl Queue { let position_string = delay.position.to_string(); let _result: (i32, String) = script - .key(&self.name) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.job_data_hash_name()) .key(self.job_meta_hash_name(&job.id)) .key(self.dedupe_set_name()) + .arg(self.name()) .arg(job_options.id) .arg(job_data) .arg(now) @@ -588,14 +607,15 @@ impl Queue { local batch_size = tonumber(ARGV[3]) local lease_seconds = tonumber(ARGV[4]) - local queue_id = KEYS[1] - local delayed_zset_name = KEYS[2] - local pending_list_name = KEYS[3] - local active_hash_name = KEYS[4] - local job_data_hash_name = KEYS[5] - local pending_cancellation_set = KEYS[6] - local failed_list_name = KEYS[7] - local success_list_name = KEYS[8] + local queue_id = ARGV[5] + + local delayed_zset_name = KEYS[1] + local pending_list_name = KEYS[2] + local active_hash_name = KEYS[3] + local job_data_hash_name = KEYS[4] + local pending_cancellation_set = KEYS[5] + local failed_list_name = KEYS[6] + local success_list_name = KEYS[7] local result_jobs = {} local timed_out_jobs = {} @@ -610,14 +630,14 @@ impl Queue { for i = 1, #active_jobs, 2 do local job_id = active_jobs[i] local attempts = active_jobs[i + 1] - local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' -- Get the current lease token from job metadata local current_lease_token = redis.call('HGET', job_meta_hash_name, 'lease_token') if current_lease_token then -- Build the lease key and check if it exists (Redis auto-expires) - local lease_key = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token + local lease_key = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token local lease_exists = redis.call('EXISTS', lease_key) -- If lease doesn't exist (expired), move job back to pending @@ -657,7 +677,7 @@ impl Queue { -- Job not successful, cancel it now redis.call('LPUSH', failed_list_name, job_id) -- Add cancellation timestamp - local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' redis.call('HSET', job_meta_hash_name, 'finished_at', now) table.insert(cancelled_jobs, job_id) end @@ -669,7 +689,7 @@ impl Queue { -- Step 3: Move expired delayed jobs to pending local delayed_jobs = redis.call('ZRANGEBYSCORE', delayed_zset_name, 0, now) for i, job_id in ipairs(delayed_jobs) do - local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' local reentry_position = redis.call('HGET', job_meta_hash_name, 'reentry_position') or 'last' -- Remove from delayed @@ -704,7 +724,7 @@ impl Queue { -- Only process if we have data if job_data then -- Update metadata - local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' redis.call('HSET', job_meta_hash_name, 'processed_at', now) local created_at = redis.call('HGET', job_meta_hash_name, 'created_at') or now local attempts = redis.call('HINCRBY', job_meta_hash_name, 'attempts', 1) @@ -713,7 +733,7 @@ impl Queue { local lease_token = now .. '_' .. job_id .. '_' .. attempts .. '_' .. pop_id -- Create separate lease key with TTL - local lease_key = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token + local lease_key = 'twmq:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token redis.call('SET', lease_key, '1') redis.call('EXPIRE', lease_key, lease_seconds) @@ -742,7 +762,6 @@ impl Queue { Vec, Vec, ) = script - .key(self.name()) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.active_hash_name()) @@ -754,6 +773,7 @@ impl Queue { .arg(pop_id) .arg(batch_size) .arg(self.options.lease_duration.as_secs()) + .arg(self.name()) .invoke_async(&mut self.redis.clone()) .await?; @@ -942,14 +962,14 @@ impl Queue { // Separate call for pruning with data deletion using Lua let trim_script = redis::Script::new( r#" - local queue_id = KEYS[1] - local list_name = KEYS[2] - local job_data_hash = KEYS[3] - local results_hash = KEYS[4] -- e.g., "myqueue:results" - local dedupe_set_name = KEYS[5] - local active_hash = KEYS[6] - local pending_list = KEYS[7] - local delayed_zset = KEYS[8] + local queue_id = ARGV[2] + local list_name = KEYS[1] + local job_data_hash = KEYS[2] + local results_hash = KEYS[3] -- e.g., "myqueue:results" + local dedupe_set_name = KEYS[4] + local active_hash = KEYS[5] + local pending_list = KEYS[6] + local delayed_zset = KEYS[7] local max_len = tonumber(ARGV[1]) @@ -971,8 +991,8 @@ impl Queue { -- Only delete if the job is NOT currently in the system if not is_active and not is_pending and not is_delayed then - local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta' - local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors' + local job_meta_hash = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' + local errors_list_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -990,7 +1010,6 @@ impl Queue { ); let trimmed_count: usize = trim_script - .key(self.name()) .key(self.success_list_name()) .key(self.job_data_hash_name()) .key(self.job_result_hash_name()) // results_hash @@ -999,6 +1018,7 @@ impl Queue { .key(self.pending_list_name()) // Check if job is pending .key(self.delayed_zset_name()) // Check if job is delayed .arg(self.options.max_success) // max_len (LTRIM is 0 to max_success-1) + .arg(self.name()) .invoke_async(&mut self.redis.clone()) .await?; @@ -1123,13 +1143,13 @@ impl Queue { // Separate call for pruning with data deletion using Lua let trim_script = redis::Script::new( r#" - local queue_id = KEYS[1] - local list_name = KEYS[2] - local job_data_hash = KEYS[3] - local dedupe_set_name = KEYS[4] - local active_hash = KEYS[5] - local pending_list = KEYS[6] - local delayed_zset = KEYS[7] + local queue_id = ARGV[2] + local list_name = KEYS[1] + local job_data_hash = KEYS[2] + local dedupe_set_name = KEYS[3] + local active_hash = KEYS[4] + local pending_list = KEYS[5] + local delayed_zset = KEYS[6] local max_len = tonumber(ARGV[1]) @@ -1151,8 +1171,8 @@ impl Queue { -- Only delete if the job is NOT currently in the system if not is_active and not is_pending and not is_delayed then - local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors' - local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta' + local errors_list_name = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' + local job_meta_hash = 'twmq:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1168,7 +1188,6 @@ impl Queue { ); let trimmed_count: usize = trim_script - .key(self.name()) .key(self.failed_list_name()) .key(self.job_data_hash_name()) .key(self.dedupe_set_name()) @@ -1176,6 +1195,7 @@ impl Queue { .key(self.pending_list_name()) // Check if job is pending .key(self.delayed_zset_name()) // Check if job is delayed .arg(self.options.max_failed) + .arg(self.name()) .invoke_async(&mut self.redis.clone()) .await?; diff --git a/twmq/src/multilane.rs b/twmq/src/multilane.rs index 5921e0a..f79c414 100644 --- a/twmq/src/multilane.rs +++ b/twmq/src/multilane.rs @@ -2,7 +2,8 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use redis::{AsyncCommands, Pipeline, RedisResult, aio::ConnectionManager}; +use redis::{AsyncCommands, Pipeline, RedisResult}; +use redis::cluster_async::ClusterConnection; use tokio::sync::Semaphore; use tokio::time::sleep; use tracing::Instrument; @@ -10,6 +11,7 @@ use tracing::Instrument; use crate::{ CancelResult, DurableExecution, FailHookData, NackHookData, QueueInternalErrorHookData, SuccessHookData, UserCancellable, + ENGINE_HASH_TAG, error::TwmqError, hooks::TransactionContext, job::{ @@ -26,7 +28,7 @@ pub struct MultilaneQueue where H: DurableExecution, { - pub redis: ConnectionManager, + pub redis: ClusterConnection, handler: Arc, options: QueueOptions, /// Unique identifier for this multilane queue instance @@ -50,8 +52,13 @@ impl MultilaneQueue { options: Option, handler: H, ) -> Result { - let client = redis::Client::open(redis_url)?; - let redis = client.get_connection_manager().await?; + let initial_nodes: Vec<&str> = redis_url + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect(); + let client = redis::cluster::ClusterClient::new(initial_nodes)?; + let redis = client.get_async_connection().await?; let queue = Self { redis, @@ -86,57 +93,81 @@ impl MultilaneQueue { // Redis key naming methods with proper multilane namespacing pub fn lanes_zset_name(&self) -> String { - format!("twmq_multilane:{}:lanes", self.queue_id) + format!("twmq_multilane:{}:{}:lanes", ENGINE_HASH_TAG, self.queue_id) } pub fn lane_pending_list_name(&self, lane_id: &str) -> String { - format!("twmq_multilane:{}:lane:{}:pending", self.queue_id, lane_id) + format!( + "twmq_multilane:{}:{}:lane:{}:pending", + ENGINE_HASH_TAG, self.queue_id, lane_id + ) } pub fn lane_delayed_zset_name(&self, lane_id: &str) -> String { - format!("twmq_multilane:{}:lane:{}:delayed", self.queue_id, lane_id) + format!( + "twmq_multilane:{}:{}:lane:{}:delayed", + ENGINE_HASH_TAG, self.queue_id, lane_id + ) } pub fn lane_active_hash_name(&self, lane_id: &str) -> String { - format!("twmq_multilane:{}:lane:{}:active", self.queue_id, lane_id) + format!( + "twmq_multilane:{}:{}:lane:{}:active", + ENGINE_HASH_TAG, self.queue_id, lane_id + ) } pub fn success_list_name(&self) -> String { - format!("twmq_multilane:{}:success", self.queue_id) + format!("twmq_multilane:{}:{}:success", ENGINE_HASH_TAG, self.queue_id) } pub fn failed_list_name(&self) -> String { - format!("twmq_multilane:{}:failed", self.queue_id) + format!("twmq_multilane:{}:{}:failed", ENGINE_HASH_TAG, self.queue_id) } pub fn job_data_hash_name(&self) -> String { - format!("twmq_multilane:{}:jobs:data", self.queue_id) + format!( + "twmq_multilane:{}:{}:jobs:data", + ENGINE_HASH_TAG, self.queue_id + ) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!("twmq_multilane:{}:job:{}:meta", self.queue_id, job_id) + format!( + "twmq_multilane:{}:{}:job:{}:meta", + ENGINE_HASH_TAG, self.queue_id, job_id + ) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!("twmq_multilane:{}:job:{}:errors", self.queue_id, job_id) + format!( + "twmq_multilane:{}:{}:job:{}:errors", + ENGINE_HASH_TAG, self.queue_id, job_id + ) } pub fn job_result_hash_name(&self) -> String { - format!("twmq_multilane:{}:jobs:result", self.queue_id) + format!( + "twmq_multilane:{}:{}:jobs:result", + ENGINE_HASH_TAG, self.queue_id + ) } pub fn dedupe_set_name(&self) -> String { - format!("twmq_multilane:{}:dedup", self.queue_id) + format!("twmq_multilane:{}:{}:dedup", ENGINE_HASH_TAG, self.queue_id) } pub fn pending_cancellation_set_name(&self) -> String { - format!("twmq_multilane:{}:pending_cancellations", self.queue_id) + format!( + "twmq_multilane:{}:{}:pending_cancellations", + ENGINE_HASH_TAG, self.queue_id + ) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { format!( - "twmq_multilane:{}:job:{}:lease:{}", - self.queue_id, job_id, lease_token + "twmq_multilane:{}:{}:job:{}:lease:{}", + ENGINE_HASH_TAG, self.queue_id, job_id, lease_token ) } @@ -372,9 +403,9 @@ impl MultilaneQueue { return "not_found" end - local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' - local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' -- Try to remove from pending queue if redis.call('LREM', lane_pending_list, 0, job_id) > 0 then @@ -563,20 +594,20 @@ impl MultilaneQueue { -- Helper function to cleanup expired leases for a specific lane local function cleanup_lane_leases(lane_id) - local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' - local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' local active_jobs = redis.call('HGETALL', lane_active_hash) for i = 1, #active_jobs, 2 do local job_id = active_jobs[i] local attempts = active_jobs[i + 1] - local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' local current_lease_token = redis.call('HGET', job_meta_hash, 'lease_token') if current_lease_token then - local lease_key = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token + local lease_key = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. current_lease_token local lease_exists = redis.call('EXISTS', lease_key) if lease_exists == 0 then @@ -597,12 +628,12 @@ impl MultilaneQueue { -- Helper function to move delayed jobs to pending for a specific lane local function process_delayed_jobs(lane_id) - local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' - local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' local delayed_jobs = redis.call('ZRANGEBYSCORE', lane_delayed_zset, 0, now) for i, job_id in ipairs(delayed_jobs) do - local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' local reentry_position = redis.call('HGET', job_meta_hash, 'reentry_position') or 'last' redis.call('ZREM', lane_delayed_zset, job_id) @@ -618,8 +649,8 @@ impl MultilaneQueue { -- Helper function to pop one job from a lane local function pop_job_from_lane(lane_id) - local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' local job_id = redis.call('RPOP', lane_pending_list) if not job_id then @@ -631,13 +662,13 @@ impl MultilaneQueue { return nil end - local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' redis.call('HSET', job_meta_hash, 'processed_at', now) local created_at = redis.call('HGET', job_meta_hash, 'created_at') or now local attempts = redis.call('HINCRBY', job_meta_hash, 'attempts', 1) local lease_token = now .. '_' .. job_id .. '_' .. attempts - local lease_key = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token + local lease_key = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token redis.call('SET', lease_key, '1') redis.call('EXPIRE', lease_key, lease_seconds) @@ -651,11 +682,11 @@ impl MultilaneQueue { local cancel_requests = redis.call('SMEMBERS', pending_cancellation_set) for i, job_id in ipairs(cancel_requests) do - local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. job_id .. ':meta' + local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. job_id .. ':meta' local lane_id = redis.call('HGET', job_meta_hash, 'lane_id') if lane_id then - local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' if redis.call('HEXISTS', lane_active_hash, job_id) == 1 then -- Still processing, keep in cancellation set @@ -720,9 +751,9 @@ impl MultilaneQueue { empty_lanes_count = empty_lanes_count + 1 -- Check if lane should be removed from Redis - local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' - local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' local pending_count = redis.call('LLEN', lane_pending_list) local delayed_count = redis.call('ZCARD', lane_delayed_zset) @@ -956,12 +987,12 @@ impl MultilaneQueue { async fn post_success_completion(&self) -> Result<(), TwmqError> { let trim_script = redis::Script::new( r#" - local queue_id = KEYS[1] - local list_name = KEYS[2] - local job_data_hash = KEYS[3] - local results_hash = KEYS[4] - local dedupe_set_name = KEYS[5] - local lanes_zset = KEYS[6] + local queue_id = ARGV[2] + local list_name = KEYS[1] + local job_data_hash = KEYS[2] + local results_hash = KEYS[3] + local dedupe_set_name = KEYS[4] + local lanes_zset = KEYS[5] local max_len = tonumber(ARGV[1]) @@ -971,16 +1002,16 @@ impl MultilaneQueue { if #job_ids_to_delete > 0 then for _, j_id in ipairs(job_ids_to_delete) do -- Get the lane_id for this job to check if it's active/pending/delayed - local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':meta' + local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' local lane_id = redis.call('HGET', job_meta_hash, 'lane_id') local should_delete = true if lane_id then -- Check if job is in any active state for this lane - local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' - local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' local is_active = redis.call('HEXISTS', lane_active_hash, j_id) == 1 local is_pending = redis.call('LPOS', lane_pending_list, j_id) ~= nil @@ -993,7 +1024,7 @@ impl MultilaneQueue { end if should_delete then - local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors' + local errors_list_name = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1010,13 +1041,13 @@ impl MultilaneQueue { ); let trimmed_count: usize = trim_script - .key(self.queue_id()) .key(self.success_list_name()) .key(self.job_data_hash_name()) .key(self.job_result_hash_name()) .key(self.dedupe_set_name()) .key(self.lanes_zset_name()) // Need to check lanes .arg(self.options.max_success) + .arg(self.queue_id()) .invoke_async(&mut self.redis.clone()) .await?; @@ -1106,10 +1137,10 @@ impl MultilaneQueue { async fn post_fail_completion(&self) -> Result<(), TwmqError> { let trim_script = redis::Script::new( r#" - local queue_id = KEYS[1] - local list_name = KEYS[2] - local job_data_hash = KEYS[3] - local dedupe_set_name = KEYS[4] + local queue_id = ARGV[2] + local list_name = KEYS[1] + local job_data_hash = KEYS[2] + local dedupe_set_name = KEYS[3] local max_len = tonumber(ARGV[1]) @@ -1119,16 +1150,16 @@ impl MultilaneQueue { if #job_ids_to_delete > 0 then for _, j_id in ipairs(job_ids_to_delete) do -- Get the lane_id for this job to check if it's active/pending/delayed - local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':meta' + local job_meta_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':meta' local lane_id = redis.call('HGET', job_meta_hash, 'lane_id') local should_delete = true if lane_id then -- Check if job is in any active state for this lane - local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' - local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' - local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' + local lane_active_hash = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':active' + local lane_pending_list = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':pending' + local lane_delayed_zset = 'twmq_multilane:{engine}:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' local is_active = redis.call('HEXISTS', lane_active_hash, j_id) == 1 local is_pending = redis.call('LPOS', lane_pending_list, j_id) ~= nil @@ -1141,7 +1172,7 @@ impl MultilaneQueue { end if should_delete then - local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors' + local errors_list_name = 'twmq_multilane:{engine}:' .. queue_id .. ':job:' .. j_id .. ':errors' redis.call('SREM', dedupe_set_name, j_id) redis.call('HDEL', job_data_hash, j_id) @@ -1157,11 +1188,11 @@ impl MultilaneQueue { ); let trimmed_count: usize = trim_script - .key(self.queue_id()) .key(self.failed_list_name()) .key(self.job_data_hash_name()) .key(self.dedupe_set_name()) .arg(self.options.max_failed) + .arg(self.queue_id()) .invoke_async(&mut self.redis.clone()) .await?; diff --git a/twmq/src/queue.rs b/twmq/src/queue.rs index 288b89f..b4571e8 100644 --- a/twmq/src/queue.rs +++ b/twmq/src/queue.rs @@ -1,6 +1,7 @@ use std::{marker::PhantomData, sync::Arc, time::Duration}; -use redis::{Client, aio::ConnectionManager}; +use redis::cluster::ClusterClient; +use redis::cluster_async::ClusterConnection; use serde::{Deserialize, Serialize}; use crate::{DurableExecution, Queue, error::TwmqError}; @@ -65,8 +66,8 @@ pub struct HasHandler; enum RedisSource { Url(String), - Client(Client), - ConnectionManager(ConnectionManager), + ClusterClient(ClusterClient), + ClusterConnection(ClusterConnection), } // Builder with typestate pattern @@ -114,9 +115,9 @@ impl QueueBuilder { } /// Set Redis connection from existing client - pub fn redis_client(self, client: Client) -> QueueBuilder { + pub fn redis_client(self, client: ClusterClient) -> QueueBuilder { QueueBuilder { - redis_source: Some(RedisSource::Client(client)), + redis_source: Some(RedisSource::ClusterClient(client)), name: self.name, options: self.options, handler: self.handler, @@ -124,13 +125,10 @@ impl QueueBuilder { } } - /// Set Redis connection from existing connection manager - pub fn redis_connection_manager( - self, - manager: ConnectionManager, - ) -> QueueBuilder { + /// Set Redis connection from an existing cluster connection + pub fn redis_connection(self, conn: ClusterConnection) -> QueueBuilder { QueueBuilder { - redis_source: Some(RedisSource::ConnectionManager(manager)), + redis_source: Some(RedisSource::ClusterConnection(conn)), name: self.name, options: self.options, handler: self.handler, @@ -192,11 +190,16 @@ impl QueueBuilder { let redis = match redis_source { RedisSource::Url(url) => { - let client = Client::open(url)?; - client.get_connection_manager().await? + let initial_nodes: Vec<&str> = url + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect(); + let client = ClusterClient::new(initial_nodes)?; + client.get_async_connection().await? } - RedisSource::Client(client) => client.get_connection_manager().await?, - RedisSource::ConnectionManager(manager) => manager, + RedisSource::ClusterClient(client) => client.get_async_connection().await?, + RedisSource::ClusterConnection(conn) => conn, }; Ok(Queue { diff --git a/twmq/tests/basic.rs b/twmq/tests/basic.rs index 0c5d2f6..8a10a1f 100644 --- a/twmq/tests/basic.rs +++ b/twmq/tests/basic.rs @@ -11,14 +11,14 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use twmq::job::{JobOptions, JobStatus}; // Assuming JobStatus is in twmq::job -use twmq::redis::aio::ConnectionManager; // For cleanup utility +use twmq::redis::cluster_async::ClusterConnection; // For cleanup utility const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys for a given queue name pattern -async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/basic_hook.rs b/twmq/tests/basic_hook.rs index 0306ac2..b4f17ef 100644 --- a/twmq/tests/basic_hook.rs +++ b/twmq/tests/basic_hook.rs @@ -1,7 +1,7 @@ // Add this to tests/basic.rs mod fixtures; use fixtures::{TestJobErrorData, TestJobOutput}; -use redis::aio::ConnectionManager; +use redis::cluster_async::ClusterConnection; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use std::{ @@ -21,9 +21,9 @@ use twmq::{ }; // Helper to clean up Redis keys for a given queue name pattern -async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/delay.rs b/twmq/tests/delay.rs index 5245188..4f93078 100644 --- a/twmq/tests/delay.rs +++ b/twmq/tests/delay.rs @@ -15,15 +15,15 @@ use twmq::{ hooks::TransactionContext, job::{BorrowedJob, DelayOptions, JobResult, JobStatus, RequeuePosition}, queue::QueueOptions, - redis::aio::ConnectionManager, + redis::cluster_async::ClusterConnection, }; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) @@ -131,8 +131,8 @@ async fn test_job_delay_basic() { }; // Create Redis connection for the execution context - let redis_client = redis::Client::open(REDIS_URL).unwrap(); - let redis_conn = Arc::new(redis_client.get_connection_manager().await.unwrap()); + let redis_client = redis::cluster::ClusterClient::new(vec![REDIS_URL]).unwrap(); + let redis_conn = Arc::new(redis_client.get_async_connection().await.unwrap()); let handler = DelayTestJobHandler; @@ -307,8 +307,8 @@ async fn test_delay_position_ordering() { }; // Create Redis connection for the execution context - let redis_client = redis::Client::open(REDIS_URL).unwrap(); - let redis_conn = Arc::new(redis_client.get_connection_manager().await.unwrap()); + let redis_client = redis::cluster::ClusterClient::new(vec![REDIS_URL]).unwrap(); + let redis_conn = Arc::new(redis_client.get_async_connection().await.unwrap()); let handler = DelayTestJobHandler; diff --git a/twmq/tests/idempotency_modes.rs b/twmq/tests/idempotency_modes.rs index ce1d0f3..7194207 100644 --- a/twmq/tests/idempotency_modes.rs +++ b/twmq/tests/idempotency_modes.rs @@ -8,7 +8,7 @@ use twmq::{ DurableExecution, Queue, job::{BorrowedJob, JobResult, JobStatus}, queue::{IdempotencyMode, QueueOptions}, - redis::aio::ConnectionManager, + redis::cluster_async::ClusterConnection, }; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; @@ -66,9 +66,9 @@ impl DurableExecution for TestJobHandler { } // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/lease_expiry.rs b/twmq/tests/lease_expiry.rs index 773c100..6ed420b 100644 --- a/twmq/tests/lease_expiry.rs +++ b/twmq/tests/lease_expiry.rs @@ -12,7 +12,7 @@ use twmq::{ hooks::TransactionContext, job::{BorrowedJob, JobResult, JobStatus}, queue::QueueOptions, - redis::aio::ConnectionManager, + redis::cluster_async::ClusterConnection, }; mod fixtures; @@ -21,9 +21,9 @@ use fixtures::TestJobErrorData; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/multilane_batch_pop.rs b/twmq/tests/multilane_batch_pop.rs index 3f8c398..e167123 100644 --- a/twmq/tests/multilane_batch_pop.rs +++ b/twmq/tests/multilane_batch_pop.rs @@ -90,7 +90,7 @@ impl MultilaneTestHarness { /// Clean up all Redis keys for this test async fn cleanup(&self) { let mut conn = self.queue.redis.clone(); - let keys_pattern = format!("twmq_multilane:{}:*", self.queue_id); + let keys_pattern = format!("twmq_multilane:{}:{}:*", twmq::ENGINE_HASH_TAG, self.queue_id); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -165,7 +165,8 @@ impl Drop for MultilaneTestHarness { tokio::spawn(async move { let mut conn = redis; - let keys_pattern = format!("twmq_multilane:{queue_id}:*"); + let keys_pattern = + format!("twmq_multilane:{}:{queue_id}:*", twmq::ENGINE_HASH_TAG); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/nack.rs b/twmq/tests/nack.rs index f66cd9f..1c58eec 100644 --- a/twmq/tests/nack.rs +++ b/twmq/tests/nack.rs @@ -15,7 +15,7 @@ use twmq::{ hooks::TransactionContext, job::{BorrowedJob, JobError, JobResult, JobStatus, RequeuePosition}, queue::QueueOptions, - redis::aio::ConnectionManager, + redis::cluster_async::ClusterConnection, }; mod fixtures; @@ -24,9 +24,9 @@ use fixtures::TestJobErrorData; const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/prune_race_condition.rs b/twmq/tests/prune_race_condition.rs index c9898f9..bbbf7cb 100644 --- a/twmq/tests/prune_race_condition.rs +++ b/twmq/tests/prune_race_condition.rs @@ -6,7 +6,7 @@ mod fixtures; use fixtures::TestJobErrorData; -use redis::aio::ConnectionManager; +use redis::cluster_async::ClusterConnection; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use std::{ @@ -146,9 +146,9 @@ impl DurableExecution for EoaSimulatorJobHandler { } // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/prune_race_random_ids.rs b/twmq/tests/prune_race_random_ids.rs index fd5e9ea..143bba5 100644 --- a/twmq/tests/prune_race_random_ids.rs +++ b/twmq/tests/prune_race_random_ids.rs @@ -4,7 +4,7 @@ mod fixtures; use fixtures::TestJobErrorData; -use redis::{AsyncCommands, aio::ConnectionManager}; +use redis::{AsyncCommands, cluster_async::ClusterConnection}; use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; use std::{ @@ -121,9 +121,9 @@ impl DurableExecution for RandomJobHandler { } // Helper to clean up Redis keys -async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { +async fn cleanup_redis_keys(conn_manager: &ClusterConnection, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + let keys_pattern = format!("twmq:{}:{queue_name}:*", twmq::ENGINE_HASH_TAG); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -239,7 +239,7 @@ async fn test_prune_with_random_ids() { let success_job_ids: Vec = conn.lrange(queue.success_list_name(), 0, -1).await.unwrap(); // Count how many job metadata hashes still exist (should match success list length if pruning works) - let meta_pattern = format!("twmq:{}:job:*:meta", queue.name()); + let meta_pattern = format!("twmq:{}:{}:job:*:meta", twmq::ENGINE_HASH_TAG, queue.name()); let meta_keys: Vec = redis::cmd("KEYS") .arg(&meta_pattern) .query_async(&mut conn)