Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions yarn-project/end-to-end/src/spartan/n_tps.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ const mempoolTxMinedDelayQuery = (perc: string) =>
const mempoolAttestationMinedDelayQuery = (perc: string) =>
`histogram_quantile(${perc}, sum(rate(aztec_mempool_attestations_mined_delay_milliseconds_bucket{k8s_namespace_name="${config.NAMESPACE}"}[1m])) by (le))`;

const peerCountQuery = () => `avg(aztec_peer_manager_peer_count{k8s_namespace_name="${config.NAMESPACE}"})`;
const peerCountQuery = () => `avg(aztec_peer_manager_peer_count_peers{k8s_namespace_name="${config.NAMESPACE}"})`;

const peerConnectionDurationQuery = (perc: string) =>
`histogram_quantile(${perc}, sum(rate(aztec_peer_manager_peer_connection_duration_milliseconds_bucket{k8s_namespace_name="${config.NAMESPACE}"}[1m])) by (le))`;
const peerConnectionDurationQuery = (perc: string, windowSeconds: number) =>
`histogram_quantile(${perc}, sum(rate(aztec_peer_manager_peer_connection_duration_milliseconds_bucket{k8s_namespace_name="${config.NAMESPACE}"}[${windowSeconds}s])) by (le))`;

describe('sustained N TPS test', () => {
jest.setTimeout(60 * 60 * 1000 * 10); // 10 hours
Expand Down Expand Up @@ -168,8 +168,8 @@ describe('sustained N TPS test', () => {
try {
const [avgCount, durationP50, durationP95] = await Promise.all([
prometheusClient.querySingleValue(peerCountQuery()),
prometheusClient.querySingleValue(peerConnectionDurationQuery('0.50')),
prometheusClient.querySingleValue(peerConnectionDurationQuery('0.95')),
prometheusClient.querySingleValue(peerConnectionDurationQuery('0.50', TEST_DURATION_SECONDS + 60)),
prometheusClient.querySingleValue(peerConnectionDurationQuery('0.95', TEST_DURATION_SECONDS + 60)),
]);
metrics.recordPeerStats(avgCount, durationP50, durationP95);
logger.debug('Scraped peer stats', { avgCount, durationP50, durationP95 });
Expand Down Expand Up @@ -384,7 +384,7 @@ describe('sustained N TPS test', () => {
const tx = await (config.REAL_VERIFIER ? submitProven(wallet, fee) : submitUnproven(wallet, fee));
const t1 = performance.now();

metrics.recordSentTx(tx, `high_value_${highValueTps}tps`);
metrics.recordSentTx(tx, 'tx_inclusion_time');

const txHash = await tx.send({ wait: NO_WAIT });
const t2 = performance.now();
Expand Down Expand Up @@ -461,8 +461,8 @@ describe('sustained N TPS test', () => {
logger.warn(`Failed transaction ${idx + 1}: ${result.error}`);
});

const highValueGroup = `high_value_${highValueTps}tps`;
const inclusionStats = metrics.inclusionTimeInSeconds(highValueGroup);
const txInclusionGroup = 'tx_inclusion_time';
const inclusionStats = metrics.inclusionTimeInSeconds(txInclusionGroup);
logger.info(`Transaction inclusion summary: ${successCount} succeeded, ${failureCount} failed`);
logger.info('Inclusion time stats', inclusionStats);
});
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/end-to-end/src/spartan/tx_metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ export class TxInclusionMetrics {
value: stats.mean,
},
{
name: `${group}/median_inclusion`,
name: `${group}/p50_inclusion`,
unit: 's',
value: stats.median,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,10 @@ export class AttestationPool {
}

const address = sender.toString();
const ownKey = this.getAttestationKey(slotNumber, proposalId, address);

await this.checkpointAttestations.set(
this.getAttestationKey(slotNumber, proposalId, address),
attestation.toBuffer(),
);
await this.checkpointAttestations.set(ownKey, attestation.toBuffer());
this.metrics.trackMempoolItemAdded(ownKey);

this.log.debug(`Added own checkpoint attestation for slot ${slotNumber} from ${address}`, {
signature: attestation.signature.toString(),
Expand Down Expand Up @@ -429,6 +428,7 @@ export class AttestationPool {
const attestationEndKey = new Fr(oldestSlot).toString();
for await (const key of this.checkpointAttestations.keysAsync({ end: attestationEndKey })) {
await this.checkpointAttestations.delete(key);
this.metrics.trackMempoolItemRemoved(key);
numberOfAttestations++;
}

Expand Down Expand Up @@ -526,6 +526,7 @@ export class AttestationPool {

// Add the attestation
await this.checkpointAttestations.set(key, attestation.toBuffer());
this.metrics.trackMempoolItemAdded(key);

// Track this attestation in the per-signer-per-slot index for duplicate detection
const slotSignerKey = this.getSlotSignerKey(slotNumber, signerAddress);
Expand Down
30 changes: 17 additions & 13 deletions yarn-project/p2p/src/mem_pools/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class PoolInstrumentation<PoolObject extends Gossipable> {
private defaultAttributes;
private meter: Meter;

private txAddedTimestamp: Map<bigint, number> = new Map<bigint, number>();
private mempoolItemAddedTimestamp: Map<bigint | string, number> = new Map<bigint | string, number>();

constructor(
telemetry: TelemetryClient,
Expand Down Expand Up @@ -114,22 +114,26 @@ export class PoolInstrumentation<PoolObject extends Gossipable> {
}

public transactionsAdded(transactions: Tx[]) {
const timestamp = Date.now();
for (const transaction of transactions) {
this.txAddedTimestamp.set(transaction.txHash.toBigInt(), timestamp);
}
transactions.forEach(tx => this.trackMempoolItemAdded(tx.txHash.toBigInt()));
}

public transactionsRemoved(hashes: Iterable<bigint> | Iterable<string>) {
const timestamp = Date.now();
for (const hash of hashes) {
const key = BigInt(hash);
const addedAt = this.txAddedTimestamp.get(key);
if (addedAt !== undefined) {
this.txAddedTimestamp.delete(key);
if (addedAt < timestamp) {
this.minedDelay.record(timestamp - addedAt);
}
this.trackMempoolItemRemoved(BigInt(hash));
}
}

public trackMempoolItemAdded(key: bigint | string): void {
this.mempoolItemAddedTimestamp.set(key, Date.now());
}

public trackMempoolItemRemoved(key: bigint | string): void {
const timestamp = Date.now();
const addedAt = this.mempoolItemAddedTimestamp.get(key);
if (addedAt !== undefined) {
this.mempoolItemAddedTimestamp.delete(key);
if (addedAt < timestamp) {
this.minedDelay.record(timestamp - addedAt);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ export class AztecKVTxPoolV2 extends (EventEmitter as new () => TypedEventEmitte
const hashes = txHashes.map(h => (typeof h === 'string' ? TxHash.fromString(h) : TxHash.fromBigInt(h)));
this.emit('txs-removed', { txHashes: hashes });
},
onTxsMined: (txHashes: string[]) => {
this.#metrics?.transactionsRemoved(txHashes);
},
};

// Create the implementation
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import { TxPoolIndices } from './tx_pool_indices.js';
export interface TxPoolV2Callbacks {
onTxsAdded: (txs: Tx[], opts: { source?: string }) => void;
onTxsRemoved: (txHashes: string[] | bigint[]) => void;
onTxsMined: (txHashes: string[]) => void;
}

/**
Expand Down Expand Up @@ -498,6 +499,10 @@ export class TxPoolV2Impl {
await this.#evictionManager.evictAfterNewBlock(block.header, nullifiers, feePayers);
});

if (found.length > 0) {
this.#callbacks.onTxsMined(found.map(m => m.txHash));
}

this.#log.info(`Marked ${found.length} txs as mined in block ${blockId.number}`);
}

Expand Down
7 changes: 7 additions & 0 deletions yarn-project/p2p/src/services/peer-manager/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export class PeerManagerMetrics {
private sentGoodbyes: UpDownCounter;
private receivedGoodbyes: UpDownCounter;
private peerCount: Gauge;
private healthyPeerCount: Gauge;
private lowScoreDisconnects: UpDownCounter;
private peerConnectionDuration: Histogram;

Expand Down Expand Up @@ -49,6 +50,7 @@ export class PeerManagerMetrics {
goodbyeReasonAttrs,
);
this.peerCount = meter.createGauge(Metrics.PEER_MANAGER_PEER_COUNT);
this.healthyPeerCount = meter.createGauge(Metrics.PEER_MANAGER_HEALTHY_PEER_COUNT);
this.lowScoreDisconnects = createUpDownCounterWithDefault(meter, Metrics.PEER_MANAGER_LOW_SCORE_DISCONNECTS, {
[Attributes.P2P_PEER_SCORE_STATE]: ['Banned', 'Disconnect'],
});
Expand All @@ -67,6 +69,10 @@ export class PeerManagerMetrics {
this.peerCount.record(count);
}

public recordHealthyPeerCount(count: number) {
this.healthyPeerCount.record(count);
}

public recordLowScoreDisconnect(scoreState: 'Banned' | 'Disconnect') {
this.lowScoreDisconnects.add(1, { [Attributes.P2P_PEER_SCORE_STATE]: scoreState });
}
Expand All @@ -79,6 +85,7 @@ export class PeerManagerMetrics {
const connectedAt = this.peerConnectedAt.get(id.toString());
if (connectedAt) {
this.peerConnectionDuration.record(Date.now() - connectedAt);
this.peerConnectedAt.delete(id.toString());
}
}
}
3 changes: 2 additions & 1 deletion yarn-project/p2p/src/services/peer-manager/peer_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,8 @@ export class PeerManager implements PeerManagerInterface {
...this.peerScoring.getStats(),
});

this.metrics.recordPeerCount(healthyConnections.length);
this.metrics.recordPeerCount(connections.length);
this.metrics.recordHealthyPeerCount(healthyConnections.length);

// Exit if no peers to connect
if (peersToConnect <= 0) {
Expand Down
6 changes: 6 additions & 0 deletions yarn-project/telemetry-client/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,12 @@ export const PEER_MANAGER_PEER_COUNT: MetricDefinition = {
unit: 'peers',
valueType: ValueType.INT,
};
export const PEER_MANAGER_HEALTHY_PEER_COUNT: MetricDefinition = {
name: 'aztec.peer_manager.healthy_peer_count',
description: 'Number of healthy (non-protected, non-banned) peers',
unit: 'peers',
valueType: ValueType.INT,
};
export const PEER_MANAGER_LOW_SCORE_DISCONNECTS: MetricDefinition = {
name: 'aztec.peer_manager.low_score_disconnects',
description: 'Number of peers disconnected due to low score',
Expand Down
Loading