diff --git a/src/do.ts b/src/do.ts index b6bb2b6..936d3a0 100644 --- a/src/do.ts +++ b/src/do.ts @@ -1,4 +1,5 @@ import { DurableObject } from 'cloudflare:workers' +import type { ExportState } from './export/chunked-dump' export class StarbaseDBDurableObject extends DurableObject { // Durable storage for the SQL database @@ -72,9 +73,46 @@ export class StarbaseDBDurableObject extends DurableObject { deleteAlarm: this.deleteAlarm.bind(this), getStatistics: this.getStatistics.bind(this), executeQuery: this.executeQuery.bind(this), + saveExportState: this.saveExportState.bind(this), + getExportState: this.getExportState.bind(this), + getActiveExportState: this.getActiveExportState.bind(this), } } + /** + * Save export state to DO storage for resumption after alarm. + */ + public async saveExportState(state: ExportState): Promise { + await this.storage.put( + `export:${state.exportId}`, + JSON.stringify(state) + ) + // Also track the active export (only one at a time) + if (state.status === 'processing') { + await this.storage.put('export:active', state.exportId) + } else { + await this.storage.delete('export:active') + } + } + + /** + * Get export state by export ID. + */ + public async getExportState(exportId: string): Promise { + const raw = await this.storage.get(`export:${exportId}`) + if (!raw) return null + return JSON.parse(raw) as ExportState + } + + /** + * Get the currently active (in-progress) export state, if any. + */ + public async getActiveExportState(): Promise { + const activeId = await this.storage.get('export:active') + if (!activeId) return null + return this.getExportState(activeId) + } + public async getAlarm(): Promise { return await this.storage.getAlarm() } @@ -106,6 +144,33 @@ export class StarbaseDBDurableObject extends DurableObject { async alarm() { try { + // Check if there's an active export to continue + const activeExport = await this.getActiveExportState() + if (activeExport && activeExport.status === 'processing') { + // The actual export continuation is handled by the Worker + // via a fetch to a special internal path + try { + await fetch(`https://localhost/internal/export/continue`, { + method: 'POST', + headers: { + Authorization: `Bearer ${this.clientAuthToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + exportId: activeExport.exportId, + }), + }) + } catch (error) { + console.error('Failed to continue export via alarm:', error) + // Mark export as failed if we can't continue + activeExport.status = 'failed' + activeExport.error = 'Alarm continuation failed' + activeExport.updatedAt = Date.now() + await this.saveExportState(activeExport) + } + return + } + // Fetch all the tasks that are marked to emit an event for this cycle. const task = (await this.executeQuery({ sql: 'SELECT * FROM tmp_cron_tasks WHERE is_active = 1;', diff --git a/src/export/chunked-dump.test.ts b/src/export/chunked-dump.test.ts new file mode 100644 index 0000000..a2fbe2f --- /dev/null +++ b/src/export/chunked-dump.test.ts @@ -0,0 +1,384 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { + generateDumpFilename, + generateExportId, + initializeExport, + processExportChunk, + exportStatusRoute, + exportDownloadRoute, + ExportState, +} from './chunked-dump' +import { executeOperation } from '.' +import type { DataSource } from '../types' +import type { StarbaseDBConfiguration } from '../handler' + +vi.mock('.', () => ({ + executeOperation: vi.fn(), +})) + +vi.mock('../utils', () => ({ + createResponse: vi.fn( + (data, message, status) => + new Response(JSON.stringify({ result: data, error: message }), { + status, + headers: { 'Content-Type': 'application/json' }, + }) + ), +})) + +let mockDataSource: DataSource +let mockConfig: StarbaseDBConfiguration +let mockR2Bucket: any + +function createMockR2Bucket(content: string = ''): any { + return { + put: vi.fn().mockResolvedValue(undefined), + get: vi.fn().mockResolvedValue({ + text: vi.fn().mockResolvedValue(content), + body: new ReadableStream(), + size: content.length, + }), + delete: vi.fn().mockResolvedValue(undefined), + } +} + +beforeEach(() => { + vi.clearAllMocks() + + mockDataSource = { + source: 'internal', + rpc: { executeQuery: vi.fn() }, + } as any + + mockConfig = { + outerbaseApiKey: 'mock-api-key', + role: 'admin', + features: { allowlist: true, rls: true, rest: true }, + } + + mockR2Bucket = createMockR2Bucket() +}) + +describe('Chunked Export Utilities', () => { + it('should generate a valid dump filename', () => { + const filename = generateDumpFilename() + expect(filename).toMatch(/^dump_\d{8}-\d{6}\.sql$/) + }) + + it('should generate a unique export ID', () => { + const id1 = generateExportId() + const id2 = generateExportId() + expect(id1).toMatch(/^export_\d+_[a-z0-9]+$/) + expect(id1).not.toBe(id2) + }) +}) + +describe('initializeExport', () => { + it('should create an export state with all tables', async () => { + vi.mocked(executeOperation).mockResolvedValueOnce([ + { name: 'users' }, + { name: 'orders' }, + ]) + + const state = await initializeExport( + mockDataSource, + mockConfig, + mockR2Bucket + ) + + expect(state.status).toBe('processing') + expect(state.tables).toEqual(['users', 'orders']) + expect(state.currentTableIndex).toBe(0) + expect(state.currentRowOffset).toBe(0) + expect(state.r2Key).toMatch(/^dump_\d{8}-\d{6}\.sql$/) + expect(mockR2Bucket.put).toHaveBeenCalledOnce() + }) + + it('should exclude tmp_ tables from export', async () => { + vi.mocked(executeOperation).mockResolvedValueOnce([{ name: 'users' }]) + + const state = await initializeExport( + mockDataSource, + mockConfig, + mockR2Bucket + ) + + // The SQL query itself filters tmp_ tables + expect(executeOperation).toHaveBeenCalledWith( + [ + { + sql: "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'tmp_%';", + }, + ], + mockDataSource, + mockConfig + ) + expect(state.tables).toEqual(['users']) + }) + + it('should handle empty database', async () => { + vi.mocked(executeOperation).mockResolvedValueOnce([]) + + const state = await initializeExport( + mockDataSource, + mockConfig, + mockR2Bucket + ) + + expect(state.tables).toEqual([]) + expect(state.status).toBe('processing') + }) + + it('should store callbackUrl when provided', async () => { + vi.mocked(executeOperation).mockResolvedValueOnce([]) + + const state = await initializeExport( + mockDataSource, + mockConfig, + mockR2Bucket, + 'https://example.com/callback' + ) + + expect(state.callbackUrl).toBe('https://example.com/callback') + }) +}) + +describe('processExportChunk', () => { + it('should process all tables and mark as completed for small DB', async () => { + const state: ExportState = { + exportId: 'test-export', + status: 'processing', + tables: ['users'], + currentTableIndex: 0, + currentRowOffset: 0, + currentTableSchemaWritten: false, + r2Key: 'dump_test.sql', + createdAt: Date.now(), + updatedAt: Date.now(), + totalRowsExported: 0, + } + + // Schema query + vi.mocked(executeOperation) + .mockResolvedValueOnce([ + { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, + ]) + // Data batch (less than BATCH_SIZE means table is exhausted) + .mockResolvedValueOnce([ + { id: 1, name: 'Alice' }, + { id: 2, name: 'Bob' }, + ]) + + const updatedState = await processExportChunk( + state, + mockDataSource, + mockConfig, + mockR2Bucket + ) + + expect(updatedState.status).toBe('completed') + expect(updatedState.totalRowsExported).toBe(2) + expect(mockR2Bucket.put).toHaveBeenCalled() + + // Verify the content written to R2 includes the data + const putCall = mockR2Bucket.put.mock.calls[0] + expect(putCall[1]).toContain('CREATE TABLE users') + expect(putCall[1]).toContain( + 'INSERT INTO "users" VALUES (1, \'Alice\');' + ) + expect(putCall[1]).toContain('INSERT INTO "users" VALUES (2, \'Bob\');') + }) + + it('should handle NULL values in rows', async () => { + const state: ExportState = { + exportId: 'test-export', + status: 'processing', + tables: ['users'], + currentTableIndex: 0, + currentRowOffset: 0, + currentTableSchemaWritten: false, + r2Key: 'dump_test.sql', + createdAt: Date.now(), + updatedAt: Date.now(), + totalRowsExported: 0, + } + + vi.mocked(executeOperation) + .mockResolvedValueOnce([ + { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, + ]) + .mockResolvedValueOnce([{ id: 1, name: null }]) + + const updatedState = await processExportChunk( + state, + mockDataSource, + mockConfig, + mockR2Bucket + ) + + expect(updatedState.status).toBe('completed') + const putCall = mockR2Bucket.put.mock.calls[0] + expect(putCall[1]).toContain('INSERT INTO "users" VALUES (1, NULL);') + }) + + it('should mark as failed on error', async () => { + const state: ExportState = { + exportId: 'test-export', + status: 'processing', + tables: ['users'], + currentTableIndex: 0, + currentRowOffset: 0, + currentTableSchemaWritten: false, + r2Key: 'dump_test.sql', + createdAt: Date.now(), + updatedAt: Date.now(), + totalRowsExported: 0, + } + + mockR2Bucket.get = vi + .fn() + .mockRejectedValue(new Error('R2 unavailable')) + + const updatedState = await processExportChunk( + state, + mockDataSource, + mockConfig, + mockR2Bucket + ) + + expect(updatedState.status).toBe('failed') + expect(updatedState.error).toContain('R2 unavailable') + }) + + it('should fire callback on completion', async () => { + const fetchSpy = vi + .spyOn(global, 'fetch') + .mockResolvedValue(new Response('ok')) + + const state: ExportState = { + exportId: 'test-export', + status: 'processing', + tables: [], + currentTableIndex: 0, + currentRowOffset: 0, + currentTableSchemaWritten: false, + r2Key: 'dump_test.sql', + callbackUrl: 'https://example.com/callback', + createdAt: Date.now(), + updatedAt: Date.now(), + totalRowsExported: 0, + } + + const updatedState = await processExportChunk( + state, + mockDataSource, + mockConfig, + mockR2Bucket + ) + + expect(updatedState.status).toBe('completed') + expect(fetchSpy).toHaveBeenCalledWith( + 'https://example.com/callback', + expect.objectContaining({ + method: 'POST', + body: expect.stringContaining('"status":"completed"'), + }) + ) + + fetchSpy.mockRestore() + }) +}) + +describe('exportStatusRoute', () => { + it('should return 404 for unknown export', async () => { + const response = await exportStatusRoute('unknown-id', async () => null) + expect(response.status).toBe(404) + }) + + it('should return export state for known export', async () => { + const state: ExportState = { + exportId: 'test-export', + status: 'processing', + tables: ['users', 'orders'], + currentTableIndex: 1, + currentRowOffset: 500, + currentTableSchemaWritten: true, + r2Key: 'dump_test.sql', + createdAt: Date.now(), + updatedAt: Date.now(), + totalRowsExported: 1500, + } + + const response = await exportStatusRoute( + 'test-export', + async () => state + ) + expect(response.status).toBe(200) + + const body = await response.json() + expect(body.result.exportId).toBe('test-export') + expect(body.result.status).toBe('processing') + expect(body.result.totalRowsExported).toBe(1500) + expect(body.result.tablesProcessed).toBe(1) + expect(body.result.tablesTotal).toBe(2) + }) +}) + +describe('exportDownloadRoute', () => { + it('should return 404 for unknown export', async () => { + const response = await exportDownloadRoute( + 'unknown-id', + mockR2Bucket, + async () => null + ) + expect(response.status).toBe(404) + }) + + it('should return 202 for in-progress export', async () => { + const state: ExportState = { + exportId: 'test-export', + status: 'processing', + tables: ['users'], + currentTableIndex: 0, + currentRowOffset: 0, + currentTableSchemaWritten: false, + r2Key: 'dump_test.sql', + createdAt: Date.now(), + updatedAt: Date.now(), + totalRowsExported: 0, + } + + const response = await exportDownloadRoute( + 'test-export', + mockR2Bucket, + async () => state + ) + expect(response.status).toBe(202) + }) + + it('should return file for completed export', async () => { + const state: ExportState = { + exportId: 'test-export', + status: 'completed', + tables: ['users'], + currentTableIndex: 1, + currentRowOffset: 0, + currentTableSchemaWritten: false, + r2Key: 'dump_test.sql', + createdAt: Date.now(), + updatedAt: Date.now(), + totalRowsExported: 10, + } + + const response = await exportDownloadRoute( + 'test-export', + mockR2Bucket, + async () => state + ) + + expect(response.headers.get('Content-Type')).toBe('application/sql') + expect(response.headers.get('Content-Disposition')).toContain( + 'dump_test.sql' + ) + }) +}) diff --git a/src/export/chunked-dump.ts b/src/export/chunked-dump.ts new file mode 100644 index 0000000..48ec80e --- /dev/null +++ b/src/export/chunked-dump.ts @@ -0,0 +1,428 @@ +import { executeOperation } from '.' +import { StarbaseDBConfiguration } from '../handler' +import { DataSource } from '../types' +import { createResponse } from '../utils' + +/** + * Configuration constants for chunked export + */ +const BATCH_SIZE = 1000 // Rows per batch +const TIME_LIMIT_MS = 20_000 // 20 seconds before yielding (leaving buffer before 30s timeout) + +/** + * Represents the state of an ongoing export operation. + * Stored in DO storage so export can be resumed after alarm. + */ +export interface ExportState { + exportId: string + status: 'pending' | 'processing' | 'completed' | 'failed' + tables: string[] + currentTableIndex: number + currentRowOffset: number + currentTableSchemaWritten: boolean + r2Key: string + callbackUrl?: string + createdAt: number + updatedAt: number + error?: string + totalRowsExported: number +} + +/** + * Generate a timestamped filename for the dump. + * e.g. dump_20240101-170000.sql + */ +export function generateDumpFilename(): string { + const now = new Date() + const pad = (n: number) => n.toString().padStart(2, '0') + const date = `${now.getFullYear()}${pad(now.getMonth() + 1)}${pad(now.getDate())}` + const time = `${pad(now.getHours())}${pad(now.getMinutes())}${pad(now.getSeconds())}` + return `dump_${date}-${time}.sql` +} + +/** + * Generate a unique export ID. + */ +export function generateExportId(): string { + return `export_${Date.now()}_${Math.random().toString(36).substring(2, 10)}` +} + +/** + * Initialize a new export: get all table names, create initial R2 object, + * and return the ExportState. + */ +export async function initializeExport( + dataSource: DataSource, + config: StarbaseDBConfiguration, + r2Bucket: R2Bucket, + callbackUrl?: string +): Promise { + // Get all table names (excluding internal tmp_ tables) + const tablesResult = await executeOperation( + [ + { + sql: "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'tmp_%';", + }, + ], + dataSource, + config + ) + + const tables = tablesResult.map((row: any) => row.name) + const exportId = generateExportId() + const r2Key = generateDumpFilename() + + // Write the SQL header to R2 + const header = + '-- StarbaseDB Database Dump\n-- Generated at: ' + + new Date().toISOString() + + '\n\n' + await r2Bucket.put(r2Key, header) + + const state: ExportState = { + exportId, + status: 'processing', + tables, + currentTableIndex: 0, + currentRowOffset: 0, + currentTableSchemaWritten: false, + r2Key, + callbackUrl, + createdAt: Date.now(), + updatedAt: Date.now(), + totalRowsExported: 0, + } + + return state +} + +/** + * Continue processing an export from its current state. + * Processes rows in batches and appends SQL to the R2 object. + * Returns the updated state — if status is still 'processing', + * the caller should schedule an alarm to continue. + */ +export async function processExportChunk( + state: ExportState, + dataSource: DataSource, + config: StarbaseDBConfiguration, + r2Bucket: R2Bucket +): Promise { + const startTime = Date.now() + + try { + // Get existing content from R2 so we can append + const existingObj = await r2Bucket.get(state.r2Key) + let existingContent = existingObj ? await existingObj.text() : '' + + let newContent = '' + + while (state.currentTableIndex < state.tables.length) { + const table = state.tables[state.currentTableIndex] + + // Write table schema if not yet written for this table + if (!state.currentTableSchemaWritten) { + const schemaResult = await executeOperation( + [ + { + sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`, + params: [table], + }, + ], + dataSource, + config + ) + + if (schemaResult.length) { + const schema = schemaResult[0].sql + newContent += `\n-- Table: ${table}\n${schema};\n\n` + } + + state.currentTableSchemaWritten = true + } + + // Fetch rows in batches using LIMIT/OFFSET + while (true) { + // Check if we're approaching the time limit + if (Date.now() - startTime > TIME_LIMIT_MS) { + // Save progress and yield + state.updatedAt = Date.now() + await r2Bucket.put( + state.r2Key, + existingContent + newContent + ) + return state // Still 'processing' — caller should schedule alarm + } + + const batchResult = await executeOperation( + [ + { + sql: `SELECT * FROM "${table}" LIMIT ${BATCH_SIZE} OFFSET ${state.currentRowOffset};`, + }, + ], + dataSource, + config + ) + + if (!batchResult.length) { + // No more rows in this table — move to next + break + } + + for (const row of batchResult) { + const values = Object.values(row).map((value) => + value === null + ? 'NULL' + : typeof value === 'string' + ? `'${value.replace(/'/g, "''")}'` + : value + ) + newContent += `INSERT INTO "${table}" VALUES (${values.join(', ')});\n` + } + + state.currentRowOffset += batchResult.length + state.totalRowsExported += batchResult.length + + // If we got fewer rows than BATCH_SIZE, we've exhausted this table + if (batchResult.length < BATCH_SIZE) { + break + } + } + + // Move to next table + state.currentTableIndex++ + state.currentRowOffset = 0 + state.currentTableSchemaWritten = false + newContent += '\n' + } + + // All tables processed — export is complete + state.status = 'completed' + state.updatedAt = Date.now() + await r2Bucket.put(state.r2Key, existingContent + newContent) + + // Fire callback if provided + if (state.callbackUrl) { + try { + await fetch(state.callbackUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + exportId: state.exportId, + status: 'completed', + r2Key: state.r2Key, + totalRowsExported: state.totalRowsExported, + completedAt: new Date().toISOString(), + }), + }) + } catch (callbackError) { + console.error('Failed to send export callback:', callbackError) + } + } + + return state + } catch (error: any) { + state.status = 'failed' + state.error = error?.message || 'Unknown export error' + state.updatedAt = Date.now() + + // Try to notify via callback on failure too + if (state.callbackUrl) { + try { + await fetch(state.callbackUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + exportId: state.exportId, + status: 'failed', + error: state.error, + failedAt: new Date().toISOString(), + }), + }) + } catch (_) { + // Best effort + } + } + + return state + } +} + +/** + * Synchronous fast-path: For small databases that complete within the time limit, + * return the dump directly in the response. Falls back to async R2 export + * if the operation would exceed the time limit. + */ +export async function dumpDatabaseChunkedRoute( + dataSource: DataSource, + config: StarbaseDBConfiguration, + r2Bucket: R2Bucket | undefined, + request: Request, + scheduleAlarm: (exportState: ExportState) => Promise, + getExportState: () => Promise, + saveExportState: (state: ExportState) => Promise +): Promise { + try { + let callbackUrl: string | undefined + + // Parse callbackUrl from query params or body + const url = new URL(request.url) + callbackUrl = url.searchParams.get('callbackUrl') ?? undefined + + // If R2 is not configured, fall back to the legacy in-memory dump + if (!r2Bucket) { + // Import and delegate to the original dump function + const { dumpDatabaseRoute } = await import('./dump') + return dumpDatabaseRoute(dataSource, config) + } + + // Initialize the export + const state = await initializeExport( + dataSource, + config, + r2Bucket, + callbackUrl + ) + await saveExportState(state) + + // Start processing + const updatedState = await processExportChunk( + state, + dataSource, + config, + r2Bucket + ) + await saveExportState(updatedState) + + if (updatedState.status === 'completed') { + // Fast path: export completed within time limit — return the file directly + const r2Object = await r2Bucket.get(updatedState.r2Key) + + if (!r2Object) { + return createResponse( + undefined, + 'Export file not found in R2', + 500 + ) + } + + const content = await r2Object.text() + const headers = new Headers({ + 'Content-Type': 'application/sql', + 'Content-Disposition': `attachment; filename="${updatedState.r2Key}"`, + }) + + return new Response(content, { headers }) + } + + // Async path: export still processing — schedule alarm to continue + await scheduleAlarm(updatedState) + + return createResponse( + { + exportId: updatedState.exportId, + status: updatedState.status, + message: + 'Export is in progress. It will continue in the background.', + r2Key: updatedState.r2Key, + totalRowsExported: updatedState.totalRowsExported, + }, + undefined, + 202 + ) + } catch (error: any) { + console.error('Chunked Database Dump Error:', error) + return createResponse( + undefined, + 'Failed to create database dump: ' + + (error?.message || 'Unknown error'), + 500 + ) + } +} + +/** + * GET /export/status/:exportId — Check the status of an async export. + */ +export async function exportStatusRoute( + exportId: string, + getExportState: (id: string) => Promise +): Promise { + try { + const state = await getExportState(exportId) + + if (!state) { + return createResponse(undefined, 'Export not found', 404) + } + + return createResponse( + { + exportId: state.exportId, + status: state.status, + r2Key: state.r2Key, + totalRowsExported: state.totalRowsExported, + tablesTotal: state.tables.length, + tablesProcessed: state.currentTableIndex, + createdAt: new Date(state.createdAt).toISOString(), + updatedAt: new Date(state.updatedAt).toISOString(), + error: state.error, + }, + undefined, + 200 + ) + } catch (error: any) { + return createResponse(undefined, 'Failed to get export status', 500) + } +} + +/** + * GET /export/download/:exportId — Download a completed export from R2. + */ +export async function exportDownloadRoute( + exportId: string, + r2Bucket: R2Bucket, + getExportState: (id: string) => Promise +): Promise { + try { + const state = await getExportState(exportId) + + if (!state) { + return createResponse(undefined, 'Export not found', 404) + } + + if (state.status !== 'completed') { + return createResponse( + { + exportId: state.exportId, + status: state.status, + message: + state.status === 'processing' + ? 'Export is still in progress. Please try again later.' + : `Export failed: ${state.error}`, + }, + state.status === 'failed' ? state.error : undefined, + state.status === 'processing' ? 202 : 500 + ) + } + + const r2Object = await r2Bucket.get(state.r2Key) + + if (!r2Object) { + return createResponse( + undefined, + 'Export file not found in storage', + 404 + ) + } + + const headers = new Headers({ + 'Content-Type': 'application/sql', + 'Content-Disposition': `attachment; filename="${state.r2Key}"`, + 'Content-Length': r2Object.size.toString(), + }) + + return new Response(r2Object.body, { headers }) + } catch (error: any) { + return createResponse(undefined, 'Failed to download export', 500) + } +} diff --git a/src/handler.ts b/src/handler.ts index 3fa0085..d55021c 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -7,6 +7,13 @@ import { LiteREST } from './literest' import { executeQuery, executeTransaction } from './operation' import { createResponse, QueryRequest, QueryTransactionRequest } from './utils' import { dumpDatabaseRoute } from './export/dump' +import { + dumpDatabaseChunkedRoute, + exportStatusRoute, + exportDownloadRoute, + processExportChunk, + ExportState, +} from './export/chunked-dump' import { exportTableToJsonRoute } from './export/json' import { exportTableToCsvRoute } from './export/csv' import { importDumpRoute } from './import/dump' @@ -47,17 +54,20 @@ export class StarbaseDB { private plugins: StarbasePlugin[] private initialized: boolean = false private app: StarbaseApp + private r2Bucket?: R2Bucket constructor(options: { dataSource: DataSource config: StarbaseDBConfiguration plugins?: StarbasePlugin[] + r2Bucket?: R2Bucket }) { this.dataSource = options.dataSource this.config = options.config this.liteREST = new LiteREST(this.dataSource, this.config) this.plugins = options.plugins || [] this.app = new Hono() + this.r2Bucket = options.r2Bucket if ( this.dataSource.source === 'external' && @@ -120,10 +130,131 @@ export class StarbaseDB { } if (this.getFeature('export')) { - this.app.get('/export/dump', this.isInternalSource, async () => { - return dumpDatabaseRoute(this.dataSource, this.config) + // Legacy sync dump (kept for backwards compatibility when R2 is not configured) + this.app.get('/export/dump', this.isInternalSource, async (c) => { + return dumpDatabaseChunkedRoute( + this.dataSource, + this.config, + this.r2Bucket, + c.req.raw, + async (exportState: ExportState) => { + await this.dataSource.rpc.saveExportState(exportState) + // Schedule alarm 2 seconds from now to allow breathing room + await this.dataSource.rpc.setAlarm(Date.now() + 2000) + }, + async () => { + return await this.dataSource.rpc.getActiveExportState() + }, + async (state: ExportState) => { + await this.dataSource.rpc.saveExportState(state) + } + ) }) + // POST /export/dump — Start an async export with optional callbackUrl + this.app.post('/export/dump', this.isInternalSource, async (c) => { + return dumpDatabaseChunkedRoute( + this.dataSource, + this.config, + this.r2Bucket, + c.req.raw, + async (exportState: ExportState) => { + await this.dataSource.rpc.saveExportState(exportState) + await this.dataSource.rpc.setAlarm(Date.now() + 2000) + }, + async () => { + return await this.dataSource.rpc.getActiveExportState() + }, + async (state: ExportState) => { + await this.dataSource.rpc.saveExportState(state) + } + ) + }) + + // GET /export/status/:exportId — Check export progress + this.app.get( + '/export/status/:exportId', + this.isInternalSource, + async (c) => { + const exportId = c.req.param('exportId') + return exportStatusRoute(exportId, async (id: string) => { + return await this.dataSource.rpc.getExportState(id) + }) + } + ) + + // GET /export/download/:exportId — Download completed export + this.app.get( + '/export/download/:exportId', + this.isInternalSource, + async (c) => { + const exportId = c.req.param('exportId') + if (!this.r2Bucket) { + return createResponse( + undefined, + 'R2 bucket not configured. Cannot download async exports.', + 400 + ) + } + return exportDownloadRoute( + exportId, + this.r2Bucket, + async (id: string) => { + return await this.dataSource.rpc.getExportState(id) + } + ) + } + ) + + // Internal route: continue export from DO alarm + this.app.post( + '/internal/export/continue', + this.isInternalSource, + async (c) => { + if (!this.r2Bucket) { + return createResponse( + undefined, + 'R2 bucket not configured', + 400 + ) + } + + const activeState = + await this.dataSource.rpc.getActiveExportState() + + if (!activeState || activeState.status !== 'processing') { + return createResponse( + { message: 'No active export to continue' }, + undefined, + 200 + ) + } + + const updatedState = await processExportChunk( + activeState, + this.dataSource, + this.config, + this.r2Bucket + ) + await this.dataSource.rpc.saveExportState(updatedState) + + if (updatedState.status === 'processing') { + // Still more work — schedule another alarm + await this.dataSource.rpc.setAlarm(Date.now() + 2000) + } + + return createResponse( + { + exportId: updatedState.exportId, + status: updatedState.status, + totalRowsExported: updatedState.totalRowsExported, + }, + undefined, + 200 + ) + } + ) + this.app.get( '/export/json/:tableName', this.isInternalSource, diff --git a/src/index.ts b/src/index.ts index 4d08932..d69f5ca 100644 --- a/src/index.ts +++ b/src/index.ts @@ -56,6 +56,9 @@ export interface Env { HYPERDRIVE: Hyperdrive + // R2 Bucket for database export storage (optional) + EXPORT_BUCKET?: R2Bucket + // ## DO NOT REMOVE: TEMPLATE INTERFACE ## } @@ -232,6 +235,7 @@ export default { dataSource, config, plugins, + r2Bucket: env.EXPORT_BUCKET, }) const preAuthRequest = await starbase.handlePreAuth(request, ctx) diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts index 6c35c6f..9981254 100644 --- a/worker-configuration.d.ts +++ b/worker-configuration.d.ts @@ -13,4 +13,5 @@ interface Env { DATABASE_DURABLE_OBJECT: DurableObjectNamespace< import('./src/index').StarbaseDBDurableObject > + EXPORT_BUCKET?: R2Bucket } diff --git a/wrangler.toml b/wrangler.toml index 395c4ac..6e5279c 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -78,3 +78,10 @@ AUTH_JWKS_ENDPOINT = "" # [[hyperdrive]] # binding = "HYPERDRIVE" # id = "" + +# R2 Bucket for database export storage +# Required for large database exports that exceed the 30-second timeout +# Create a bucket: wrangler r2 bucket create starbasedb-exports +[[r2_buckets]] +binding = "EXPORT_BUCKET" +bucket_name = "starbasedb-exports"