Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions crates/core/src/schema/table_info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use alloc::{format, string::String, vec, vec::Vec};
use alloc::string::ToString;
use alloc::vec;
use alloc::{collections::btree_set::BTreeSet, format, string::String, vec::Vec};
use serde::{Deserialize, de::Visitor};

#[derive(Deserialize)]
Expand Down Expand Up @@ -252,16 +254,77 @@ impl<'de> Deserialize<'de> for TableInfoFlags {
}
}

#[derive(Deserialize)]
pub struct PendingStatement {
pub sql: String,
/// This vec should contain an entry for each parameter in [sql].
pub params: Vec<PendingStatementValue>,

/// Present if this statement has a [PendingStatementValue::Rest] parameter.
pub named_parameters_index: Option<RestColumnIndex>,
}

pub struct RestColumnIndex {
/// All column names referenced by this statement.
pub named_parameters: BTreeSet<String>,
/// Parameter indices that should be bound to a JSON object containing those values from the
/// source row that haven't been referenced by [PendingStatementValue::Column].
pub rest_parameter_positions: Vec<usize>,
}

impl<'de> Deserialize<'de> for PendingStatement {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
struct PendingStatementSource {
pub sql: String,
/// This vec should contain an entry for each parameter in [sql].
pub params: Vec<PendingStatementValue>,
}

let source = PendingStatementSource::deserialize(deserializer)?;
let mut named_parameters_index = None;
if source
.params
.iter()
.any(|s| matches!(s, PendingStatementValue::Rest))
{
let mut set = BTreeSet::new();
let mut rest_parameter_positions = vec![];
for (i, column) in source.params.iter().enumerate() {
set.insert(match column {
PendingStatementValue::Id => "id".to_string(),
PendingStatementValue::Column(name) => name.clone(),
PendingStatementValue::Rest => {
rest_parameter_positions.push(i);
continue;
}
});
}

named_parameters_index = Some(RestColumnIndex {
named_parameters: set,
rest_parameter_positions,
});
}

return Ok(Self {
sql: source.sql,
params: source.params,
named_parameters_index,
});
}
}

#[derive(Deserialize)]
pub enum PendingStatementValue {
/// Bind to the PowerSync row id of the affected row.
Id,
/// Bind to the value of column in the synced row.
Column(String),
/// Bind to a JSON object containing all columns from the synced row that haven't been matched
/// by other statement values.
Rest,
// TODO: Stuff like a raw object of put data?
}
89 changes: 77 additions & 12 deletions crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use alloc::collections::btree_map::BTreeMap;
use alloc::format;
use alloc::string::String;
use alloc::vec::Vec;
use serde::Deserialize;
use serde::ser::SerializeMap;
use serde::{Deserialize, Serialize};

use crate::error::{PSResult, PowerSyncError};
use crate::schema::inspection::ExistingTable;
Expand Down Expand Up @@ -153,7 +154,14 @@ impl<'a> SyncOperation<'a> {
let stmt = raw.put_statement(self.db)?;
let parsed: serde_json::Value = serde_json::from_str(data)
.map_err(PowerSyncError::json_local_error)?;
stmt.bind_for_put(id, &parsed)?;
let json_object = parsed.as_object().ok_or_else(|| {
PowerSyncError::argument_error(
"expected oplog data to be an object",
)
})?;

let rest = stmt.render_rest_object(json_object)?;
stmt.bind_for_put(id, &json_object, &rest)?;
stmt.exec(self.db, type_name, id, Some(&parsed))?;
}
Err(_) => {
Expand Down Expand Up @@ -510,7 +518,7 @@ impl<'a> ParsedSchemaTable<'a> {

struct PreparedPendingStatement<'a> {
stmt: ManagedStmt,
params: &'a [PendingStatementValue],
definition: &'a PendingStatement,
}

impl<'a> PreparedPendingStatement<'a> {
Expand All @@ -532,29 +540,72 @@ impl<'a> PreparedPendingStatement<'a> {

Ok(Self {
stmt,
params: &pending.params,
definition: pending,
})
}

pub fn render_rest_object(
&self,
json_data: &serde_json::Map<String, serde_json::Value>,
) -> Result<Option<String>, PowerSyncError> {
use serde_json::Value;

let Some(ref index) = self.definition.named_parameters_index else {
return Ok(None);
};

struct UnmatchedValues<'a>(BTreeMap<&'a String, &'a Value>);

impl<'a> Serialize for UnmatchedValues<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serializer.serialize_map(Some(self.0.len()))?;

for (k, v) in &self.0 {
map.serialize_entry(k, v)?;
}

map.end()
}
}

let mut unmatched_values: Option<UnmatchedValues> = None;
for (key, value) in json_data {
if !index.named_parameters.contains(key) {
unmatched_values
.get_or_insert_with(|| UnmatchedValues(BTreeMap::new()))
.0
.insert(key, value);
}
}

Ok(match unmatched_values {
None => None,
Some(unmatched) => {
Some(serde_json::to_string(&unmatched).map_err(|e| PowerSyncError::internal(e))?)
}
})
}

