Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/artifacts/config-schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions engine/packages/api-peer/src/actors/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
}),
)?;

let namespace = namespace_res.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

let actor = actors_res
.actors
.into_iter()
Expand All @@ -45,13 +47,12 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
return Err(pegboard::errors::Actor::NotFound.build());
}

let namespace = namespace_res.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Verify the actor belongs to the specified namespace
if actor.namespace_id != namespace.namespace_id {
return Err(pegboard::errors::Actor::NotFound.build());
}

// TODO: Actor v2
let res = ctx
.signal(pegboard::workflows::actor::Destroy {})
.to_workflow::<pegboard::workflows::actor::Workflow>()
Expand Down
6 changes: 3 additions & 3 deletions engine/packages/config/src/config/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub struct Pegboard {
/// Max time since last seen before actor is considered stale, in milliseconds.
pub envoy_event_demuxer_max_last_seen_threshold: Option<u64>,
/// Max response payload size in bytes from actors.
pub envoy_max_response_payload_body_size: Option<usize>,
pub envoy_max_response_payload_size: Option<usize>,
/// Ping interval for envoy updates in milliseconds.
pub envoy_update_ping_interval: Option<u64>,
/// How long after last ping before considering a envoy ineligible for allocation.
Expand Down Expand Up @@ -282,8 +282,8 @@ impl Pegboard {
.unwrap_or(30_000)
}

pub fn envoy_max_response_payload_body_size(&self) -> usize {
self.envoy_max_response_payload_body_size
pub fn envoy_max_response_payload_size(&self) -> usize {
self.envoy_max_response_payload_size
.unwrap_or(20 * 1024 * 1024) // 20 MiB
}

Expand Down
2 changes: 2 additions & 0 deletions engine/packages/gasoline/src/builder/common/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ where
} else {
tracing::debug!(?tags, "unique workflow already exists");
}
} else {
tracing::debug!(?actual_workflow_id, "dispatched workflow");
}

if workflow_id == actual_workflow_id {
Expand Down
6 changes: 5 additions & 1 deletion engine/packages/guard/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ impl SharedState {
}

pub async fn start(&self) -> Result<()> {
self.pegboard_gateway.start().await?;
tokio::try_join!(
self.pegboard_gateway.start(),
self.pegboard_gateway2.start(),
)?;

Ok(())
}
}
Expand Down
1 change: 1 addition & 0 deletions engine/packages/pegboard-envoy/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ pub async fn handle_init(
serverless_drain_grace_period: conn
.is_serverless
.then(|| pb.serverless_drain_grace_period() as i64),
max_response_payload_size: pb.envoy_max_response_payload_size() as u64,
},
}));
let init_msg_serialized = init_msg.serialize(conn.protocol_version)?;
Expand Down
7 changes: 1 addition & 6 deletions engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,7 @@ async fn handle_tunnel_message(
let inner_data_len = tunnel_message_inner_data_len(&msg.message_kind);

// Enforce incoming payload size
if inner_data_len
> ctx
.config()
.pegboard()
.envoy_max_response_payload_body_size()
{
if inner_data_len > ctx.config().pegboard().envoy_max_response_payload_size() {
return Err(errors::WsError::InvalidPacket("payload too large".to_string()).build());
}

Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway2/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl SharedState {
while let Ok(NextOutput::Message(msg)) = sub.next().await {
tracing::trace!(
payload_len = msg.payload.len(),
"received message from pubsub"
"received message from envoy"
);

match versioned::ToGateway::deserialize_with_embedded_version(&msg.payload) {
Expand Down
23 changes: 17 additions & 6 deletions engine/packages/pegboard/src/workflows/actor2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,17 @@ struct CheckEnvoyLivenessInput {
envoy_key: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct CheckEnvoyLivenessOutput {
expired: bool,
now: i64,
}

#[activity(CheckEnvoyLiveness)]
async fn check_envoy_liveness(ctx: &ActivityCtx, input: &CheckEnvoyLivenessInput) -> Result<bool> {
async fn check_envoy_liveness(
ctx: &ActivityCtx,
input: &CheckEnvoyLivenessInput,
) -> Result<CheckEnvoyLivenessOutput> {
let state = ctx.state::<State>()?;
let envoy_lost_threshold = ctx.config().pegboard().envoy_lost_threshold();

Expand All @@ -378,7 +387,7 @@ async fn check_envoy_liveness(ctx: &ActivityCtx, input: &CheckEnvoyLivenessInput
let now = util::timestamp::now();
let expired = last_ping_ts < now - envoy_lost_threshold;

Ok(expired)
Ok(CheckEnvoyLivenessOutput { expired, now })
})
.custom_instrument(tracing::info_span!("actor_check_envoy_liveness_tx"))
.await
Expand All @@ -392,7 +401,7 @@ async fn listen_for_signals(
metrics_workflow_id: Id,
) -> Result<Vec<Main>> {
// Listen for signals based on transition
let signals = match &state.transition {
let signals = match &mut state.transition {
Transition::Allocating {
lost_timeout_ts, ..
}
Expand Down Expand Up @@ -432,20 +441,22 @@ async fn listen_for_signals(
// Listen for signals with periodic liveness check timeout
let signals = ctx
.listen_n_until::<Main>(
last_liveness_check_ts + ctx.config().pegboard().envoy_lost_threshold(),
*last_liveness_check_ts + ctx.config().pegboard().envoy_lost_threshold(),
256,
)
.await?;

// Perform liveness check
if signals.is_empty() {
let expired = ctx
let res = ctx
.activity(CheckEnvoyLivenessInput {
envoy_key: envoy.envoy_key.clone(),
})
.await?;

if expired {
*last_liveness_check_ts = res.now;

if res.expired {
vec![Main::Lost(Lost {
generation: state.generation,
reason: LostReason::EnvoyConnectionLost,
Expand Down
1 change: 1 addition & 0 deletions engine/sdks/schemas/envoy-protocol/v1.bare
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ type ProtocolMetadata struct {
envoyLostThreshold: i64
actorStopThreshold: i64
serverlessDrainGracePeriod: optional<i64>
maxResponsePayloadSize: u64
}

type ToEnvoyInit struct {
Expand Down
4 changes: 2 additions & 2 deletions engine/sdks/typescript/envoy-client/package.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions engine/sdks/typescript/envoy-client/src/context.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions engine/sdks/typescript/envoy-client/src/handle.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/sdks/typescript/envoy-client/src/index.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading