Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 180 additions & 1 deletion src/io/vss_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use bitcoin::Network;
use lightning::impl_writeable_tlv_based_enum;
use lightning::io::{self, Error, ErrorKind};
use lightning::sign::{EntropySource as LdkEntropySource, RandomBytes};
use lightning::util::persist::{KVStore, KVStoreSync};
use lightning::util::persist::{
KVStore, KVStoreSync, PageToken, PaginatedKVStore, PaginatedKVStoreSync, PaginatedListResponse,
};
use lightning::util::ser::{Readable, Writeable};
use prost::Message;
use vss_client::client::VssClient;
Expand Down Expand Up @@ -377,6 +379,52 @@ impl KVStore for VssStore {
}
}

impl PaginatedKVStoreSync for VssStore {
fn list_paginated(
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
) -> io::Result<PaginatedListResponse> {
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
debug_assert!(false, "Failed to access internal runtime");
let msg = format!("Failed to access internal runtime");
Error::new(ErrorKind::Other, msg)
})?;
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let inner = Arc::clone(&self.inner);
let fut = async move {
inner
.list_paginated_internal(
&inner.blocking_client,
primary_namespace,
secondary_namespace,
page_token,
)
.await
};
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
}
}

impl PaginatedKVStore for VssStore {
fn list_paginated(
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
) -> impl Future<Output = Result<PaginatedListResponse, io::Error>> + 'static + Send {
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let inner = Arc::clone(&self.inner);
async move {
inner
.list_paginated_internal(
&inner.async_client,
primary_namespace,
secondary_namespace,
page_token,
)
.await
}
}
}

