Skip to content

Commit 60dfab9

Browse files
author
Eric (SPG) Fang
committed
first try at remote mcap reading
1 parent deb489e commit 60dfab9

File tree

16 files changed

+1239
-144
lines changed

16 files changed

+1239
-144
lines changed

Cargo.lock

Lines changed: 933 additions & 35 deletions
Large diffs are not rendered by default.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ jsonwebtoken = { version = "9.3", default-features = false }
269269
lance = { version = "0.38.2", default-features = false } # When you update this, also update the list of features enabled for `datafusion` (~50 lines up)
270270
lance-index = { version = "0.38.2", default-features = false }
271271
lance-linalg = { version = "0.38.2", default-features = false }
272+
lance-io = { version = "0.38.2", default-features = false }
272273
libc = "0.2.176"
273274
linked-hash-map = { version = "0.5.6", default-features = false }
274275
log = "0.4.28"
@@ -289,6 +290,7 @@ notify = { version = "8.2", features = ["macos_kqueue"] }
289290
num-derive = "0.4.2"
290291
num-traits = "0.2.19"
291292
numpy = "0.25.0"
293+
object_store = "0.12.4"
292294
objc2-app-kit = "0.3.2"
293295
opentelemetry = { version = "0.31.0", features = ["metrics"] }
294296
opentelemetry-appender-tracing = "0.31.0"

crates/store/re_data_loader/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,16 @@ indexmap.workspace = true
4646
itertools.workspace = true
4747
notify = { workspace = true, features = ["crossbeam-channel"] }
4848
mcap.workspace = true
49+
lance-io = { workspace = true, features = ["gcp", "aws"] }
50+
object_store = { workspace = true }
4951
parking_lot.workspace = true
5052
rayon.workspace = true
5153
serde.workspace = true
5254
serde_json.workspace = true
5355
thiserror.workspace = true
56+
tokio = { workspace = true, features = ["rt-multi-thread"] }
5457
urdf-rs.workspace = true
58+
url.workspace = true
5559
walkdir.workspace = true
5660

5761
[target.'cfg(not(any(target_arch = "wasm32")))'.dependencies]

crates/store/re_data_loader/src/load_file.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ pub fn load_from_path(
2727

2828
re_tracing::profile_function!(path.to_string_lossy());
2929

30-
if !path.exists() {
31-
return Err(std::io::Error::new(
32-
std::io::ErrorKind::NotFound,
33-
format!("path does not exist: {path:?}"),
34-
)
35-
.into());
36-
}
30+
// if !path.exists() {
31+
// return Err(std::io::Error::new(
32+
// std::io::ErrorKind::NotFound,
33+
// format!("path does not exist: {path:?}"),
34+
// )
35+
// .into());
36+
// }
3737

3838
re_log::info!("Loading {path:?}…");
3939

crates/store/re_data_loader/src/loader_mcap.rs

Lines changed: 118 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
//! Rerun dataloader for MCAP files.
22
3-
use std::io::Cursor;
4-
use std::path::Path;
5-
use std::sync::mpsc::Sender;
6-
3+
use crate::{DataLoader, DataLoaderError, DataLoaderSettings, LoadedData};
4+
use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry};
5+
use mcap::sans_io::IndexedReaderOptions;
76
use re_chunk::RowId;
87
use re_log_types::{SetStoreInfo, StoreId, StoreInfo};
9-
use re_mcap::{LayerRegistry, SelectedLayers};
8+
use re_mcap::{AsyncSeekRead, LayerRegistry, SelectedLayers};
9+
use std::io::Cursor;
10+
use std::path::Path;
11+
use std::str::FromStr as _;
1012

11-
use crate::{DataLoader, DataLoaderError, DataLoaderSettings, LoadedData};
13+
use std::sync::mpsc::Sender;
14+
use url::Url;
1215

1316
const MCAP_LOADER_NAME: &str = "McapLoader";
1417

