-
Notifications
You must be signed in to change notification settings - Fork 129
Add PaginatedKVStore support to VssStore #864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,9 @@ use bitcoin::Network; | |
| use lightning::impl_writeable_tlv_based_enum; | ||
| use lightning::io::{self, Error, ErrorKind}; | ||
| use lightning::sign::{EntropySource as LdkEntropySource, RandomBytes}; | ||
| use lightning::util::persist::{KVStore, KVStoreSync}; | ||
| use lightning::util::persist::{ | ||
| KVStore, KVStoreSync, PageToken, PaginatedKVStore, PaginatedKVStoreSync, PaginatedListResponse, | ||
| }; | ||
| use lightning::util::ser::{Readable, Writeable}; | ||
| use prost::Message; | ||
| use vss_client::client::VssClient; | ||
|
|
@@ -377,6 +379,52 @@ impl KVStore for VssStore { | |
| } | ||
| } | ||
|
|
||
| impl PaginatedKVStoreSync for VssStore { | ||
| fn list_paginated( | ||
| &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>, | ||
| ) -> io::Result<PaginatedListResponse> { | ||
| let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { | ||
| debug_assert!(false, "Failed to access internal runtime"); | ||
| let msg = format!("Failed to access internal runtime"); | ||
| Error::new(ErrorKind::Other, msg) | ||
| })?; | ||
| let primary_namespace = primary_namespace.to_string(); | ||
| let secondary_namespace = secondary_namespace.to_string(); | ||
| let inner = Arc::clone(&self.inner); | ||
| let fut = async move { | ||
| inner | ||
| .list_paginated_internal( | ||
| &inner.blocking_client, | ||
| primary_namespace, | ||
| secondary_namespace, | ||
| page_token, | ||
| ) | ||
| .await | ||
| }; | ||
| tokio::task::block_in_place(move || internal_runtime.block_on(fut)) | ||
| } | ||
| } | ||
|
|
||
| impl PaginatedKVStore for VssStore { | ||
| fn list_paginated( | ||
| &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>, | ||
| ) -> impl Future<Output = Result<PaginatedListResponse, io::Error>> + 'static + Send { | ||
| let primary_namespace = primary_namespace.to_string(); | ||
| let secondary_namespace = secondary_namespace.to_string(); | ||
| let inner = Arc::clone(&self.inner); | ||
| async move { | ||
| inner | ||
| .list_paginated_internal( | ||
| &inner.async_client, | ||
| primary_namespace, | ||
| secondary_namespace, | ||
| page_token, | ||
| ) | ||
| .await | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Drop for VssStore { | ||
| fn drop(&mut self) { | ||
| let internal_runtime = self.internal_runtime.take(); | ||
|
|
@@ -638,6 +686,49 @@ impl VssStoreInner { | |
| Ok(keys) | ||
| } | ||
|
|
||
| async fn list_paginated_internal( | ||
| &self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String, | ||
| secondary_namespace: String, page_token: Option<PageToken>, | ||
| ) -> io::Result<PaginatedListResponse> { | ||
| check_namespace_key_validity( | ||
| &primary_namespace, | ||
| &secondary_namespace, | ||
| None, | ||
| "list_paginated", | ||
| )?; | ||
|
|
||
| const PAGE_SIZE: i32 = 50; | ||
|
|
||
| let key_prefix = self.build_obfuscated_prefix(&primary_namespace, &secondary_namespace); | ||
| let vss_page_token = page_token.map(|t| t.as_str().to_string()); | ||
|
|
||
| let request = ListKeyVersionsRequest { | ||
| store_id: self.store_id.clone(), | ||
| key_prefix: Some(key_prefix), | ||
| page_token: vss_page_token, | ||
| page_size: Some(PAGE_SIZE), | ||
| }; | ||
|
|
||
| let response = client.list_key_versions(&request).await.map_err(|e| { | ||
| let msg = format!( | ||
| "Failed to list keys in {}/{}: {}", | ||
| primary_namespace, secondary_namespace, e | ||
| ); | ||
| Error::new(ErrorKind::Other, msg) | ||
| })?; | ||
|
|
||
| let mut keys = Vec::with_capacity(response.key_versions.len()); | ||
| for kv in response.key_versions { | ||
| keys.push(self.extract_key(&kv.key)?); | ||
| } | ||
|
|
||
| // VSS uses empty string to signal the last page | ||
| let next_page_token = | ||
| response.next_page_token.filter(|t| !t.is_empty()).map(PageToken::new); | ||
|
Comment on lines
+725
to
+727
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds to me like we should line-up the VSS API and the Right now a VSS could return a non-empty keys list, but signal "no more data to give" with a A consumer of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I feel like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed in lightningdevkit/vss-server#96 to do this
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think we're following protobuf's/Google's best practices here. https://google.aip.dev/158 states:
|
||
|
|
||
| Ok(PaginatedListResponse { keys, next_page_token }) | ||
| } | ||
|
|
||
| async fn execute_locked_write< | ||
| F: Future<Output = Result<(), lightning::io::Error>>, | ||
| FN: FnOnce() -> F, | ||
|
|
@@ -1051,4 +1142,92 @@ mod tests { | |
| do_read_write_remove_list_persist(&vss_store); | ||
| drop(vss_store) | ||
| } | ||
|
|
||
| fn build_vss_store() -> VssStore { | ||
| let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); | ||
| let mut rng = rng(); | ||
| let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); | ||
| let mut node_seed = [0u8; 64]; | ||
| rng.fill_bytes(&mut node_seed); | ||
| let entropy = NodeEntropy::from_seed_bytes(node_seed); | ||
| VssStoreBuilder::new(entropy, vss_base_url, rand_store_id, Network::Testnet) | ||
| .build_with_sigs_auth(HashMap::new()) | ||
| .unwrap() | ||
| } | ||
|
|
||
| #[test] | ||
| fn vss_paginated_listing() { | ||
| let store = build_vss_store(); | ||
| let ns = "test_paginated"; | ||
| let sub = "listing"; | ||
| let num_entries = 5; | ||
|
|
||
| for i in 0..num_entries { | ||
| let key = format!("key_{:04}", i); | ||
| let data = vec![i as u8; 32]; | ||
| KVStoreSync::write(&store, ns, sub, &key, data).unwrap(); | ||
| } | ||
|
|
||
| let mut all_keys = Vec::new(); | ||
| let mut page_token = None; | ||
|
|
||
| loop { | ||
| let response = | ||
| PaginatedKVStoreSync::list_paginated(&store, ns, sub, page_token).unwrap(); | ||
| all_keys.extend(response.keys); | ||
| match response.next_page_token { | ||
| Some(token) if !token.as_str().is_empty() => page_token = Some(token), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds to me here we should break only if the page token field is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah true, good catch |
||
| _ => break, | ||
| } | ||
|
Comment on lines
+1178
to
+1181
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm here we determine whether we are done with pagination based on whether the page token is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update to handle empty string |
||
| } | ||
|
|
||
| assert_eq!(all_keys.len(), num_entries); | ||
|
|
||
| // Verify no duplicates | ||
| let mut unique = all_keys.clone(); | ||
| unique.sort(); | ||
| unique.dedup(); | ||
| assert_eq!(unique.len(), num_entries); | ||
| } | ||
|
|
||
| #[test] | ||
| fn vss_paginated_empty_namespace() { | ||
| let store = build_vss_store(); | ||
| let response = | ||
| PaginatedKVStoreSync::list_paginated(&store, "nonexistent", "ns", None).unwrap(); | ||
| assert!(response.keys.is_empty()); | ||
| assert!(response.next_page_token.is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn vss_paginated_removal() { | ||
| let store = build_vss_store(); | ||
| let ns = "test_paginated"; | ||
| let sub = "removal"; | ||
|
|
||
| KVStoreSync::write(&store, ns, sub, "a", vec![1u8; 8]).unwrap(); | ||
| KVStoreSync::write(&store, ns, sub, "b", vec![2u8; 8]).unwrap(); | ||
| KVStoreSync::write(&store, ns, sub, "c", vec![3u8; 8]).unwrap(); | ||
|
|
||
| KVStoreSync::remove(&store, ns, sub, "b", false).unwrap(); | ||
|
|
||
| let response = PaginatedKVStoreSync::list_paginated(&store, ns, sub, None).unwrap(); | ||
| assert_eq!(response.keys.len(), 2); | ||
| assert!(!response.keys.contains(&"b".to_string())); | ||
benthecarman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| #[test] | ||
| fn vss_paginated_namespace_isolation() { | ||
| let store = build_vss_store(); | ||
|
|
||
| KVStoreSync::write(&store, "ns_a", "sub", "key_1", vec![1u8; 8]).unwrap(); | ||
| KVStoreSync::write(&store, "ns_a", "sub", "key_2", vec![2u8; 8]).unwrap(); | ||
| KVStoreSync::write(&store, "ns_b", "sub", "key_3", vec![3u8; 8]).unwrap(); | ||
|
|
||
| let response = PaginatedKVStoreSync::list_paginated(&store, "ns_a", "sub", None).unwrap(); | ||
| assert_eq!(response.keys.len(), 2); | ||
benthecarman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| let response = PaginatedKVStoreSync::list_paginated(&store, "ns_b", "sub", None).unwrap(); | ||
| assert_eq!(response.keys.len(), 1); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update the docs on the
PaginatedKVStoreto let people know they should stop asking for pages once they get aNonefor the page token ?Maybe this is implied in the current API docs over there. I want to make sure people don't wait for the "empty list" condition to stop asking for more pages, as VSS API does not quite fit this. They should look at their page_token they got back and stop asking for pages once it is none.