Skip to content

Commit e4db945

Browse files
committed
feat: basic rivetkit impl
1 parent 9adad6e commit e4db945

52 files changed

Lines changed: 944 additions & 509 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/artifacts/openapi.json

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-peer/src/actors/create.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,39 @@ pub async fn create(
3838

3939
Ok(CreateResponse { actor: res.actor })
4040
}
41+
42+
#[tracing::instrument(skip_all)]
43+
pub async fn create2(
44+
ctx: ApiCtx,
45+
_path: (),
46+
query: CreateQuery,
47+
body: CreateRequest,
48+
) -> Result<CreateResponse> {
49+
let namespace = ctx
50+
.op(namespace::ops::resolve_for_name_global::Input {
51+
name: query.namespace.clone(),
52+
})
53+
.await?
54+
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
55+
56+
let actor_id = Id::new_v1(ctx.config().dc_label());
57+
58+
let res = ctx
59+
.op(pegboard::ops::actor::create::Input2 {
60+
actor_id,
61+
namespace_id: namespace.namespace_id,
62+
name: body.name.clone(),
63+
key: body.key,
64+
pool_name: body.runner_name_selector,
65+
input: body.input.clone(),
66+
crash_policy: body.crash_policy,
67+
// NOTE: This can forward if the user attempts to create an actor with a target dc and this dc
68+
// ends up forwarding to another.
69+
forward_request: true,
70+
// api-peer is always creating in its own datacenter
71+
datacenter_name: None,
72+
})
73+
.await?;
74+
75+
Ok(CreateResponse { actor: res.actor })
76+
}

engine/packages/api-peer/src/actors/get_or_create.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,97 @@ pub async fn get_or_create(
9797
}
9898
}
9999

100+
#[tracing::instrument(skip_all)]
101+
pub async fn get_or_create2(
102+
ctx: ApiCtx,
103+
_path: (),
104+
query: GetOrCreateQuery,
105+
body: GetOrCreateRequest,
106+
) -> Result<GetOrCreateResponse> {
107+
let namespace = ctx
108+
.op(namespace::ops::resolve_for_name_global::Input {
109+
name: query.namespace.clone(),
110+
})
111+
.await?
112+
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
113+
114+
// Check if actor already exists for the key
115+
let existing = ctx
116+
.op(pegboard::ops::actor::get_for_key::Input {
117+
namespace_id: namespace.namespace_id,
118+
name: body.name.clone(),
119+
key: body.key.clone(),
120+
fetch_error: true,
121+
})
122+
.await?;
123+
124+
if let Some(actor) = existing.actor {
125+
// Actor exists, return it
126+
return Ok(GetOrCreateResponse {
127+
actor,
128+
created: false,
129+
});
130+
}
131+
132+
// Actor doesn't exist, create it
133+
let actor_id = Id::new_v1(ctx.config().dc_label());
134+
135+
match ctx
136+
.op(pegboard::ops::actor::create::Input2 {
137+
actor_id,
138+
namespace_id: namespace.namespace_id,
139+
name: body.name.clone(),
140+
key: Some(body.key.clone()),
141+
pool_name: body.runner_name_selector,
142+
input: body.input.clone(),
143+
crash_policy: body.crash_policy,
144+
// NOTE: This can forward if the user attempts to create an actor with a target dc and this dc
145+
// ends up forwarding to another.
146+
forward_request: true,
147+
// api-peer is always creating in its own datacenter
148+
datacenter_name: None,
149+
})
150+
.await
151+
{
152+
Ok(res) => Ok(GetOrCreateResponse {
153+
actor: res.actor,
154+
created: true,
155+
}),
156+
Err(err) => {
157+
// Check if this is a DuplicateKey error and extract the existing actor ID
158+
if let Some(existing_actor_id) = extract_duplicate_key_error(&err) {
159+
tracing::info!(
160+
?existing_actor_id,
161+
"received duplicate key error, fetching existing actor"
162+
);
163+
164+
// Fetch the existing actor - it should be in this datacenter since
165+
// the duplicate key error came from this datacenter
166+
let res = ctx
167+
.op(pegboard::ops::actor::get::Input {
168+
actor_ids: vec![existing_actor_id],
169+
fetch_error: true,
170+
})
171+
.await?;
172+
173+
let actor = res
174+
.actors
175+
.into_iter()
176+
.next()
177+
.ok_or_else(|| pegboard::errors::Actor::NotFound.build())?;
178+
179+
return Ok(GetOrCreateResponse {
180+
actor,
181+
created: false,
182+
});
183+
}
184+
185+
// Re-throw the original error if it's not a DuplicateKey
186+
Err(err)
187+
}
188+
}
189+
}
190+
100191
/// Helper function to extract the existing actor ID from a duplicate key error
101192
///
102193
/// Returns Some(actor_id) if the error is a duplicate key error with metadata, None otherwise