@@ -56,6 +59,19 @@ impl McapLoader {
5659
}
5760
}
5861

62+
fn mcap_option_from_url(url: &Url) -> anyhow::Result<IndexedReaderOptions> {
63+
let mut mcap_options = IndexedReaderOptions::default();
64+
for (name, val) in url.query_pairs() {
65+
if name == "start" {
66+
mcap_options = mcap_options.log_time_on_or_after(u64::from_str(&val)?);
67+
} else if name == "end" {
68+
mcap_options = mcap_options.log_time_before(u64::from_str(&val)?);
69+
}
70+
}
71+
72+
Ok(mcap_options)
73+
}
74+
5975
impl DataLoader for McapLoader {
6076
fn name(&self) -> crate::DataLoaderName {
6177
MCAP_LOADER_NAME.into()
@@ -68,11 +84,31 @@ impl DataLoader for McapLoader {
6884
path: std::path::PathBuf,
6985
tx: Sender<crate::LoadedData>,
7086
) -> std::result::Result<(), DataLoaderError> {
71-
if !is_mcap_file(&path) {
72-
return Err(DataLoaderError::Incompatible(path)); // simply not interested
87+
re_tracing::profile_function!();
88+
let url = path.to_string_lossy();
89+
let mcap_url = match Url::parse(&url) {
90+
Ok(mcap_url) => mcap_url,
91+
Err(err) => match err {
92+
url::ParseError::RelativeUrlWithoutBase => {
93+
match Url::parse(&format!("file://{url}")) {
94+
Ok(mcap_url) => mcap_url,
95+
Err(err) => return Err(DataLoaderError::Other(err.into())),
96+
}
97+
}
98+
_ => return Err(DataLoaderError::Other(err.into())),
99+
},
100+
};
101+
102+
let path = std::path::PathBuf::from_str(mcap_url.path()).unwrap();
103+
104+
if mcap_url.scheme() == "file" && !is_mcap_file(&path) {
105+
return Err(DataLoaderError::Incompatible(path.clone())); // simply not interested
73106
}
74107

75-
re_tracing::profile_function!();
108+
let mcap_options = match mcap_option_from_url(&mcap_url) {
109+
Ok(mcap_options) => mcap_options,
110+
Err(err) => return Err(DataLoaderError::Other(err)),
111+
};
76112

77113
// NOTE(1): `spawn` is fine, this whole function is native-only.
78114
// NOTE(2): this must spawned on a dedicated thread to avoid a deadlock!
@@ -85,8 +121,23 @@ impl DataLoader for McapLoader {
85121
std::thread::Builder::new()
86122
.name(format!("load_mcap({path:?}"))
87123
.spawn(move || {
88-
if let Err(err) = load_mcap_mmap(
89-
&path,
124+
// Load from local disk.
125+
if mcap_url.scheme() == "file" {
126+
if let Err(err) = load_mcap_mmap(
127+
&path,
128+
&mcap_options,
129+
&settings,
130+
&tx,
131+
&selected_layers,
132+
raw_fallback_enabled,
133+
) {
134+
re_log::error!("Failed to load MCAP file: {err}");
135+
}
136+
}
137+
// Load from cloud.
138+
else if let Err(err) = load_mcap_cloud(
139+
&mcap_url,
140+
&mcap_options,
90141
&settings,
91142
&tx,
92143
&selected_layers,
@@ -128,6 +179,7 @@ impl DataLoader for McapLoader {
128179
.spawn(move || {
129180
if let Err(err) = load_mcap_mmap(
130181
&filepath,
182+
&IndexedReaderOptions::default(),
131183
&settings,
132184
&tx,
133185
&selected_layers,
@@ -151,12 +203,11 @@ impl DataLoader for McapLoader {
151203
) -> std::result::Result<(), DataLoaderError> {
152204
if !is_mcap_file(&filepath) {
153205
return Err(DataLoaderError::Incompatible(filepath)); // simply not interested
154-
}
155-
206+
};
156207
let contents = contents.into_owned();
157-
158208
load_mcap(
159-
&contents,
209+
Box::new(Cursor::new(contents)),
210+
&IndexedReaderOptions::default(),
160211
settings,
161212
&tx,
162213
&self.selected_layers,
@@ -165,9 +216,48 @@ impl DataLoader for McapLoader {
165216
}
166217
}
167218

219+
#[cfg(not(target_arch = "wasm32"))]
220+
fn load_mcap_cloud(
221+
url: &Url,
222+
mcap_options: &IndexedReaderOptions,
223+
settings: &DataLoaderSettings,
224+
tx: &Sender<LoadedData>,
225+
selected_layers: &SelectedLayers,
226+
raw_fallback_enabled: bool,
227+
) -> anyhow::Result<()> {
228+
let mcap_reader = tokio::runtime::Builder::new_current_thread()
229+
.enable_all()
230+
.build()?
231+
.block_on(async {
232+
let store = ObjectStoreRegistry::default()
233+
.get_store(url.clone(), &ObjectStoreParams::default())
234+
.await?;
235+
let meta = store
236+
.inner
237+
.head(&object_store::path::Path::parse(url.path())?)
238+
.await?;
239+
let mcap_reader = Box::from(object_store::buffered::BufReader::new(
240+
store.inner.clone(),
241+
&meta,
242+
));
243+
244+
Ok::<Box<object_store::buffered::BufReader>, anyhow::Error>(mcap_reader)
245+
})?;
246+
247+
Ok(load_mcap(
248+
mcap_reader,
249+
mcap_options,
250+
settings,
251+
tx,
252+
selected_layers,
253+
raw_fallback_enabled,
254+
)?)
255+
}
256+
168257
#[cfg(not(target_arch = "wasm32"))]
169258
fn load_mcap_mmap(
170259
filepath: &std::path::PathBuf,
260+
mcap_options: &IndexedReaderOptions,
171261
settings: &DataLoaderSettings,
172262
tx: &Sender<LoadedData>,
173263
selected_layers: &SelectedLayers,
@@ -180,17 +270,26 @@ fn load_mcap_mmap(
180270
#[expect(unsafe_code)]
181271
let mmap = unsafe { memmap2::Mmap::map(&file)? };
182272

183-
load_mcap(&mmap, settings, tx, selected_layers, raw_fallback_enabled)
273+
load_mcap(
274+
Box::new(Cursor::new(mmap)),
275+
mcap_options,
276+
settings,
277+
tx,
278+
selected_layers,
279+
raw_fallback_enabled,
280+
)
184281
}
185282

186283
pub fn load_mcap(
187-
mcap: &[u8],
284+
mut mcap: Box<dyn AsyncSeekRead>,
285+
mcap_options: &IndexedReaderOptions,
188286
settings: &DataLoaderSettings,
189287
tx: &Sender<LoadedData>,
190288
selected_layers: &SelectedLayers,
191289
raw_fallback_enabled: bool,
192290
) -> Result<(), DataLoaderError> {
193291
re_tracing::profile_function!();
292+
194293
let store_id = settings.recommended_store_id();
195294

196295
if tx
@@ -223,16 +322,14 @@ pub fn load_mcap(
223322
}
224323
};
225324

226-
let reader = Cursor::new(&mcap);
227-
228-
let summary = re_mcap::read_summary(reader)?
325+
let summary = re_mcap::read_summary(&mut mcap)?
229326
.ok_or_else(|| anyhow::anyhow!("MCAP file does not contain a summary"))?;
230327

231328
// TODO(#10862): Add warning for channel that miss semantic information.
232329
LayerRegistry::all_builtin(raw_fallback_enabled)
233330
.select(selected_layers)
234331
.plan(&summary)?
235-
.run(mcap, &summary, &mut send_chunk)?;
332+
.run(&mut mcap, &summary, mcap_options, &mut send_chunk)?;
236333

237334
Ok(())
238335
}

crates/store/re_data_loader/src/loader_rrd.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl crate::DataLoader for RrdLoader {
2929

3030
let mut extension = crate::extension(&filepath);
3131
if !matches!(extension.as_str(), "rbl" | "rrd") {
32-
if filepath.is_file() || filepath.is_dir() {
32+
if filepath.is_file() || filepath.is_dir() || !extension.is_empty() {
3333
// NOTE: blueprints and recordings have the same file format
3434
return Err(crate::DataLoaderError::Incompatible(filepath.clone()));
3535
} else {

crates/store/re_data_loader/tests/test_mcap_loader.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,33 @@
22
mod tests {
33
use std::sync::Arc;
44

5+
use mcap::sans_io::IndexedReaderOptions;
56
use re_chunk::Chunk;
67
use re_chunk_store::{ChunkStore, ChunkStoreConfig, ChunkStoreHandle};
78
use re_data_loader::loader_mcap::load_mcap;
89
use re_data_loader::{DataLoaderSettings, LoadedData};
910
use re_log_types::StoreId;
1011
use re_mcap::layers::SelectedLayers;
12+
use std::io::Cursor;
1113

1214
// Load an MCAP file into a list of chunks.
1315
fn load_mcap_chunks(path: impl AsRef<std::path::Path>) -> Vec<Chunk> {
1416
let path = path.as_ref();
1517
println!("Loading MCAP file: {}", path.display());
16-
let mcap_data = std::fs::read(path).unwrap();
18+
let mcap_data = Box::new(Cursor::new(std::fs::read(path).unwrap()));
19+
1720
let (tx, rx) = std::sync::mpsc::channel();
1821
let settings = DataLoaderSettings::recommended("test");
19-
load_mcap(&mcap_data, &settings, &tx, &SelectedLayers::All, false).unwrap();
22+
let mcap_options = IndexedReaderOptions::default();
23+
load_mcap(
24+
mcap_data,
25+
&mcap_options,
26+
&settings,
27+
&tx,
28+
&SelectedLayers::All,
29+
false,
30+
)
31+
.unwrap();
2032
drop(tx);
2133

2234
// Collect chunks

crates/store/re_data_source/src/data_source.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ impl LogDataSource {
5757
pub fn from_uri(_file_source: re_log_types::FileSource, url: &str) -> Option<Self> {
5858
#[cfg(not(target_arch = "wasm32"))]
5959
{
60-
use itertools::Itertools as _;
61-
6260
fn looks_like_windows_abs_path(path: &str) -> bool {
6361
let path = path.as_bytes();
6462
// "C:/" etc
@@ -79,20 +77,10 @@ impl LogDataSource {
7977
true // Unix absolute path
8078
} else if looks_like_windows_abs_path(uri) {
8179
true
82-
} else if uri.starts_with("http:") || uri.starts_with("https:") {
83-
false
8480
} else {
85-
// We use a simple heuristic here: if there are multiple dots, it is likely an url,
86-
// like "example.com/foo.zip".
87-
// If there is only one dot, we treat it as an extension and look it up in a list of common
88-
// file extensions:
89-
90-
let parts = uri.split('.').collect_vec();
91-
if parts.len() == 2 {
92-
true
93-
} else {
94-
false // Too many dots; assume an url
95-
}
81+
!(uri.starts_with("http:")
82+
|| uri.starts_with("https:")
83+
|| uri.starts_with("rerun:"))
9684
}
9785
}
9886

crates/store/re_mcap/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ saturating_cast.workspace = true
3333
serde.workspace = true
3434
serde_bytes.workspace = true
3535
thiserror.workspace = true
36+
tokio = { workspace = true, features = ["rt-multi-thread"] }
3637

3738
[dev-dependencies]
3839
insta = { workspace = true, features = ["filters", "redactions"] }

0 commit comments

Comments
 (0)