pub fn bind_for_put(
&self,
id: &str,
json_data: &serde_json::Value,
json_data: &serde_json::Map<String, serde_json::Value>,
rest: &Option<String>,
) -> Result<(), PowerSyncError> {
use serde_json::Value;
for (i, source) in self.params.iter().enumerate() {

for (i, source) in self.definition.params.iter().enumerate() {
let i = (i + 1) as i32;

match source {
PendingStatementValue::Id => {
self.stmt.bind_text(i, id, Destructor::STATIC)?;
}
PendingStatementValue::Column(column) => {
let parsed = json_data.as_object().ok_or_else(|| {
PowerSyncError::argument_error("expected oplog data to be an object")
})?;

match parsed.get(column) {
match json_data.get(column) {
Some(Value::Bool(value)) => {
self.stmt.bind_int(i, if *value { 1 } else { 0 })
}
Expand All @@ -573,14 +624,28 @@ impl<'a> PreparedPendingStatement<'a> {
_ => self.stmt.bind_null(i),
}?;
}
PendingStatementValue::Rest => {
// These are bound later.
debug_assert!(self.definition.named_parameters_index.is_some());
}
}
}

if let Some(index) = &self.definition.named_parameters_index {
for target in &index.rest_parameter_positions {
let index = (*target + 1) as i32;
match rest {
None => self.stmt.bind_null(index),
Some(value) => self.stmt.bind_text(index, &*value, Destructor::STATIC),
}?;
}
}

Ok(())
}

pub fn bind_for_delete(&self, id: &str) -> Result<(), PowerSyncError> {
for (i, source) in self.params.iter().enumerate() {
for (i, source) in self.definition.params.iter().enumerate() {
if let PendingStatementValue::Id = source {
self.stmt
.bind_text((i + 1) as i32, id, Destructor::STATIC)?;
Expand Down
61 changes: 61 additions & 0 deletions dart/test/sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,67 @@ CREATE TRIGGER users_delete
);
});

test('rest column', () {
db.execute(
'CREATE TABLE users (id TEXT NOT NULL, name TEXT, _rest TEXT)');
invokeControl(
'start',
json.encode({
'schema': {
'tables': [],
'raw_tables': [
{
'name': 'users',
'put': {
'sql':
'INSERT OR REPLACE INTO users (id, name, _rest) VALUES (?, ?, ?);',
'params': [
'Id',
{'Column': 'name'},
'Rest'
],
},
'delete': {
'sql': 'DELETE FROM users WHERE id = ?',
'params': ['Id'],
},
'clear': 'DELETE FROM users;',
}
]
}
}),
);

pushCheckpoint(buckets: [bucketDescription('a')]);
pushSyncData(
'a',
'1',
'user1',
'PUT',
{'name': 'First user'},
objectType: 'users',
);
pushSyncData(
'a',
'2',
'user2',
'PUT',
{'name': 'Second user', 'foo': 'bar', 'another': 3},
objectType: 'users',
);
pushCheckpointComplete();

final users = db.select('SELECT * FROM users;');
expect(users, [
{'id': 'user1', 'name': 'First user', '_rest': null},
{
'id': 'user2',
'name': 'Second user',
'_rest': json.encode({'another': 3, 'foo': 'bar'})
},
]);
});

test('crud vtab', () {
// This is mostly a test for the triggers, validating the suggestions we
// give on https://docs.powersync.com/usage/use-case-examples/raw-tables#capture-local-writes-with-triggers
Expand Down