Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ZiggyCreatures.FusionCache.sln
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ZiggyCreatures.FusionCache.
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AOTTester", "tests\AOTTester\AOTTester.csproj", "{A1321882-2C76-4105-A0BD-9500D2402A2B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ZiggyCreatures.FusionCache.Backplane.NATS", "src\ZiggyCreatures.FusionCache.Backplane.NATS\ZiggyCreatures.FusionCache.Backplane.NATS.csproj", "{970C789F-EEF1-08D6-6053-36CF2DCD8E68}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -141,6 +143,10 @@ Global
{A1321882-2C76-4105-A0BD-9500D2402A2B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A1321882-2C76-4105-A0BD-9500D2402A2B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A1321882-2C76-4105-A0BD-9500D2402A2B}.Release|Any CPU.Build.0 = Release|Any CPU
{970C789F-EEF1-08D6-6053-36CF2DCD8E68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{970C789F-EEF1-08D6-6053-36CF2DCD8E68}.Debug|Any CPU.Build.0 = Debug|Any CPU
{970C789F-EEF1-08D6-6053-36CF2DCD8E68}.Release|Any CPU.ActiveCfg = Release|Any CPU
{970C789F-EEF1-08D6-6053-36CF2DCD8E68}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -166,6 +172,7 @@ Global
{9BC9B26A-E73F-46D4-B788-06C3A8AE63EB} = {34B53F49-F5C5-4850-B79E-59AD130379C6}
{5F66F031-412F-43E3-946B-1034DBD3ED4D} = {34B53F49-F5C5-4850-B79E-59AD130379C6}
{A1321882-2C76-4105-A0BD-9500D2402A2B} = {C6F3C570-C68C-4A95-960E-82778306BDBA}
{970C789F-EEF1-08D6-6053-36CF2DCD8E68} = {34B53F49-F5C5-4850-B79E-59AD130379C6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {92916FA2-FCAC-406E-BF3F-0A2CE9512EF0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOp
{
EnsureConnection();

if (message is null)
throw new ArgumentNullException(nameof(message));

if (message.IsValid() == false)
throw new InvalidOperationException("The message is invalid");

Expand All @@ -60,8 +57,6 @@ public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOp
if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] about to send a backplane notification to {BackplanesCount} backplanes (including self)", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, _subscribers.Count);

var payload = BackplaneMessage.ToByteArray(message);

foreach (var backplane in _subscribers)
{
token.ThrowIfCancellationRequested();
Expand All @@ -74,7 +69,7 @@ public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOp
if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] before sending a backplane notification to channel {BackplaneChannel}", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, backplane._channelName);

await backplane.OnMessageAsync(payload).ConfigureAwait(false);
await backplane.OnMessageAsync(message).ConfigureAwait(false);

if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] after sending a backplane notification to channel {BackplaneChannel}", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, backplane._channelName);
Expand All @@ -87,10 +82,8 @@ public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOp
}
}

internal async ValueTask OnMessageAsync(byte[] payload)
internal async ValueTask OnMessageAsync(BackplaneMessage message)
{
var message = BackplaneMessage.FromByteArray(payload);

var handler = _incomingMessageHandlerAsync;

if (handler is null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,10 @@ public void Unsubscribe()
}

/// <inheritdoc/>
public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
public void Publish(in BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
{
EnsureConnection();

if (message is null)
throw new ArgumentNullException(nameof(message));

if (message.IsValid() == false)
throw new InvalidOperationException("The message is invalid");

Expand All @@ -102,8 +99,6 @@ public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, C
if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] about to send a backplane notification to {BackplanesCount} backplanes (including self)", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, _subscribers.Count);

var payload = BackplaneMessage.ToByteArray(message);

foreach (var backplane in _subscribers)
{
token.ThrowIfCancellationRequested();
Expand All @@ -116,7 +111,7 @@ public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, C
if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] before sending a backplane notification to channel {BackplaneChannel}", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, backplane._channelName);

backplane.OnMessage(payload);
backplane.OnMessage(message);

if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}] (K={CacheKey}): [BP] after sending a backplane notification to channel {BackplaneChannel}", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId, message.CacheKey, backplane._channelName);
Expand All @@ -129,10 +124,8 @@ public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, C
}
}

