Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/subgraph/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ where
),
entity_lfu_cache: LfuCache::new(),
cached_head_ptr: None,
postponed_indexes_created: false,
},
logger,
metrics,
Expand Down Expand Up @@ -731,6 +732,21 @@ where

let is_caught_up = self.is_caught_up(&block_ptr).await.non_deterministic()?;

if !self.state.postponed_indexes_created
&& close_to_chain_head(
&block_ptr,
&self.state.cached_head_ptr,
ENV_VARS.postpone_indexes_creation_threshold,
)
{
self.state.postponed_indexes_created = true;
self.inputs
.store
.create_postponed_indexes()
.await
.non_deterministic()?;
}

self.inputs
.store
.transact_block_operations(
Expand Down
3 changes: 3 additions & 0 deletions core/src/subgraph/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ pub struct IndexingState {
pub skip_ptr_updates_timer: Instant,
pub entity_lfu_cache: EntityLfuCache,
pub cached_head_ptr: Option<BlockPtr>,
/// Set to `true` once postponed indexes have been created. This
/// ensures we only trigger index creation once per subgraph run.
pub postponed_indexes_created: bool,
}
6 changes: 6 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,12 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker {

async fn health(&self) -> Result<SubgraphHealth, StoreError>;

/// Create indexes whose creation was postponed at deployment time.
/// This should be called when a subgraph gets close to the chain
/// head. Calling it when all postponed indexes already exist is safe
/// and a no-op.
async fn create_postponed_indexes(&self) -> Result<(), StoreError>;

/// Wait for the background writer to finish processing its queue
async fn flush(&self) -> Result<(), StoreError>;

Expand Down
11 changes: 11 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ pub struct EnvVars {
///
/// Set the flag `GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION`. Off by default.
pub postpone_attribute_index_creation: bool,
/// When a subgraph gets within this many blocks of the chain head,
/// create any indexes whose creation was postponed. Only has an effect
/// when `postpone_attribute_index_creation` is set.
///
/// Set by the environment variable
/// `GRAPH_POSTPONE_INDEXES_CREATION_THRESHOLD`. The default value is
/// 10000.
pub postpone_indexes_creation_threshold: BlockNumber,
/// Verbose logging of mapping inputs.
///
/// Set by the flag `GRAPH_LOG_TRIGGER_DATA`. Off by
Expand Down Expand Up @@ -345,6 +353,7 @@ impl EnvVars {
enable_select_by_specific_attributes: inner.enable_select_by_specific_attributes.0,
postpone_attribute_index_creation: inner.postpone_attribute_index_creation.0
|| cfg!(debug_assertions),
postpone_indexes_creation_threshold: inner.postpone_indexes_creation_threshold,
log_trigger_data: inner.log_trigger_data.0,
explorer_ttl: Duration::from_secs(inner.explorer_ttl_in_secs),
explorer_lock_threshold: Duration::from_millis(inner.explorer_lock_threshold_in_msec),
Expand Down Expand Up @@ -549,6 +558,8 @@ struct Inner {
enable_select_by_specific_attributes: EnvVarBoolean,
#[envconfig(from = "GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION", default = "false")]
postpone_attribute_index_creation: EnvVarBoolean,
#[envconfig(from = "GRAPH_POSTPONE_INDEXES_CREATION_THRESHOLD", default = "10000")]
postpone_indexes_creation_threshold: i32,
#[envconfig(from = "GRAPH_LOG_TRIGGER_DATA", default = "false")]
log_trigger_data: EnvVarBoolean,
#[envconfig(from = "GRAPH_EXPLORER_TTL", default = "10")]
Expand Down
5 changes: 3 additions & 2 deletions node/src/manager/commands/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use graph::{
prelude::{anyhow, StoreError},
};
use graph_store_postgres::{
command_support::index::{CreateIndex, Method},
command_support::index::{CreateIndex, IndexCreator, Method},
ConnectionPool, SubgraphStore,
};
use std::io::Write as _;
Expand Down Expand Up @@ -183,8 +183,9 @@ pub async fn list(
let mut term = Terminal::new();

if to_sql {
let creat = IndexCreator::new(concurrent, if_not_exists, true);
for index in indexes {
writeln!(term, "{};", index.to_sql(concurrent, if_not_exists)?)?;
writeln!(term, "{};", creat.to_sql(&index)?)?;
}
} else {
let mut first = true;
Expand Down
74 changes: 74 additions & 0 deletions store/postgres/examples/create_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::{collections::HashSet, env, fs, time::Instant};

use graph::anyhow;
use graph_store_postgres::command_support::index::{CreateIndex, Expr};

/// Parse index definitions from a file and print information about any that
/// we could not parse.
///
/// The easiest way to create a file with index definitions is to run this
/// query in psql:
/// ```sql
/// select indexdef from pg_indexes where schemaname like 'sgd%' \g /tmp/idxs.txt
/// ```
pub fn main() -> anyhow::Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() != 2 {
return Err(anyhow::anyhow!("usage: create_index <index_file>"));
}
let idxs = fs::read_to_string(&args[1])?;

let mut parsed: usize = 0;
let mut failed: usize = 0;
let mut skipped: usize = 0;
let mut unknown_cols = HashSet::new();
let start = Instant::now();
for idxdef in idxs.lines() {
let idxdef = idxdef.trim();
if idxdef.is_empty() || !idxdef.starts_with("CREATE") && !idxdef.starts_with("create") {
skipped += 1;
continue;
}

let idx = CreateIndex::parse(idxdef.to_string());

match &idx {
CreateIndex::Parsed { columns, .. } => {
let mut failed_col = false;
for column in columns {
match column {
Expr::Unknown(expr) => {
unknown_cols.insert(expr.clone());
failed_col = true;
break;
}
_ => { /* ok */ }
}
}
if failed_col {
failed += 1;
} else {
parsed += 1
}
}
CreateIndex::Unknown { defn } => {
println!("Can not parse index definition: {}", defn);
failed += 1;
}
}
}

if !unknown_cols.is_empty() {
println!("Unknown columns:");
for col in unknown_cols {
println!(" {}", col);
}
}

println!(
"total: {}, parsed: {parsed}, failed: {failed}, skipped: {skipped}, elapsed: {}s",
parsed + failed + skipped,
start.elapsed().as_secs()
);
Ok(())
}
2 changes: 1 addition & 1 deletion store/postgres/examples/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn print_delete_all(layout: &Layout) {
}

fn print_ddl(layout: &Layout) {
let ddl = ensure(layout.as_ddl(None), "Failed to generate DDL");
let ddl = ensure(layout.as_ddl(), "Failed to generate DDL");
println!("{}", ddl);
}

Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub struct Catalog {

/// Whether the database supports `int4_minmax_multi_ops` etc.
/// See the [Postgres docs](https://www.postgresql.org/docs/15/brin-builtin-opclasses.html)
has_minmax_multi_ops: bool,
pub has_minmax_multi_ops: bool,

/// Whether the column `pg_stats.range_bounds_histogram` introduced in
/// Postgres 17 exists. See the [Postgres
Expand Down
97 changes: 45 additions & 52 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! operation can resume after an interruption, for example, because
//! `graph-node` was restarted while the copy was running.
use std::{
collections::HashSet,
convert::TryFrom,
future::Future,
pin::Pin,
Expand All @@ -23,15 +24,23 @@ use std::{
};

use diesel::{
dsl::sql, insert_into, select, sql_query, update, ExpressionMethods, OptionalExtension,
QueryDsl,
dsl::sql, insert_into, select, update, ExpressionMethods, OptionalExtension, QueryDsl,
};
use diesel_async::{
scoped_futures::{ScopedBoxFuture, ScopedFutureExt},
AsyncConnection,
};
use diesel_async::{RunQueryDsl, SimpleAsyncConnection};

use crate::{
advisory_lock, catalog, deployment,
dynds::DataSourcesTable,
primary::{DeploymentId, Primary, Site},
relational::{index::IndexList, Layout, Table},
relational_queries as rq,
vid_batcher::{VidBatcher, VidRange},
AsyncPgConnection, ConnectionPool,
};
use graph::{
futures03::{
future::{select_all, BoxFuture},
Expand All @@ -44,17 +53,6 @@ use graph::{
schema::EntityType,
slog::error,
};
use itertools::Itertools;

use crate::{
advisory_lock, catalog, deployment,
dynds::DataSourcesTable,
primary::{DeploymentId, Primary, Site},
relational::{index::IndexList, Layout, Table},
relational_queries as rq,
vid_batcher::{VidBatcher, VidRange},
AsyncPgConnection, ConnectionPool,
};

const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60);

Expand Down Expand Up @@ -1001,6 +999,18 @@ impl Connection {
})
}

/// Run `callback` in a transaction using the connection in `self.conn`.
/// This will return an error if `self.conn` is `None`, which happens
/// while a background task is copying a table.
fn get_conn(&mut self) -> Result<&mut AsyncPgConnection, StoreError> {
let Some(conn) = self.conn.as_mut() else {
return Err(internal_error!(
"copy connection has been handed to background task but not returned yet (get_conn)"
));
};
Ok(&mut conn.inner)
}

/// Run `callback` in a transaction using the connection in `self.conn`.
/// This will return an error if `self.conn` is `None`, which happens
/// while a background task is copying a table.
Expand All @@ -1017,12 +1027,7 @@ impl Connection {
R: Send + 'a,
'a: 'conn,
{
let Some(conn) = self.conn.as_mut() else {
return Err(internal_error!(
"copy connection has been handed to background task but not returned yet (transaction)"
));
};
let conn = &mut conn.inner;
let conn = self.get_conn()?;
Ok(conn.transaction(|conn| callback(conn).scope_boxed()))
}

Expand Down Expand Up @@ -1232,45 +1237,33 @@ impl Connection {
// Create indexes for all the attributes that were postponed at the start of
// the copy/graft operations.
// First recreate the indexes that existed in the original subgraph.
let creat = self.dst.index_creator(false, true);
for table in state.all_tables() {
let arr = index_list.indexes_for_table(
&self.dst.site.namespace,
&table.src.name.to_string(),
&table.dst,
true,
false,
true,
)?;

for (_, sql) in arr {
let query = sql_query(format!("{};", sql));
self.transaction(|conn| {
async { query.execute(conn).await.map_err(StoreError::from) }.scope_boxed()
})?
.await?;
}
let dst_nsp = self.dst.site.namespace.to_string();
let idxs = index_list
.indexes_for_table(&table.dst)
.filter(|idx| idx.to_postpone())
.map(|idx| idx.with_nsp(dst_nsp.clone()))
.collect::<Result<Vec<_>, _>>()?;

let conn = self.get_conn()?;
creat.execute_many(conn, &idxs).await?;
}

// Second create the indexes for the new fields.
// Here we need to skip those created in the first step for the old fields.
// Second create the indexes for the new fields that don't exist in
// the source.
for table in state.all_tables() {
let orig_colums = table
.src
.columns
.iter()
.map(|c| c.name.to_string())
.collect_vec();
for sql in table
let src_columns: HashSet<&str> =
table.src.columns.iter().map(|c| c.name.as_str()).collect();
let new_idxs: Vec<_> = table
.dst
.create_postponed_indexes(orig_colums, false)
.indexes(&self.dst.input_schema)
.map_err(|_| internal_error!("failed to generate indexes for copy"))?
.into_iter()
{
let query = sql_query(sql);
self.transaction(|conn| {
async { query.execute(conn).await.map_err(StoreError::from) }.scope_boxed()
})?
.await?;
}
.filter(|idx| idx.to_postpone() && idx.references_column_not_in(&src_columns))
.collect();
let conn = self.get_conn()?;
creat.execute_many(conn, &new_idxs).await?;
}

self.copy_private_data_sources(&state).await?;
Expand Down
Loading