Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dist/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export { ChangeDataCapturePlugin } from '../plugins/cdc'
export { QueryLogPlugin } from '../plugins/query-log'
export { ResendPlugin } from '../plugins/resend'
export { ClerkPlugin } from '../plugins/clerk'
export { DataSyncPlugin } from '../plugins/data-sync'
Binary file added docs/demo/pr98-data-sync-demo.mp4
Binary file not shown.
12 changes: 12 additions & 0 deletions plugins/cron/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ export class CronPlugin extends StarbasePlugin {
await this.scheduleNextAlarm()
}

public async removeEvent(name: string): Promise<void> {
if (!this.dataSource)
throw new Error('CronPlugin not properly initialized')

await this.dataSource.rpc.executeQuery({
sql: SQL_QUERIES.DELETE_TASK,
params: [name],
})

await this.scheduleNextAlarm()
}

public onEvent(
callback: (payload: CronEventPayload) => void | Promise<void>,
ctx?: ExecutionContext
Expand Down
50 changes: 50 additions & 0 deletions plugins/data-sync/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Data Sync Plugin

Replicate rows from an external SQL source into the internal Durable Object SQLite database.

## Endpoints

- `GET /data-sync` List configured sync tasks.
- `POST /data-sync` Create or update a task.
- `DELETE /data-sync/:name` Delete a task.
- `POST /data-sync/run/:name` Run a task immediately.

## Create a task

```bash
curl --location 'https://starbasedb.YOUR-ID.workers.dev/data-sync' \
--header 'Authorization: Bearer ADMIN_TOKEN' \
--header 'Content-Type: application/json' \
--data '{
"name": "users_sync",
"sourceTable": "users",
"sourceSchema": "public",
"targetTable": "users_cache",
"cursorColumn": "id",
"intervalCron": "*/5 * * * *",
"batchSize": 250
}'
```

## Task fields

- `name` Unique task identifier (letters, numbers, underscores).
- `sourceTable` External table to pull from.
- `sourceSchema` Optional external schema for PostgreSQL/MySQL.
- `targetTable` Internal SQLite table to replicate into.
- `cursorColumn` Incremental checkpoint column (must be monotonic).
- `intervalCron` Cron expression used by the cron plugin.
- `batchSize` Optional pull size (defaults to 250, min 10, max 2000).

## Behavior

- First run pulls earliest rows ordered by `cursorColumn`.
- Later runs pull rows where `cursorColumn > last_cursor_value`.
- Target table is created automatically when missing.
- Missing target columns are added automatically.
- Upserts are conflict-safe using a unique index on `cursorColumn`.

## Requirements

- External SQL datasource must be configured in StarbaseDB.
- Cron plugin must be enabled to run scheduled pulls.
122 changes: 122 additions & 0 deletions plugins/data-sync/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import type { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler'
import { StarbasePlugin } from '../../src/plugin'
import type { DataSource } from '../../src/types'
import { createResponse } from '../../src/utils'
import { CronPlugin } from '../cron'
import {
deleteDataSyncTask,
ensureDataSyncTables,
listDataSyncTasks,
runDataSyncTask,
upsertDataSyncTask,
} from './service'

export class DataSyncPlugin extends StarbasePlugin {
public pathPrefix = '/data-sync'
private cronPlugin?: CronPlugin
private dataSource?: DataSource
private config?: StarbaseDBConfiguration

constructor(opts?: { cronPlugin?: CronPlugin }) {
super('starbasedb:data-sync', { requiresAuth: true })
this.cronPlugin = opts?.cronPlugin
}

override async register(app: StarbaseApp): Promise<void> {
app.use(async (c, next) => {
const dataSource = c.get('dataSource') as DataSource
this.dataSource = dataSource
this.config = c.get('config') as StarbaseDBConfiguration
await ensureDataSyncTables(dataSource)
await next()
})

if (this.cronPlugin) {
this.cronPlugin.onEvent(async ({ name, payload }) => {
const eventTaskName =
typeof payload?.taskName === 'string'
? payload.taskName
: undefined

const taskName =
eventTaskName ||
(name.startsWith('data-sync:')
? name.replace('data-sync:', '')
: undefined)

if (!taskName || !this.dataSource || !this.config) {
return
}

try {
await runDataSyncTask(this.dataSource, taskName, this.config)
} catch (error) {
console.error(`Data sync cron run failed for ${taskName}:`, error)
}
})
}

app.get(this.pathPrefix, async (c) => {
const dataSource = c.get('dataSource') as DataSource
const tasks = await listDataSyncTasks(dataSource)
return createResponse({ tasks }, undefined, 200)
})

app.post(this.pathPrefix, async (c) => {
const dataSource = c.get('dataSource') as DataSource

let body: any = {}
try {
body = await c.req.json()
} catch {
return createResponse(undefined, 'Invalid JSON payload', 400)
}

const task = await upsertDataSyncTask(dataSource, {
name: body.name,
sourceTable: body.sourceTable,
targetTable: body.targetTable,
cursorColumn: body.cursorColumn,
sourceSchema: body.sourceSchema,
intervalCron: body.intervalCron,
batchSize: body.batchSize,
})

if (this.cronPlugin) {
await this.cronPlugin.addEvent(
task.cronTab,
`data-sync:${task.name}`,
{ taskName: task.name },
new URL(c.req.url).origin
)
}

return createResponse({ task }, undefined, 201)
})

app.delete(`${this.pathPrefix}/:name`, async (c) => {
const dataSource = c.get('dataSource') as DataSource
const name = c.req.param('name')
const deleted = await deleteDataSyncTask(dataSource, name)

if (!deleted) {
return createResponse(undefined, 'Task not found', 404)
}

if (this.cronPlugin) {
await this.cronPlugin.removeEvent(`data-sync:${name}`)
}

return createResponse({ deleted: true, name }, undefined, 200)
})

app.post(`${this.pathPrefix}/run/:name`, async (c) => {
const dataSource = c.get('dataSource') as DataSource
const config = c.get('config') as StarbaseDBConfiguration
const name = c.req.param('name')

const summary = await runDataSyncTask(dataSource, name, config)
return createResponse({ summary }, undefined, 200)
})
}
}
Loading