internal void OnMessage(byte[] payload)
internal void OnMessage(in BackplaneMessage message)
{
var message = BackplaneMessage.FromByteArray(payload);

var handler = _incomingMessageHandler;

if (handler is null)
Expand Down
144 changes: 144 additions & 0 deletions src/ZiggyCreatures.FusionCache.Backplane.NATS/NatsBackplane.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
using System.Buffers;
using System.Text.Json;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;

using NATS.Client.Core;

using ZiggyCreatures.Caching.Fusion.Internals;

namespace ZiggyCreatures.Caching.Fusion.Backplane.NATS;

/// <summary>
/// A Redis based implementation of a FusionCache backplane.
/// </summary>
public partial class NatsBackplane
: IFusionCacheBackplane
{
private BackplaneSubscriptionOptions? _subscriptionOptions;
private readonly ILogger? _logger;
private INatsConnection _connection;
private string _channelName = "";
private Func<BackplaneMessage, ValueTask>? _incomingMessageHandlerAsync;
private INatsSub<NatsMemoryOwner<byte>>? _subscription;

/// <summary>
/// Initializes a new instance of the RedisBackplane class.
/// </summary>
/// <param name="natsConnection">The NATS connection instance to use.</param>
/// <param name="logger">The <see cref="ILogger{TCategoryName}"/> instance to use. If null, logging will be completely disabled.</param>
public NatsBackplane(INatsConnection? natsConnection, ILogger<NatsBackplane>? logger = null)
{
_connection = natsConnection ?? throw new ArgumentNullException(nameof(natsConnection));

// LOGGING
if (logger is NullLogger<NatsBackplane>)
{
// IGNORE NULL LOGGER (FOR BETTER PERF)
_logger = null;
}
else
{
_logger = logger;
}
}

/// <inheritdoc/>
public async ValueTask SubscribeAsync(BackplaneSubscriptionOptions subscriptionOptions)
{
if (subscriptionOptions is null)
throw new ArgumentNullException(nameof(subscriptionOptions));

if (subscriptionOptions.ChannelName is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.ChannelName cannot be null");

if (subscriptionOptions.IncomingMessageHandler is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.IncomingMessageHandler cannot be null");

if (subscriptionOptions.ConnectHandler is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.ConnectHandler cannot be null");

if (subscriptionOptions.IncomingMessageHandlerAsync is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.IncomingMessageHandlerAsync cannot be null");

if (subscriptionOptions.ConnectHandlerAsync is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.ConnectHandlerAsync cannot be null");

_subscriptionOptions = subscriptionOptions;

_channelName = _subscriptionOptions.ChannelName;
if (string.IsNullOrEmpty(_channelName))
throw new NullReferenceException("The backplane channel name must have a value");

_incomingMessageHandlerAsync = _subscriptionOptions.IncomingMessageHandlerAsync;
_subscription = await _connection.SubscribeCoreAsync<NatsMemoryOwner<byte>>(_channelName);
_ = Task.Run(async () =>
{
while (await _subscription.Msgs.WaitToReadAsync().ConfigureAwait(false))
{
while (_subscription.Msgs.TryRead(out var msg))
{
using (msg.Data)
{
if(BackplaneMessage.TryParse(msg.Data.Span, out BackplaneMessage message))
{
await OnMessageAsync(message).ConfigureAwait(false);
}
}
}
}
});
}


/// <inheritdoc/>
public void Subscribe(BackplaneSubscriptionOptions options)
{
SubscribeAsync(options).AsTask().Wait();
}

/// <inheritdoc/>
public async ValueTask UnsubscribeAsync()
{
if (_subscription is not null)
{
await _subscription.UnsubscribeAsync().ConfigureAwait(false);
await _subscription.Msgs.Completion;
}
}

/// <inheritdoc/>
public void Unsubscribe()
{
UnsubscribeAsync().AsTask().Wait();
}

/// <inheritdoc/>
public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
{
var writer = new NatsBufferWriter<byte>();
message.WriteTo(writer);
await _connection.PublishAsync(_channelName, writer).ConfigureAwait(false);
}

/// <inheritdoc/>
public void Publish(in BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
{
PublishAsync(message, options, token).AsTask().Wait();
}

internal async ValueTask OnMessageAsync(BackplaneMessage message)
{
var tmp = _incomingMessageHandlerAsync;
if (tmp is null)
{
if (_logger?.IsEnabled(LogLevel.Trace) ?? false)
_logger.Log(LogLevel.Trace, "FUSION [N={CacheName} I={CacheInstanceId}]: [BP] incoming message handler was null", _subscriptionOptions?.CacheName, _subscriptionOptions?.CacheInstanceId);
return;
}

await tmp(message).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<Version>2.1.0</Version>
<PackageId>ZiggyCreatures.FusionCache.Backplane.NATS</PackageId>
<Description>FusionCache backplane for NATS based on the NATS.Net library</Description>
<PackageTags>backplane;nats;synadia;caching;cache;hybrid;hybrid-cache;hybridcache;multi-level;multilevel;fusion;fusioncache;fusion-cache;performance;async;ziggy</PackageTags>
<RootNamespace>ZiggyCreatures.Caching.Fusion.Backplane.NATS</RootNamespace>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageValidationBaselineVersion>1.0.0</PackageValidationBaselineVersion>
</PropertyGroup>

<ItemGroup>
<None Include="artwork\logo-128x128.png" Pack="true" PackagePath="\" />
<None Include="docs\README.md" Pack="true" PackagePath="\" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="NATS.Client.Core" Version="2.6.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\ZiggyCreatures.FusionCache\ZiggyCreatures.FusionCache.csproj" />
</ItemGroup>
</Project>
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
13 changes: 13 additions & 0 deletions src/ZiggyCreatures.FusionCache.Backplane.NATS/docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# FusionCache

![FusionCache logo](https://raw.githubusercontent.com/ZiggyCreatures/FusionCache/main/docs/logo-256x256.png)

### FusionCache is an easy to use, fast and robust hybrid cache with advanced resiliency features.

It was born after years of dealing with all sorts of different types of caches: memory caching, distributed caching, http caching, CDNs, browser cache, offline cache, you name it. So I've tried to put together these experiences and came up with FusionCache.

Find out [more](https://github.com/ZiggyCreatures/FusionCache).

## 📦 This package

This package is a backplane implementation on [NATS](https://nats.io/) based on the awesome [StackExchange.Redis](https://github.com/StackExchange/StackExchange.Redis) library.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using Microsoft.Extensions.Options;
using StackExchange.Redis;

using ZiggyCreatures.Caching.Fusion.Internals;

namespace ZiggyCreatures.Caching.Fusion.Backplane.StackExchangeRedis;

/// <summary>
Expand Down Expand Up @@ -93,26 +95,33 @@ private void Disconnect()
_connection = null;
}

private static BackplaneMessage? GetMessageFromRedisValue(RedisValue value, ILogger? logger, BackplaneSubscriptionOptions? subscriptionOptions)
private static bool TryGetMessageFromRedisValue(RedisValue value, ILogger? logger, BackplaneSubscriptionOptions? subscriptionOptions, out BackplaneMessage backplaneMessage)
{
try
{
return BackplaneMessage.FromByteArray(value);
byte[]? byteValue = value;
if (byteValue is not null && BackplaneMessage.TryParse(byteValue, out backplaneMessage))
{
return true;
}
}
catch (Exception exc)
{
if (logger?.IsEnabled(LogLevel.Warning) ?? false)
logger.Log(LogLevel.Warning, exc, "FUSION [N={CacheName} I={CacheInstanceId}]: [BP] an error occurred while converting a RedisValue into a BackplaneMessage", subscriptionOptions?.CacheName, subscriptionOptions?.CacheInstanceId);
}

return null;
backplaneMessage = default;
return false;
}

private static RedisValue GetRedisValueFromMessage(BackplaneMessage message, ILogger? logger, BackplaneSubscriptionOptions? subscriptionOptions)
{
try
{
return BackplaneMessage.ToByteArray(message);
using var arrayPoolBufferWriter = new ArrayPoolBufferWriter();
message.WriteTo(arrayPoolBufferWriter);
return arrayPoolBufferWriter.ToArray();
}
catch (Exception exc)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ public async ValueTask SubscribeAsync(BackplaneSubscriptionOptions subscriptionO

await _subscriber.SubscribeAsync(_channel, (rc, value) =>
{
var message = GetMessageFromRedisValue(value, _logger, _subscriptionOptions);
if (message is null)
return;

_ = Task.Run(async () =>
if (TryGetMessageFromRedisValue(value, _logger, _subscriptionOptions, out var message))
{
await OnMessageAsync(message).ConfigureAwait(false);
});
_ = Task.Run(async () =>
{
await OnMessageAsync(message).ConfigureAwait(false);
});
}

return;
}).ConfigureAwait(false);
}

Expand Down
Loading
Loading