diff --git a/Cargo.lock b/Cargo.lock index 64b55ffedf..7d75f4dea4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1045,7 +1045,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1170,7 +1170,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2233,7 +2233,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3136,7 +3136,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3591,7 +3591,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3601,7 +3601,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230a1b821ccbd75b185820a1f1ff7b14d21da1e442e22c0863ea5f08771a8874" dependencies = [ "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4301,6 +4301,7 @@ dependencies = [ "crc32c", "criterion", "croaring", + "crossbeam-queue", "dashmap", "futures-util", "governor", @@ -4562,8 +4563,7 @@ dependencies = [ [[package]] name = "tycho-types" version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "104f8eaefe8d4c12e480906a876f0c6dd6f68c1e7eaeb8fd2c236e8f729019fd" +source = "git+https://github.com/broxus/tycho-types.git?rev=273ce77#273ce77279202d89cefa585ff4820de0241012e9" dependencies = [ "ahash", "anyhow", @@ -4596,8 +4596,7 @@ dependencies = [ [[package]] name = "tycho-types-abi-proc" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c813c08a03554252747f9e5e88485d9af4c30077394a1c3bb6d774ddca56b07" +source = "git+https://github.com/broxus/tycho-types.git?rev=273ce77#273ce77279202d89cefa585ff4820de0241012e9" dependencies = [ "anyhow", "proc-macro2", @@ -4608,8 +4607,7 @@ dependencies = [ [[package]] name = "tycho-types-proc" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad05cf4ab89631f8c11d85c3aa80f781502440f75361d251f866e0d76ae9d31" +source = "git+https://github.com/broxus/tycho-types.git?rev=273ce77#273ce77279202d89cefa585ff4820de0241012e9" dependencies = [ "proc-macro2", "quote", @@ -5035,7 +5033,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9dd44e4dd7..f114b0f63b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -182,6 +182,7 @@ tycho-wu-tuner = { path = "./wu-tuner", version = "0.3.10" } [patch.crates-io] # patches here +tycho-types = { git = "https://github.com/broxus/tycho-types.git", rev = "273ce77" } [workspace.lints.rust] future_incompatible = "warn" diff --git a/block-util/src/dict.rs b/block-util/src/dict.rs index 4eb42f73ae..dbd32a6cfe 100644 --- a/block-util/src/dict.rs +++ b/block-util/src/dict.rs @@ -211,6 +211,15 @@ where K: DictKey, A: Default, { + let (dict_root, _) = dict.into_parts(); + split_dict_raw(dict_root.into_root(), K::BITS, depth) +} + +pub fn split_dict_raw( + dict: Option, + key_bit_len: u16, + depth: u8, +) -> Result, Error> { fn split_dict_impl( dict: Option, key_bit_len: u16, @@ -241,8 +250,72 @@ where let mut shards = FastHashMap::with_capacity_and_hasher(2usize.pow(depth as _), Default::default()); + split_dict_impl(dict, key_bit_len, depth, &mut shards)?; + + Ok(shards) +} + +/// Splits aug dict by shards, preserving empty shards. +/// E.g. if `depth == 1` and all entries are in the left shard, +/// then will return `None` cell for the right shard. +pub fn split_aug_dict_raw_by_shards( + workchain: i32, + dict: AugDict, + depth: u8, +) -> Result)>, Error> +where + K: DictKey, + A: Default, +{ + fn split_dict_impl( + shard: &ShardIdent, + dict: Option, + key_bit_len: u16, + depth: u8, + shards: &mut Vec<(ShardIdent, Option)>, + ) -> Result<(), Error> { + if depth == 0 { + shards.push((*shard, dict)); + return Ok(()); + } + + let Some((left_shard, right_shard)) = shard.split() else { + shards.push((*shard, dict)); + return Ok(()); + }; + + let PartialSplitDict { + remaining_bit_len, + left_branch, + right_branch, + } = dict_split_raw(dict.as_ref(), key_bit_len, Cell::empty_context())?; + + split_dict_impl( + &left_shard, + left_branch, + remaining_bit_len, + depth - 1, + shards, + )?; + split_dict_impl( + &right_shard, + right_branch, + remaining_bit_len, + depth - 1, + shards, + ) + } + + let mut shards = Vec::with_capacity(2usize.pow(depth as _)); + let (dict_root, _) = dict.into_parts(); - split_dict_impl(dict_root.into_root(), K::BITS, depth, &mut shards)?; + split_dict_impl( + &ShardIdent::new_full(workchain), + dict_root.into_root(), + K::BITS, + depth, + &mut shards, + )?; Ok(shards) } diff --git a/cli/src/cmd/tools/dump_state.rs b/cli/src/cmd/tools/dump_state.rs index 1c5c0667f5..6ae65e6cac 100644 --- a/cli/src/cmd/tools/dump_state.rs +++ b/cli/src/cmd/tools/dump_state.rs @@ -250,7 +250,7 @@ impl Dumper { let dir = Dir::new(self.output_dir.path().join("persistents"))?; self.storage .shard_state_storage() - .write_persistent_shard_state(dir, *block_id, *state.root_cell().repr_hash(), None) + .write_persistent_shard_state(dir, *block_id, *state.root_cell().repr_hash(), 0, None) .await .context(format!("Failed to write state for {}", block_id))?; println!(" - Persistent state saved"); diff --git a/cli/src/cmd/tools/hardfork.rs b/cli/src/cmd/tools/hardfork.rs index ae9932a6e1..2d9e9e1125 100644 --- a/cli/src/cmd/tools/hardfork.rs +++ b/cli/src/cmd/tools/hardfork.rs @@ -65,7 +65,12 @@ impl Cmd { }) .await?; - let storage = CoreStorage::open(ctx, CoreStorageConfig::default().without_gc()).await?; + let storage_config = CoreStorageConfig::default().without_gc(); + anyhow::ensure!( + storage_config.persistent_state_split_depth == 0, + "hardfork creation supports only persistent_state_split_depth = 0" + ); + let storage = CoreStorage::open(ctx, storage_config).await?; let Some(mc_block_id) = storage .shard_state_storage() diff --git a/collator/src/test_utils.rs b/collator/src/test_utils.rs index 358062293d..2de2315629 100644 --- a/collator/src/test_utils.rs +++ b/collator/src/test_utils.rs @@ -19,6 +19,7 @@ use tycho_storage::StorageContext; use tycho_types::boc::{Boc, BocRepr}; use tycho_types::cell::CellBuilder; use tycho_types::models::{Block, BlockId, ShardStateUnsplit}; +#[cfg(any(test, feature = "test"))] use tycho_util::compression::zstd_decompress_simple; use crate::internal_queue::queue::{QueueConfig, QueueFactory, QueueFactoryStdImpl}; diff --git a/core/Cargo.toml b/core/Cargo.toml index 9332ff04d6..0bff8aa9bb 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -45,6 +45,7 @@ castaway = { workspace = true } clap = { workspace = true, optional = true } crc32c = { workspace = true } croaring = { workspace = true } +crossbeam-queue = { workspace = true } dashmap = { workspace = true } futures-util = { workspace = true } governor = { workspace = true, optional = true } diff --git a/core/src/block_strider/starter/cold_boot.rs b/core/src/block_strider/starter/cold_boot.rs index 867e4f8b46..d659174d8a 100644 --- a/core/src/block_strider/starter/cold_boot.rs +++ b/core/src/block_strider/starter/cold_boot.rs @@ -1,4 +1,5 @@ use std::fs::File; +use std::path::Path; use std::pin::pin; use std::sync::Arc; use std::time::Duration; @@ -29,7 +30,7 @@ use crate::overlay_client::PunishReason; use crate::proto::blockchain::{KeyBlockProof, ZerostateProof}; use crate::storage::{ BlockHandle, CoreStorage, KeyBlocksDirection, MaybeExistingHandle, NewBlockMeta, - PersistentStateKind, + PersistentStateKind, PersistentStateMeta, }; impl StarterInner { @@ -587,6 +588,8 @@ impl StarterInner { handle_storage.set_has_shard_state(&handle); handle_storage.set_block_committed(&handle); + state_storage.finish_raw_import()?; + let _mc_handle = mc_zerostate.ref_mc_state_handle().clone(); // TODO: Somehow save the original file. @@ -596,8 +599,6 @@ impl StarterInner { .store_shard_state(mc_block_id.seqno, &handle) .await?; - state_storage.finish_raw_import()?; - tracing::info!("imported zerostates"); Ok((handle, mc_zerostate)) @@ -631,6 +632,8 @@ impl StarterInner { mc_block_id: &BlockId, block_id: &BlockId, ) -> Result<(BlockHandle, BlockStuff)> { + tracing::info!(%block_id, "download and import queue and shard state: started"); + // First download the block itself, with all its parts (proof and queue diff). let (handle, block) = self.download_block_data(mc_block_id, block_id).await?; self.storage @@ -659,6 +662,8 @@ impl StarterInner { self.download_queue_state(&handle, top_update).await?; } + tracing::info!(%block_id, "download and import queue and shard state: finished"); + Ok((handle, block)) } @@ -776,7 +781,7 @@ impl StarterInner { expected_state_root: &HashBytes, ) -> Result<(BlockHandle, ShardStateStuff)> { enum StoreZeroStateFrom { - File(FileBuilder), + File(DownloadedPersistentStateBundle), State(RefMcStateHandle), } @@ -788,15 +793,22 @@ impl StarterInner { let state_file = temp.file(format!("state_{block_id}")); let state_file_path = state_file.path().to_owned(); + let state_meta_file = state_file.with_extension("meta.json"); + let state_meta_file_path = state_meta_file.path().to_owned(); // NOTE: Intentionally dont spawn yet - let remove_state_file = async move || { - if let Err(e) = tokio::fs::remove_file(&state_file_path).await { - tracing::warn!( - path = %state_file_path.display(), - "failed to remove downloaded shard state: {e:?}", - ); - } + let remove_downloaded_state_files = async move || { + let prefixes = PersistentStateMeta::read_from_file(&state_meta_file_path) + .ok() + .flatten() + .map(|meta| meta.parts) + .unwrap_or_default(); + remove_downloaded_persistent_state_files( + &state_file_path, + &state_meta_file_path, + &prefixes, + ) + .await; }; let mc_seqno = mc_block_id.seqno; @@ -805,11 +817,24 @@ impl StarterInner { async move { match from { // Fast reuse the downloaded file if possible - StoreZeroStateFrom::File(mut state_file) => { + StoreZeroStateFrom::File(mut bundle) => { // Reuse downloaded (and validated) file as is. - let state_file = state_file.read(true).open()?; + let persistent_parts = bundle + .parts + .iter_mut() + .map(|(prefix, file)| Ok((*prefix, file.read(true).open()?))) + .collect::>>()?; persistent_states - .store_shard_state_file(mc_seqno, &block_handle, state_file) + .store_shard_state_files( + mc_seqno, + &block_handle, + bundle.main.read(true).open()?, + persistent_parts, + bundle + .split_depth + .try_into() + .context("invalid persistent split depth")?, + ) .await } // Possibly slow full state traversal @@ -843,8 +868,17 @@ impl StarterInner { ); if !handle.has_persistent_shard_state() { - let from = if state_file.exists() { - StoreZeroStateFrom::File(state_file) + let meta = match PersistentStateMeta::read_from_file(state_meta_file.path()) { + Ok(meta) => meta, + Err(e) => { + tracing::warn!("failed to read downloaded persistent state meta: {e:?}"); + None + } + }; + let from = if let Some(meta) = &meta + && is_downloaded_persistent_state_ready(&state_file, meta) + { + StoreZeroStateFrom::File(downloaded_persistent_state_bundle(&state_file, meta)) } else { // FIXME: Ensure that `state` is stored as direct. StoreZeroStateFrom::State(state.ref_mc_state_handle().clone()) @@ -854,7 +888,7 @@ impl StarterInner { .context("failed to store persistent shard state")?; } - remove_state_file().await; + remove_downloaded_state_files().await; tracing::info!("using the stored shard state"); return Ok((handle.clone(), state)); @@ -862,14 +896,14 @@ impl StarterInner { // Try download the state for attempt in 0..MAX_PERSISTENT_STATE_RETRIES { - let file = match self - .download_persistent_state_file(block_id, PersistentStateKind::Shard, &state_file) + let mut bundle = match self + .download_persistent_state_bundle(block_id, PersistentStateKind::Shard, &state_file) .await { - Ok(file) => file, + Ok(bundle) => bundle, Err(e) => { tracing::error!(attempt, "failed to download persistent shard state: {e:?}"); - remove_state_file().await; + remove_downloaded_state_files().await; continue; } }; @@ -877,9 +911,19 @@ impl StarterInner { // NOTE: `store_state_file` error is mostly unrecoverable since the operation // context is too large to be atomic. This downloaded-state path is // rebuild-only on interruption, unlike bootstrap raw import marker cleanup. + let part_files = bundle + .parts + .iter_mut() + .map(|(_, file)| file.read(true).open()) + .collect::, _>>()?; // TODO: mark all errors before `apply_temp_cell` as recoverable. match shard_states - .store_state_file(block_id, file, Some(expected_state_root)) + .store_state_split_files( + block_id, + bundle.main.read(true).open()?, + part_files, + Some(expected_state_root), + ) .await { Ok(root_hash) => { @@ -890,10 +934,10 @@ impl StarterInner { } Err(e) => { tracing::error!("failed to store shard state file: {e:?}"); - remove_state_file().await; + remove_downloaded_state_files().await; continue; } - } + }; let state = shard_states .load_state(mc_seqno, block_id) @@ -918,12 +962,35 @@ impl StarterInner { block_handles.set_is_zerostate(&block_handle); } - let from = StoreZeroStateFrom::File(state_file); - try_save_persistent(&block_handle, from) + let persistent_parts = bundle + .parts + .iter_mut() + .map(|(prefix, file)| Ok((*prefix, file.read(true).open()?))) + .collect::>>()?; + persistent_states + .store_shard_state_files( + mc_seqno, + &block_handle, + bundle.main.read(true).open()?, + persistent_parts, + bundle + .split_depth + .try_into() + .context("invalid persistent split depth")?, + ) .await - .context("failed to store persistent shard state")?; - - remove_state_file().await; + .context("failed to store persistent shard state bundle")?; + + remove_downloaded_persistent_state_files( + bundle.main.path(), + state_meta_file.path(), + &bundle + .parts + .iter() + .map(|(prefix, _)| *prefix) + .collect::>(), + ) + .await; tracing::info!("using the downloaded shard state"); return Ok((block_handle, state)); @@ -1089,31 +1156,191 @@ impl StarterInner { kind: PersistentStateKind, state_file: &FileBuilder, ) -> Result { + let mut bundle = self + .download_persistent_state_bundle(block_id, kind, state_file) + .await?; + anyhow::ensure!( + bundle.parts.is_empty(), + "single-file persistent state download received split bundle" + ); + bundle.main.read(true).open() + } + + async fn download_persistent_state_bundle( + &self, + block_id: &BlockId, + kind: PersistentStateKind, + state_file: &FileBuilder, + ) -> Result { let mut temp_file = state_file.with_extension("temp"); let temp_file_path = temp_file.path().to_owned(); scopeguard::defer! { std::fs::remove_file(temp_file_path).ok(); }; + let meta_file = state_file.with_extension("meta.json"); + let client = &self.starter_client; loop { - if state_file.exists() { - // Use the downloaded state file if it exists - return state_file.clone().read(true).open(); + if kind == PersistentStateKind::Queue && state_file.exists() { + return Ok(DownloadedPersistentStateBundle { + main: state_file.clone(), + parts: Vec::new(), + split_depth: 0, + }); + } + + let local_meta = match kind { + PersistentStateKind::Shard => { + PersistentStateMeta::read_from_file(meta_file.path())? + } + PersistentStateKind::Queue => None, + }; + if let Some(meta) = &local_meta + && is_downloaded_persistent_state_ready(state_file, meta) + { + return Ok(downloaded_persistent_state_bundle(state_file, meta)); + } + + let mut pending_state = client.find_persistent_state(block_id, kind).await?; + let remote_meta = PersistentStateMeta::new( + pending_state + .split_depth + .try_into() + .context("invalid persistent split depth")?, + pending_state.parts.iter().map(|part| part.prefix).collect(), + ); + + if local_meta.as_ref() != Some(&remote_meta) { + let old_prefixes = local_meta + .as_ref() + .into_iter() + .flat_map(|meta| meta.parts.iter().copied()); + let new_prefixes = remote_meta.parts.iter().copied(); + remove_downloaded_persistent_state_files( + state_file.path(), + meta_file.path(), + &old_prefixes.chain(new_prefixes).collect::>(), + ) + .await; + if kind == PersistentStateKind::Shard { + remote_meta.write_to_file(meta_file.path())?; + } + } + + if is_downloaded_persistent_state_ready(state_file, &remote_meta) { + return Ok(downloaded_persistent_state_bundle(state_file, &remote_meta)); } - let pending_state = client.find_persistent_state(block_id, kind).await?; + let parts_to_download = pending_state.parts.clone(); + let download_part = pending_state.download_part.take(); + if !state_file.exists() { + let output = temp_file.write(true).create(true).truncate(true).open()?; + _ = (pending_state.download)(output).await?; + + tokio::fs::rename(temp_file.path(), state_file.path()).await?; + } - let output = temp_file.write(true).create(true).truncate(true).open()?; - _ = (pending_state.download)(output).await?; + if !parts_to_download.is_empty() { + let Some(download_part) = download_part else { + anyhow::bail!("split persistent state download missing part downloader"); + }; + for part in &parts_to_download { + let part_file = state_file.with_extension(format!("part_{:016x}", part.prefix)); + let mut temp_part_file = + state_file.with_extension(format!("part_{:016x}.temp", part.prefix)); + let temp_part_file_path = temp_part_file.path().to_owned(); + scopeguard::defer! { + std::fs::remove_file(temp_part_file_path).ok(); + }; + if part_file.exists() { + continue; + } - tokio::fs::rename(temp_file.path(), state_file.path()).await?; + let output = temp_part_file + .write(true) + .create(true) + .truncate(true) + .open()?; + _ = download_part(part.clone(), output).await?; - // NOTE: File will be loaded on the next iteration of the loop + tokio::fs::rename(temp_part_file.path(), part_file.path()).await?; + } + } + + if is_downloaded_persistent_state_ready(state_file, &remote_meta) { + return Ok(downloaded_persistent_state_bundle(state_file, &remote_meta)); + } + + // NOTE: File will be loaded on the next iteration of the loop. } } } +fn is_downloaded_persistent_state_ready( + state_file: &FileBuilder, + meta: &PersistentStateMeta, +) -> bool { + state_file.exists() + && downloaded_persistent_state_part_files(state_file, meta) + .iter() + .all(|(_, file)| file.exists()) +} + +fn downloaded_persistent_state_bundle( + state_file: &FileBuilder, + meta: &PersistentStateMeta, +) -> DownloadedPersistentStateBundle { + DownloadedPersistentStateBundle { + main: state_file.clone(), + parts: downloaded_persistent_state_part_files(state_file, meta), + split_depth: meta.split_depth.into(), + } +} + +fn downloaded_persistent_state_part_files( + state_file: &FileBuilder, + meta: &PersistentStateMeta, +) -> Vec<(u64, FileBuilder)> { + meta.parts + .iter() + .map(|prefix| { + ( + *prefix, + state_file.with_extension(format!("part_{prefix:016x}")), + ) + }) + .collect() +} + +async fn remove_downloaded_persistent_state_files( + state_file: &Path, + meta_file: &Path, + prefixes: &[u64], +) { + let remove_file = async |file_path: &Path| { + if let Err(e) = tokio::fs::remove_file(file_path).await { + tracing::warn!( + file_path = %file_path.display(), + "failed to remove downloaded shard state: {e:?}", + ); + } + }; + + remove_file(state_file).await; + remove_file(meta_file).await; + for prefix in prefixes { + let file = state_file.with_extension(format!("part_{prefix:016x}")); + remove_file(file.as_path()).await; + } +} + +struct DownloadedPersistentStateBundle { + main: FileBuilder, + parts: Vec<(u64, FileBuilder)>, + split_depth: u32, +} + async fn download_block_proof_task( storage: CoreStorage, rpc_client: BlockchainRpcClient, diff --git a/core/src/block_strider/starter/mod.rs b/core/src/block_strider/starter/mod.rs index 999d206ba5..6a83deb385 100644 --- a/core/src/block_strider/starter/mod.rs +++ b/core/src/block_strider/starter/mod.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -245,12 +245,22 @@ impl ZerostateProvider for FileZerostateProvider { fn load_zerostate(path: &PathBuf) -> Result { tracing::info!(path = %path.display(), "loading zerostate"); + anyhow::ensure!( + !is_split_persistent_bundle_path(path), + "zerostate import supports only single persistent files without split parts" + ); #[allow(clippy::disallowed_methods)] let mf = MappedFile::from_existing_file(File::open(path)?)?; let bytes = Bytes::from_owner(mf); Ok(bytes) } +fn is_split_persistent_bundle_path(path: &Path) -> bool { + path.file_name() + .and_then(|file_name| file_name.to_str()) + .is_some_and(|file_name| file_name.ends_with(".meta.json") || file_name.contains("_part_")) +} + #[async_trait::async_trait] pub trait QueueStateHandler: Send + Sync + 'static { async fn import_from_file( @@ -285,6 +295,24 @@ impl QueueStateHandler for Box { } } +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn split_persistent_bundle_paths_are_rejected_for_zerostate_import() { + assert!(!is_split_persistent_bundle_path(&PathBuf::from( + "state.boc" + ))); + assert!(is_split_persistent_bundle_path(&PathBuf::from( + "state.meta.json" + ))); + assert!(is_split_persistent_bundle_path(&PathBuf::from( + "state_part_8000000000000000.boc" + ))); + } +} + /// Does some basic validation of the provided queue state. #[derive(Debug, Clone, Copy)] pub struct ValidateQueueState; diff --git a/core/src/block_strider/starter/starter_client.rs b/core/src/block_strider/starter/starter_client.rs index 574cdca853..b065481999 100644 --- a/core/src/block_strider/starter/starter_client.rs +++ b/core/src/block_strider/starter/starter_client.rs @@ -31,17 +31,37 @@ impl StarterClient for BlockchainRpcClient { kind: PersistentStateKind, ) -> Result> { let pending_state = self.find_persistent_state(block_id, kind).await?; - let this = self.clone(); + + let split_depth = pending_state.split_depth; + let parts = pending_state + .parts + .iter() + .map(|part| FoundStatePart { + prefix: part.prefix, + }) + .collect::>(); + let pending_state_for_part = pending_state.clone(); Ok(FoundState { + split_depth, + parts, download: Box::new(move |output| { Box::pin(async move { - let output = this - .download_persistent_state(pending_state, output) + let output = self + .download_persistent_state(pending_state, None, output) .await?; Ok(output) }) }), + download_part: Some(Box::new(move |part, output| { + let pending_state = pending_state_for_part.clone(); + Box::pin(async move { + let output = self + .download_persistent_state(pending_state, Some(part.prefix), output) + .await?; + Ok(output) + }) + })), }) } @@ -70,10 +90,20 @@ pub struct FoundBlockDataFull { } pub struct FoundState<'a> { + pub split_depth: u32, + pub parts: Vec, pub download: Box>, + pub download_part: Option>>, +} + +#[derive(Debug, Clone)] +pub struct FoundStatePart { + pub prefix: u64, } type DownloadFn<'a> = dyn FnOnce(File) -> BoxFuture<'a, Result> + Send + 'a; +type DownloadPartFn<'a> = + dyn Fn(FoundStatePart, File) -> BoxFuture<'a, Result> + Send + Sync + 'a; type PunishFn = dyn FnOnce(PunishReason) + Send + 'static; #[cfg(feature = "s3")] @@ -122,6 +152,8 @@ mod s3 { }; Ok(FoundState { + split_depth: 0, + parts: Vec::new(), download: Box::new(move |output| { Box::pin(async move { let output = self @@ -131,6 +163,7 @@ mod s3 { Ok(output) }) }), + download_part: None, }) } diff --git a/core/src/blockchain_rpc/client.rs b/core/src/blockchain_rpc/client.rs index e76040e14f..9c8582891b 100644 --- a/core/src/blockchain_rpc/client.rs +++ b/core/src/blockchain_rpc/client.rs @@ -421,6 +421,41 @@ impl BlockchainRpcClient { kind, size, chunk_size, + split_depth: 0, + parts: Vec::new(), + neighbour, + }); + } + PersistentStateInfo::FoundWithParts { + size, + chunk_size, + split_depth, + parts, + } => { + let neighbour = handle.accept(); + tracing::debug!( + peer_id = %neighbour.peer_id(), + state_size = size.get(), + state_chunk_size = chunk_size.get(), + state_split_depth = split_depth, + state_parts_count = parts.len(), + ?kind, + "found split persistent state", + ); + + return Ok(PendingPersistentState { + block_id: *block_id, + kind, + size, + chunk_size, + split_depth, + parts: parts + .into_iter() + .map(|part| PendingPersistentStatePart { + prefix: part.prefix, + size: part.size, + }) + .collect(), neighbour, }); } @@ -442,6 +477,7 @@ impl BlockchainRpcClient { pub async fn download_persistent_state( &self, state: PendingPersistentState, + part_shard_prefix: Option, output: W, ) -> Result where @@ -454,9 +490,16 @@ impl BlockchainRpcClient { let block_id = state.block_id; let max_retries = self.inner.config.download_retries; + let size = match part_shard_prefix { + Some(prefix) => state + .part(prefix) + .map(|part| part.size) + .ok_or(Error::NotFound)?, + None => state.size, + }; download_and_decompress( - state.size, + size, state.chunk_size, PARALLEL_REQUESTS, output, @@ -465,7 +508,15 @@ impl BlockchainRpcClient { let req = match state.kind { PersistentStateKind::Shard => { - Request::from_tl(rpc::GetPersistentShardStateChunk { block_id, offset }) + if let Some(prefix) = part_shard_prefix { + Request::from_tl(rpc::GetPersistentShardStatePartChunk { + block_id, + prefix, + offset, + }) + } else { + Request::from_tl(rpc::GetPersistentShardStateChunk { block_id, offset }) + } } PersistentStateKind::Queue => { Request::from_tl(rpc::GetPersistentQueueStateChunk { block_id, offset }) @@ -657,9 +708,23 @@ pub struct PendingPersistentState { pub kind: PersistentStateKind, pub size: NonZeroU64, pub chunk_size: NonZeroU32, + pub split_depth: u32, + pub parts: Vec, pub neighbour: Neighbour, } +impl PendingPersistentState { + pub fn part(&self, prefix: u64) -> Option<&PendingPersistentStatePart> { + self.parts.iter().find(|part| part.prefix == prefix) + } +} + +#[derive(Clone)] +pub struct PendingPersistentStatePart { + pub prefix: u64, + pub size: NonZeroU64, +} + pub struct BlockDataFull { pub block_id: BlockId, pub block_data: Bytes, diff --git a/core/src/blockchain_rpc/mod.rs b/core/src/blockchain_rpc/mod.rs index 1eb10e586f..58552376fd 100644 --- a/core/src/blockchain_rpc/mod.rs +++ b/core/src/blockchain_rpc/mod.rs @@ -4,7 +4,7 @@ pub use self::broadcast_listener::{ pub use self::client::{ BlockDataFull, BlockDataFullWithNeighbour, BlockchainRpcClient, BlockchainRpcClientBuilder, BlockchainRpcClientConfig, DataRequirement, PendingArchive, PendingArchiveResponse, - PendingPersistentState, SelfBroadcastListener, + PendingPersistentState, PendingPersistentStatePart, SelfBroadcastListener, }; #[cfg(feature = "s3")] pub use self::providers::S3RpcDataProvider; diff --git a/core/src/blockchain_rpc/providers.rs b/core/src/blockchain_rpc/providers.rs index 1ed1a5232e..fcb95b0cb8 100644 --- a/core/src/blockchain_rpc/providers.rs +++ b/core/src/blockchain_rpc/providers.rs @@ -38,6 +38,7 @@ pub trait RpcDataProvider: Send + Sync + 'static { block_id: &BlockId, offset: u64, kind: PersistentStateKind, + part_shard_prefix: Option, ) -> Result>; } @@ -109,10 +110,11 @@ impl RpcDataProvider for StorageRpcDataProvider { block_id: &BlockId, offset: u64, kind: PersistentStateKind, + part_shard_prefix: Option, ) -> Result> { let persistent_state_storage = self.storage.persistent_state_storage(); Ok(persistent_state_storage - .read_state_part(block_id, offset, kind) + .read_state_chunk(block_id, offset, kind, part_shard_prefix) .await .map(Bytes::from)) } @@ -227,6 +229,8 @@ mod s3_impl { .map(|info| PersistentStateInfo { size: info.size, chunk_size: self.chunk_size, + split_depth: 0, + parts: Vec::new(), })) } @@ -235,7 +239,12 @@ mod s3_impl { block_id: &BlockId, offset: u64, kind: PersistentStateKind, + part_shard_prefix: Option, ) -> Result> { + if part_shard_prefix.is_some() { + return Ok(None); + } + self.check_rate_limit()?; self.check_bandwidth_limit()?; @@ -354,11 +363,12 @@ where block_id: &BlockId, offset: u64, kind: PersistentStateKind, + part_shard_prefix: Option, ) -> Result> { // Try primary first match self .primary - .get_persistent_state_chunk(block_id, offset, kind) + .get_persistent_state_chunk(block_id, offset, kind, part_shard_prefix) .await { Ok(Some(chunk)) => return Ok(Some(chunk)), @@ -368,6 +378,7 @@ where ?block_id, offset, ?kind, + ?part_shard_prefix, "primary state provider error: {e:?}" ); } @@ -375,7 +386,7 @@ where // Fallback self.fallback - .get_persistent_state_chunk(block_id, offset, kind) + .get_persistent_state_chunk(block_id, offset, kind, part_shard_prefix) .await } } diff --git a/core/src/blockchain_rpc/service.rs b/core/src/blockchain_rpc/service.rs index d0f26cc3cf..93711bd6e5 100644 --- a/core/src/blockchain_rpc/service.rs +++ b/core/src/blockchain_rpc/service.rs @@ -343,6 +343,16 @@ impl Service for BlockchainRpcService { inner.handle_get_persistent_shard_state_chunk(&req).await }, + #[meta( + "getPersistentShardStatePartChunk", + block_id = %req.block_id, + prefix = %req.prefix, + offset = %req.offset, + )] + rpc::GetPersistentShardStatePartChunk as req => { + inner.handle_get_persistent_shard_state_part_chunk(&req).await + }, + #[meta( "getPersistentQueueStateChunk", block_id = %req.block_id, @@ -669,16 +679,39 @@ impl Inner { &self, req: &rpc::GetPersistentShardStateChunk, ) -> overlay::Response { - self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard) - .await + self.read_persistent_state_chunk( + &req.block_id, + req.offset, + PersistentStateKind::Shard, + None, + ) + .await + } + + async fn handle_get_persistent_shard_state_part_chunk( + &self, + req: &rpc::GetPersistentShardStatePartChunk, + ) -> overlay::Response { + self.read_persistent_state_chunk( + &req.block_id, + req.offset, + PersistentStateKind::Shard, + Some(req.prefix), + ) + .await } async fn handle_get_persistent_queue_state_chunk( &self, req: &rpc::GetPersistentQueueStateChunk, ) -> overlay::Response { - self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue) - .await + self.read_persistent_state_chunk( + &req.block_id, + req.offset, + PersistentStateKind::Queue, + None, + ) + .await } } @@ -745,10 +778,26 @@ impl Inner { .get_persistent_state_info(block_id, state_kind) .await? { - return Ok(PersistentStateInfo::Found { - size: info.size, - chunk_size: info.chunk_size, - }); + return if state_kind == PersistentStateKind::Shard && !info.parts.is_empty() { + Ok(PersistentStateInfo::FoundWithParts { + size: info.size, + chunk_size: info.chunk_size, + split_depth: u32::from(info.split_depth), + parts: info + .parts + .into_iter() + .map(|part| PersistentStatePartInfo { + prefix: part.prefix, + size: part.size, + }) + .collect(), + }) + } else { + Ok(PersistentStateInfo::Found { + size: info.size, + chunk_size: info.chunk_size, + }) + }; } Ok(PersistentStateInfo::NotFound) @@ -759,6 +808,7 @@ impl Inner { block_id: &BlockId, offset: u64, state_kind: PersistentStateKind, + part_shard_prefix: Option, ) -> overlay::Response { let persistent_state_request_validation = || { anyhow::ensure!( @@ -775,7 +825,7 @@ impl Inner { match self .rpc_data_provider - .get_persistent_state_chunk(block_id, offset, state_kind) + .get_persistent_state_chunk(block_id, offset, state_kind, part_shard_prefix) .await { Ok(Some(data)) => overlay::Response::Ok(Data { data }), @@ -785,6 +835,7 @@ impl Inner { ?block_id, offset, ?state_kind, + ?part_shard_prefix, "get_persistent_state_chunk failed: {e:?}" ); overlay::Response::Err(INTERNAL_ERROR_CODE) diff --git a/core/src/proto.tl b/core/src/proto.tl index f97a4cf34b..5622c773f2 100644 --- a/core/src/proto.tl +++ b/core/src/proto.tl @@ -136,6 +136,17 @@ blockchain.archiveInfo.tooNew = blockchain.ArchiveInfo; */ blockchain.archiveInfo.notFound = blockchain.ArchiveInfo; +/** +* Persistent state part metadata +* +* @param prefix shard prefix of the state part +* @param size state part file size in bytes +*/ +blockchain.persistentStatePartInfo + prefix:long + size:long + = blockchain.PersistentStatePartInfo; + /** * A successful response for the 'getPersistentStateInfo' query * @@ -147,6 +158,22 @@ blockchain.persistentStateInfo.found chunk_size:long = blockchain.PersistentStateInfo; +/** +* A successful response for the 'getPersistentStateInfo' query +* when persistent state stored with parts +* +* @param size state main file size in bytes +* @param chunk_size chunk size in bytes +* @param split_depth state split depth on parts +* @param parts list of state parts files +*/ +blockchain.persistentStateInfo.foundWithParts + size:long + chunk_size:long + split_depth:int + parts:(vector blockchain.persistentStatePartInfo) + = blockchain.PersistentStateInfo; + /** * An unsuccessful response for the 'getPersistentStateInfo' query */ @@ -261,6 +288,19 @@ blockchain.getPersistentShardStateChunk offset:long = overlay.Response blockchain.Data; +/** +* Get persistent shard state part chunk +* +* @param block_id requested block id +* @param prefix persistent shard state part prefix +* @param offset part offset in bytes +*/ +blockchain.getPersistentShardStatePartChunk + block_id:blockchain.blockId + prefix:long + offset:long + = overlay.Response blockchain.Data; + /** * Get persistent queue state info * diff --git a/core/src/proto/blockchain.rs b/core/src/proto/blockchain.rs index b143e74b07..1271e18b6e 100644 --- a/core/src/proto/blockchain.rs +++ b/core/src/proto/blockchain.rs @@ -67,10 +67,24 @@ pub enum PersistentStateInfo { size: NonZeroU64, chunk_size: NonZeroU32, }, + #[tl(id = "blockchain.persistentStateInfo.foundWithParts")] + FoundWithParts { + size: NonZeroU64, + chunk_size: NonZeroU32, + split_depth: u32, + parts: Vec, + }, #[tl(id = "blockchain.persistentStateInfo.notFound")] NotFound, } +#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] +#[tl(boxed, id = "blockchain.persistentStatePartInfo", scheme = "proto.tl")] +pub struct PersistentStatePartInfo { + pub prefix: u64, + pub size: NonZeroU64, +} + #[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] #[tl(boxed, scheme = "proto.tl")] pub enum ArchiveInfo { @@ -191,6 +205,19 @@ pub mod rpc { pub offset: u64, } + #[derive(Debug, Clone, TlRead, TlWrite)] + #[tl( + boxed, + id = "blockchain.getPersistentShardStatePartChunk", + scheme = "proto.tl" + )] + pub struct GetPersistentShardStatePartChunk { + #[tl(with = "tl_block_id")] + pub block_id: tycho_types::models::BlockId, + pub prefix: u64, + pub offset: u64, + } + #[derive(Debug, Clone, TlRead, TlWrite)] #[tl( boxed, diff --git a/core/src/storage/config.rs b/core/src/storage/config.rs index 39f53409cf..32dd809279 100644 --- a/core/src/storage/config.rs +++ b/core/src/storage/config.rs @@ -30,6 +30,11 @@ pub struct CoreStorageConfig { /// Default: `5` (= 32 virtual shards). pub shard_split_depth: u8, + /// Persistent shard state split depth. + /// + /// Default: `0` (= no persistent state file split). + pub persistent_state_split_depth: u8, + /// Store every Nth shard state to storage /// /// Default: 5 @@ -104,6 +109,7 @@ impl Default for CoreStorageConfig { drop_interval: 3, store_archives: true, shard_split_depth: 5, + persistent_state_split_depth: 0, store_shard_state_step: NonZeroU32::new(5).unwrap(), max_new_cells_threshold: 500_000, cell_storage_threads: NonZeroUsize::new(4).unwrap(), diff --git a/core/src/storage/mod.rs b/core/src/storage/mod.rs index 1104df4cf1..9ad76c1834 100644 --- a/core/src/storage/mod.rs +++ b/core/src/storage/mod.rs @@ -23,7 +23,7 @@ pub use self::db::{CellsDb, CoreDb, CoreDbExt}; pub use self::gc::ManualGcTrigger; pub use self::node_state::{NodeStateStorage, NodeSyncState}; pub use self::persistent_state::{ - BriefBocHeader, PersistentState, PersistentStateInfo, PersistentStateKind, + BriefBocHeader, PersistentState, PersistentStateInfo, PersistentStateKind, PersistentStateMeta, PersistentStateStorage, QueueDiffReader, QueueStateReader, QueueStateWriter, ShardStateReader, ShardStateWriter, }; @@ -158,6 +158,7 @@ impl CoreStorage { block_handle_storage.clone(), block_storage.clone(), shard_state_storage.clone(), + config.persistent_state_split_depth, )?; persistent_state_storage.preload().await?; diff --git a/core/src/storage/persistent_state/mod.rs b/core/src/storage/persistent_state/mod.rs index d712fd1306..b6cfbe251f 100644 --- a/core/src/storage/persistent_state/mod.rs +++ b/core/src/storage/persistent_state/mod.rs @@ -23,7 +23,7 @@ use tycho_util::{FastHashMap, FastHashSet}; pub use self::queue_state::reader::{QueueDiffReader, QueueStateReader}; pub use self::queue_state::writer::QueueStateWriter; pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader}; -pub use self::shard_state::writer::ShardStateWriter; +pub use self::shard_state::writer::{PersistentStateMeta, ShardStateWriter}; use super::{ BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, KeyBlocksDirection, NodeStateStorage, ShardStateStorage, @@ -64,6 +64,13 @@ impl PersistentStateKind { } } + pub fn make_part_file_name(&self, block_id: &BlockId, part_prefix: u64) -> Option { + match self { + Self::Shard => Some(ShardStateWriter::file_name_ext(block_id, Some(part_prefix))), + Self::Queue => None, + } + } + pub fn from_extension(extension: &str) -> Option { match extension { ShardStateWriter::FILE_EXTENSION => Some(Self::Shard), @@ -92,6 +99,7 @@ impl PersistentStateStorage { block_handle_storage: Arc, block_storage: Arc, shard_state_storage: Arc, + persistent_state_split_depth: u8, ) -> Result { const MAX_PARALLEL_CHUNK_READS: usize = 20; @@ -105,6 +113,7 @@ impl PersistentStateStorage { block_handles: block_handle_storage, blocks: block_storage, shard_states: shard_state_storage, + persistent_state_split_depth, descriptor_cache: Default::default(), mc_seqno_to_block_ids: Default::default(), chunks_semaphore: Arc::new(Semaphore::new(MAX_PARALLEL_CHUNK_READS)), @@ -173,6 +182,20 @@ impl PersistentStateStorage { continue; } + // skip persistent metadata + if path + .file_name() + .and_then(|name| name.to_str()) + .is_some_and(|name| name.ends_with(".meta.json")) + { + continue; + } + + // skip parts + if parse_shard_state_part_file_name(&path).is_some() { + continue; + } + 'file: { // Try to parse the file name as a block_id let Ok(block_id) = path @@ -308,15 +331,27 @@ impl PersistentStateStorage { Some(PersistentStateInfo { size, chunk_size: self.state_chunk_size(), + split_depth: cached.meta.as_ref().map_or(0, |meta| meta.split_depth), + parts: cached + .parts + .iter() + .filter_map(|part| { + Some(PersistentStatePartInfo { + prefix: part.prefix, + size: NonZeroU64::new(part.file.length() as u64)?, + }) + }) + .collect(), }) }) } - pub async fn read_state_part( + pub async fn read_state_chunk( &self, block_id: &BlockId, offset: u64, state_kind: PersistentStateKind, + part_shard_prefix: Option, ) -> Option> { // NOTE: Should be noop on x64 let offset = usize::try_from(offset).ok()?; @@ -335,7 +370,18 @@ impl PersistentStateStorage { kind: state_kind, }; let cached = self.inner.descriptor_cache.get(&key)?.clone(); - if offset > cached.file.length() { + let part_index = match (state_kind, part_shard_prefix) { + (PersistentStateKind::Shard, Some(prefix)) => { + Some(cached.parts.iter().position(|part| part.prefix == prefix)?) + } + _ => None, + }; + + let file = match part_index { + Some(index) => &cached.parts[index].file, + None => &cached.file, + }; + if offset > file.length() { return None; } @@ -346,11 +392,15 @@ impl PersistentStateStorage { // Ensure that permit is dropped only after cached state is used. let _permit = permit; - let end = std::cmp::min(offset.saturating_add(chunk_size), cached.file.length()); - cached.file.as_slice()[offset..end].to_vec() + let file = match part_index { + Some(index) => &cached.parts[index].file, + None => &cached.file, + }; + let end = std::cmp::min(offset.saturating_add(chunk_size), file.length()); + Some(file.as_slice()[offset..end].to_vec()) }) .await - .ok() + .ok()? } #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))] @@ -392,6 +442,7 @@ impl PersistentStateStorage { states_dir, *handle.id(), root_hash, + this.persistent_state_split_depth, Some(cancelled.clone()), ) .await @@ -428,11 +479,13 @@ impl PersistentStateStorage { } #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))] - pub async fn store_shard_state_file( + pub async fn store_shard_state_files( &self, mc_seqno: u32, handle: &BlockHandle, - file: File, + main: File, + parts: Vec<(u64, File)>, + split_depth: u8, ) -> Result<()> { if self .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard) @@ -445,32 +498,102 @@ impl PersistentStateStorage { scopeguard::defer! { cancelled.cancel(); } + let guard = scopeguard::guard((), |_| { + tracing::warn!("cancelled"); + }); let handle = handle.clone(); let this = self.inner.clone(); + let span = tracing::Span::current(); + + // prepare persistent state dir + let states_dir = { + let this = this.clone(); + tokio::task::spawn_blocking(move || { + let _span = span.enter(); + + this.prepare_persistent_states_dir(mc_seqno) + }) + .await?? + }; + + // will use semaphore to limit parrallel io operations + const MAX_PARALLEL_STATE_FILE_WRITES: usize = 4; + let semaphore = Arc::new(Semaphore::new(MAX_PARALLEL_STATE_FILE_WRITES)); + + // write part files in parallel + let mut prefixes = Vec::with_capacity(parts.len()); + let mut parts_writes = tokio::task::JoinSet::new(); + for (prefix, file) in parts { + prefixes.push(prefix); + + let permit = semaphore + .clone() + .acquire_owned() + .await + .context("state file write semaphore closed")?; + let cells_db = this.cells_db.clone(); + let states_dir = states_dir.clone(); + let block_id = *handle.id(); + let cancelled = cancelled.clone(); + let span = tracing::Span::current(); + + parts_writes.spawn_blocking(move || { + let _permit = permit; + let _span = span.enter(); + + ShardStateWriter::new_part(&cells_db, &states_dir, &block_id, prefix) + .write_file(file, Some(&cancelled)) + .with_context(|| format!("failed to write persistent state part {prefix:016x}")) + }); + } + + // collect results + let mut write_result = Ok::<_, anyhow::Error>(()); + while let Some(result) = parts_writes.join_next().await { + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => { + cancelled.cancel(); + if write_result.is_ok() { + write_result = Err(e); + } + } + Err(e) => { + cancelled.cancel(); + if write_result.is_ok() { + write_result = Err(e.into()); + } + } + } + } + write_result?; + + // write metadata and main file let cancelled = cancelled.clone(); let span = tracing::Span::current(); let state = tokio::task::spawn_blocking(move || { let _span = span.enter(); - let guard = scopeguard::guard((), |_| { - tracing::warn!("cancelled"); - }); + // write persistent metadata even for single file + PersistentStateMeta::new(if prefixes.is_empty() { 0 } else { split_depth }, prefixes) + .write(&states_dir, handle.id())?; - let states_dir = this.prepare_persistent_states_dir(mc_seqno)?; + // write main + ShardStateWriter::new(&this.cells_db, &states_dir, handle.id()) + .write_file(main, Some(&cancelled))?; - let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id()); - cell_writer.write_file(file, Some(&cancelled))?; this.block_handles.set_has_persistent_shard_state(&handle); let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?; - scopeguard::ScopeGuard::into_inner(guard); Ok::<_, anyhow::Error>(state) }) .await??; self.notify_with_persistent_state(&state).await; + + scopeguard::ScopeGuard::into_inner(guard); Ok(()) } @@ -747,6 +870,24 @@ impl PersistentStateStorage { let states_dir = this.prepare_persistent_states_dir(mc_seqno)?; + if kind == PersistentStateKind::Shard { + // write parts + for part in &cached.parts { + let file_name = ShardStateWriter::file_name_ext(block_id, Some(part.prefix)); + let temp_file = states_dir.file(ShardStateWriter::temp_file_name_ext( + block_id, + Some(part.prefix), + )); + std::fs::write(temp_file.path(), part.file.as_slice())?; + temp_file.rename(file_name)?; + } + + // write persistent state metadata + let meta = cached.meta.clone().unwrap_or_default(); + meta.write(&states_dir, &block_id)?; + } + + // write main let temp_file = states_dir.file(kind.make_temp_file_name(&block_id)); std::fs::write(temp_file.path(), cached.file.as_slice())?; temp_file.rename(kind.make_file_name(&block_id))?; @@ -776,6 +917,7 @@ struct Inner { block_handles: Arc, blocks: Arc, shard_states: Arc, + persistent_state_split_depth: u8, descriptor_cache: DashMap>, mc_seqno_to_block_ids: Mutex>>, chunks_semaphore: Arc, @@ -862,12 +1004,10 @@ impl Inner { kind, }; - let load_mapped = || { - let mut file = self - .mc_states_dir(mc_seqno) - .file(kind.make_file_name(block_id)) - .read(true) - .open()?; + let states_dir = self.mc_states_dir(mc_seqno); + + let load_mapped = |file_name: PathBuf| { + let mut file = states_dir.file(file_name).read(true).open()?; // We create a copy of the original file here to make sure // that the underlying mapped file will not be changed outside @@ -885,10 +1025,48 @@ impl Inner { MappedFile::from_existing_file(temp_file).context("failed to map a temp file") }; - let file = - load_mapped().with_context(|| format!("failed to cache {kind:?} for {block_id}"))?; + // cache main file + let file = load_mapped(kind.make_file_name(block_id)) + .with_context(|| format!("failed to cache {kind:?} for {block_id}"))?; + + let (meta, parts) = if kind == PersistentStateKind::Shard { + // cache metadata, and parts if exist + let meta = match PersistentStateMeta::read(&states_dir, block_id)? { + Some(meta) => meta, + None if has_shard_state_part_files(&states_dir, block_id)? => { + anyhow::bail!( + "incomplete split persistent state bundle for {block_id}: missing metadata" + ) + } + None => PersistentStateMeta::default(), + }; + let mut parts = Vec::with_capacity(meta.parts.len()); + for &prefix in &meta.parts { + let file_name = ShardStateWriter::file_name_ext(block_id, Some(prefix)); + anyhow::ensure!( + states_dir.file(&file_name).path().is_file(), + "incomplete split persistent state bundle for {block_id}: missing part {prefix:016x}" + ); + parts.push(CachedStatePart { + prefix, + file: load_mapped(file_name).with_context(|| { + format!( + "failed to cache split persistent state part {prefix:016x} for {block_id}" + ) + })?, + }); + } + (Some(meta), parts) + } else { + (None, Vec::new()) + }; - let new_state = Arc::new(CachedState { mc_seqno, file }); + let new_state = Arc::new(CachedState { + mc_seqno, + file, + meta, + parts, + }); let prev_mc_seqno = match self.descriptor_cache.entry(key) { Entry::Vacant(entry) => { @@ -944,10 +1122,72 @@ impl Inner { } } -#[derive(Debug, Clone, Copy)] +fn has_shard_state_part_files(states_dir: &Dir, block_id: &BlockId) -> Result { + let block_id = block_id.to_string(); + for entry in std::fs::read_dir(states_dir.path())? { + let path = entry?.path(); + if path.is_file() + && parse_shard_state_part_file_name(&path) + .is_some_and(|(part_block_id, _)| part_block_id == block_id) + { + return Ok(true); + } + } + Ok(false) +} + +fn parse_shard_state_part_file_name(path: &std::path::Path) -> Option<(&str, u64)> { + if path.extension()?.to_str()? != ShardStateWriter::FILE_EXTENSION { + return None; + } + let (block_id, prefix) = path.file_stem()?.to_str()?.rsplit_once("_part_")?; + if prefix.len() != 16 { + return None; + } + let prefix = u64::from_str_radix(prefix, 16).ok()?; + block_id.parse::().ok()?; + Some((block_id, prefix)) +} + +pub(super) fn check_can_reuse_shard_state_part_files( + states_dir: &Dir, + block_id: &BlockId, + expected_meta: &PersistentStateMeta, +) -> Result { + // read metadata from the disk + let meta = match PersistentStateMeta::read(states_dir, block_id) { + Ok(Some(meta)) => meta, + Ok(None) => return Ok(false), + Err(e) if e.downcast_ref::().is_none() => return Ok(false), + Err(e) => return Err(e), + }; + + // can reuse existing parts only if metadata matches and all part files exist + if meta != *expected_meta { + return Ok(false); + } + for &prefix in &expected_meta.parts { + let file_name = ShardStateWriter::file_name_ext(block_id, Some(prefix)); + if !states_dir.file(&file_name).path().is_file() { + return Ok(false); + } + } + + Ok(true) +} + +#[derive(Debug, Clone)] pub struct PersistentStateInfo { pub size: NonZeroU64, pub chunk_size: NonZeroU32, + pub split_depth: u8, + pub parts: Vec, +} + +#[derive(Debug, Clone, Copy)] +pub struct PersistentStatePartInfo { + pub prefix: u64, + pub size: NonZeroU64, } #[derive(Clone)] @@ -1016,6 +1256,13 @@ static RECEIVER_ID: AtomicUsize = AtomicUsize::new(0); struct CachedState { mc_seqno: u32, file: MappedFile, + meta: Option, + parts: Vec, +} + +struct CachedStatePart { + prefix: u64, + file: MappedFile, } #[derive(Default)] diff --git a/core/src/storage/persistent_state/shard_state/reader.rs b/core/src/storage/persistent_state/shard_state/reader.rs index 100267d563..5c75197542 100644 --- a/core/src/storage/persistent_state/shard_state/reader.rs +++ b/core/src/storage/persistent_state/shard_state/reader.rs @@ -66,7 +66,7 @@ impl ShardStateReader { let root_count = reader.read_be_uint(ref_size)?; total_size += ref_size as u64; - reader.read_be_uint(ref_size)?; // skip absent + let absent_count = reader.read_be_uint(ref_size)?; total_size += ref_size as u64; if root_count != 1 { @@ -131,6 +131,7 @@ impl ShardStateReader { ref_size, offset_size, cell_count, + absent_count, total_size, }; @@ -142,36 +143,60 @@ impl ShardStateReader { } pub fn read_next_cell(&mut self, buffer: &mut [u8; 256]) -> std::io::Result { - let descriptor = { - let mut bytes = [0u8; 2]; - self.reader.read_exact(&mut bytes)?; - CellDescriptor::new(bytes) + let (descriptor, descriptor_len) = { + let d1 = self.reader.read_byte()?; + buffer[0] = d1; + + let check = CellDescriptor::new([d1, 0]); + if check.is_absent() { + (check, 1) + } else { + let d2 = self.reader.read_byte()?; + buffer[1] = d2; + + let descriptor = CellDescriptor::new([d1, d2]); + (descriptor, 2) + } }; + let mut refs = descriptor.reference_count() as usize; + + // skip refs for absent cells because `reference_count = 7` is just a marker if descriptor.is_absent() { - return Err(parser_error("absent cell are not supported")); + refs = 0; } - - let refs = descriptor.reference_count() as usize; + // check refs count if refs > 4 { return Err(parser_error("invalid reference count")); } + // NOTE: in the boc hashes go first and then goes data + + let mut hashes_len = 0; let hash_count = descriptor.hash_count(); if descriptor.store_hashes() { - // NOTE: We must forward all skipped bytes to the CRC reader to get the correct checksum - std::io::copy( - &mut self.reader.by_ref().take(hash_count as u64 * (32 + 2)), - &mut std::io::sink(), - )?; + let len = hash_count as u64 * (32 + 2); + if descriptor.is_absent() { + // for absent cells we read all hashes and will use them + hashes_len = len as usize; + } else { + // NOTE: We must forward all skipped bytes to the CRC reader to get the correct checksum + std::io::copy(&mut self.reader.by_ref().take(len), &mut std::io::sink())?; + } } let byte_len = descriptor.byte_len() as usize; - let total_len = 2 + byte_len + refs * self.header.ref_size; - buffer[0] = descriptor.d1; - buffer[1] = descriptor.d2; - self.reader.read_exact(&mut buffer[2..total_len])?; + if descriptor.is_absent() && byte_len != 0 { + return Err(parser_error( + "absent cell should have no data except hashes/depths", + )); + } + + let total_len = descriptor_len + hashes_len + byte_len + refs * self.header.ref_size; + + self.reader + .read_exact(&mut buffer[descriptor_len..total_len])?; // TODO: Add full cell validation here? But what to do with indices @@ -204,6 +229,7 @@ pub struct BriefBocHeader { pub ref_size: usize, pub offset_size: u64, pub cell_count: u64, + pub absent_count: u64, pub total_size: u64, } diff --git a/core/src/storage/persistent_state/shard_state/writer.rs b/core/src/storage/persistent_state/shard_state/writer.rs index 169e129d51..683d2f0a8f 100644 --- a/core/src/storage/persistent_state/shard_state/writer.rs +++ b/core/src/storage/persistent_state/shard_state/writer.rs @@ -5,9 +5,10 @@ use std::io::{Read, Seek, SeekFrom, Write}; use std::path::PathBuf; use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use tycho_storage::fs::Dir; -use tycho_types::cell::{CellDescriptor, HashBytes}; +use tycho_types::cell::{Cell, CellDescriptor, HashBytes}; use tycho_types::models::*; use tycho_util::FastHashMap; use tycho_util::compression::ZstdCompressedFile; @@ -21,6 +22,7 @@ pub struct ShardStateWriter<'a> { db: &'a CellsDb, states_dir: &'a Dir, block_id: &'a BlockId, + part_prefix: Option, } impl<'a> ShardStateWriter<'a> { @@ -31,18 +33,50 @@ impl<'a> ShardStateWriter<'a> { // Partially written BOC file. const FILE_EXTENSION_TEMP: &'static str = "boc.temp"; + pub const META_FILE_EXTENSION: &'static str = "meta.json"; + + fn build_file_name_base(name: S, part_prefix: Option) -> String + where + S: Display, + { + match part_prefix { + Some(prefix) => format!("{name}_part_{prefix:016x}"), + None => name.to_string(), + } + } + pub fn file_name(name: S) -> PathBuf where S: Display, { - PathBuf::from(name.to_string()).with_extension(Self::FILE_EXTENSION) + Self::file_name_ext(name, None) + } + + pub fn file_name_ext(name: S, part_prefix: Option) -> PathBuf + where + S: Display, + { + PathBuf::from(Self::build_file_name_base(name, part_prefix)) + .with_extension(Self::FILE_EXTENSION) } pub fn temp_file_name(name: S) -> PathBuf where S: Display, { - PathBuf::from(name.to_string()).with_extension(Self::FILE_EXTENSION_TEMP) + Self::temp_file_name_ext(name, None) + } + + pub fn temp_file_name_ext(name: S, part_prefix: Option) -> PathBuf + where + S: Display, + { + PathBuf::from(Self::build_file_name_base(name, part_prefix)) + .with_extension(Self::FILE_EXTENSION_TEMP) + } + + pub fn meta_file_name(block_id: &BlockId) -> PathBuf { + PathBuf::from(block_id.to_string()).with_extension(Self::META_FILE_EXTENSION) } pub fn new(db: &'a CellsDb, states_dir: &'a Dir, block_id: &'a BlockId) -> Self { @@ -50,21 +84,43 @@ impl<'a> ShardStateWriter<'a> { db, states_dir, block_id, + part_prefix: None, + } + } + + pub fn new_part( + db: &'a CellsDb, + states_dir: &'a Dir, + block_id: &'a BlockId, + part_prefix: u64, + ) -> Self { + Self { + db, + states_dir, + block_id, + part_prefix: Some(part_prefix), } } pub fn write_file( &self, mut boc_file: File, + cancelled: Option<&CancellationFlag>, + ) -> Result<()> { + boc_file.seek(SeekFrom::Start(0))?; + self.write_uncompressed_boc(boc_file, cancelled) + } + + fn write_uncompressed_boc( + &self, + mut boc_file: R, _cancelled: Option<&CancellationFlag>, ) -> Result<()> { - let temp_file_name = Self::temp_file_name(self.block_id); + let temp_file_name = Self::temp_file_name_ext(self.block_id, self.part_prefix); scopeguard::defer! { self.states_dir.remove_file(&temp_file_name).ok(); } - boc_file.seek(SeekFrom::Start(0))?; - // Create states file let compressed_file = self .states_dir @@ -89,7 +145,7 @@ impl<'a> ShardStateWriter<'a> { // Atomically rename the file self.states_dir .file(&temp_file_name) - .rename(Self::file_name(self.block_id)) + .rename(Self::file_name_ext(self.block_id, self.part_prefix)) .map_err(Into::into) } @@ -98,7 +154,16 @@ impl<'a> ShardStateWriter<'a> { root_hash: &HashBytes, cancelled: Option<&CancellationFlag>, ) -> Result { - self.write_inner(root_hash, None, None, cancelled) + self.write_inner(root_hash, None, None, None, cancelled) + } + + pub fn write_with_absent( + &self, + root_hash: &HashBytes, + to_make_absent_cells: FastHashMap, + cancelled: Option<&CancellationFlag>, + ) -> Result { + self.write_inner(root_hash, None, Some(to_make_absent_cells), None, cancelled) } pub fn write_tracked( @@ -108,19 +173,26 @@ impl<'a> ShardStateWriter<'a> { progress_bar: &mut ProgressBar, cancelled: Option<&CancellationFlag>, ) -> Result { - self.write_inner(root_hash, Some(file_name), Some(progress_bar), cancelled) + self.write_inner( + root_hash, + Some(file_name), + None, + Some(progress_bar), + cancelled, + ) } fn write_inner( &self, root_hash: &HashBytes, file_name: Option<&str>, - mut progress_bar: Option<&mut ProgressBar>, + to_make_absent_cells: Option>, + progress_bar: Option<&mut ProgressBar>, cancelled: Option<&CancellationFlag>, ) -> Result { let temp_file_name = match file_name { - Some(name) => Self::temp_file_name(name), - None => Self::temp_file_name(self.block_id), + Some(name) => Self::temp_file_name_ext(name, self.part_prefix), + None => Self::temp_file_name_ext(self.block_id, self.part_prefix), }; scopeguard::defer! { @@ -129,10 +201,28 @@ impl<'a> ShardStateWriter<'a> { // Load cells from db in reverse order into the temp file tracing::info!("started loading cells"); - let mut intermediate = self - .write_rev(&root_hash.0, cancelled) + let intermediate = self + .write_rev(&root_hash.0, to_make_absent_cells, cancelled) .context("Failed to write reversed cells data")?; tracing::info!("finished loading cells"); + + self.write_intermediate( + temp_file_name.clone(), + file_name, + intermediate, + progress_bar, + cancelled, + ) + } + + fn write_intermediate( + &self, + temp_file_name: PathBuf, + file_name: Option<&str>, + mut intermediate: IntermediateState, + mut progress_bar: Option<&mut ProgressBar>, + cancelled: Option<&CancellationFlag>, + ) -> Result { let cell_count = intermediate.cell_sizes.len() as u32; // Compute offset type size (usually 4 bytes) @@ -174,7 +264,7 @@ impl<'a> ShardStateWriter<'a> { buffer.write_all(&1u32.to_be_bytes())?; // Absent cell count | current len: 14 - buffer.write_all(&[0, 0, 0, 0])?; + buffer.write_all(&intermediate.absent_cells_count.to_be_bytes())?; // Total cell size | current len: 18 buffer.write_all(&intermediate.total_size.to_be_bytes()[(8 - offset_size)..8])?; @@ -217,13 +307,21 @@ impl<'a> ShardStateWriter<'a> { d2: cell_buffer[1], }; - let ref_offset = 2 + descriptor.byte_len() as usize; - for r in 0..descriptor.reference_count() as usize { - let ref_offset = ref_offset + r * REF_SIZE; - let slice = &mut cell_buffer[ref_offset..ref_offset + REF_SIZE]; - - let index = u32::from_be_bytes(slice.try_into().unwrap()); - slice.copy_from_slice(&(cell_count - index - 1).to_be_bytes()); + // skip refs for absent cells + if !descriptor.is_absent() { + let hash_depth_len = if descriptor.store_hashes() { + descriptor.hash_count() * (32 + 2) + } else { + 0 + }; + let ref_offset = 2 + hash_depth_len as usize + descriptor.byte_len() as usize; + for r in 0..descriptor.reference_count() as usize { + let ref_offset = ref_offset + r * REF_SIZE; + let slice = &mut cell_buffer[ref_offset..ref_offset + REF_SIZE]; + + let index = u32::from_be_bytes(slice.try_into().unwrap()); + slice.copy_from_slice(&(cell_count - index - 1).to_be_bytes()); + } } buffer.write_all(&cell_buffer[..cell_size as usize])?; @@ -248,8 +346,8 @@ impl<'a> ShardStateWriter<'a> { }; let name = match file_name { - None => Self::file_name(self.block_id), - Some(name) => Self::file_name(name), + None => Self::file_name_ext(self.block_id, self.part_prefix), + Some(name) => Self::file_name_ext(name, self.part_prefix), }; self.states_dir.file(&temp_file_name).rename(name)?; @@ -260,6 +358,7 @@ impl<'a> ShardStateWriter<'a> { fn write_rev( &self, root_hash: &[u8; 32], + mut to_make_absent_cells: Option>, cancelled: Option<&CancellationFlag>, ) -> Result { enum StackItem { @@ -282,6 +381,11 @@ impl<'a> ShardStateWriter<'a> { let mut references_buffer = SmallVec::<[[u8; 32]; 4]>::with_capacity(4); + // data buffer for absent cell + let mut absent_data_buffer = Vec::new(); + + let mut absent_cells_count = 0u32; + let mut indices = FastHashMap::default(); let mut remap = FastHashMap::default(); let mut cell_sizes = Vec::::with_capacity(FILE_BUFFER_LEN); @@ -291,6 +395,18 @@ impl<'a> ShardStateWriter<'a> { let mut iteration = 0u32; let mut remap_index = 0u32; + // we put cells in the stack and traverse down one branch until the leaf is reached, + // then write this leaf cell, go back to the parent cell, and visit the sibling branch, + // so when both child branches are stored, we continue moving backward + + // the leaf will have index 0 and the root will have the maximum index + // parent cells refer to children by indexes + // e.g. for the original branch + // A -> B -> D + // -> C -> E + // the intermediate file will contain + // E[0; ] C[1; ref 0] D[2; ] B[3; ref 2] A[4; ref 1,3] + stack.push((iteration, StackItem::New(*root_hash))); indices.insert(*root_hash, (iteration, false)); @@ -306,18 +422,42 @@ impl<'a> ShardStateWriter<'a> { match data { StackItem::New(hash) => { - let value = raw - .get_pinned_cf_opt(&cf, hash, read_options)? - .ok_or(CellWriterError::CellNotFound)?; - - let (_, value) = - decode_indexed_value(value.as_ref()).ok_or(CellWriterError::InvalidCell)?; - if value.is_empty() { - return Err(CellWriterError::InvalidCell.into()); - } + let hash_bytes = HashBytes::from(hash); + + let to_make_absent_cell = to_make_absent_cells + .as_mut() + .and_then(|map| map.remove(&hash_bytes)); + + let (descriptor, data) = if let Some(to_make_absent_cell) = &to_make_absent_cell + { + // count absent cells + absent_cells_count += 1; + + let descriptor = + make_absent_cell_data(to_make_absent_cell, &mut absent_data_buffer); + + references_buffer.clear(); + ( + descriptor, + SmallVec::from_slice(absent_data_buffer.as_slice()), + ) + } else { + // read cell from db + let value = raw + .get_pinned_cf_opt(&cf, hash, read_options)? + .ok_or(CellWriterError::CellNotFound(hash_bytes))?; + + let (_, value) = decode_indexed_value(value.as_ref()) + .ok_or(CellWriterError::InvalidCell(hash_bytes))?; + if value.is_empty() { + return Err(CellWriterError::InvalidCell(hash_bytes).into()); + } - let (descriptor, data) = deserialize_cell(value, &mut references_buffer) - .ok_or(CellWriterError::InvalidCell)?; + let (descriptor, data) = deserialize_cell(value, &mut references_buffer) + .ok_or(CellWriterError::InvalidCell(hash_bytes))?; + + (descriptor, SmallVec::from_slice(data)) + }; let mut reference_indices = SmallVec::with_capacity(references_buffer.len()); @@ -357,7 +497,7 @@ impl<'a> ShardStateWriter<'a> { StackItem::Loaded(LoadedCell { hash, descriptor, - data: SmallVec::from_slice(data), + data, indices: reference_indices, }), )); @@ -392,11 +532,21 @@ impl<'a> ShardStateWriter<'a> { tracing::info!(iteration); } - let cell_size = 2 + loaded.data.len() + loaded.indices.len() * REF_SIZE; + // absent cell contain only 1 descriptor byte + let descriptor_len = 1 + !loaded.descriptor.is_absent() as usize; + + let cell_size = + descriptor_len + loaded.data.len() + loaded.indices.len() * REF_SIZE; cell_sizes.push(cell_size as u8); total_size += cell_size as u64; - temp_file_buffer.write_all(&[loaded.descriptor.d1, loaded.descriptor.d2])?; + if descriptor_len > 1 { + temp_file_buffer + .write_all(&[loaded.descriptor.d1, loaded.descriptor.d2])?; + } else { + temp_file_buffer.write_all(&[loaded.descriptor.d1])?; + } + temp_file_buffer.write_all(&loaded.data)?; for index in loaded.indices { let index = remap.get(&index).with_context(|| { @@ -408,6 +558,8 @@ impl<'a> ShardStateWriter<'a> { } } + ensure_absent_cells_consumed(&to_make_absent_cells)?; + drop(temp_file_buffer); file.flush()?; @@ -416,14 +568,100 @@ impl<'a> ShardStateWriter<'a> { file, cell_sizes, total_size, + absent_cells_count, }) } } +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct PersistentStateMeta { + pub split_depth: u8, + pub parts: Vec, +} + +impl PersistentStateMeta { + pub const VERSION: u8 = 1; + + pub fn new(split_depth: u8, mut parts: Vec) -> Self { + parts.sort_unstable(); + parts.dedup(); + Self { split_depth, parts } + } + + pub fn write(&self, states_dir: &Dir, block_id: &BlockId) -> Result<()> { + let file_path = states_dir + .path() + .join(ShardStateWriter::meta_file_name(block_id)); + self.write_to_file(file_path) + } + + pub fn write_to_file(&self, file_path: impl AsRef) -> Result<()> { + let raw = RawPersistentStateMeta { + version: Self::VERSION, + split_depth: self.split_depth, + parts: self + .parts + .iter() + .map(|prefix| format!("{prefix:016x}")) + .collect(), + }; + + let file_path = file_path.as_ref(); + let temp_file_path = file_path.with_extension("temp"); + scopeguard::defer! { + std::fs::remove_file(&temp_file_path).ok(); + } + + tycho_util::serde_helpers::save_json_to_file(&raw, &temp_file_path)?; + std::fs::rename(&temp_file_path, file_path)?; + + Ok(()) + } + + pub fn read(states_dir: &Dir, block_id: &BlockId) -> Result> { + let file_path = states_dir + .path() + .join(ShardStateWriter::meta_file_name(block_id)); + Self::read_from_file(file_path) + } + + pub fn read_from_file(file_path: impl AsRef) -> Result> { + if !file_path.as_ref().exists() { + return Ok(None); + } + + let raw: RawPersistentStateMeta = + tycho_util::serde_helpers::load_json_from_file(file_path)?; + anyhow::ensure!( + raw.version == Self::VERSION, + "unsupported persistent state meta version: {}", + raw.version + ); + let mut parts = Vec::with_capacity(raw.parts.len()); + for prefix in raw.parts { + if prefix.len() != 16 || !prefix.chars().all(|c| c.is_ascii_hexdigit()) { + anyhow::bail!("invalid persistent state part prefix: {prefix}"); + } + parts.push(u64::from_str_radix(&prefix, 16)?); + } + + Ok(Some(Self::new(raw.split_depth, parts))) + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct RawPersistentStateMeta { + version: u8, + split_depth: u8, + #[serde(default)] + parts: Vec, +} + struct IntermediateState { file: File, cell_sizes: Vec, total_size: u64, + absent_cells_count: u32, } fn deserialize_cell<'a>( @@ -463,6 +701,40 @@ fn deserialize_cell<'a>( Some((descriptor, data)) } +fn make_absent_cell_data(cell: &Cell, absent_data_buffer: &mut Vec) -> CellDescriptor { + let level_mask = cell.level_mask(); + let d1 = CellDescriptor::ABSENT_MASK | level_mask.to_byte() << 5; + let absent_descriptor = CellDescriptor::new([d1, 0]); + + absent_data_buffer.clear(); + absent_data_buffer.reserve(absent_descriptor.hash_count() as usize * (32 + 2)); + + for level in level_mask { + absent_data_buffer.extend_from_slice(cell.hash(level).as_slice()); + } + + for level in level_mask { + absent_data_buffer.extend_from_slice(&cell.depth(level).to_be_bytes()); + } + + absent_descriptor +} + +fn ensure_absent_cells_consumed( + to_make_absent_cells: &Option>, +) -> Result<()> { + if let Some(to_make_absent_cells) = to_make_absent_cells { + let remaining = to_make_absent_cells.len(); + anyhow::ensure!( + remaining == 0, + "not all requested absent cells were written: remaining={}, sample={:?}", + remaining, + to_make_absent_cells.keys().take(4).collect::>(), + ); + } + Ok(()) +} + fn number_of_bytes_to_fit(l: u64) -> u32 { 8 - l.leading_zeros() / 8 } @@ -502,10 +774,10 @@ const FILE_BUFFER_LEN: usize = 128 * 1024 * 1024; // 128 MB #[derive(thiserror::Error, Debug)] enum CellWriterError { - #[error("Cell not found in cell db")] - CellNotFound, - #[error("Invalid cell")] - InvalidCell, + #[error("Cell {0} not found in cell db")] + CellNotFound(HashBytes), + #[error("Invalid cell {0}")] + InvalidCell(HashBytes), } struct IntermediateHasher { diff --git a/core/src/storage/persistent_state/tests.rs b/core/src/storage/persistent_state/tests.rs index 0bc24eda28..9aa4e014f7 100644 --- a/core/src/storage/persistent_state/tests.rs +++ b/core/src/storage/persistent_state/tests.rs @@ -1,11 +1,14 @@ use std::collections::BTreeSet; +use std::io::{Seek, SeekFrom, Write}; use anyhow::Result; +use bytes::Bytes; use bytesize::ByteSize; use tycho_block_util::queue::{ QueueDiffStuff, QueueKey, QueueStateHeader, RouterAddr, RouterPartitions, }; use tycho_block_util::state::ShardStateStuff; +use tycho_storage::fs::Dir; use tycho_storage::{StorageConfig, StorageContext}; use tycho_types::boc::Boc; use tycho_types::cell::{Cell, CellBuilder, CellSlice, HashBytes, Lazy}; @@ -14,15 +17,204 @@ use tycho_types::models::{ MsgInfo, OutMsg, OutMsgDescr, OutMsgNew, OutMsgQueueUpdates, ShardIdent, StdAddr, }; use tycho_types::num::Tokens; -use tycho_util::FastHashSet; use tycho_util::compression::zstd_decompress_simple; +use tycho_util::{FastHashMap, FastHashSet}; +use weedb::rocksdb::IteratorMode; use crate::ZEROSTATE_BOC; use crate::storage::persistent_state::{ - CacheKey, PersistentStateKind, QueueStateReader, QueueStateWriter, + CacheKey, PersistentStateKind, PersistentStateMeta, QueueStateReader, QueueStateWriter, + ShardStateWriter, check_can_reuse_shard_state_part_files, parse_shard_state_part_file_name, }; use crate::storage::{CoreStorage, CoreStorageConfig, NewBlockMeta}; +#[test] +fn persistent_state_meta_roundtrip() -> Result<()> { + let temp_dir = tempfile::tempdir()?; + let dir = Dir::new(temp_dir.path())?; + let block_id = BlockId { + shard: ShardIdent::BASECHAIN, + seqno: 36, + root_hash: HashBytes::from([1; 32]), + file_hash: HashBytes::from([2; 32]), + }; + let meta = PersistentStateMeta::new(2, vec![0xa000000000000000, 0x2000000000000000]); + + meta.write(&dir, &block_id)?; + let loaded = PersistentStateMeta::read(&dir, &block_id)?.unwrap(); + + assert_eq!(loaded, meta); + Ok(()) +} + +#[test] +fn shard_state_part_file_name_parser_recognizes_only_parts() -> Result<()> { + let block_id = BlockId { + shard: ShardIdent::BASECHAIN, + seqno: 36, + root_hash: HashBytes::from([1; 32]), + file_hash: HashBytes::from([2; 32]), + }; + let part_file = ShardStateWriter::file_name_ext(block_id, Some(0xa000000000000000)); + let main_file = ShardStateWriter::file_name(block_id); + let block_id_string = block_id.to_string(); + assert_eq!( + parse_shard_state_part_file_name(&part_file), + Some((block_id_string.as_str(), 0xa000000000000000)) + ); + assert!(parse_shard_state_part_file_name(&main_file).is_none()); + Ok(()) +} + +#[test] +fn test_check_can_reuse_shard_state_part_files() -> Result<()> { + let temp_dir = tempfile::tempdir()?; + let dir = Dir::new(temp_dir.path())?; + let block_id = BlockId { + shard: ShardIdent::BASECHAIN, + seqno: 36, + root_hash: HashBytes::from([1; 32]), + file_hash: HashBytes::from([2; 32]), + }; + let part_prefix = 0xa000000000000000; + let expected_meta = PersistentStateMeta::new(2, vec![part_prefix]); + + expected_meta.write(&dir, &block_id)?; + std::fs::write( + dir.file(ShardStateWriter::file_name_ext(block_id, Some(part_prefix))) + .path(), + [1], + )?; + assert!(check_can_reuse_shard_state_part_files( + &dir, + &block_id, + &expected_meta + )?); + + PersistentStateMeta::new(3, vec![part_prefix]).write(&dir, &block_id)?; + assert!(!check_can_reuse_shard_state_part_files( + &dir, + &block_id, + &expected_meta + )?); + + expected_meta.write(&dir, &block_id)?; + std::fs::remove_file( + dir.file(ShardStateWriter::file_name_ext(block_id, Some(part_prefix))) + .path(), + )?; + assert!(!check_can_reuse_shard_state_part_files( + &dir, + &block_id, + &expected_meta + )?); + + std::fs::write( + dir.file(ShardStateWriter::file_name_ext(block_id, Some(part_prefix))) + .path(), + [1], + )?; + std::fs::write( + dir.file(ShardStateWriter::meta_file_name(&block_id)).path(), + "{", + )?; + assert!(!check_can_reuse_shard_state_part_files( + &dir, + &block_id, + &expected_meta + )?); + Ok(()) +} + +#[tokio::test] +async fn cache_rejects_split_sidecars_without_metadata() -> Result<()> { + let (ctx, _tmp_dir) = StorageContext::new_temp().await?; + let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?; + let persistent_states = storage.persistent_state_storage(); + let block_id = BlockId { + shard: ShardIdent::BASECHAIN, + seqno: 36, + root_hash: HashBytes::from([1; 32]), + file_hash: HashBytes::from([2; 32]), + }; + let states_dir = persistent_states + .inner + .prepare_persistent_states_dir(block_id.seqno)?; + std::fs::write( + states_dir + .file(ShardStateWriter::file_name(block_id)) + .path(), + [1], + )?; + std::fs::write( + states_dir + .file(ShardStateWriter::file_name_ext( + block_id, + Some(0xa000000000000000), + )) + .path(), + [2], + )?; + let err = match persistent_states.inner.cache_state( + block_id.seqno, + &block_id, + PersistentStateKind::Shard, + ) { + Ok(_) => panic!("split sidecars without metadata must not be cached"), + Err(e) => e, + }; + assert!(err.to_string().contains("missing metadata")); + assert!(!persistent_states.state_exists(&block_id, PersistentStateKind::Shard)); + Ok(()) +} + +#[tokio::test] +async fn persistent_state_writer_rejects_missing_absent_cell() -> Result<()> { + // Store a real shard state first so the DB-backed writer can load the root by hash. + let (ctx, _tmp_dir) = StorageContext::new_temp().await?; + let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?; + let shard_states = storage.shard_state_storage(); + let persistent_states = storage.persistent_state_storage(); + let zerostate_root = Boc::decode(ZEROSTATE_BOC)?; + let zerostate_id = BlockId { + shard: ShardIdent::MASTERCHAIN, + seqno: 0, + root_hash: *zerostate_root.repr_hash(), + file_hash: Boc::file_hash_blake(ZEROSTATE_BOC), + }; + let zerostate = ShardStateStuff::from_root( + &zerostate_id, + zerostate_root.clone(), + shard_states.min_ref_mc_state().insert_untracked(), + )?; + let (handle, _) = storage.block_handle_storage().create_or_load_handle( + &zerostate_id, + NewBlockMeta::zero_state(zerostate.as_ref().gen_utime, true), + ); + shard_states + .store_state_ignore_cache(&handle, &zerostate, Default::default()) + .await?; + storage.block_handle_storage().set_skip_states_gc(&handle); + persistent_states.store_shard_state(0, &handle).await?; + + // Ask the writer to replace a hash that is not reachable from the root. + let temp_dir = tempfile::tempdir()?; + let dir = Dir::new(temp_dir.path())?; + let mut absent_cells = FastHashMap::default(); + absent_cells.insert(HashBytes::from([3; 32]), zerostate_root); + + let err = ShardStateWriter::new(&persistent_states.inner.cells_db, &dir, &zerostate_id) + .write_with_absent(&zerostate_id.root_hash, absent_cells, None) + .unwrap_err(); + + assert!(err.chain().any(|e| { + e.to_string() + .contains("not all requested absent cells were written") + })); + + Ok(()) +} + #[tokio::test] async fn persistent_shard_state() -> Result<()> { tycho_util::test::init_logger("persistent_shard_state", "debug"); @@ -80,13 +272,18 @@ async fn persistent_shard_state() -> Result<()> { storage.block_handle_storage().set_skip_states_gc(&handle); persistent_states.store_shard_state(0, &handle).await?; + // Read metadata + let meta = PersistentStateMeta::read(&persistent_states.inner.mc_states_dir(0), &zerostate_id)? + .unwrap(); + assert_eq!(meta, PersistentStateMeta::default()); + // Check if state exists let exist = persistent_states.state_exists(zerostate.block_id(), PersistentStateKind::Shard); assert!(exist); let read_verify_state = || async { let persistent_state_data = persistent_states - .read_state_part(zerostate.block_id(), 0, PersistentStateKind::Shard) + .read_state_chunk(zerostate.block_id(), 0, PersistentStateKind::Shard, None) .await .unwrap(); @@ -131,6 +328,14 @@ async fn persistent_shard_state() -> Result<()> { .store_shard_state(new_mc_seqno, &handle) .await?; + // Read metadata + let meta = PersistentStateMeta::read( + &persistent_states.inner.mc_states_dir(new_mc_seqno), + &zerostate_id, + )? + .unwrap(); + assert_eq!(meta, PersistentStateMeta::default()); + // Check if state exists let exist = persistent_states.state_exists(zerostate.block_id(), PersistentStateKind::Shard); assert!(exist); @@ -188,6 +393,197 @@ async fn persistent_shard_state() -> Result<()> { Ok(()) } +#[tokio::test] +async fn split_persistent_shard_state_import_from_dump() -> Result<()> { + tycho_util::test::init_logger("split_persistent_shard_state_import_from_dump", "debug"); + + const DUMP_PATH: &str = concat!( + env!("CARGO_MANIFEST_DIR"), + "/../test/data/dump/persistents/", + "0:8000000000000000:36:", + "78d7e559cf68d9d1821520d1b53b16ba5cf9cef139106efd388af2bfe2016817:", + "a875088e0557ab41241aaf07db46e9422e070d7d3842fe4f7269ae6a0bd69ae5.boc", + ); + + fn tempfile_from_bytes(bytes: &[u8]) -> Result { + let mut file = tempfile::tempfile()?; + file.write_all(bytes)?; + file.seek(SeekFrom::Start(0))?; + Ok(file) + } + + let collect_cell_counters = |storage: &CoreStorage| -> Result> { + let cell_storage = storage.shard_state_storage().cell_storage(); + let mut counters = FastHashMap::default(); + for item in cell_storage.db().cells.iterator(IteratorMode::Start) { + let (key, _) = item?; + let hash = HashBytes::from_slice(key.as_ref()); + let Some(counter) = cell_storage.get_counter_value(&hash)? else { + anyhow::bail!("missing counter for persisted cell {hash}"); + }; + counters.insert(hash, counter); + } + Ok(counters) + }; + + let dump_path = std::path::Path::new(DUMP_PATH); + let block_id: BlockId = dump_path + .file_stem() + .and_then(|stem| stem.to_str()) + .unwrap() + .parse()?; + let full_boc = zstd_decompress_simple(&std::fs::read(dump_path)?)?; + let expected_state_root_hash = *Boc::decode(&full_boc)?.repr_hash(); + + // Import the compressed dump as a regular single-file persistent state. + let mut config = CoreStorageConfig::new_potato(); + config.persistent_state_split_depth = 2; + let (ctx, tmp_dir) = StorageContext::new_temp().await?; + let storage = CoreStorage::open(ctx, config).await?; + let shard_states = storage.shard_state_storage(); + + shard_states.begin_raw_import()?; + let root_hash = shard_states + .store_state_bytes( + &block_id, + Bytes::from(full_boc.clone()), + Some(&expected_state_root_hash), + ) + .await?; + anyhow::ensure!( + root_hash == expected_state_root_hash, + "dump state root hash mismatch" + ); + shard_states.finish_raw_import()?; + let full_import_counters = collect_cell_counters(&storage)?; + + let loaded_state = shard_states.load_state(block_id.seqno, &block_id).await?; + let (handle, _) = + storage + .block_handle_storage() + .create_or_load_handle(&block_id, NewBlockMeta { + is_key_block: block_id.is_masterchain(), + gen_utime: loaded_state.as_ref().gen_utime, + ref_by_mc_seqno: block_id.seqno, + }); + storage.block_handle_storage().set_has_shard_state(&handle); + storage.block_handle_storage().set_skip_states_gc(&handle); + + // Save the imported state as a split persistent bundle. + let persistent_states = storage.persistent_state_storage(); + persistent_states + .store_shard_state(block_id.seqno, &handle) + .await?; + + let states_dir = persistent_states.inner.mc_states_dir(block_id.seqno); + let meta = PersistentStateMeta::read(&states_dir, &block_id)?.unwrap(); + assert_eq!(meta.split_depth, 2); + assert!(!meta.parts.is_empty()); + + let read_decompressed = |file_name| -> Result> { + Ok(zstd_decompress_simple(&std::fs::read( + states_dir.file(file_name).path(), + )?)?) + }; + + let main_boc = read_decompressed(ShardStateWriter::file_name(block_id))?; + let part_bocs = meta + .parts + .iter() + .map(|&prefix| read_decompressed(ShardStateWriter::file_name_ext(block_id, Some(prefix)))) + .collect::>>()?; + + let persistent_state_info = persistent_states + .get_state_info(&block_id, PersistentStateKind::Shard) + .unwrap(); + assert_eq!(persistent_state_info.split_depth, 2); + assert_eq!(persistent_state_info.parts.len(), meta.parts.len()); + + let part_chunk = persistent_states + .read_state_chunk( + &block_id, + 0, + PersistentStateKind::Shard, + Some(meta.parts[0]), + ) + .await + .unwrap(); + assert_eq!(zstd_decompress_simple(&part_chunk)?, part_bocs[0]); + + // Test preload flow + drop(loaded_state); + drop(storage); + let ctx = StorageContext::new(StorageConfig::new_potato(tmp_dir.path())).await?; + let reloaded_storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?; + let reloaded_persistent_states = reloaded_storage.persistent_state_storage(); + let reloaded_state_info = reloaded_persistent_states + .get_state_info(&block_id, PersistentStateKind::Shard) + .unwrap(); + assert_eq!(reloaded_state_info.split_depth, 2); + assert_eq!(reloaded_state_info.parts.len(), meta.parts.len()); + + // Import the split bundle into a fresh storage and verify it reconstructs the same state. + let (ctx, _tmp_dir) = StorageContext::new_temp().await?; + let imported_storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?; + let imported_shard_states = imported_storage.shard_state_storage(); + let part_files = part_bocs + .iter() + .map(|part| tempfile_from_bytes(part)) + .collect::>>()?; + + imported_shard_states.begin_raw_import()?; + let imported_root_hash = imported_shard_states + .store_state_split_files( + &block_id, + tempfile_from_bytes(&main_boc)?, + part_files, + Some(&expected_state_root_hash), + ) + .await?; + anyhow::ensure!( + imported_root_hash == expected_state_root_hash, + "split import root hash mismatch" + ); + imported_shard_states.finish_raw_import()?; + let split_import_counters = collect_cell_counters(&imported_storage)?; + assert_eq!(split_import_counters, full_import_counters); + + let imported_state = imported_shard_states + .load_state(block_id.seqno, &block_id) + .await?; + assert_eq!(imported_state.block_id(), &block_id); + assert_eq!( + imported_state.root_cell().repr_hash(), + &expected_state_root_hash + ); + + // Dropping a required part must fail before the split bundle is accepted. + let (ctx, _tmp_dir) = StorageContext::new_temp().await?; + let broken_storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?; + let broken_shard_states = broken_storage.shard_state_storage(); + broken_shard_states.begin_raw_import()?; + let missing_parts = part_bocs + .iter() + .skip(1) + .map(|part| tempfile_from_bytes(part)) + .collect::>>()?; + let err = broken_shard_states + .store_state_split_files( + &block_id, + tempfile_from_bytes(&main_boc)?, + missing_parts, + Some(&expected_state_root_hash), + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("split shard state parts do not match main file absent cells") + ); + + Ok(()) +} + #[tokio::test] async fn persistent_queue_state_read_write() -> Result<()> { tycho_util::test::init_logger("persistent_queue_state_read_write", "debug"); diff --git a/core/src/storage/shard_state/cell_storage.rs b/core/src/storage/shard_state/cell_storage/mod.rs similarity index 99% rename from core/src/storage/shard_state/cell_storage.rs rename to core/src/storage/shard_state/cell_storage/mod.rs index 38ebc4b235..00f47b524c 100644 --- a/core/src/storage/shard_state/cell_storage.rs +++ b/core/src/storage/shard_state/cell_storage/mod.rs @@ -35,6 +35,8 @@ use crate::storage::{CellsDb, CoreStorageConfig}; const PERSISTED_CELL_FILTER_MAX_CAPACITY: u64 = 2_000_000_000; const PERSISTED_CELL_FILTER_FP_RATE: f64 = 0.001; +pub mod raw; + pub struct CellStorage { cells_db: CellsDb, cell_counters: Mutex, @@ -99,6 +101,27 @@ impl CellStorage { &self.cells_db } + #[cfg(test)] + pub(crate) fn get_counter_value( + &self, + key: &HashBytes, + ) -> Result, CellStorageError> { + let Some(value) = self + .cells_db + .cells + .get(key) + .map_err(CellStorageError::Internal)? + else { + return Ok(None); + }; + let Some((idx, _)) = decode_indexed_value(value.as_ref()) else { + return Err(CellStorageError::InvalidCell); + }; + + let cell_counters = self.cell_counters.lock().unwrap(); + Ok(Some(cell_counters.counters.get(idx))) + } + pub(super) fn prepare_persistent_state_save(&self) { // Serialize with store/remove finalization so the nursery snapshot, // WAL remove record, and in-memory publish describe the same entries. @@ -1451,6 +1474,8 @@ fn owned_hash_key(hash: &HashBytes) -> HashBytesKey { pub enum CellStorageError { #[error("Cell not found in cell db")] CellNotFound, + #[error("Raw import writer stopped")] + RawImportWriterStopped, #[error("Invalid cell")] InvalidCell, #[error("Cell counter mismatch: expected refcount {expected}, got {actual} removes")] diff --git a/core/src/storage/shard_state/cell_storage/raw.rs b/core/src/storage/shard_state/cell_storage/raw.rs new file mode 100644 index 0000000000..2ca8168cb0 --- /dev/null +++ b/core/src/storage/shard_state/cell_storage/raw.rs @@ -0,0 +1,516 @@ +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}; +use std::sync::mpsc::{SyncSender, sync_channel}; + +use anyhow::Result; +use bytesize::ByteSize; +use crossbeam_queue::SegQueue; +use tycho_types::cell::{CellDescriptor, HashBytes}; +use tycho_util::fs::MappedFile; +use weedb::rocksdb; + +use super::{ + BuildTrustedCellHasher, CellDashMap, CellHashMap, CellStorage, CellStorageError, Counters, + CountersStore, HashBytesKey, Idx, NextIdx, PersistedCellFilter, StorageCell, hash_key, + owned_hash_key, +}; +use crate::storage::shard_state::db_state::CellsDbStateKey; +use crate::storage::shard_state::row_format::encode_indexed_value; + +impl CellStorage { + pub fn apply_indexed_temp_cell( + &self, + root: &HashBytes, + finalized_temp_cell_source: FinalizedTempCellSource<'_>, + ) -> Result<()> { + const MAX_NEW_CELLS_BATCH_SIZE: usize = 100_000; + const MAX_NEW_CELLS_BATCH_BYTES: usize = ByteSize::mib(16).0 as usize; + const LOCAL_SPLIT_DEPTH: usize = 8; + const MAX_LOCAL_STACK: usize = 4096; + + let limits = RawImportLimits { + max_new_cells_batch_size: MAX_NEW_CELLS_BATCH_SIZE, + max_new_cells_batch_bytes: MAX_NEW_CELLS_BATCH_BYTES, + local_split_depth: LOCAL_SPLIT_DEPTH, + max_local_stack: MAX_LOCAL_STACK, + }; + + let mut cell_counters = self.cell_counters.lock().unwrap(); + let mut persisted_filter = self.persisted_filter.lock().unwrap(); + let ctx = RawImportContext::new( + self, + finalized_temp_cell_source, + limits, + cell_counters.counters.next_idx, + ); + ctx.queue.push(WorkItem { + hash: *root, + depth: 0, + }); + + let worker_count = self.worker_pool.current_num_threads().max(1); + let (tx, rx) = sync_channel::(worker_count * 2); + std::thread::scope(|thread_scope| { + // Keep RocksDB writes outside the Rayon pool: with one storage + // worker, a blocking sync_channel send would otherwise deadlock. + let writer = thread_scope.spawn(move || { + while let Ok(sealed) = rx.recv() { + self.cells_db.rocksdb().write(sealed.batch)?; + } + Ok::<_, CellStorageError>(()) + }); + + let read = ReadContext { + counters: &cell_counters.counters, + persisted_filter: &persisted_filter, + }; + self.worker_pool.scope(|scope| { + for _ in 0..worker_count { + let tx = tx.clone(); + let read = &read; + scope.spawn(|_| { + // Rayon scoped tasks cannot return a Result to this + // caller, so store the first error and ask peers to stop. + if let Err(e) = ctx.worker_loop(read, tx) { + ctx.set_error(e); + } + }); + } + drop(tx); + }); + + match writer.join() { + Ok(Ok(())) => {} + Ok(Err(e)) => ctx.set_error(e), + Err(_) => ctx.set_error(CellStorageError::RawImportWriterStopped), + } + }); + + if let Some(e) = ctx.error.lock().unwrap().take() { + return Err(e.into()); + } + + ctx.finalize(&mut cell_counters, &mut persisted_filter)?; + + Ok(()) + } +} + +pub struct FinalizedTempCellsBuilder { + file: BufWriter, + index: CellHashMap, + bytes: u64, +} + +impl FinalizedTempCellsBuilder { + pub fn new(file: File, cell_count: usize) -> Self { + const BUFFER_CAPACITY: usize = ByteSize::mib(1).0 as usize; + + Self { + file: BufWriter::with_capacity(BUFFER_CAPACITY, file), + index: CellHashMap::with_capacity_and_hasher( + cell_count, + BuildTrustedCellHasher::default(), + ), + bytes: 0, + } + } + + pub fn append(&mut self, hash: &[u8; 32], data: &[u8]) -> Result<()> { + let offset = self.bytes; + self.file.write_all(data)?; + self.bytes += data.len() as u64; + self.index.insert(HashBytesKey(*hash), FinalizedTempCell { + offset, + len: data.len().try_into().unwrap(), + }); + Ok(()) + } + + pub fn finish(mut self) -> Result { + self.file.flush()?; + let file = MappedFile::from_existing_file(self.file.into_inner()?)?; + Ok(FinalizedTempCells { + file, + index: self.index, + }) + } +} + +pub struct FinalizedTempCells { + file: MappedFile, + index: CellHashMap, +} + +impl FinalizedTempCells { + fn get(&self, hash: &HashBytes) -> Option<&[u8]> { + let cell = self.index.get(hash_key(hash))?; + let offset = cell.offset as usize; + let len = cell.len as usize; + Some(&self.file.as_slice()[offset..offset + len]) + } + + fn len(&self) -> usize { + self.index.len() + } +} + +pub struct FinalizedTempCellSource<'a> { + main: &'a FinalizedTempCells, + parts: Vec<&'a FinalizedTempCells>, + len: usize, +} + +impl<'a> FinalizedTempCellSource<'a> { + pub fn new( + main: &'a FinalizedTempCells, + parts: impl IntoIterator, + ) -> Self { + let parts = parts.into_iter().collect::>(); + let len = main.len() + parts.iter().map(|part| part.len()).sum::(); + Self { main, parts, len } + } + + fn get(&self, hash: &HashBytes) -> Option<&[u8]> { + if let Some(cell) = self.main.get(hash) { + return Some(cell); + } + self.parts.iter().find_map(|part| part.get(hash)) + } + + fn len(&self) -> usize { + self.len + } +} + +struct FinalizedTempCell { + offset: u64, + len: u16, +} + +struct RawImportContext<'a> { + storage: &'a CellStorage, + finalized_temp_cell_source: FinalizedTempCellSource<'a>, + limits: RawImportLimits, + transaction: CellDashMap, + next_idx: AtomicU64, + queue: SegQueue, + in_flight: AtomicUsize, + cancelled: AtomicBool, + error: Mutex>, +} + +impl<'a> RawImportContext<'a> { + fn new( + storage: &'a CellStorage, + finalized_temp_cell_source: FinalizedTempCellSource<'a>, + limits: RawImportLimits, + next_idx: NextIdx, + ) -> Self { + let transaction_capacity = finalized_temp_cell_source.len(); + Self { + storage, + finalized_temp_cell_source, + limits, + transaction: CellDashMap::with_capacity_and_hasher_and_shard_amount( + transaction_capacity, + Default::default(), + 512, + ), + next_idx: AtomicU64::new(next_idx.get()), + queue: SegQueue::new(), + // Counts queued or currently processed cells. The root starts as + // one unfinished item before it is pushed to the queue. + in_flight: AtomicUsize::new(1), + cancelled: AtomicBool::new(false), + error: Mutex::new(None), + } + } + + fn worker_loop( + &self, + read: &ReadContext<'_>, + tx: SyncSender, + ) -> Result<(), CellStorageError> { + let mut worker = Worker::new(self.limits); + loop { + if self.cancelled.load(Ordering::Acquire) { + break; + } + + let Some(item) = worker.local_stack.pop().or_else(|| self.queue.pop()) else { + // Empty queues are not enough to stop: another worker may be + // processing a parent that can still publish children. + if self.in_flight.load(Ordering::Acquire) == 0 { + break; + } + std::thread::yield_now(); + continue; + }; + + let children = self.insert_cell(read, &mut worker, &item.hash, &tx)?; + if let Some(children) = children { + for child in children.into_iter() { + let child = WorkItem { + hash: child, + depth: item.depth + 1, + }; + // Publish each child before retiring the parent below, so + // other workers never observe zero unfinished work early. + self.in_flight.fetch_add(1, Ordering::Release); + if child.depth <= self.limits.local_split_depth + || worker.local_stack.len() > self.limits.max_local_stack + { + self.queue.push(child); + } else { + worker.local_stack.push(child); + } + } + } + self.in_flight.fetch_sub(1, Ordering::Release); + } + + worker.flush(&tx)?; + Ok(()) + } + + fn insert_cell( + &self, + read: &ReadContext<'_>, + worker: &mut Worker, + key: &HashBytes, + tx: &SyncSender, + ) -> Result, CellStorageError> { + use dashmap::mapref::entry::Entry; + + if let Some(value) = self.transaction.get(hash_key(key)) { + value.additions.fetch_add(1, Ordering::Relaxed); + return Ok(None); + } + + let existing = if read.persisted_filter.filter.contains(*key) { + self.storage.get_raw_idx_for_insert(key, usize::MAX)? + } else { + None + }; + + let loaded = if existing.is_none() { + let Some(data) = self.finalized_temp_cell_source.get(key) else { + return Err(CellStorageError::CellNotFound); + }; + Some((data, Self::read_references(data)?)) + } else { + None + }; + + match self.transaction.entry(owned_hash_key(key)) { + Entry::Occupied(value) => { + value.get().additions.fetch_add(1, Ordering::Relaxed); + Ok(None) + } + Entry::Vacant(entry) => { + if let Some(idx) = existing { + entry.insert(TempCell { + idx, + old_rc: read.counters.get(idx), + additions: AtomicU32::new(1), + is_new: false, + }); + Ok(None) + } else { + let (data, children) = loaded.unwrap(); + let idx = Idx::new(self.next_idx.fetch_add(1, Ordering::Relaxed)); + entry.insert(TempCell { + idx, + old_rc: 0, + additions: AtomicU32::new(1), + is_new: true, + }); + worker.stage_new(self.storage, key, idx, data, tx)?; + Ok(Some(children)) + } + } + } + } + + fn read_references(data: &[u8]) -> Result { + if data.len() < 6 { + return Err(CellStorageError::InvalidCell); + } + + let descriptor = CellDescriptor::new([data[0], data[1]]); + let byte_len = descriptor.byte_len() as usize; + let hash_count = descriptor.hash_count() as usize - 1; + let ref_count = descriptor.reference_count() as usize; + + let offset = 6usize + byte_len + StorageCell::HASHES_ITEM_LEN * hash_count; + let end_offset = offset + ref_count * 32; + if data.len() < end_offset { + return Err(CellStorageError::InvalidCell); + } + + let mut refs = ChildHashes::new(); + for chunk in data[offset..end_offset].chunks_exact(32) { + refs.push(HashBytes::from_slice(chunk)); + } + + Ok(refs) + } + + fn finalize( + self, + cell_counters: &mut CountersStore, + persisted_filter: &mut PersistedCellFilter, + ) -> Result<(), CellStorageError> { + cell_counters.counters.next_idx = NextIdx::new(self.next_idx.load(Ordering::Relaxed)); + + let mut batch = rocksdb::WriteBatch::default(); + let mut counter_batch = cell_counters.counters.begin(); + counter_batch.reserve(self.transaction.len()); + + for item in &self.transaction { + let additions = u64::from(item.additions.load(Ordering::Relaxed)); + counter_batch.update_raw(item.idx, item.old_rc, item.old_rc + additions); + } + counter_batch.apply(); + cell_counters.counters.shrink_if_needed(); + + cell_counters + .put_snapshot(&mut batch, CellsDbStateKey::CounterSnapshotLatest) + .map_err(CellStorageError::State)?; + + self.storage.cells_db.rocksdb().write(batch)?; + + for item in self.transaction { + if item.1.is_new { + persisted_filter.insert(&HashBytes(item.0.0)); + } + } + persisted_filter.record_metrics(); + + Ok(()) + } + + fn set_error(&self, e: CellStorageError) { + self.cancelled.store(true, Ordering::Release); + let mut slot = self.error.lock().unwrap(); + if slot.is_none() { + *slot = Some(e); + } + } +} + +struct Worker { + limits: RawImportLimits, + buffer: Vec, + batch: rocksdb::WriteBatch, + batch_cells: usize, + batch_bytes: usize, + local_stack: Vec, +} + +impl Worker { + fn new(limits: RawImportLimits) -> Self { + Self { + limits, + buffer: Vec::with_capacity(512), + batch: rocksdb::WriteBatch::default(), + batch_cells: 0, + batch_bytes: 0, + local_stack: Vec::with_capacity(1024), + } + } + + fn stage_new( + &mut self, + storage: &CellStorage, + key: &HashBytes, + idx: Idx, + data: &[u8], + tx: &SyncSender, + ) -> Result<(), CellStorageError> { + encode_indexed_value(idx, data, &mut self.buffer); + let row_len = self.buffer.len(); + self.batch + .put_cf(&storage.cells_db.cells.cf(), key, self.buffer.as_slice()); + self.batch_cells += 1; + self.batch_bytes += row_len; + + if self.batch_cells >= self.limits.max_new_cells_batch_size + || self.batch_bytes >= self.limits.max_new_cells_batch_bytes + { + self.flush(tx)?; + } + + Ok(()) + } + + fn flush(&mut self, tx: &SyncSender) -> Result<(), CellStorageError> { + if self.batch_cells == 0 { + return Ok(()); + } + + let sealed = SealedBatch { + batch: std::mem::take(&mut self.batch), + }; + self.batch_cells = 0; + self.batch_bytes = 0; + tx.send(sealed) + .map_err(|_err| CellStorageError::RawImportWriterStopped) + } +} + +#[derive(Clone, Copy)] +struct RawImportLimits { + max_new_cells_batch_size: usize, + max_new_cells_batch_bytes: usize, + local_split_depth: usize, + max_local_stack: usize, +} + +struct ReadContext<'a> { + counters: &'a Counters, + persisted_filter: &'a PersistedCellFilter, +} + +struct TempCell { + old_rc: u64, + idx: Idx, + additions: AtomicU32, + is_new: bool, +} + +#[derive(Clone, Copy)] +struct WorkItem { + hash: HashBytes, + depth: usize, +} + +struct SealedBatch { + batch: rocksdb::WriteBatch, +} + +struct ChildHashes { + hashes: [HashBytes; 4], + len: usize, +} + +impl ChildHashes { + fn new() -> Self { + Self { + hashes: [HashBytes([0; 32]); 4], + len: 0, + } + } + + fn push(&mut self, hash: HashBytes) { + self.hashes[self.len] = hash; + self.len += 1; + } + + fn into_iter(self) -> impl Iterator { + self.hashes.into_iter().take(self.len) + } +} diff --git a/core/src/storage/shard_state/entries_buffer.rs b/core/src/storage/shard_state/entries_buffer.rs index 5c46484d64..b6c4c1bf62 100644 --- a/core/src/storage/shard_state/entries_buffer.rs +++ b/core/src/storage/shard_state/entries_buffer.rs @@ -69,6 +69,10 @@ impl HashesEntryWriter<'_> { self.0[1] = cell_type.into(); } + pub fn set_is_absent(&mut self, is_absent: bool) { + self.0[2] = is_absent as u8; + } + pub fn set_hash(&mut self, i: u8, hash: &[u8]) { self.get_hash_slice(i).copy_from_slice(hash); } @@ -92,7 +96,7 @@ impl HashesEntryWriter<'_> { pub struct HashesEntry<'a>(&'a [u8; HashesEntry::LEN]); impl<'a> HashesEntry<'a> { - // 4 bytes - info (1 byte level mask, 1 byte cell type, 2 bytes padding) + // 4 bytes - info (1 byte level mask, 1 byte cell type, 1 byte is absent, 1 byte padding) // 32 * 4 bytes - hashes // 2 * 4 bytes - depths pub const LEN: usize = 4 + 32 * 4 + 2 * 4; @@ -114,6 +118,10 @@ impl<'a> HashesEntry<'a> { } } + pub fn is_absent(&self) -> bool { + self.0[2] != 0 + } + pub fn hash(&self, n: u8) -> &'a [u8; 32] { let offset = Self::HASHES_OFFSET + 32 * self.level_mask().hash_index(n) as usize; unsafe { &*self.0.as_ptr().add(offset).cast() } @@ -136,11 +144,7 @@ impl<'a> HashesEntry<'a> { let offset = Self::HASHES_OFFSET; unsafe { &*self.0.as_ptr().add(offset).cast() } } else { - let offset = 1 + 1 + index * 32; - if data.len() < offset + 32 { - return None; - } - unsafe { &*data.as_ptr().add(offset).cast() } + read_stored_hash_by_index(index, data, 1 + 1)? }) } @@ -149,15 +153,53 @@ impl<'a> HashesEntry<'a> { let index = level_mask.hash_index(n) as usize; let level = level_mask.level() as usize; - Some(if index == level { + if index == level { let offset = Self::DEPTHS_OFFSET; - u16::from_le_bytes([self.0[offset], self.0[offset + 1]]) + Some(u16::from_le_bytes([self.0[offset], self.0[offset + 1]])) } else { - let offset = 1 + 1 + level * 32 + index * 2; - if data.len() < offset + 2 { - return None; - } - u16::from_be_bytes([data[offset], data[offset + 1]]) - }) + read_stored_depth_by_index(level, index, data, 1 + 1) + } + } +} + +pub fn read_stored_hash( + level_mask: LevelMask, + n: u8, + data: &[u8], + hashes_offset: usize, +) -> Option<&[u8; 32]> { + let index = level_mask.hash_index(n) as usize; + read_stored_hash_by_index(index, data, hashes_offset) +} + +fn read_stored_hash_by_index(index: usize, data: &[u8], hashes_offset: usize) -> Option<&[u8; 32]> { + let offset = hashes_offset + index * 32; + if data.len() < offset + 32 { + return None; } + Some(unsafe { &*data.as_ptr().add(offset).cast() }) +} + +pub fn read_stored_depth( + level_mask: LevelMask, + n: u8, + data: &[u8], + hashes_offset: usize, +) -> Option { + let index = level_mask.hash_index(n) as usize; + let level = level_mask.level() as usize; + read_stored_depth_by_index(level, index, data, hashes_offset) +} + +fn read_stored_depth_by_index( + level: usize, + index: usize, + data: &[u8], + hashes_offset: usize, +) -> Option { + let offset = hashes_offset + (level + 1) * 32 + index * 2; + if data.len() < offset + 2 { + return None; + } + Some(u16::from_be_bytes([data[offset], data[offset + 1]])) } diff --git a/core/src/storage/shard_state/mod.rs b/core/src/storage/shard_state/mod.rs index a84f41cee1..0b6bff1e7e 100644 --- a/core/src/storage/shard_state/mod.rs +++ b/core/src/storage/shard_state/mod.rs @@ -13,7 +13,7 @@ use bytes::Bytes; use futures_util::FutureExt; use futures_util::future::BoxFuture; use tycho_block_util::block::*; -use tycho_block_util::dict::split_aug_dict_raw; +use tycho_block_util::dict::{split_aug_dict_raw, split_aug_dict_raw_by_shards}; use tycho_block_util::state::*; use tycho_storage::fs::{Dir, TempFileStorage}; use tycho_storage::kv::StoredValue; @@ -35,7 +35,9 @@ use super::{ CoreStorageConfig, tables, }; use crate::storage::BlockConnection; -use crate::storage::persistent_state::ShardStateWriter; +use crate::storage::persistent_state::{ + PersistentStateMeta, ShardStateWriter, check_can_reuse_shard_state_part_files, +}; mod cell_nursery; mod cell_storage; @@ -141,24 +143,143 @@ impl ShardStateStorage { states_dir: Dir, block_id: BlockId, root_hash: HashBytes, + split_depth: u8, cancelled: Option, - ) -> Result { + ) -> Result { let cells_db = self.cells_db.clone(); let cell_storage = self.cell_storage.clone(); let gc_lock = self.gc_lock.clone().lock_owned().await; + + let split = if split_depth == 0 || block_id.is_masterchain() { + FastHashMap::default() + } else { + let root_cell = self.load_state(0, &block_id).await?.root_cell().clone(); + split_shard_accounts_for_persistent(&block_id.shard, &root_cell, split_depth)? + }; + + let mut parts: Vec = split.values().map(|part| part.prefix).collect(); + parts.sort_unstable(); + let meta = PersistentStateMeta::new(if parts.is_empty() { 0 } else { split_depth }, parts); + + // prepare, check for parts reuse + let parts_reused = tokio::task::spawn_blocking({ + let meta = meta.clone(); + let states_dir = states_dir.clone(); + let is_split = !split.is_empty(); + let span = tracing::Span::current(); + + move || { + let _span = span.enter(); + { + // Serialize with state commits just long enough to publish + // nursery-only cells into RocksDB before the raw writer starts. + let _gc_lock = gc_lock; + cell_storage.prepare_persistent_state_save(); + } + + if is_split + && check_can_reuse_shard_state_part_files(&states_dir, &block_id, &meta)? + { + tracing::debug!("reusing split persistent shard state parts"); + return Ok::<_, anyhow::Error>(true); + } + + Ok::<_, anyhow::Error>(false) + } + }) + .await??; + + // write parts in parallel if exist + if !split.is_empty() && !parts_reused { + // will use semaphore to limit parrallel io operations + const MAX_PARALLEL_PERSISTENT_STATE_PART_WRITES: usize = 4; + let semaphore = Arc::new(tokio::sync::Semaphore::new( + MAX_PARALLEL_PERSISTENT_STATE_PART_WRITES, + )); + + let mut parts_writes = tokio::task::JoinSet::new(); + for (part_root_hash, part) in &split { + let permit = semaphore + .clone() + .acquire_owned() + .await + .context("persistent state part write semaphore closed")?; + let cells_db = cells_db.clone(); + let states_dir = states_dir.clone(); + let part_root_hash = *part_root_hash; + let part_prefix = part.prefix; + let cancelled = cancelled.clone(); + let span = tracing::Span::current(); + + parts_writes.spawn_blocking(move || { + let _permit = permit; + let _span = span.enter(); + + ShardStateWriter::new_part(&cells_db, &states_dir, &block_id, part_prefix) + .write(&part_root_hash, cancelled.as_ref()) + .with_context(|| { + format!("failed to write persistent state part {part_prefix:016x}") + }) + }); + } + + // collect results + let mut write_result = Ok::<_, anyhow::Error>(()); + while let Some(result) = parts_writes.join_next().await { + match result { + Ok(Ok(_)) => {} + Ok(Err(e)) => { + if let Some(cancelled) = &cancelled { + cancelled.cancel(); + } + if write_result.is_ok() { + write_result = Err(e); + } + } + Err(e) => { + if let Some(cancelled) = &cancelled { + cancelled.cancel(); + } + if write_result.is_ok() { + write_result = Err(e.into()); + } + } + } + } + write_result?; + } + + // write metadata and main file let span = tracing::Span::current(); tokio::task::spawn_blocking(move || { let _span = span.enter(); - { - // Serialize with state commits just long enough to publish - // nursery-only cells into RocksDB before the raw writer starts. - let _gc_lock = gc_lock; - cell_storage.prepare_persistent_state_save(); + + if !split.is_empty() { + if !parts_reused { + // write persistent metadata + meta.write(&states_dir, &block_id)?; + } + + // write main + let absent_cells = split + .iter() + .map(|(root_hash, part)| (*root_hash, part.cell.clone())) + .collect(); + ShardStateWriter::new(&cells_db, &states_dir, &block_id).write_with_absent( + &root_hash, + absent_cells, + cancelled.as_ref(), + )?; + } else { + // write persistent metadata + meta.write(&states_dir, &block_id)?; + + ShardStateWriter::new(&cells_db, &states_dir, &block_id) + .write(&root_hash, cancelled.as_ref())?; } - ShardStateWriter::new(&cells_db, &states_dir, &block_id) - .write(&root_hash, cancelled.as_ref()) + Ok::<_, anyhow::Error>(meta) }) .await? } @@ -811,7 +932,19 @@ impl ShardStateStorage { boc: File, expected_root_hash: Option<&HashBytes>, ) -> Result { - self.store_state_raw_inner(block_id, boc, expected_root_hash) + self.store_state_raw_inner(block_id, boc, Vec::new(), expected_root_hash) + .await + } + + // Stores shard state from parts and returns the hash of its root cell. + pub async fn store_state_split_files( + &self, + block_id: &BlockId, + main: File, + parts: Vec, + expected_root_hash: Option<&HashBytes>, + ) -> Result { + self.store_state_raw_inner(block_id, main, parts, expected_root_hash) .await } @@ -822,14 +955,15 @@ impl ShardStateStorage { expected_root_hash: Option<&HashBytes>, ) -> Result { let cursor = Cursor::new(boc); - self.store_state_raw_inner(block_id, cursor, expected_root_hash) + self.store_state_raw_inner(block_id, cursor, Vec::new(), expected_root_hash) .await } async fn store_state_raw_inner( &self, block_id: &BlockId, - boc: R, + main: R, + parts: Vec, expected_root_hash: Option<&HashBytes>, ) -> Result where @@ -852,7 +986,7 @@ impl ShardStateStorage { // Raw import writes directly to RocksDB/counters; RawImportInProgress // is bootstrap-scoped, so publish nursery-only cells before storing. ctx.cell_storage.prepare_persistent_state_save(); - ctx.store(&block_id, boc) + Ok(ctx.store_split(&block_id, main, parts)?.root_hash) }) .await? } @@ -1814,6 +1948,45 @@ pub fn split_shard_accounts( split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts") } +/// Splits shard accounts dict by shards, skips empty shards, +/// and returns map of **not empty** shard branch root cells by hashes: +/// `{ hash: (cell, shard) }`. +pub fn split_shard_accounts_for_persistent( + state_shard: &ShardIdent, + root_cell: impl AsRef, + split_depth: u8, +) -> Result> { + // Cell#0 - processed_upto + // Cell#1 - accounts + let shard_accounts = root_cell + .as_ref() + .reference_cloned(1) + .context("invalid shard state")? + .parse::() + .context("failed to load shard accounts")?; + + let shards = split_aug_dict_raw_by_shards(state_shard.workchain(), shard_accounts, split_depth) + .context("failed to split shard accounts for persistent state")?; + + let mut result = FastHashMap::default(); + for (shard, dict) in shards { + if let Some(cell) = dict { + result.insert(*cell.repr_hash(), ShardAccountsSplitPart { + prefix: shard.prefix(), + cell, + }); + } + } + + Ok(result) +} + +#[derive(Debug, Clone)] +pub struct ShardAccountsSplitPart { + pub prefix: u64, + pub cell: Cell, +} + #[derive(Debug, Clone)] pub struct BlockInfoForApply { pub prev_block_id: BlockId, diff --git a/core/src/storage/shard_state/store_state_raw.rs b/core/src/storage/shard_state/store_state_raw.rs index 699c74e2b0..81378f5a36 100644 --- a/core/src/storage/shard_state/store_state_raw.rs +++ b/core/src/storage/shard_state/store_state_raw.rs @@ -8,15 +8,19 @@ use tycho_storage::kv::StoredValue; use tycho_types::cell::*; use tycho_types::models::BlockId; use tycho_types::util::ArrayVec; -use tycho_util::FastHashMap; use tycho_util::fs::MappedFile; use tycho_util::io::ByteOrderRead; use tycho_util::progress_bar::*; -use weedb::{BoundedCfHandle, rocksdb}; +use tycho_util::{FastHashMap, FastHashSet}; +use weedb::rocksdb; +use super::cell_storage::raw::{ + FinalizedTempCellSource, FinalizedTempCells, FinalizedTempCellsBuilder, +}; use super::cell_storage::*; use super::db_state::{CELL_HASH_RANGE_END, CELL_HASH_RANGE_START}; use super::entries_buffer::*; +use super::util::{CellHashMap, HashBytesKey}; use crate::storage::{BriefBocHeader, CellsDb, ShardStateReader}; pub const MAX_DEPTH: u16 = u16::MAX - 1; @@ -29,16 +33,93 @@ pub struct StoreStateContext { } impl StoreStateContext { - // Stores shard state and returns the hash of its root cell. - pub fn store(&self, block_id: &BlockId, reader: R) -> Result + pub fn store_split( + &self, + block_id: &BlockId, + main: R, + parts: Vec, + ) -> Result + where + R: std::io::Read + Send, + { + // TODO: remove after 1 release + self.clear_temp_cells()?; + + // import main and parts to temp in parallel + let (main, parts) = std::thread::scope(|scope| { + let main_handle = scope.spawn(move || self.import_to_temp(main, true)); + let mut part_handles = Vec::with_capacity(parts.len()); + for part in parts { + part_handles.push(scope.spawn(move || self.import_to_temp(part, false))); + } + let mut parts = Vec::with_capacity(part_handles.len()); + for part_handle in part_handles { + match part_handle.join() { + Ok(part) => parts.push(part?), + Err(_) => anyhow::bail!("persistent state part import thread failed"), + } + } + let main = match main_handle.join() { + Ok(main) => main?, + Err(_) => anyhow::bail!("persistent state main import thread failed"), + }; + Ok::<_, anyhow::Error>((main, parts)) + })?; + + let mut imported_part_roots = FastHashSet::default(); + for part in &parts { + anyhow::ensure!( + part.absent_cells_hashes.is_empty(), + "split shard state part must not contain absent cells" + ); + anyhow::ensure!( + imported_part_roots.insert(part.root_hash), + "duplicate split shard state part root: {}", + part.root_hash + ); + } + + // validate + anyhow::ensure!( + main.absent_cells_hashes == imported_part_roots, + "split shard state parts do not match main file absent cells: \ + absent_count={}, parts_count={}, missing_parts={:?}, unexpected_parts={:?}", + main.absent_cells_hashes.len(), + imported_part_roots.len(), + main.absent_cells_hashes + .difference(&imported_part_roots) + .take(4), + imported_part_roots + .difference(&main.absent_cells_hashes) + .take(4) + ); + + // NOTE: we make one atomic apply for all parts to avoid possible + // dangling part sub-trees (with a fake counter = 1), + // if a process crashed after applying parts but before applying main + + self.apply_temp(block_id, main, parts) + } + + fn clear_temp_cells(&self) -> Result<()> { + self.cells_db.rocksdb().delete_range_cf_opt( + &self.cells_db.temp_cells.cf(), + CELL_HASH_RANGE_START.as_slice(), + CELL_HASH_RANGE_END.as_slice(), + self.cells_db.temp_cells.write_config(), + )?; + Ok(()) + } + + fn import_to_temp(&self, reader: R, is_main_part: bool) -> Result where R: std::io::Read, { - let preprocessed = self.preprocess(reader)?; - self.finalize(block_id, preprocessed) + let preprocessed = self.preprocess(reader, is_main_part)?; + self.finalize_to_temp(preprocessed, is_main_part) } - fn preprocess(&self, reader: R) -> Result + fn preprocess(&self, reader: R, allow_absent: bool) -> Result where R: std::io::Read, { @@ -50,6 +131,10 @@ impl StoreStateContext { let header = *reader.header(); tracing::debug!(?header); + if header.absent_count > 0 && !allow_absent { + anyhow::bail!("absent cells are not supported in a single shard state file"); + } + pg.set_progress(header.cell_count); let temp_file = self.temp_file_storage.unnamed_file().open()?; @@ -97,16 +182,20 @@ impl StoreStateContext { } } - fn finalize(&self, block_id: &BlockId, preprocessed: PreprocessedState) -> Result { - // 2^7 bits + 1 bytes - const MAX_DATA_SIZE: usize = 128; + fn finalize_to_temp( + &self, + preprocessed: PreprocessedState, + is_main_part: bool, + ) -> Result { + // absent cell may contain 4 * (hash + depth) + const MAX_DATA_SIZE: usize = 4 * (32 + 2); const CELLS_PER_BATCH: u64 = 1_000_000; let PreprocessedState { header, file } = preprocessed; let mut pg = ProgressBar::builder() .with_mapper(|x| bytesize::to_string(x, false)) - .build(|msg| tracing::info!("processing state... {msg}")); + .build(|msg| tracing::info!("writing state to temp... {msg}")); let file = MappedFile::from_existing_file(file)?; @@ -116,16 +205,11 @@ impl StoreStateContext { .prealloc(header.cell_count as usize * HashesEntry::LEN) .open_as_mapped_mut()?; - let raw = self.cells_db.rocksdb().as_ref(); - let write_options = self.cells_db.temp_cells.write_config(); - - let mut ctx = FinalizationContext::new(&self.cells_db); - raw.delete_range_cf_opt( - &ctx.temp_cells_cf, - CELL_HASH_RANGE_START.as_slice(), - CELL_HASH_RANGE_END.as_slice(), - write_options, - )?; + let finalized_temp_cells = FinalizedTempCellsBuilder::new( + self.temp_file_storage.unnamed_file().open()?, + header.cell_count as usize, + ); + let mut ctx = FinalizationContext::new(finalized_temp_cells); // Allocate on heap to prevent big future size let mut chunk_buffer = Vec::with_capacity(1 << 20); @@ -137,6 +221,8 @@ impl StoreStateContext { let mut file_pos = total_size; let mut cell_index = header.cell_count; let mut batch_len = 0; + let mut absent_cells_hashes = FastHashSet::default(); + while file_pos >= 4 { file_pos -= 4; @@ -184,7 +270,12 @@ impl StoreStateContext { unsafe { hashes_file.read_exact_at(index as usize * HashesEntry::LEN, buffer) } } - ctx.finalize_cell(cell_index as u32, cell)?; + // collect absent cells hashes for futher validation + if let FinalizeCellResult::Absent { hash } = + ctx.finalize_cell(cell_index as u32, cell)? + { + absent_cells_hashes.insert(hash); + } // SAFETY: `entries_buffer` is guaranteed to be in separate memory area unsafe { @@ -199,7 +290,6 @@ impl StoreStateContext { if batch_len > CELLS_PER_BATCH { ctx.finalize_cell_usages(); - raw.write_opt(std::mem::take(&mut ctx.write_batch), write_options)?; batch_len = 0; } @@ -208,12 +298,11 @@ impl StoreStateContext { if batch_len > 0 { ctx.finalize_cell_usages(); - raw.write_opt(std::mem::take(&mut ctx.write_batch), write_options)?; } // Current entry contains root cell let root_hash = ctx.entries_buffer.repr_hash(); - if let Some(expected_root_hash) = &self.expected_root_hash { + if is_main_part && let Some(expected_root_hash) = &self.expected_root_hash { anyhow::ensure!( root_hash == expected_root_hash, "shard state root hash mismatch: expected={expected_root_hash}, got={}", @@ -222,8 +311,31 @@ impl StoreStateContext { } ctx.final_check(root_hash)?; + let finalized_temp_cells = ctx.finalized_temp_cells.finish()?; + + pg.complete(); + + Ok(TempState { + root_hash: HashBytes(*root_hash), + absent_cells_hashes, + finalized_temp_cells, + }) + } + + fn apply_temp( + &self, + block_id: &BlockId, + state: TempState, + parts: Vec, + ) -> Result { + tracing::info!("applying temp state {}: started", block_id.as_short_id()); + + let temp_cell_source = FinalizedTempCellSource::new( + &state.finalized_temp_cells, + parts.iter().map(|part| &part.finalized_temp_cells), + ); self.cell_storage - .apply_temp_cell(HashBytes::wrap(root_hash))?; + .apply_indexed_temp_cell(&state.root_hash, temp_cell_source)?; let shard_state_key = block_id.to_vec(); let mut finalize_batch = rocksdb::WriteBatch::default(); finalize_batch.delete_range_cf( @@ -234,43 +346,62 @@ impl StoreStateContext { finalize_batch.put_cf( &self.cells_db.shard_states.cf(), shard_state_key.as_slice(), - root_hash.as_slice(), + state.root_hash.as_slice(), ); self.cells_db.rocksdb().write(finalize_batch)?; - pg.complete(); - // Load stored shard state match self.cells_db.shard_states.get(shard_state_key)? { - Some(root) => Ok(HashBytes::from_slice(&root[..32])), - None => Err(StoreStateError::NotFound.into()), + Some(root) => { + tracing::info!("applying temp state {}: finished", block_id.as_short_id()); + + Ok(StoreStateResult { + root_hash: HashBytes::from_slice(&root[..32]), + }) + } + None => { + tracing::error!( + "applying temp state {}: failed - state not found", + block_id.as_short_id() + ); + + Err(StoreStateError::NotFound.into()) + } } } } -struct FinalizationContext<'a> { +pub struct StoreStateResult { + pub root_hash: HashBytes, +} + +struct TempState { + root_hash: HashBytes, + absent_cells_hashes: FastHashSet, + finalized_temp_cells: FinalizedTempCells, +} + +struct FinalizationContext { pruned_branches: FastHashMap>, - cell_usages: FastHashMap<[u8; 32], i32>, + cell_usages: CellHashMap, entries_buffer: EntriesBuffer, output_buffer: Vec, - temp_cells_cf: BoundedCfHandle<'a>, - write_batch: rocksdb::WriteBatch, + finalized_temp_cells: FinalizedTempCellsBuilder, } -impl<'a> FinalizationContext<'a> { - fn new(db: &'a CellsDb) -> Self { +impl FinalizationContext { + fn new(finalized_temp_cells: FinalizedTempCellsBuilder) -> Self { Self { pruned_branches: Default::default(), - cell_usages: FastHashMap::with_capacity_and_hasher(128, Default::default()), + cell_usages: CellHashMap::with_capacity_and_hasher(128, Default::default()), entries_buffer: EntriesBuffer::new(), output_buffer: Vec::with_capacity(1 << 10), - temp_cells_cf: db.temp_cells.cf(), - write_batch: rocksdb::WriteBatch::default(), + finalized_temp_cells, } } // TODO: Somehow reuse `tycho_types::cell::CellParts`. - fn finalize_cell(&mut self, cell_index: u32, cell: RawCell<'_>) -> Result<()> { + fn finalize_cell(&mut self, cell_index: u32, cell: RawCell<'_>) -> Result { use sha2::{Digest, Sha256}; let (mut current_entry, children) = self @@ -288,7 +419,13 @@ impl<'a> FinalizationContext<'a> { let mut is_merkle_cell = false; let mut is_pruned_cell = false; + let mut is_absent_cell = false; + let level_mask = match cell.descriptor.cell_type() { + CellType::Ordinary if cell.descriptor.is_absent() => { + is_absent_cell = true; + cell.descriptor.level_mask() + } CellType::Ordinary => children_mask, CellType::PrunedBranch => { is_pruned_cell = true; @@ -308,6 +445,7 @@ impl<'a> FinalizationContext<'a> { // Save mask and counters current_entry.set_level_mask(level_mask); current_entry.set_cell_type(cell.descriptor.cell_type()); + current_entry.set_is_absent(is_absent_cell); // Calculate hashes let hash_count = if is_pruned_cell { @@ -323,6 +461,21 @@ impl<'a> FinalizationContext<'a> { if level != 0 && (is_pruned_cell || !level_mask.contains(level)) { continue; } + + // for absent cell read depth and hash from data + if is_absent_cell { + let depth = read_stored_depth(level_mask, level, cell.data, 0) + .context("invalid absent cell")?; + current_entry.set_depth(hash_idx, depth); + + let hash = read_stored_hash(level_mask, level, cell.data, 0) + .context("invalid absent cell")?; + current_entry.set_hash(hash_idx, hash); + + hash_idx += 1; + continue; + } + let mut hasher = Sha256::new(); let level_mask = if is_pruned_cell { @@ -394,6 +547,23 @@ impl<'a> FinalizationContext<'a> { self.pruned_branches.insert(cell_index, cell.data.to_vec()); } + // get cell hash + let repr_hash = if is_pruned_cell { + *current_entry + .as_reader() + .pruned_branch_hash(LevelMask::MAX_LEVEL, cell.data) + .context("Invalid pruned branch")? + } else { + *current_entry.as_reader().hash(LevelMask::MAX_LEVEL) + }; + + // do not store absent cell to storage + if is_absent_cell { + return Ok(FinalizeCellResult::Absent { + hash: HashBytes(repr_hash), + }); + } + // Write cell data let output_buffer = &mut self.output_buffer; output_buffer.clear(); @@ -425,26 +595,25 @@ impl<'a> FinalizationContext<'a> { child.hash(LevelMask::MAX_LEVEL) }; - *self.cell_usages.entry(*child_hash).or_default() += 1; + // update cell usages only when child is not absent + // because we won't store absent cells + // this is for the local BOC check after import to temp + if !child.is_absent() { + *self + .cell_usages + .entry(HashBytesKey(*child_hash)) + .or_default() += 1; + } output_buffer.extend_from_slice(child_hash); } // Save serialized data - let repr_hash = if is_pruned_cell { - current_entry - .as_reader() - .pruned_branch_hash(LevelMask::MAX_LEVEL, cell.data) - .context("Invalid pruned branch")? - } else { - current_entry.as_reader().hash(LevelMask::MAX_LEVEL) - }; - - self.write_batch - .put_cf(&self.temp_cells_cf, repr_hash, output_buffer.as_slice()); - self.cell_usages.insert(*repr_hash, -1); + self.finalized_temp_cells + .append(&repr_hash, output_buffer)?; + self.cell_usages.insert(HashBytesKey(repr_hash), -1); // Done - Ok(()) + Ok(FinalizeCellResult::Other) } fn finalize_cell_usages(&mut self) { @@ -456,13 +625,19 @@ impl<'a> FinalizationContext<'a> { tracing::info!(len = ?self.cell_usages.len(), "Cell usages"); anyhow::ensure!( - self.cell_usages.len() == 1 && self.cell_usages.contains_key(root_hash), + self.cell_usages.len() == 1 + && self.cell_usages.contains_key(HashBytesKey::wrap(root_hash)), "Invalid shard state cell" ); Ok(()) } } +enum FinalizeCellResult { + Absent { hash: HashBytes }, + Other, +} + struct PreprocessedState { header: BriefBocHeader, file: File, @@ -486,27 +661,50 @@ impl<'a> RawCell<'a> { where R: Read, { - let mut descriptor = [0u8; 2]; - src.read_exact(&mut descriptor)?; - let descriptor = CellDescriptor::new(descriptor); + let descriptor = { + let d1 = src.read_byte()?; + let check = CellDescriptor::new([d1, 0]); + if check.is_absent() { + check + } else { + let d2 = src.read_byte()?; + CellDescriptor::new([d1, d2]) + } + }; let byte_len = descriptor.byte_len() as usize; let ref_count = descriptor.reference_count() as usize; - if descriptor.is_absent() || ref_count > 4 { + if ref_count > 4 && !descriptor.is_absent() { return Err(parser_error("invalid preprocessed cell descriptor")); } - let data = &mut data_buffer[0..byte_len]; + // for absent cells read stored hashes and depth into data + let data_len = if descriptor.is_absent() { + if byte_len != 0 { + return Err(parser_error( + "absent cell should have no data except hashes/depths", + )); + } + descriptor.hash_count() as usize * (32 + 2) + } else { + byte_len + }; + + let data = &mut data_buffer[0..data_len]; src.read_exact(data)?; let mut reference_indices = ArrayVec::new(); - for _ in 0..ref_count { - let index = src.read_be_uint(ref_size)? as usize; - if index >= cell_count || index <= cell_index { - return Err(parser_error("reference index out of range")); - } else { - // SAFETY: `ref_count` is in range 0..=4 - unsafe { reference_indices.push(index as u32) }; + + // skip refs for absent cells + if !descriptor.is_absent() { + for _ in 0..ref_count { + let index = src.read_be_uint(ref_size)? as usize; + if index >= cell_count || index <= cell_index { + return Err(parser_error("reference index out of range")); + } else { + // SAFETY: `ref_count` is in range 0..=4 + unsafe { reference_indices.push(index as u32) }; + } } } @@ -546,6 +744,7 @@ enum StoreStateError { #[cfg(test)] mod test { use std::collections::BTreeSet; + use std::num::NonZeroUsize; use bytes::Bytes; use bytesize::ByteSize; @@ -619,7 +818,7 @@ mod test { #[allow(clippy::disallowed_methods)] let file = File::open(file.path())?; - store_ctx.store(&block_id, file)?; + store_ctx.store_split(&block_id, file, Vec::new())?; } tracing::info!("Finished processing all states"); tracing::info!("Starting gc"); @@ -776,7 +975,9 @@ mod test { #[tokio::test] async fn raw_state_store_allows_existing_snapshot() -> Result<()> { let (ctx, _tempdir) = StorageContext::new_temp().await?; - let storage = CoreStorage::open(ctx.clone(), CoreStorageConfig::new_potato()).await?; + let mut config = CoreStorageConfig::new_potato(); + config.cell_storage_threads = NonZeroUsize::new(1).unwrap(); + let storage = CoreStorage::open(ctx.clone(), config).await?; let root = Boc::decode(ZEROSTATE_BOC)?; let state = root.parse::()?; diff --git a/core/tests/overlay_server.rs b/core/tests/overlay_server.rs index 09390800a8..c799b07cb6 100644 --- a/core/tests/overlay_server.rs +++ b/core/tests/overlay_server.rs @@ -11,11 +11,14 @@ use tycho_core::blockchain_rpc::{ }; use tycho_core::overlay_client::PublicOverlayClient; use tycho_core::proto::blockchain::{KeyBlockIds, PersistentStateInfo}; -use tycho_core::storage::{CoreStorage, CoreStorageConfig, NewBlockMeta, PersistentStateKind}; +use tycho_core::storage::{ + CoreStorage, CoreStorageConfig, NewBlockMeta, PersistentStateKind, PersistentStateStorage, +}; use tycho_network::{DhtClient, InboundRequestMeta, Network, OverlayId, PeerId, PublicOverlay}; use tycho_storage::StorageContext; use tycho_types::boc::{Boc, BocRepr}; use tycho_types::models::{BlockId, ExtInMsgInfo, OwnedMessage, ShardIdent}; +use tycho_util::compression::zstd_decompress_simple; use tycho_util::fs::MappedFile; use crate::network::TestNode; @@ -317,7 +320,7 @@ async fn overlay_server_persistent_state() -> Result<()> { )?; persistent_states - .store_shard_state_file(0, &zerostate_handle, zerostate_file) + .store_shard_state_files(0, &zerostate_handle, zerostate_file, Vec::new(), 0) .await?; } @@ -348,9 +351,13 @@ async fn overlay_server_persistent_state() -> Result<()> { .find_persistent_state(&zerostate_id, PersistentStateKind::Shard) .await?; + assert_eq!(pending_state.split_depth, 0); + assert!(pending_state.parts.is_empty()); + let temp_file = client .download_persistent_state( pending_state, + None, storage.context().temp_files().unnamed_file().open()?, ) .await?; @@ -361,3 +368,160 @@ async fn overlay_server_persistent_state() -> Result<()> { tracing::info!("done!"); Ok(()) } + +#[tokio::test] +async fn overlay_server_split_persistent_state() -> Result<()> { + tycho_util::test::init_logger("overlay_server_split_persistent_state", "info"); + + const DUMP_PATH: &str = concat!( + env!("CARGO_MANIFEST_DIR"), + "/../test/data/dump/persistents/", + "0:8000000000000000:36:", + "78d7e559cf68d9d1821520d1b53b16ba5cf9cef139106efd388af2bfe2016817:", + "a875088e0557ab41241aaf07db46e9422e070d7d3842fe4f7269ae6a0bd69ae5.boc", + ); + + // load a real shard state dump that produces split persistent parts + let dump_path = std::path::Path::new(DUMP_PATH); + let block_id: BlockId = dump_path + .file_stem() + .and_then(|stem| stem.to_str()) + .unwrap() + .parse()?; + let full_boc = zstd_decompress_simple(&std::fs::read(dump_path)?)?; + let expected_state_root_hash = *Boc::decode(&full_boc)?.repr_hash(); + + // open storage with split persistent states enabled + let mut config = CoreStorageConfig::new_potato(); + config.persistent_state_split_depth = 2; + let (ctx, _tmp_dir) = StorageContext::new_temp().await?; + let storage = CoreStorage::open(ctx, config).await?; + let shard_states = storage.shard_state_storage(); + + // import the full state into shard storage before writing the persistent bundle + shard_states.begin_raw_import()?; + let root_hash = shard_states + .store_state_bytes( + &block_id, + bytes::Bytes::from(full_boc), + Some(&expected_state_root_hash), + ) + .await?; + anyhow::ensure!( + root_hash == expected_state_root_hash, + "dump state root hash mismatch" + ); + shard_states.finish_raw_import()?; + + // create a block handle that can be used by the persistent-state writer + let loaded_state = shard_states.load_state(block_id.seqno, &block_id).await?; + let (handle, _) = + storage + .block_handle_storage() + .create_or_load_handle(&block_id, NewBlockMeta { + is_key_block: block_id.is_masterchain(), + gen_utime: loaded_state.as_ref().gen_utime, + ref_by_mc_seqno: block_id.seqno, + }); + storage.block_handle_storage().set_has_shard_state(&handle); + storage.block_handle_storage().set_skip_states_gc(&handle); + + // write the split persistent bundle using the existing storage flow + let persistent_states = storage.persistent_state_storage(); + persistent_states + .store_shard_state(block_id.seqno, &handle) + .await?; + + // capture local decompressed bytes to compare with overlay downloads + let state_info = persistent_states + .get_state_info(&block_id, PersistentStateKind::Shard) + .unwrap(); + assert_eq!(state_info.split_depth, 2); + assert!(!state_info.parts.is_empty()); + let first_part = state_info.parts[0]; + let expected_main = read_persistent_state_decompressed( + persistent_states, + &block_id, + PersistentStateKind::Shard, + None, + state_info.size, + state_info.chunk_size, + ) + .await?; + let expected_part = read_persistent_state_decompressed( + persistent_states, + &block_id, + PersistentStateKind::Shard, + Some(first_part.prefix), + first_part.size, + state_info.chunk_size, + ) + .await?; + + // expose the storage through the regular overlay test network + let nodes = network::make_network(storage.clone(), 10); + network::discover(&nodes).await?; + + tracing::info!("making split overlay requests..."); + + let node = nodes.first().unwrap(); + let client = BlockchainRpcClient::builder() + .with_public_overlay_client(PublicOverlayClient::new( + node.network().clone(), + node.public_overlay().clone(), + Default::default(), + )) + .build(); + + // verify that overlay discovery returns split metadata + let pending_state = client + .find_persistent_state(&block_id, PersistentStateKind::Shard) + .await?; + assert_eq!(pending_state.split_depth, 2); + assert_eq!(pending_state.parts.len(), state_info.parts.len()); + assert!(pending_state.part(first_part.prefix).is_some()); + + // download and verify the split main state + let main_file = client + .download_persistent_state( + pending_state.clone(), + None, + storage.context().temp_files().unnamed_file().open()?, + ) + .await?; + let main_file = MappedFile::from_existing_file(main_file)?; + assert_eq!(main_file.as_slice(), expected_main); + + // download and verify one advertised split part + let part_file = client + .download_persistent_state( + pending_state, + Some(first_part.prefix), + storage.context().temp_files().unnamed_file().open()?, + ) + .await?; + let part_file = MappedFile::from_existing_file(part_file)?; + assert_eq!(part_file.as_slice(), expected_part); + + tracing::info!("done!"); + Ok(()) +} + +async fn read_persistent_state_decompressed( + persistent_states: &PersistentStateStorage, + block_id: &BlockId, + state_kind: PersistentStateKind, + part_shard_prefix: Option, + size: std::num::NonZeroU64, + chunk_size: std::num::NonZeroU32, +) -> Result> { + let mut compressed = Vec::new(); + for offset in (0..size.get()).step_by(chunk_size.get() as usize) { + let chunk = persistent_states + .read_state_chunk(block_id, offset, state_kind, part_shard_prefix) + .await + .context("failed to read persistent state chunk")?; + compressed.extend_from_slice(&chunk); + } + Ok(zstd_decompress_simple(&compressed)?) +}