engine/packages/api-peer/src/router.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ pub async fn router(
2323
// MARK: Actors
2424
.route("/actors", get(actors::list::list))
2525
.route("/actors", post(actors::create::create))
26+
.route("/actors2", post(actors::create::create2))
2627
.route("/actors", put(actors::get_or_create::get_or_create))
28+
.route("/actors2", put(actors::get_or_create::get_or_create2))
2729
.route("/actors/{actor_id}", delete(actors::delete::delete))
2830
.route("/actors/names", get(actors::list_names::list_names))
2931
.route(

engine/packages/api-public/src/actors/create.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,67 @@ async fn create_inner(
8888
.await
8989
}
9090
}
91+
92+
#[utoipa::path(
93+
post,
94+
operation_id = "actors2_create",
95+
path = "/actors2",
96+
params(CreateQuery),
97+
request_body(content = CreateRequest, content_type = "application/json"),
98+
responses(
99+
(status = 200, body = CreateResponse),
100+
),
101+
)]
102+
pub async fn create2(
103+
Extension(ctx): Extension<ApiCtx>,
104+
Query(query): Query<CreateQuery>,
105+
Json(body): Json<CreateRequest>,
106+
) -> Response {
107+
match create2_inner(ctx, query, body).await {
108+
Ok(response) => Json(response).into_response(),
109+
Err(err) => ApiError::from(err).into_response(),
110+
}
111+
}
112+
113+
#[tracing::instrument(skip_all)]
114+
async fn create2_inner(
115+
ctx: ApiCtx,
116+
query: CreateQuery,
117+
body: CreateRequest,
118+
) -> Result<CreateResponse> {
119+
ctx.skip_auth();
120+
121+
let namespace = ctx
122+
.op(namespace::ops::resolve_for_name_global::Input {
123+
name: query.namespace.clone(),
124+
})
125+
.await?
126+
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
127+
128+
let target_dc_label = super::utils::find_dc_for_actor_creation(
129+
&ctx,
130+
namespace.namespace_id,
131+
&query.namespace,
132+
&body.runner_name_selector,
133+
body.datacenter.as_ref().map(String::as_str),
134+
)
135+
.await?;
136+
137+
let query = rivet_api_types::actors::create::CreateQuery {
138+
namespace: query.namespace,
139+
};
140+
141+
if target_dc_label == ctx.config().dc_label() {
142+
rivet_api_peer::actors::create::create2(ctx.into(), (), query, body).await
143+
} else {
144+
request_remote_datacenter::<CreateResponse>(
145+
ctx.config(),
146+
target_dc_label,
147+
"/actors2",
148+
axum::http::Method::POST,
149+
Some(&query),
150+
Some(&body),
151+
)
152+
.await
153+
}
154+
}

engine/packages/api-public/src/actors/get_or_create.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,67 @@ async fn get_or_create_inner(
9898
.await
9999
}
100100
}
101+
102+
#[utoipa::path(
103+
put,
104+
operation_id = "actors2_get_or_create",
105+
path = "/actors2",
106+
params(GetOrCreateQuery),
107+
request_body(content = GetOrCreateRequest, content_type = "application/json"),
108+
responses(
109+
(status = 200, body = GetOrCreateResponse),
110+
),
111+
)]
112+
pub async fn get_or_create2(
113+
Extension(ctx): Extension<ApiCtx>,
114+
Query(query): Query<GetOrCreateQuery>,
115+
Json(body): Json<GetOrCreateRequest>,
116+
) -> Response {
117+
match get_or_create_inner2(ctx, query, body).await {
118+
Ok(response) => Json(response).into_response(),
119+
Err(err) => ApiError::from(err).into_response(),
120+
}
121+
}
122+
123+
#[tracing::instrument(skip_all)]
124+
async fn get_or_create_inner2(
125+
ctx: ApiCtx,
126+
query: GetOrCreateQuery,
127+
body: GetOrCreateRequest,
128+
) -> Result<GetOrCreateResponse> {
129+
ctx.skip_auth();
130+
131+
let namespace = ctx
132+
.op(namespace::ops::resolve_for_name_global::Input {
133+
name: query.namespace.clone(),
134+
})
135+
.await?
136+
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
137+
138+
let target_dc_label = super::utils::find_dc_for_actor_creation(
139+
&ctx,
140+
namespace.namespace_id,
141+
&query.namespace,
142+
&body.runner_name_selector,
143+
body.datacenter.as_ref().map(String::as_str),
144+
)
145+
.await?;
146+
147+
let query = GetOrCreateQuery {
148+
namespace: query.namespace,
149+
};
150+
151+
if target_dc_label == ctx.config().dc_label() {
152+
rivet_api_peer::actors::get_or_create::get_or_create2(ctx.into(), (), query, body).await
153+
} else {
154+
request_remote_datacenter::<GetOrCreateResponse>(
155+
ctx.config(),
156+
target_dc_label,
157+
"/actors2",
158+
axum::http::Method::PUT,
159+
Some(&query),
160+
Some(&body),
161+
)
162+
.await
163+
}
164+
}

0 commit comments

Comments
 (0)