Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/rust_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ jobs:
- run: sudo apt-get install -y musl musl-dev musl-tools cmake
if: matrix.target == 'x86_64-unknown-linux-musl'
# # Caching stuff
- uses: actions/cache@v2
- uses: actions/cache@v4
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
key: ${{ runner.os }}-cargo-deps-${{ hashFiles('**/Cargo.toml') }}
- uses: actions/cache@v2
- uses: actions/cache@v4
with:
path: |
target/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ http-0-2 = { version = "0.2", optional = true, package = "http"}
axum-lib = { version = "^0.8", optional = true, package="axum"}
http-body-util = {version = "^0.1", optional = true}
poem-lib = { version = "^3.1", optional = true, package = "poem" }
nats-lib = { version = "0.25.0", optional = true, package = "nats" }
nats-lib = { version = "0.42.0", optional = true, package = "async-nats" }

[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.4"
Expand Down
4 changes: 3 additions & 1 deletion example-projects/nats-example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ edition = "2021"
[dependencies]
cloudevents-sdk = { path = "../..", features = ["nats"] }
serde_json = "^1.0"
nats = "^0.25"
async-nats = "^0.42.0"
tokio = { version = "^1.0", features = ["full"] }
futures = "^0.3"
27 changes: 15 additions & 12 deletions example-projects/nats-example/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::{error::Error, thread};
use std::error::Error;

use cloudevents::binding::nats::{MessageExt, NatsCloudEvent};
use cloudevents::{Event, EventBuilder, EventBuilderV10};
use serde_json::json;
use futures::StreamExt;

/// First spin up a nats server i.e.
/// ```bash
/// docker run -p 4222:4222 -ti nats:latest
/// ```
fn main() -> Result<(), Box<dyn Error>> {
let nc = nats::connect("localhost:4222").unwrap();
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let client = async_nats::connect("localhost:4222").await?;

let event = EventBuilderV10::new()
.id("123".to_string())
Expand All @@ -21,26 +23,27 @@ fn main() -> Result<(), Box<dyn Error>> {

let n_msg = NatsCloudEvent::from_event(event).unwrap();

let sub = nc.subscribe("test").unwrap();
let mut sub = client.subscribe("test").await?;

let t = thread::spawn(move || -> Result<Event, String> {
match sub.next() {
Some(msg) => match msg.to_event() {
let receive_task = tokio::spawn(async move {
if let Some(msg) = sub.next().await {
match msg.to_event() {
Ok(evt) => Ok(evt),
Err(e) => Err(e.to_string()),
},
None => Err("Unsubed or disconnected".to_string()),
}
} else {
Err("No event received".to_string())
}
});

nc.publish("test", n_msg)?;
client.publish("test", n_msg.payload.into()).await?;

let maybe_event = t.join().unwrap();
let maybe_event = receive_task.await?;

if let Ok(evt) = maybe_event {
println!("{}", evt.to_string());
} else {
println!("{}", maybe_event.unwrap_err().to_string());
println!("{}", maybe_event.unwrap_err());
}

Ok(())
Expand Down
32 changes: 19 additions & 13 deletions src/binding/nats/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ impl StructuredDeserializer for nats::Message {
self,
serializer: V,
) -> crate::message::Result<R> {
serializer.set_structured_event(self.data.to_vec())
serializer.set_structured_event(self.payload.to_vec())
}
}

Expand Down Expand Up @@ -47,12 +47,15 @@ mod tests {
fn test_structured_deserialize_v10() {
let expected = fixtures::v10::full_json_data_string_extension();

let nats_message = nats::Message::new(
"not_relevant",
None,
json!(expected).to_string().as_bytes(),
None,
);
let nats_message = nats::Message {
subject: "not_relevant".into(),
payload: json!(expected).to_string().into_bytes().into(),
reply: None,
headers: None,
status: None,
description: None,
length: 0,
};

let actual = nats_message.to_event().unwrap();

Expand All @@ -63,12 +66,15 @@ mod tests {
fn test_structured_deserialize_v03() {
let expected = fixtures::v03::full_json_data();

let nats_message = nats::Message::new(
"not_relevant",
None,
json!(expected).to_string().as_bytes(),
None,
);
let nats_message = nats::Message {
subject: "not_relevant".into(),
payload: json!(expected).to_string().into_bytes().into(),
reply: None,
headers: None,
status: None,
description: None,
length: 0,
};

let actual = nats_message.to_event().unwrap();

Expand Down
30 changes: 18 additions & 12 deletions src/binding/nats/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
//! This module provides bindings between [cloudevents-sdk](https://docs.rs/cloudevents-sdk) and [nats](https://docs.rs/nats)
//! ## Examples
//! Deserialize [nats::Message](https://docs.rs/nats/0.21.0/nats/struct.Message.html) into [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html)
//! ```
//! ```no_run
//! use nats_lib as nats;
//! use cloudevents::binding::nats::MessageExt;
//! use futures::StreamExt;
//!
//! fn consume() {
//! let nc = nats::connect("localhost:4222").unwrap();
//! let sub = nc.subscribe("test").unwrap();
//! let nats_message = sub.next().unwrap();
//! let cloud_event = nats_message.to_event().unwrap();
//!
//! println!("{}", cloud_event.to_string());
//! #[tokio::main]
//! async fn main() {
//! let nc = nats::connect("localhost:4222").await.unwrap();
//! let mut sub = nc.subscribe("test").await.unwrap();
//!
//! // Process messages one at a time
//! sub.for_each_concurrent(1, |nats_message| async move {
//! let cloud_event = nats_message.to_event().unwrap();
//! println!("{}", cloud_event.to_string());
//! }).await;
//! }
//! ```
//!
//! Serialize [Event](https://docs.rs/cloudevents-sdk/latest/cloudevents/event/struct.Event.html) into [NatsCloudEvent] and publish to nats subject
//! ```
//! ```no_run
//! use nats_lib as nats;
//! use cloudevents::binding::nats::NatsCloudEvent;
//! use cloudevents::{EventBuilder, EventBuilderV10, Event};
//! use serde_json::json;
//!
//! fn publish() {
//! let nc = nats::connect("localhost:4222").unwrap();
//! #[tokio::main]
//! async fn main() {
//! let nc = nats::connect("localhost:4222").await.unwrap();
//!
//! let event = EventBuilderV10::new()
//! .id("123".to_string())
Expand All @@ -33,7 +38,8 @@
//! .build()
//! .unwrap();
//!
//! nc.publish("whatever.subject.you.like", NatsCloudEvent::from_event(event).unwrap()).unwrap();
//! let nats_payload = NatsCloudEvent::from_event(event).unwrap();
//! nc.publish("whatever.subject.you.like", nats_payload.payload.into()).await.unwrap();
//! }
//! ```
mod deserializer;
Expand Down
Loading