Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions src/do.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { DurableObject } from 'cloudflare:workers'
import type { ExportState } from './export/chunked-dump'

export class StarbaseDBDurableObject extends DurableObject {
// Durable storage for the SQL database
Expand Down Expand Up @@ -72,9 +73,46 @@ export class StarbaseDBDurableObject extends DurableObject {
deleteAlarm: this.deleteAlarm.bind(this),
getStatistics: this.getStatistics.bind(this),
executeQuery: this.executeQuery.bind(this),
saveExportState: this.saveExportState.bind(this),
getExportState: this.getExportState.bind(this),
getActiveExportState: this.getActiveExportState.bind(this),
}
}

/**
* Save export state to DO storage for resumption after alarm.
*/
public async saveExportState(state: ExportState): Promise<void> {
await this.storage.put(
`export:${state.exportId}`,
JSON.stringify(state)
)
// Also track the active export (only one at a time)
if (state.status === 'processing') {
await this.storage.put('export:active', state.exportId)
} else {
await this.storage.delete('export:active')
}
}

/**
* Get export state by export ID.
*/
public async getExportState(exportId: string): Promise<ExportState | null> {
const raw = await this.storage.get<string>(`export:${exportId}`)
if (!raw) return null
return JSON.parse(raw) as ExportState
}

/**
* Get the currently active (in-progress) export state, if any.
*/
public async getActiveExportState(): Promise<ExportState | null> {
const activeId = await this.storage.get<string>('export:active')
if (!activeId) return null
return this.getExportState(activeId)
}

public async getAlarm(): Promise<number | null> {
return await this.storage.getAlarm()
}
Expand Down Expand Up @@ -106,6 +144,33 @@ export class StarbaseDBDurableObject extends DurableObject {

async alarm() {
try {
// Check if there's an active export to continue
const activeExport = await this.getActiveExportState()
if (activeExport && activeExport.status === 'processing') {
// The actual export continuation is handled by the Worker
// via a fetch to a special internal path
try {
await fetch(`https://localhost/internal/export/continue`, {
method: 'POST',
headers: {
Authorization: `Bearer ${this.clientAuthToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
exportId: activeExport.exportId,
}),
})
} catch (error) {
console.error('Failed to continue export via alarm:', error)
// Mark export as failed if we can't continue
activeExport.status = 'failed'
activeExport.error = 'Alarm continuation failed'
activeExport.updatedAt = Date.now()
await this.saveExportState(activeExport)
}
return
}

// Fetch all the tasks that are marked to emit an event for this cycle.
const task = (await this.executeQuery({
sql: 'SELECT * FROM tmp_cron_tasks WHERE is_active = 1;',
Expand Down
Loading