Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ Version 0.40.0 contains a new database migration, version 7, but running it is **only necessary if you're using SQLite**. It converts all uses of `json` to `jsonb`. If using Postgres, version 7 is a no-op. You can run it now or wait for another migration in the future and run the two together.

See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version:

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

If not using River's internal migration system, the raw SQL can alternatively be dumped with:

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-get --database-url sqlite:// --version 6 --up > river7.up.sql
river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql
```

### Changed

- Convert SQLite JSON columns to JSONB (including migration). [PR #1224](https://github.com/riverqueue/river/pull/1224).
- Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276)
- Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
--
-- Notification outbox rollback.
--

DROP TABLE /* TEMPLATE: schema */river_notification;

--
-- SQLite JSONB conversion rollback.
--
-- No-op. PostgreSQL already stores River JSON columns as jsonb.
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
--
-- Notification outbox.
--

CREATE TABLE /* TEMPLATE: schema */river_notification (
id bigserial PRIMARY KEY,
created_at timestamptz NOT NULL DEFAULT now(),
Expand All @@ -8,3 +12,8 @@ CREATE TABLE /* TEMPLATE: schema */river_notification (

CREATE INDEX river_notification_created_at_idx ON /* TEMPLATE: schema */river_notification (created_at);
CREATE INDEX river_notification_topic_id_idx ON /* TEMPLATE: schema */river_notification (topic, id);

--
-- SQLite JSONB conversion.
--
-- No-op. PostgreSQL already stores River JSON columns as jsonb.
129 changes: 129 additions & 0 deletions riverdriver/riverdrivertest/job_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,4 +814,133 @@ func exerciseJobInsert[TTx any](ctx context.Context, t *testing.T,
assertJobEqualsInput(t, results[0], jobParams1)
assertJobEqualsInput(t, results[1], jobParams2)
})

t.Run("SQLiteStoresJobJSONAsJSONB", func(t *testing.T) {
t.Parallel()

exec, bundle := setup(ctx, t)
if bundle.driver.DatabaseName() != riverdriver.DatabaseNameSQLite {
t.Skip("SQLite-specific JSONB storage assertion")
}

requireJSONBValues := func(t *testing.T, id int64, expectedByColumn map[string]string) {
t.Helper()

for column, expectedJSON := range expectedByColumn {
var matchesJSONB bool
require.NoError(t, exec.QueryRow(ctx,
fmt.Sprintf("SELECT %s = jsonb(?) FROM river_job WHERE id = ?", column),
expectedJSON,
id,
).Scan(&matchesJSONB))
require.True(t, matchesJSONB, "column %s should be stored as JSONB", column)
}
}

var (
attemptError1 = `{"at":"2026-01-02T03:04:05Z","attempt":1,"error":"error 1","trace":""}`
attemptError2 = `{"at":"2026-01-02T03:04:06Z","attempt":2,"error":"error 2","trace":""}`
attemptError3 = `{"at":"2026-01-02T03:04:07Z","attempt":3,"error":"error 3","trace":""}`
idStart = rand.Int64()
)

fastManyResults, err := exec.JobInsertFastMany(ctx, &riverdriver.JobInsertFastManyParams{
Jobs: []*riverdriver.JobInsertFastParams{
{
EncodedArgs: []byte(`{"encoded": "fast-many"}`),
ID: ptrutil.Ptr(idStart),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "fast-many"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
State: rivertype.JobStateAvailable,
Tags: []string{"fast-many"},
},
},
})
require.NoError(t, err)
require.Len(t, fastManyResults, 1)
requireJSONBValues(t, fastManyResults[0].Job.ID, map[string]string{
"args": `{"encoded":"fast-many"}`,
"tags": `["fast-many"]`,
})

fastManyNoReturningID := idStart + 1
rowsAffected, err := exec.JobInsertFastManyNoReturning(ctx, &riverdriver.JobInsertFastManyParams{
Jobs: []*riverdriver.JobInsertFastParams{
{
EncodedArgs: []byte(`{"encoded": "fast-many-no-returning"}`),
ID: &fastManyNoReturningID,
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Metadata: []byte(`{"meta": "fast-many-no-returning"}`),
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
State: rivertype.JobStateAvailable,
Tags: []string{"fast-many-no-returning"},
},
},
})
require.NoError(t, err)
require.Equal(t, 1, rowsAffected)
requireJSONBValues(t, fastManyNoReturningID, map[string]string{
"args": `{"encoded":"fast-many-no-returning"}`,
"metadata": `{"meta":"fast-many-no-returning"}`,
"tags": `["fast-many-no-returning"]`,
})

fullResult, err := exec.JobInsertFull(ctx, testfactory.Job_Build(t, &testfactory.JobOpts{
AttemptedBy: []string{"full"},
EncodedArgs: []byte(`{"encoded": "full"}`),
Errors: [][]byte{[]byte(attemptError1)},
Metadata: []byte(`{"meta": "full"}`),
Tags: []string{"full"},
}))
require.NoError(t, err)
requireJSONBValues(t, fullResult.ID, map[string]string{
"args": `{"encoded":"full"}`,
"attempted_by": `["full"]`,
"errors": `[` + attemptError1 + `]`,
"metadata": `{"meta":"full"}`,
"tags": `["full"]`,
})

fullManyResults, err := exec.JobInsertFullMany(ctx, &riverdriver.JobInsertFullManyParams{
Jobs: []*riverdriver.JobInsertFullParams{
testfactory.Job_Build(t, &testfactory.JobOpts{
AttemptedBy: []string{"full-many"},
EncodedArgs: []byte(`{"encoded": "full-many"}`),
Errors: [][]byte{[]byte(attemptError2)},
Metadata: []byte(`{"meta": "full-many"}`),
Tags: []string{"full-many"},
}),
},
})
require.NoError(t, err)
require.Len(t, fullManyResults, 1)
requireJSONBValues(t, fullManyResults[0].ID, map[string]string{
"args": `{"encoded":"full-many"}`,
"attempted_by": `["full-many"]`,
"errors": `[` + attemptError2 + `]`,
"metadata": `{"meta":"full-many"}`,
"tags": `["full-many"]`,
})

updatedJob, err := exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
ID: fullManyResults[0].ID,
AttemptedBy: []string{"updated"},
AttemptedByDoUpdate: true,
Errors: [][]byte{[]byte(attemptError3)},
ErrorsDoUpdate: true,
Metadata: []byte(`{"meta": "updated"}`),
MetadataDoUpdate: true,
})
require.NoError(t, err)
requireJSONBValues(t, updatedJob.ID, map[string]string{
"attempted_by": `["updated"]`,
"errors": `[` + attemptError3 + `]`,
"metadata": `{"meta":"updated"}`,
})
})
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
--
-- Notification outbox rollback.
--

DROP TABLE /* TEMPLATE: schema */river_notification;

--
-- SQLite JSONB conversion rollback.
--
-- No-op. PostgreSQL already stores River JSON columns as jsonb.
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
--
-- Notification outbox.
--

CREATE TABLE /* TEMPLATE: schema */river_notification (
id bigserial PRIMARY KEY,
created_at timestamptz NOT NULL DEFAULT now(),
Expand All @@ -8,3 +12,8 @@ CREATE TABLE /* TEMPLATE: schema */river_notification (

CREATE INDEX river_notification_created_at_idx ON /* TEMPLATE: schema */river_notification (created_at);
CREATE INDEX river_notification_topic_id_idx ON /* TEMPLATE: schema */river_notification (topic, id);

--
-- SQLite JSONB conversion.
--
-- No-op. PostgreSQL already stores River JSON columns as jsonb.
2 changes: 1 addition & 1 deletion riverdriver/riversqlite/internal/dbsqlc/river_client.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CREATE TABLE river_client (
id text PRIMARY KEY NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
metadata blob NOT NULL DEFAULT (json('{}')),
metadata jsonb NOT NULL DEFAULT (jsonb('{}')),
paused_at timestamp,
updated_at timestamp NOT NULL,
CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CREATE TABLE river_client_queue (
name text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
max_workers integer NOT NULL DEFAULT 0,
metadata blob NOT NULL DEFAULT (json('{}')),
metadata jsonb NOT NULL DEFAULT (jsonb('{}')),
num_jobs_completed integer NOT NULL DEFAULT 0,
num_jobs_running integer NOT NULL DEFAULT 0,
updated_at timestamp NOT NULL,
Expand Down
Loading
Loading