impl Drop for VssStore {
fn drop(&mut self) {
let internal_runtime = self.internal_runtime.take();
Expand Down Expand Up @@ -638,6 +686,49 @@ impl VssStoreInner {
Ok(keys)
}

async fn list_paginated_internal(
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
secondary_namespace: String, page_token: Option<PageToken>,
) -> io::Result<PaginatedListResponse> {
check_namespace_key_validity(
&primary_namespace,
&secondary_namespace,
None,
"list_paginated",
)?;

const PAGE_SIZE: i32 = 50;

let key_prefix = self.build_obfuscated_prefix(&primary_namespace, &secondary_namespace);
let vss_page_token = page_token.map(|t| t.as_str().to_string());

let request = ListKeyVersionsRequest {
store_id: self.store_id.clone(),
key_prefix: Some(key_prefix),
page_token: vss_page_token,
page_size: Some(PAGE_SIZE),
};

let response = client.list_key_versions(&request).await.map_err(|e| {
let msg = format!(
"Failed to list keys in {}/{}: {}",
primary_namespace, secondary_namespace, e
);
Error::new(ErrorKind::Other, msg)
})?;

let mut keys = Vec::with_capacity(response.key_versions.len());
for kv in response.key_versions {
keys.push(self.extract_key(&kv.key)?);
}

// VSS uses empty string to signal the last page
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the docs on the PaginatedKVStore to let people know they should stop asking for pages once they get a None for the page token ?

Maybe this is implied in the current API docs over there. I want to make sure people don't wait for the "empty list" condition to stop asking for more pages, as VSS API does not quite fit this. They should look at their page_token they got back and stop asking for pages once it is none.

let next_page_token =
response.next_page_token.filter(|t| !t.is_empty()).map(PageToken::new);
Comment on lines +725 to +727
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds to me like we should line-up the VSS API and the PaginatedKVStore API better, I raised a similar point in the VSS PR.

Right now a VSS could return a non-empty keys list, but signal "no more data to give" with a String::new() for the page token.

A consumer of PaginatedKVStore does another roundtrip, but the VSS did not expect any further roundtrips.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I feel like page_token: None should just be the signal that there is no more. That's what we do in the file system store and sqlite. Should fix that on the vss-server side but we can keep this is_empty check for safety.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in lightningdevkit/vss-server#96 to do this

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I feel like page_token: None should just be the signal that there is no more. That's what we do in the file system store and sqlite. Should fix that on the vss-server side but we can keep this is_empty check for safety.

I think we're following protobuf's/Google's best practices here. https://google.aip.dev/158 states:

  • Response messages for collections should define a string next_page_token field, providing the user with a page token that may be used to retrieve the next page.
    • The field containing pagination results should be the first field in the message and have a field number of 1. It should be a repeated field containing a list of resources constituting a single page of results.
    • If the end of the collection has been reached, the next_page_token field must be empty. This is the only way to communicate "end-of-collection" to users.
    • If the end of the collection has not been reached (or if the API can not determine in time), the API must provide a next_page_token.


Ok(PaginatedListResponse { keys, next_page_token })
}

async fn execute_locked_write<
F: Future<Output = Result<(), lightning::io::Error>>,
FN: FnOnce() -> F,
Expand Down Expand Up @@ -1051,4 +1142,92 @@ mod tests {
do_read_write_remove_list_persist(&vss_store);
drop(vss_store)
}

fn build_vss_store() -> VssStore {
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
let mut rng = rng();
let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
let mut node_seed = [0u8; 64];
rng.fill_bytes(&mut node_seed);
let entropy = NodeEntropy::from_seed_bytes(node_seed);
VssStoreBuilder::new(entropy, vss_base_url, rand_store_id, Network::Testnet)
.build_with_sigs_auth(HashMap::new())
.unwrap()
}

#[test]
fn vss_paginated_listing() {
let store = build_vss_store();
let ns = "test_paginated";
let sub = "listing";
let num_entries = 5;

for i in 0..num_entries {
let key = format!("key_{:04}", i);
let data = vec![i as u8; 32];
KVStoreSync::write(&store, ns, sub, &key, data).unwrap();
}

let mut all_keys = Vec::new();
let mut page_token = None;

loop {
let response =
PaginatedKVStoreSync::list_paginated(&store, ns, sub, page_token).unwrap();
all_keys.extend(response.keys);
match response.next_page_token {
Some(token) if !token.as_str().is_empty() => page_token = Some(token),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds to me here we should break only if the page token field is None, and not if the token is Some(""), otherwise, some details of the VSS API are leaking past the PaginatedKVStore API ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah true, good catch

_ => break,
}
Comment on lines +1178 to +1181
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm here we determine whether we are done with pagination based on whether the page token is None, and not whether the list is empty. I guess just looking for clarification on the PaginatedKVStore API :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to handle empty string

}

assert_eq!(all_keys.len(), num_entries);

// Verify no duplicates
let mut unique = all_keys.clone();
unique.sort();
unique.dedup();
assert_eq!(unique.len(), num_entries);
}

#[test]
fn vss_paginated_empty_namespace() {
let store = build_vss_store();
let response =
PaginatedKVStoreSync::list_paginated(&store, "nonexistent", "ns", None).unwrap();
assert!(response.keys.is_empty());
assert!(response.next_page_token.is_none());
}

#[test]
fn vss_paginated_removal() {
let store = build_vss_store();
let ns = "test_paginated";
let sub = "removal";

KVStoreSync::write(&store, ns, sub, "a", vec![1u8; 8]).unwrap();
KVStoreSync::write(&store, ns, sub, "b", vec![2u8; 8]).unwrap();
KVStoreSync::write(&store, ns, sub, "c", vec![3u8; 8]).unwrap();

KVStoreSync::remove(&store, ns, sub, "b", false).unwrap();

let response = PaginatedKVStoreSync::list_paginated(&store, ns, sub, None).unwrap();
assert_eq!(response.keys.len(), 2);
assert!(!response.keys.contains(&"b".to_string()));
}

#[test]
fn vss_paginated_namespace_isolation() {
let store = build_vss_store();

KVStoreSync::write(&store, "ns_a", "sub", "key_1", vec![1u8; 8]).unwrap();
KVStoreSync::write(&store, "ns_a", "sub", "key_2", vec![2u8; 8]).unwrap();
KVStoreSync::write(&store, "ns_b", "sub", "key_3", vec![3u8; 8]).unwrap();

let response = PaginatedKVStoreSync::list_paginated(&store, "ns_a", "sub", None).unwrap();
assert_eq!(response.keys.len(), 2);

let response = PaginatedKVStoreSync::list_paginated(&store, "ns_b", "sub", None).unwrap();
assert_eq!(response.keys.len(), 1);
}
}