diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 0ad00222b..9586f582a 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -11,8 +11,9 @@ Current package versions: - Support Redis 8.4 CAS/CAD operations (`DIGEST`, and the `IFEQ`, `IFNE`, `IFDEQ`, `IFDNE` modifiers on `SET` / `DEL`) via the new `ValueCondition` abstraction, and use CAS/CAD operations for `Lock*` APIs when possible ([#2978 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2978)) - **note**: overload resolution for `StringSet[Async]` may be impacted in niche cases, requiring trivial build changes (there are no runtime-breaking changes such as missing methods) -- Support `XREADGROUP CLAIM` ([#2972 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2972)) +- Support `XREADGROUP CLAIM` ([#2972 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2972)) - Support `MSETEX` (Redis 8.4.0) for multi-key operations with expiration ([#2977 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2977)) +- Support `NOMKSTREAM` option in `StreamAdd` methods via `createStream` parameter (requires Redis 6.2.0+) ## 2.9.32 diff --git a/docs/Streams.md b/docs/Streams.md index 4378a528d..58d504655 100644 --- a/docs/Streams.md +++ b/docs/Streams.md @@ -37,6 +37,38 @@ You also have the option to override the auto-generated message ID by passing yo db.StreamAdd("events_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100); ``` +Conditional Stream Creation with NOMKSTREAM +--- + +By default, `StreamAdd` automatically creates the stream if it doesn't exist. Starting with Redis 6.2.0, you can prevent automatic stream creation using the `createStream` parameter: + +```csharp +// This will return null if the stream doesn't exist +var messageId = db.StreamAdd( + "mystream", + "field", + "value", + createStream: false); + +if (messageId.IsNull) +{ + Console.WriteLine("Stream does not exist, message was not added."); +} +else +{ + Console.WriteLine($"Message added with ID: {messageId}"); +} +``` + +**Use cases**: +- **Producer-consumer scenarios**: Only add messages if a consumer has registered (by creating the stream or consumer group) +- **Prevent typos**: Avoid accidentally creating streams with misspelled keys +- **Conditional writes**: Only write to pre-existing streams + +**Requirements**: +- Redis 6.2.0 or higher +- When `createStream: false` and the stream doesn't exist, the method returns `RedisValue.Null` + Reading from Streams === diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 3df162682..84ed041bc 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2655,13 +2655,14 @@ IEnumerable SortedSetScan( /// The ID to assign to the stream entry, defaults to an auto-generated ID ("*"). /// The maximum length of the stream. /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// When false, the stream will not be created if it does not exist, and the command returns null. When true (default), the stream is created if it does not exist. Requires Redis 6.2.0+. /// Specifies the maximal count of entries that will be evicted. /// Determines how stream trimming should be performed. /// The flags to use for this operation. /// The ID of the newly created message. /// #pragma warning disable RS0026 // different shape - RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); + RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); #pragma warning restore RS0026 /// @@ -2674,13 +2675,14 @@ IEnumerable SortedSetScan( /// The ID to assign to the stream entry, defaults to an auto-generated ID ("*"). /// The maximum length of the stream. /// If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages. + /// When false, the stream will not be created if it does not exist, and the command returns null. When true (default), the stream is created if it does not exist. Requires Redis 6.2.0+. /// Specifies the maximal count of entries that will be evicted. /// Determines how stream trimming should be performed. /// The flags to use for this operation. /// The ID of the newly created message. /// #pragma warning disable RS0026 // different shape - RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); + RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); #pragma warning restore RS0026 /// diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 855ea6c8f..bdd106e50 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -650,11 +650,11 @@ IAsyncEnumerable SortedSetScanAsync( Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags); #pragma warning disable RS0026 // similar overloads - /// - Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); + /// + Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); - /// - Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); + /// + Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None); #pragma warning restore RS0026 /// diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs index fe23b73c1..603fd2a17 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs @@ -615,11 +615,11 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); - public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => - Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, limit, mode, flags); + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags); - public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => - Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags); + public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags); public Task StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs index 69775c15d..587cf9d92 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs @@ -597,11 +597,11 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) => Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags); - public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => - Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, limit, mode, flags); + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags); - public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => - Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags); + public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) => + Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, createStream, limit, mode, flags); public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) => Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags); diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 5eaa42b3f..90d087e85 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -719,9 +719,9 @@ StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long StackExchange.Redis.IDatabase.StreamAcknowledgeAndDelete(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamTrimMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamTrimResult StackExchange.Redis.IDatabase.StreamAcknowledgeAndDelete(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamTrimMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamTrimResult[]! -StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue +StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisValue -StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue +StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.RedisValue StackExchange.Redis.IDatabase.StreamAutoClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimResult StackExchange.Redis.IDatabase.StreamAutoClaimIdsOnly(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimIdsOnlyResult @@ -963,9 +963,9 @@ StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.Re StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAndDeleteAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamTrimMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAndDeleteAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamTrimMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! -StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! -StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! +StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StackExchange.Redis.StreamTrimMode trimMode = StackExchange.Redis.StreamTrimMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAutoClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.IDatabaseAsync.StreamAutoClaimIdsOnlyAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index f13571c4e..959598719 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2776,9 +2776,9 @@ public Task StreamAcknowledgeAndDeleteAsync(RedisKey key, Re } public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) - => StreamAdd(key, streamField, streamValue, messageId, maxLength, useApproximateMaxLength, null, StreamTrimMode.KeepReferences, flags); + => StreamAdd(key, streamField, streamValue, messageId, maxLength, useApproximateMaxLength, createStream: true, null, StreamTrimMode.KeepReferences, flags); - public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) + public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAddMessage( key, @@ -2786,6 +2786,7 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str maxLength, useApproximateMaxLength, new NameValueEntry(streamField, streamValue), + createStream, limit, mode, flags); @@ -2794,9 +2795,9 @@ public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue str } public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) - => StreamAddAsync(key, streamField, streamValue, messageId, maxLength, useApproximateMaxLength, null, StreamTrimMode.KeepReferences, flags); + => StreamAddAsync(key, streamField, streamValue, messageId, maxLength, useApproximateMaxLength, createStream: true, null, StreamTrimMode.KeepReferences, flags); - public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) + public Task StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAddMessage( key, @@ -2804,6 +2805,7 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red maxLength, useApproximateMaxLength, new NameValueEntry(streamField, streamValue), + createStream, limit, mode, flags); @@ -2812,9 +2814,9 @@ public Task StreamAddAsync(RedisKey key, RedisValue streamField, Red } public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) - => StreamAdd(key, streamPairs, messageId, maxLength, useApproximateMaxLength, null, StreamTrimMode.KeepReferences, flags); + => StreamAdd(key, streamPairs, messageId, maxLength, useApproximateMaxLength, createStream: true, null, StreamTrimMode.KeepReferences, flags); - public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) + public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAddMessage( key, @@ -2822,6 +2824,7 @@ public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisVal maxLength, useApproximateMaxLength, streamPairs, + createStream, limit, mode, flags); @@ -2830,9 +2833,9 @@ public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisVal } public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) - => StreamAddAsync(key, streamPairs, messageId, maxLength, useApproximateMaxLength, null, StreamTrimMode.KeepReferences, flags); + => StreamAddAsync(key, streamPairs, messageId, maxLength, useApproximateMaxLength, createStream: true, null, StreamTrimMode.KeepReferences, flags); - public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) + public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, bool createStream = true, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None) { var msg = GetStreamAddMessage( key, @@ -2840,6 +2843,7 @@ public Task StreamAddAsync(RedisKey key, NameValueEntry[] streamPair maxLength, useApproximateMaxLength, streamPairs, + createStream, limit, mode, flags); @@ -4628,20 +4632,32 @@ private Message GetStreamAcknowledgeAndDeleteMessage(RedisKey key, RedisValue gr return Message.Create(Database, flags, RedisCommand.XACKDEL, key, values); } - private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, long? maxLength, bool useApproximateMaxLength, NameValueEntry streamPair, long? limit, StreamTrimMode mode, CommandFlags flags) + private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, long? maxLength, bool useApproximateMaxLength, NameValueEntry streamPair, bool createStream, long? limit, StreamTrimMode mode, CommandFlags flags) { // Calculate the correct number of arguments: - // 3 array elements for Entry ID & NameValueEntry.Name & NameValueEntry.Value. + // 1 element if using NOMKSTREAM (when createStream = false), otherwise 0. // 2 elements if using MAXLEN (keyword & value), otherwise 0. // 1 element if using Approximate Length (~), otherwise 0. - var totalLength = 3 + (maxLength.HasValue ? 2 : 0) + // 2 elements if using LIMIT (keyword & value), otherwise 0. + // 1 element for trim mode (if not KeepReferences), otherwise 0. + // 1 element for Entry ID. + // 2 elements for NameValueEntry.Name & NameValueEntry.Value. + var totalLength = (!createStream ? 1 : 0) + + (maxLength.HasValue ? 2 : 0) + (maxLength.HasValue && useApproximateMaxLength ? 1 : 0) + (limit.HasValue ? 2 : 0) - + (mode != StreamTrimMode.KeepReferences ? 1 : 0); + + (mode != StreamTrimMode.KeepReferences ? 1 : 0) + + 3; var values = new RedisValue[totalLength]; var offset = 0; + // NOMKSTREAM must come FIRST in the XADD command (before MAXLEN, LIMIT, etc.) + if (!createStream) + { + values[offset++] = StreamConstants.NoMkStream; + } + if (maxLength.HasValue) { values[offset++] = StreamConstants.MaxLen; @@ -4677,7 +4693,7 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue messageId, long? ma /// /// Gets message for . /// - private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxLength, bool useApproximateMaxLength, NameValueEntry[] streamPairs, long? limit, StreamTrimMode mode, CommandFlags flags) + private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxLength, bool useApproximateMaxLength, NameValueEntry[] streamPairs, bool createStream, long? limit, StreamTrimMode mode, CommandFlags flags) { if (streamPairs == null) throw new ArgumentNullException(nameof(streamPairs)); if (streamPairs.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPairs), "streamPairs must contain at least one item."); @@ -4687,7 +4703,8 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxL throw new ArgumentOutOfRangeException(nameof(maxLength), "maxLength must be greater than 0."); } - var totalLength = (streamPairs.Length * 2) // Room for the name/value pairs + var totalLength = (!createStream ? 1 : 0) // NOMKSTREAM flag + + (streamPairs.Length * 2) // Room for the name/value pairs + 1 // The stream entry ID + (maxLength.HasValue ? 2 : 0) // MAXLEN N + (maxLength.HasValue && useApproximateMaxLength ? 1 : 0) // ~ @@ -4698,6 +4715,12 @@ private Message GetStreamAddMessage(RedisKey key, RedisValue entryId, long? maxL var offset = 0; + // NOMKSTREAM must come FIRST in the XADD command (before MAXLEN, LIMIT, etc.) + if (!createStream) + { + values[offset++] = StreamConstants.NoMkStream; + } + if (maxLength.HasValue) { values[offset++] = StreamConstants.MaxLen; diff --git a/src/StackExchange.Redis/StreamConstants.cs b/src/StackExchange.Redis/StreamConstants.cs index 92c37222a..6b8559287 100644 --- a/src/StackExchange.Redis/StreamConstants.cs +++ b/src/StackExchange.Redis/StreamConstants.cs @@ -63,6 +63,8 @@ internal static class StreamConstants internal static readonly RedisValue MkStream = "MKSTREAM"; + internal static readonly RedisValue NoMkStream = "NOMKSTREAM"; + internal static readonly RedisValue Stream = "STREAM"; private static readonly RedisValue KeepRef = "KEEPREF", DelRef = "DELREF", Acked = "ACKED"; diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs index 0b781123c..fb2516f79 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs @@ -1729,16 +1729,16 @@ public void IsConnected() [Fact] public void StreamAdd_WithTrimMode_1() { - prefixed.StreamAdd("key", "field", "value", "*", 1000, false, 100, StreamTrimMode.KeepReferences, CommandFlags.None); - mock.Received().StreamAdd("prefix:key", "field", "value", "*", 1000, false, 100, StreamTrimMode.KeepReferences, CommandFlags.None); + prefixed.StreamAdd("key", "field", "value", "*", 1000, false, true, 100, StreamTrimMode.KeepReferences, CommandFlags.None); + mock.Received().StreamAdd("prefix:key", "field", "value", "*", 1000, false, true, 100, StreamTrimMode.KeepReferences, CommandFlags.None); } [Fact] public void StreamAdd_WithTrimMode_2() { var fields = new NameValueEntry[] { new NameValueEntry("field", "value") }; - prefixed.StreamAdd("key", fields, "*", 1000, false, 100, StreamTrimMode.KeepReferences, CommandFlags.None); - mock.Received().StreamAdd("prefix:key", fields, "*", 1000, false, 100, StreamTrimMode.KeepReferences, CommandFlags.None); + prefixed.StreamAdd("key", fields, "*", 1000, false, true, 100, StreamTrimMode.KeepReferences, CommandFlags.None); + mock.Received().StreamAdd("prefix:key", fields, "*", 1000, false, true, 100, StreamTrimMode.KeepReferences, CommandFlags.None); } [Fact] diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs index 94b54e112..7ddd24049 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs @@ -1650,16 +1650,16 @@ public async Task KeyIdleTimeAsync() [Fact] public async Task StreamAddAsync_WithTrimMode_1() { - await prefixed.StreamAddAsync("key", "field", "value", "*", 1000, false, 100, StreamTrimMode.KeepReferences, CommandFlags.None); - await mock.Received().StreamAddAsync("prefix:key", "field", "value", "*", 1000, false, 100, StreamTrimMode.KeepReferences, CommandFlags.None); + await prefixed.StreamAddAsync("key", "field", "value", "*", 1000, false, true, 100, StreamTrimMode.KeepReferences, CommandFlags.None); + await mock.Received().StreamAddAsync("prefix:key", "field", "value", "*", 1000, false, true, 100, StreamTrimMode.KeepReferences, CommandFlags.None); } [Fact] public async Task StreamAddAsync_WithTrimMode_2() { var fields = new NameValueEntry[] { new NameValueEntry("field", "value") }; - await prefixed.StreamAddAsync("key", fields, "*", 1000, false, 100, StreamTrimMode.KeepReferences, CommandFlags.None); - await mock.Received().StreamAddAsync("prefix:key", fields, "*", 1000, false, 100, StreamTrimMode.KeepReferences, CommandFlags.None); + await prefixed.StreamAddAsync("key", fields, "*", 1000, false, true, 100, StreamTrimMode.KeepReferences, CommandFlags.None); + await mock.Received().StreamAddAsync("prefix:key", fields, "*", 1000, false, true, 100, StreamTrimMode.KeepReferences, CommandFlags.None); } [Fact] diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 2419f673a..1b6002a01 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -105,6 +105,140 @@ public async Task StreamAddMultipleValuePairsWithManualId() Assert.Equal(id, entries[0].Id); } + [Fact] + public async Task StreamAddWithNoMkStream_StreamExists_Success() + { + await using var conn = Create(require: RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + var key = Me(); + + // First, create the stream with a normal StreamAdd + var firstId = db.StreamAdd(key, "field1", "value1"); + Assert.True(firstId != RedisValue.Null); + + // Now add entry with createStream: false (should succeed since stream exists) + var secondId = db.StreamAdd(key, "field2", "value2", createStream: false); + Assert.True(secondId != RedisValue.Null); + + // Verify both entries exist + var entries = db.StreamRange(key); + Assert.Equal(2, entries.Length); + } + + [Fact] + public async Task StreamAddWithNoMkStream_StreamDoesNotExist_ReturnsNull() + { + await using var conn = Create(require: RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + var key = Me(); + + // Ensure stream doesn't exist + db.KeyDelete(key); + + // Try to add entry with createStream: false (should return null) + var messageId = db.StreamAdd(key, "field1", "value1", createStream: false); + Assert.True(messageId.IsNull); + + // Verify stream was not created + var keyExists = db.KeyExists(key); + Assert.False(keyExists); + } + + [Fact] + public async Task StreamAddAsyncWithNoMkStream_StreamDoesNotExist_ReturnsNull() + { + await using var conn = Create(require: RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + var key = Me(); + + // Ensure stream doesn't exist + await db.KeyDeleteAsync(key); + + // Try to add entry with createStream: false (should return null) + var messageId = await db.StreamAddAsync(key, "field1", "value1", createStream: false); + Assert.True(messageId.IsNull); + + // Verify stream was not created + var keyExists = await db.KeyExistsAsync(key); + Assert.False(keyExists); + } + + [Fact] + public async Task StreamAddWithNoMkStream_MultipleFields_StreamExists_Success() + { + await using var conn = Create(require: RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + var key = Me(); + + // First, create the stream + var firstId = db.StreamAdd(key, "field1", "value1"); + Assert.True(firstId != RedisValue.Null); + + // Add entry with multiple fields and createStream: false + var fields = new[] + { + new NameValueEntry("field2", "value2"), + new NameValueEntry("field3", "value3"), + }; + var secondId = db.StreamAdd(key, fields, createStream: false); + Assert.True(secondId != RedisValue.Null); + + // Verify entries + var entries = db.StreamRange(key); + Assert.Equal(2, entries.Length); + Assert.Equal(2, entries[1].Values.Length); + } + + [Fact] + public async Task StreamAddWithNoMkStream_MultipleFields_StreamDoesNotExist_ReturnsNull() + { + await using var conn = Create(require: RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + var key = Me(); + + // Ensure stream doesn't exist + db.KeyDelete(key); + + // Try to add entry with multiple fields and createStream: false + var fields = new[] + { + new NameValueEntry("field1", "value1"), + new NameValueEntry("field2", "value2"), + }; + var messageId = db.StreamAdd(key, fields, createStream: false); + Assert.True(messageId.IsNull); + + // Verify stream was not created + var keyExists = db.KeyExists(key); + Assert.False(keyExists); + } + + [Fact] + public async Task StreamAddWithNoMkStream_WithMaxLen_StreamExists_Success() + { + await using var conn = Create(require: RedisFeatures.v6_2_0); + + var db = conn.GetDatabase(); + var key = Me(); + + // Create stream with initial entries + db.StreamAdd(key, "field1", "value1"); + db.StreamAdd(key, "field2", "value2"); + + // Add entry with NOMKSTREAM and MAXLEN + var messageId = db.StreamAdd(key, "field3", "value3", maxLength: 2, createStream: false); + Assert.True(messageId != RedisValue.Null); + + // Verify stream was trimmed to maxLength + var entries = db.StreamRange(key); + Assert.Equal(2, entries.Length); + } + [Fact] public async Task StreamAutoClaim_MissingKey() {