diff --git a/chain/chain/src/stateless_validation/chunk_validation.rs b/chain/chain/src/stateless_validation/chunk_validation.rs index cf89a9efe87..0bf6c0f1603 100644 --- a/chain/chain/src/stateless_validation/chunk_validation.rs +++ b/chain/chain/src/stateless_validation/chunk_validation.rs @@ -178,11 +178,11 @@ fn get_state_witness_block_range( last_chunk_shard_id: ShardId, } - let initial_prev_hash = *state_witness.chunk_header.prev_block_hash(); + let initial_prev_hash = *state_witness.inner.chunk_header.prev_block_hash(); let initial_prev_block = store.get_block(&initial_prev_hash)?; let initial_shard_layout = epoch_manager.get_shard_layout_from_prev_block(&initial_prev_hash)?; - let initial_shard_id = state_witness.chunk_header.shard_id(); + let initial_shard_id = state_witness.inner.chunk_header.shard_id(); // Check that shard id is present in current epoch. // TODO: consider more proper way to validate this. let _ = initial_shard_layout.get_shard_index(initial_shard_id)?; @@ -333,8 +333,8 @@ pub fn pre_validate_chunk_state_witness( // Ensure that the chunk header version is supported in this protocol version let protocol_version = - epoch_manager.get_epoch_info(&state_witness.epoch_id)?.protocol_version(); - state_witness.chunk_header.validate_version(protocol_version)?; + epoch_manager.get_epoch_info(&state_witness.inner.epoch_id)?.protocol_version(); + state_witness.inner.chunk_header.validate_version(protocol_version)?; // First, go back through the blockchain history to locate the last new chunk // and last last new chunk for the shard. @@ -348,19 +348,19 @@ pub fn pre_validate_chunk_state_witness( let receipts_to_apply = validate_source_receipt_proofs( epoch_manager, - &state_witness.source_receipt_proofs, + &state_witness.inner.source_receipt_proofs, &blocks_after_last_last_chunk, last_chunk_shard_layout, last_chunk_shard_id, )?; let applied_receipts_hash = hash(&borsh::to_vec(receipts_to_apply.as_slice()).unwrap()); - if applied_receipts_hash != state_witness.applied_receipts_hash { + if applied_receipts_hash != state_witness.inner.applied_receipts_hash { return Err(Error::InvalidChunkStateWitness(format!( "Receipts hash {:?} does not match expected receipts hash {:?}", - applied_receipts_hash, state_witness.applied_receipts_hash + applied_receipts_hash, state_witness.inner.applied_receipts_hash ))); } - let (tx_root_from_state_witness, _) = merklize(&state_witness.transactions); + let (tx_root_from_state_witness, _) = merklize(&state_witness.inner.transactions); let last_chunk_block = blocks_after_last_last_chunk.first().ok_or_else(|| { Error::Other("blocks_after_last_last_chunk is empty, this should be impossible!".into()) })?; @@ -374,15 +374,15 @@ pub fn pre_validate_chunk_state_witness( } let current_protocol_version = - epoch_manager.get_epoch_protocol_version(&state_witness.epoch_id)?; + epoch_manager.get_epoch_protocol_version(&state_witness.inner.epoch_id)?; if !checked_feature!( "protocol_feature_relaxed_chunk_validation", RelaxedChunkValidation, current_protocol_version ) { - let new_transactions = &state_witness.new_transactions; + let new_transactions = &state_witness.inner.new_transactions; let (new_tx_root_from_state_witness, _) = merklize(&new_transactions); - let chunk_tx_root = state_witness.chunk_header.tx_root(); + let chunk_tx_root = state_witness.inner.chunk_header.tx_root(); if new_tx_root_from_state_witness != chunk_tx_root { return Err(Error::InvalidChunkStateWitness(format!( "Witness new transactions root {:?} does not match chunk {:?}", @@ -392,10 +392,10 @@ pub fn pre_validate_chunk_state_witness( // Verify that all proposed transactions are valid. if !new_transactions.is_empty() { let transactions_validation_storage_config = RuntimeStorageConfig { - state_root: state_witness.chunk_header.prev_state_root(), + state_root: state_witness.inner.chunk_header.prev_state_root(), use_flat_storage: true, source: StorageDataSource::Recorded(PartialStorage { - nodes: state_witness.new_transactions_validation_state.clone(), + nodes: state_witness.inner.new_transactions_validation_state.clone(), }), state_patch: Default::default(), }; @@ -403,10 +403,10 @@ pub fn pre_validate_chunk_state_witness( match validate_prepared_transactions( chain, runtime_adapter, - &state_witness.chunk_header, + &state_witness.inner.chunk_header, transactions_validation_storage_config, &new_transactions, - &state_witness.transactions, + &state_witness.inner.transactions, ) { Ok(result) => { if result.transactions.len() != new_transactions.len() { @@ -450,7 +450,7 @@ pub fn pre_validate_chunk_state_witness( } else { MainTransition::NewChunk(NewChunkData { chunk_header: last_chunk_block.chunks().get(last_chunk_shard_index).unwrap().clone(), - transactions: state_witness.transactions.clone(), + transactions: state_witness.inner.transactions.clone(), receipts: receipts_to_apply, block: Chain::get_apply_chunk_block_context( epoch_manager, @@ -461,7 +461,7 @@ pub fn pre_validate_chunk_state_witness( is_first_block_with_chunk_of_version: false, storage_context: StorageContext { storage_data_source: StorageDataSource::Recorded(PartialStorage { - nodes: state_witness.main_state_transition.base_state.clone(), + nodes: state_witness.inner.main_state_transition.base_state.clone(), }), state_patch: Default::default(), }, @@ -596,13 +596,13 @@ pub fn validate_chunk_state_witness( main_state_transition_cache: &MainStateTransitionCache, ) -> Result<(), Error> { let _timer = crate::stateless_validation::metrics::CHUNK_STATE_WITNESS_VALIDATION_TIME - .with_label_values(&[&state_witness.chunk_header.shard_id().to_string()]) + .with_label_values(&[&state_witness.inner.chunk_header.shard_id().to_string()]) .start_timer(); let span = tracing::debug_span!(target: "client", "validate_chunk_state_witness").entered(); - let witness_shard_layout = epoch_manager.get_shard_layout(&state_witness.epoch_id)?; - let witness_chunk_shard_id = state_witness.chunk_header.shard_id(); + let witness_shard_layout = epoch_manager.get_shard_layout(&state_witness.inner.epoch_id)?; + let witness_chunk_shard_id = state_witness.inner.chunk_header.shard_id(); let witness_chunk_shard_uid = - epoch_manager.shard_id_to_uid(witness_chunk_shard_id, &state_witness.epoch_id)?; + epoch_manager.shard_id_to_uid(witness_chunk_shard_id, &state_witness.inner.epoch_id)?; let block_hash = pre_validation_output.main_transition_params.block_hash(); let epoch_id = epoch_manager.get_epoch_id(&block_hash)?; let shard_id = pre_validation_output.main_transition_params.shard_id(); @@ -634,14 +634,14 @@ pub fn validate_chunk_state_witness( } (_, Some(result)) => (result.chunk_extra, result.outgoing_receipts), }; - if chunk_extra.state_root() != &state_witness.main_state_transition.post_state_root { + if chunk_extra.state_root() != &state_witness.inner.main_state_transition.post_state_root { // This is an early check, it's not for correctness, only for better // error reporting in case of an invalid state witness due to a bug. // Only the final state root check against the chunk header is required. return Err(Error::InvalidChunkStateWitness(format!( "Post state root {:?} for main transition does not match expected post state root {:?}", chunk_extra.state_root(), - state_witness.main_state_transition.post_state_root, + state_witness.inner.main_state_transition.post_state_root, ))); } @@ -654,7 +654,7 @@ pub fn validate_chunk_state_witness( &mut outgoing_receipts, protocol_version, &witness_shard_layout, - state_witness.chunk_header.shard_id(), + state_witness.inner.chunk_header.shard_id(), shard_id, )?; } @@ -676,19 +676,19 @@ pub fn validate_chunk_state_witness( } if pre_validation_output.implicit_transition_params.len() - != state_witness.implicit_transitions.len() + != state_witness.inner.implicit_transitions.len() { return Err(Error::InvalidChunkStateWitness(format!( "Implicit transitions count mismatch. Expected {}, found {}", pre_validation_output.implicit_transition_params.len(), - state_witness.implicit_transitions.len(), + state_witness.inner.implicit_transitions.len(), ))); } for (implicit_transition_params, transition) in pre_validation_output .implicit_transition_params .into_iter() - .zip(state_witness.implicit_transitions.into_iter()) + .zip(state_witness.inner.implicit_transitions.into_iter()) { let (shard_uid, new_state_root) = match implicit_transition_params { ImplicitTransitionParams::ApplyOldChunk(block, shard_uid) => { @@ -744,7 +744,7 @@ pub fn validate_chunk_state_witness( let (outgoing_receipts_root, _) = merklize(&outgoing_receipts_hashes); validate_chunk_with_chunk_extra_and_receipts_root( &chunk_extra, - &state_witness.chunk_header, + &state_witness.inner.chunk_header, &outgoing_receipts_root, )?; @@ -778,9 +778,9 @@ impl Chain { runtime_adapter: &dyn RuntimeAdapter, processing_done_tracker: Option, ) -> Result<(), Error> { - let shard_id = witness.chunk_header.shard_id(); - let height_created = witness.chunk_header.height_created(); - let chunk_hash = witness.chunk_header.chunk_hash(); + let shard_id = witness.inner.chunk_header.shard_id(); + let height_created = witness.inner.chunk_header.height_created(); + let chunk_hash = witness.inner.chunk_header.chunk_hash(); let parent_span = tracing::debug_span!( target: "chain", "shadow_validate", ?shard_id, height_created); let (encoded_witness, raw_witness_size) = { diff --git a/chain/chain/src/stateless_validation/metrics.rs b/chain/chain/src/stateless_validation/metrics.rs index 7afb60db795..98264f5838b 100644 --- a/chain/chain/src/stateless_validation/metrics.rs +++ b/chain/chain/src/stateless_validation/metrics.rs @@ -179,7 +179,7 @@ fn record_witness_size_metrics_fallible( encoded_size: usize, witness: &ChunkStateWitness, ) -> Result<(), std::io::Error> { - let shard_id = witness.chunk_header.shard_id().to_string(); + let shard_id = witness.inner.chunk_header.shard_id().to_string(); CHUNK_STATE_WITNESS_RAW_SIZE .with_label_values(&[shard_id.as_str()]) .observe(decoded_size as f64); @@ -188,16 +188,16 @@ fn record_witness_size_metrics_fallible( .observe(encoded_size as f64); CHUNK_STATE_WITNESS_MAIN_STATE_TRANSISTION_SIZE .with_label_values(&[shard_id.as_str()]) - .observe(borsh::to_vec(&witness.main_state_transition)?.len() as f64); + .observe(borsh::to_vec(&witness.inner.main_state_transition)?.len() as f64); CHUNK_STATE_WITNESS_NEW_TRANSACTIONS_SIZE .with_label_values(&[&shard_id.as_str()]) - .observe(borsh::to_vec(&witness.new_transactions)?.len() as f64); + .observe(borsh::to_vec(&witness.inner.new_transactions)?.len() as f64); CHUNK_STATE_WITNESS_NEW_TRANSACTIONS_STATE_SIZE .with_label_values(&[&shard_id.as_str()]) - .observe(borsh::to_vec(&witness.new_transactions_validation_state)?.len() as f64); + .observe(borsh::to_vec(&witness.inner.new_transactions_validation_state)?.len() as f64); CHUNK_STATE_WITNESS_SOURCE_RECEIPT_PROOFS_SIZE .with_label_values(&[&shard_id.as_str()]) - .observe(borsh::to_vec(&witness.source_receipt_proofs)?.len() as f64); + .observe(borsh::to_vec(&witness.inner.source_receipt_proofs)?.len() as f64); Ok(()) } diff --git a/chain/chain/src/store/latest_witnesses.rs b/chain/chain/src/store/latest_witnesses.rs index 2b61626b749..3757c7b70f0 100644 --- a/chain/chain/src/store/latest_witnesses.rs +++ b/chain/chain/src/store/latest_witnesses.rs @@ -112,8 +112,8 @@ impl ChainStore { let _span = tracing::info_span!( target: "client", "save_latest_chunk_state_witness", - witness_height = witness.chunk_header.height_created(), - witness_shard = ?witness.chunk_header.shard_id(), + witness_height = witness.inner.chunk_header.height_created(), + witness_shard = ?witness.inner.chunk_header.shard_id(), ) .entered(); @@ -172,9 +172,9 @@ impl ChainStore { let mut random_uuid = [0u8; 16]; OsRng.fill_bytes(&mut random_uuid); let key = LatestWitnessesKey { - height: witness.chunk_header.height_created(), - shard_id: witness.chunk_header.shard_id().into(), - epoch_id: witness.epoch_id, + height: witness.inner.chunk_header.height_created(), + shard_id: witness.inner.chunk_header.shard_id().into(), + epoch_id: witness.inner.epoch_id, witness_size: serialized_witness_size, random_uuid, }; @@ -195,7 +195,7 @@ impl ChainStore { let store_commit_time = start_time.elapsed().saturating_sub(store_update_time); - let shard_id_str = witness.chunk_header.shard_id().to_string(); + let shard_id_str = witness.inner.chunk_header.shard_id().to_string(); stateless_validation::metrics::SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME .with_label_values(&[shard_id_str.as_str()]) .observe(store_update_time.as_secs_f64()); diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index 971225cc790..a9c1de5657b 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -648,6 +648,16 @@ pub(crate) static PARTIAL_WITNESS_ENCODE_TIME: LazyLock = LazyLock .unwrap() }); +pub(crate) static PARTIAL_WITNESS_DECODE_TIME: LazyLock = LazyLock::new(|| { + try_create_histogram_vec( + "near_partial_witness_decode_time", + "State witness decoding time from the partial state witness parts in seconds", + &["shard_id"], + Some(linear_buckets(0.0, 0.005, 20).unwrap()), + ) + .unwrap() +}); + pub(crate) static PARTIAL_WITNESS_TIME_TO_LAST_PART: LazyLock = LazyLock::new(|| { try_create_histogram_vec( "near_partial_witness_time_to_last_part", diff --git a/chain/client/src/stateless_validation/chunk_validator/mod.rs b/chain/client/src/stateless_validation/chunk_validator/mod.rs index 4cad6d7ef8a..4e860456946 100644 --- a/chain/client/src/stateless_validation/chunk_validator/mod.rs +++ b/chain/client/src/stateless_validation/chunk_validator/mod.rs @@ -75,13 +75,13 @@ impl ChunkValidator { processing_done_tracker: Option, signer: &Arc, ) -> Result<(), Error> { - let prev_block_hash = state_witness.chunk_header.prev_block_hash(); - let shard_id = state_witness.chunk_header.shard_id(); + let prev_block_hash = state_witness.inner.chunk_header.prev_block_hash(); + let shard_id = state_witness.inner.chunk_header.shard_id(); let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?; - if epoch_id != state_witness.epoch_id { + if epoch_id != state_witness.inner.epoch_id { return Err(Error::InvalidChunkStateWitness(format!( "Invalid EpochId {:?} for previous block {}, expected {:?}", - state_witness.epoch_id, prev_block_hash, epoch_id + state_witness.inner.epoch_id, prev_block_hash, epoch_id ))); } @@ -92,7 +92,7 @@ impl ChunkValidator { self.runtime_adapter.as_ref(), )?; - let chunk_header = state_witness.chunk_header.clone(); + let chunk_header = state_witness.inner.chunk_header.clone(); let network_sender = self.network_sender.clone(); let epoch_manager = self.epoch_manager.clone(); @@ -230,8 +230,8 @@ impl Client { ) -> Result<(), Error> { tracing::debug!( target: "client", - chunk_hash=?witness.chunk_header.chunk_hash(), - shard_id=?witness.chunk_header.shard_id(), + chunk_hash=?witness.inner.chunk_header.chunk_hash(), + shard_id=?witness.inner.chunk_header.shard_id(), "process_chunk_state_witness", ); @@ -252,7 +252,7 @@ impl Client { self.chain.chain_store.save_latest_chunk_state_witness(&witness)?; } - match self.chain.get_block(witness.chunk_header.prev_block_hash()) { + match self.chain.get_block(witness.inner.chunk_header.prev_block_hash()) { Ok(block) => self.process_chunk_state_witness_with_prev_block( witness, &block, @@ -273,7 +273,7 @@ impl Client { // produced the witness. However some tests bypass PartialWitnessActor, thus when a chunk producer // receives its own state witness, we log a warning instead of panicking. // TODO: Make sure all tests run with "test_features" and panic for non-test builds. - if signer.validator_id() == &witness.chunk_producer { + if signer.validator_id() == &witness.inner.chunk_producer { tracing::warn!( "Validator {:?} received state witness from itself. Witness={:?}", signer.validator_id(), @@ -283,7 +283,7 @@ impl Client { } self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( NetworkRequests::ChunkStateWitnessAck( - witness.chunk_producer.clone(), + witness.inner.chunk_producer.clone(), ChunkStateWitnessAck::new(witness), ), )); @@ -296,10 +296,10 @@ impl Client { processing_done_tracker: Option, signer: &Arc, ) -> Result<(), Error> { - if witness.chunk_header.prev_block_hash() != prev_block.hash() { + if witness.inner.chunk_header.prev_block_hash() != prev_block.hash() { return Err(Error::Other(format!( "process_chunk_state_witness_with_prev_block - prev_block doesn't match ({} != {})", - witness.chunk_header.prev_block_hash(), + witness.inner.chunk_header.prev_block_hash(), prev_block.hash() ))); } diff --git a/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs index 96a55131d2b..2ca4689f987 100644 --- a/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs +++ b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs @@ -28,7 +28,7 @@ impl Client { witness: ChunkStateWitness, witness_size: usize, ) -> Result { - let chunk_header = &witness.chunk_header; + let chunk_header = &witness.inner.chunk_header; let witness_height = chunk_header.height_created(); let witness_shard = chunk_header.shard_id(); @@ -83,7 +83,7 @@ impl Client { .orphan_witness_pool .take_state_witnesses_waiting_for_block(new_block.hash()); for witness in ready_witnesses { - let header = &witness.chunk_header; + let header = &witness.inner.chunk_header; tracing::debug!( target: "client", witness_height = header.height_created(), diff --git a/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs index 58cd381645f..dc77cc36794 100644 --- a/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs +++ b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs @@ -52,7 +52,7 @@ impl OrphanStateWitnessPool { let cache_entry = CacheEntry { witness, _metrics_tracker: metrics_tracker }; if let Some((_, ejected_entry)) = self.witness_cache.push(cache_key, cache_entry) { // Another witness has been ejected from the cache due to capacity limit - let header = &ejected_entry.witness.chunk_header; + let header = &ejected_entry.witness.inner.chunk_header; tracing::debug!( target: "client", ejected_witness_height = header.height_created(), @@ -72,7 +72,7 @@ impl OrphanStateWitnessPool { ) -> Vec { let mut to_remove: Vec = Vec::new(); for (cache_key, cache_entry) in self.witness_cache.iter() { - if cache_entry.witness.chunk_header.prev_block_hash() == prev_block { + if cache_entry.witness.inner.chunk_header.prev_block_hash() == prev_block { to_remove.push(cache_key.clone()); } } @@ -96,7 +96,7 @@ impl OrphanStateWitnessPool { let witness_height = cache_key.height_created; if witness_height <= final_height { to_remove.push(cache_key.clone()); - let header = &cache_entry.witness.chunk_header; + let header = &cache_entry.witness.inner.chunk_header; tracing::debug!( target: "client", final_height, @@ -141,7 +141,7 @@ mod metrics_tracker { witness: &ChunkStateWitness, witness_size: usize, ) -> OrphanWitnessMetricsTracker { - let shard_id = witness.chunk_header.shard_id().to_string(); + let shard_id = witness.inner.chunk_header.shard_id().to_string(); metrics::ORPHAN_CHUNK_STATE_WITNESSES_TOTAL_COUNT .with_label_values(&[shard_id.as_str()]) .inc(); @@ -193,7 +193,7 @@ mod tests { encoded_length: u64, ) -> ChunkStateWitness { let mut witness = ChunkStateWitness::new_dummy(height, shard_id, prev_block_hash); - match &mut witness.chunk_header { + match &mut witness.inner.chunk_header { ShardChunkHeader::V3(header) => match &mut header.inner { ShardChunkHeaderInner::V1(_) => unimplemented!(), ShardChunkHeaderInner::V2(inner) => inner.encoded_length = encoded_length, @@ -221,7 +221,7 @@ mod tests { expected.sort_by(sort_comparator); if observed != expected { let print_witness_info = |witness: &ChunkStateWitness| { - let header = &witness.chunk_header; + let header = &witness.inner.chunk_header; eprintln!( "- height = {}, shard_id = {}, encoded_length: {} prev_block: {}", header.height_created(), diff --git a/chain/client/src/stateless_validation/partial_witness/partial_deploys_tracker.rs b/chain/client/src/stateless_validation/partial_witness/partial_deploys_tracker.rs index 3b2c748a487..dfa47bab959 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_deploys_tracker.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_deploys_tracker.rs @@ -66,7 +66,7 @@ impl CacheEntry { ); None } - InsertPartResult::Decoded(decode_result) => Some(decode_result), + InsertPartResult::Decoded(decode_result, _) => Some(decode_result), } } } diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 191607df2fc..86774649bc8 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -39,6 +39,7 @@ use near_store::adapter::trie_store::TrieStoreAdapter; use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage}; use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache}; use rand::Rng; +use rayon::{iter::ParallelIterator, prelude::*}; use crate::client_actor::ClientSenderForPartialWitness; use crate::metrics; @@ -65,7 +66,7 @@ pub struct PartialWitnessActor { epoch_manager: Arc, runtime: Arc, /// Tracks the parts of the state witness sent from chunk producers to chunk validators. - partial_witness_tracker: PartialEncodedStateWitnessTracker, + partial_witness_tracker: Arc, partial_deploys_tracker: PartialEncodedContractDeploysTracker, /// Tracks a collection of state witnesses sent from chunk producers to chunk validators. state_witness_tracker: ChunkStateWitnessTracker, @@ -75,6 +76,7 @@ pub struct PartialWitnessActor { /// Same as above for contract deploys. contract_deploys_encoders: ReedSolomonEncoderCache, compile_contracts_spawner: Arc, + partial_witness_spawner: Arc, /// AccountId in the key corresponds to the requester (chunk validator). processed_contract_code_requests: LruCache<(ChunkProductionKey, AccountId), ()>, } @@ -166,9 +168,10 @@ impl PartialWitnessActor { epoch_manager: Arc, runtime: Arc, compile_contracts_spawner: Arc, + partial_witness_spawner: Arc, ) -> Self { let partial_witness_tracker = - PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone()); + Arc::new(PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone())); Self { network_adapter, my_signer, @@ -182,6 +185,7 @@ impl PartialWitnessActor { CONTRACT_DEPLOYS_RATIO_DATA_PARTS, ), compile_contracts_spawner, + partial_witness_spawner, processed_contract_code_requests: LruCache::new( NonZeroUsize::new(PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE).unwrap(), ), @@ -200,7 +204,7 @@ impl PartialWitnessActor { tracing::debug!( target: "client", - chunk_hash=?state_witness.chunk_header.chunk_hash(), + chunk_hash=?state_witness.inner.chunk_header.chunk_hash(), "distribute_chunk_state_witness", ); @@ -224,7 +228,7 @@ impl PartialWitnessActor { key.clone(), contract_accesses, MainTransitionKey { - block_hash: state_witness.main_state_transition.block_hash, + block_hash: state_witness.inner.main_state_transition.block_hash, shard_id: main_transition_shard_id, }, &chunk_validators, @@ -235,7 +239,7 @@ impl PartialWitnessActor { let witness_bytes = compress_witness(&state_witness)?; self.send_state_witness_parts( key.epoch_id, - &state_witness.chunk_header, + &state_witness.inner.chunk_header, witness_bytes, &chunk_validators, &signer, @@ -267,9 +271,10 @@ impl PartialWitnessActor { let encoder = self.witness_encoders.entry(chunk_validators.len()); let (parts, encoded_length) = encoder.encode(&witness_bytes); + let mut generated_parts = vec![]; chunk_validators - .iter() - .zip_eq(parts) + .par_iter() + .zip_eq(parts.into_par_iter()) .enumerate() .map(|(part_ord, (chunk_validator, part))| { // It's fine to unwrap part here as we just constructed the parts above and we expect @@ -278,13 +283,15 @@ impl PartialWitnessActor { epoch_id, chunk_header.clone(), part_ord, - part.unwrap().to_vec(), + part.unwrap().into_vec(), encoded_length, signer, ); (chunk_validator.clone(), partial_witness) }) - .collect_vec() + .collect_into_vec(&mut generated_parts); + + generated_parts } fn generate_contract_deploys_parts( @@ -365,13 +372,19 @@ impl PartialWitnessActor { )); } - /// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part. - fn forward_state_witness_part( - &self, + /// Function to handle receiving partial_encoded_state_witness message from chunk producer. + fn handle_partial_encoded_state_witness( + &mut self, partial_witness: PartialEncodedStateWitness, ) -> Result<(), Error> { + tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage"); + let signer = self.my_validator_signer()?; + let epoch_manager = self.epoch_manager.clone(); + let runtime_adapter = self.runtime.clone(); + let ChunkProductionKey { shard_id, epoch_id, height_created } = partial_witness.chunk_production_key(); + let chunk_producer = self .epoch_manager .get_chunk_producer_info(&ChunkProductionKey { epoch_id, height_created, shard_id })? @@ -386,32 +399,39 @@ impl PartialWitnessActor { .filter(|validator| validator != &chunk_producer) .collect(); - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedStateWitnessForward( - target_chunk_validators, - partial_witness, - ), - )); - Ok(()) - } - - /// Function to handle receiving partial_encoded_state_witness message from chunk producer. - fn handle_partial_encoded_state_witness( - &mut self, - partial_witness: PartialEncodedStateWitness, - ) -> Result<(), Error> { - tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage"); + let network_adapter = self.network_adapter.clone(); - let signer = self.my_validator_signer()?; - // Validate the partial encoded state witness and forward the part to all the chunk validators. - if validate_partial_encoded_state_witness( - self.epoch_manager.as_ref(), - &partial_witness, - &signer, - self.runtime.store(), - )? { - self.forward_state_witness_part(partial_witness)?; - } + self.partial_witness_spawner.spawn("handle_partial_encoded_state_witness", move || { + // Validate the partial encoded state witness and forward the part to all the chunk validators. + match validate_partial_encoded_state_witness( + epoch_manager.as_ref(), + &partial_witness, + &signer, + runtime_adapter.store(), + ) { + Ok(true) => { + network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedStateWitnessForward( + target_chunk_validators, + partial_witness, + ), + )); + } + Ok(false) => { + tracing::warn!( + target: "client", + "Received invalid partial encoded state witness" + ); + } + Err(err) => { + tracing::warn!( + target: "client", + "Encountered error during validation: {}", + err + ); + } + } + }); Ok(()) } @@ -424,15 +444,40 @@ impl PartialWitnessActor { tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessForwardMessage"); let signer = self.my_validator_signer()?; - // Validate the partial encoded state witness and store the partial encoded state witness. - if validate_partial_encoded_state_witness( - self.epoch_manager.as_ref(), - &partial_witness, - &signer, - self.runtime.store(), - )? { - self.partial_witness_tracker.store_partial_encoded_state_witness(partial_witness)?; - } + let partial_witness_tracker = self.partial_witness_tracker.clone(); + let epoch_manager = self.epoch_manager.clone(); + let runtime_adapter = self.runtime.clone(); + self.partial_witness_spawner.spawn( + "handle_partial_encoded_state_witness_forward", + move || { + // Validate the partial encoded state witness and store the partial encoded state witness. + match validate_partial_encoded_state_witness( + epoch_manager.as_ref(), + &partial_witness, + &signer, + runtime_adapter.store(), + ) { + Ok(true) => { + if let Err(err) = partial_witness_tracker.store_partial_encoded_state_witness(partial_witness) { + tracing::error!(target: "client", "Failed to store partial encoded state witness: {}", err); + } + } + Ok(false) => { + tracing::warn!( + target: "client", + "Received invalid partial encoded state witness" + ); + } + Err(err) => { + tracing::warn!( + target: "client", + "Encountered error during validation: {}", + err + ); + } + } + }, + ); Ok(()) } @@ -596,7 +641,7 @@ impl PartialWitnessActor { /// Sends the contract accesses to the same chunk validators /// (except for the chunk producers that track the same shard), - /// which will receive the state witness for the new chunk. + /// which will receive the state witness for the new chunk. fn send_contract_accesses_to_chunk_validators( &self, key: ChunkProductionKey, @@ -774,7 +819,7 @@ impl PartialWitnessActor { } fn compress_witness(witness: &ChunkStateWitness) -> Result { - let shard_id_label = witness.chunk_header.shard_id().to_string(); + let shard_id_label = witness.inner.chunk_header.shard_id().to_string(); let encode_timer = near_chain::stateless_validation::metrics::CHUNK_STATE_WITNESS_ENCODE_TIME .with_label_values(&[shard_id_label.as_str()]) .start_timer(); diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs index 4beb1b1ff3d..bf1e118c4f2 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs @@ -1,10 +1,11 @@ use std::collections::HashSet; use std::num::NonZeroUsize; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use lru::LruCache; use near_async::messaging::CanSend; use near_async::time::Instant; +use near_cache::SyncLruCache; use near_chain::chain::ChunkStateWitnessMessage; use near_chain::Error; use near_epoch_manager::EpochManagerAdapter; @@ -188,7 +189,10 @@ impl CacheEntry { "Received invalid partial witness part ord" ); } - InsertPartResult::Decoded(decode_result) => { + InsertPartResult::Decoded(decode_result, decoding_time) => { + metrics::PARTIAL_WITNESS_DECODE_TIME + .with_label_values(&[&self.shard_id.to_string()]) + .observe(decoding_time); self.witness_parts = WitnessPartsState::Decoded { decode_result, decoded_at: Instant::now() }; metrics::DECODE_PARTIAL_WITNESS_ACCESSED_CONTRACTS_STATE_COUNT @@ -308,13 +312,13 @@ pub struct PartialEncodedStateWitnessTracker { /// Epoch manager to get the set of chunk validators epoch_manager: Arc, /// Keeps track of state witness parts received from chunk producers. - parts_cache: LruCache, + parts_cache: Arc>>, /// Keeps track of the already decoded witnesses. This is needed /// to protect chunk validator from processing the same witness multiple /// times. - processed_witnesses: LruCache, + processed_witnesses: Arc>, /// Reed Solomon encoder for decoding state witness parts. - encoders: ReedSolomonEncoderCache, + encoders: Arc>, } impl PartialEncodedStateWitnessTracker { @@ -325,16 +329,16 @@ impl PartialEncodedStateWitnessTracker { Self { client_sender, epoch_manager, - parts_cache: LruCache::new(NonZeroUsize::new(WITNESS_PARTS_CACHE_SIZE).unwrap()), - processed_witnesses: LruCache::new( - NonZeroUsize::new(PROCESSED_WITNESSES_CACHE_SIZE).unwrap(), - ), - encoders: ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS), + parts_cache: Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(WITNESS_PARTS_CACHE_SIZE).unwrap(), + ))), + processed_witnesses: Arc::new(SyncLruCache::new(PROCESSED_WITNESSES_CACHE_SIZE)), + encoders: Arc::new(Mutex::new(ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS))), } } pub fn store_partial_encoded_state_witness( - &mut self, + &self, partial_witness: PartialEncodedStateWitness, ) -> Result<(), Error> { tracing::debug!(target: "client", ?partial_witness, "store_partial_encoded_state_witness"); @@ -345,7 +349,7 @@ impl PartialEncodedStateWitnessTracker { } pub fn store_accessed_contract_hashes( - &mut self, + &self, key: ChunkProductionKey, hashes: HashSet, ) -> Result<(), Error> { @@ -355,7 +359,7 @@ impl PartialEncodedStateWitnessTracker { } pub fn store_accessed_contract_codes( - &mut self, + &self, key: ChunkProductionKey, codes: Vec, ) -> Result<(), Error> { @@ -365,7 +369,7 @@ impl PartialEncodedStateWitnessTracker { } fn process_update( - &mut self, + &self, key: ChunkProductionKey, create_if_not_exists: bool, update: CacheUpdate, @@ -382,72 +386,80 @@ impl PartialEncodedStateWitnessTracker { if create_if_not_exists { self.maybe_insert_new_entry_in_parts_cache(&key); } - let Some(entry) = self.parts_cache.get_mut(&key) else { - return Ok(()); - }; - if let Some((decode_result, accessed_contracts)) = entry.update(update) { - // Record the time taken from receiving first part to decoding partial witness. - let time_to_last_part = Instant::now().signed_duration_since(entry.created_at); - metrics::PARTIAL_WITNESS_TIME_TO_LAST_PART - .with_label_values(&[key.shard_id.to_string().as_str()]) - .observe(time_to_last_part.as_seconds_f64()); - - self.parts_cache.pop(&key); - self.processed_witnesses.push(key.clone(), ()); - - let encoded_witness = match decode_result { - Ok(encoded_chunk_state_witness) => encoded_chunk_state_witness, - Err(err) => { - // We ideally never expect the decoding to fail. In case it does, we received a bad part - // from the chunk producer. - tracing::error!( - target: "client", - ?err, - shard_id = ?key.shard_id, - height_created = key.height_created, - "Failed to reed solomon decode witness parts. Maybe malicious or corrupt data." - ); + { + let mut parts_cache = self.parts_cache.lock().unwrap(); + let Some(entry) = parts_cache.get_mut(&key) else { + return Ok(()); + }; + if let Some((decode_result, accessed_contracts)) = entry.update(update) { + // Record the time taken from receiving first part to decoding partial witness. + let time_to_last_part = Instant::now().signed_duration_since(entry.created_at); + metrics::PARTIAL_WITNESS_TIME_TO_LAST_PART + .with_label_values(&[key.shard_id.to_string().as_str()]) + .observe(time_to_last_part.as_seconds_f64()); + + parts_cache.pop(&key); + drop(parts_cache); + self.processed_witnesses.push(key.clone(), ()); + + let encoded_witness = match decode_result { + Ok(encoded_chunk_state_witness) => encoded_chunk_state_witness, + Err(err) => { + // We ideally never expect the decoding to fail. In case it does, we received a bad part + // from the chunk producer. + tracing::error!( + target: "client", + ?err, + shard_id = ?key.shard_id, + height_created = key.height_created, + "Failed to reed solomon decode witness parts. Maybe malicious or corrupt data." + ); + return Err(Error::InvalidPartialChunkStateWitness(format!( + "Failed to reed solomon decode witness parts: {err}", + ))); + } + }; + + let (mut witness, raw_witness_size) = + self.decode_state_witness(&encoded_witness)?; + if witness.chunk_production_key() != key { return Err(Error::InvalidPartialChunkStateWitness(format!( - "Failed to reed solomon decode witness parts: {err}", + "Decoded witness key {:?} doesn't match partial witness {:?}", + witness.chunk_production_key(), + key, ))); } - }; - - let (mut witness, raw_witness_size) = self.decode_state_witness(&encoded_witness)?; - if witness.chunk_production_key() != key { - return Err(Error::InvalidPartialChunkStateWitness(format!( - "Decoded witness key {:?} doesn't match partial witness {:?}", - witness.chunk_production_key(), - key, - ))); - } - // Merge accessed contracts into the main transition's partial state. - let PartialState::TrieValues(values) = &mut witness.main_state_transition.base_state; - values.extend(accessed_contracts.into_iter().map(|code| code.0.into())); + // Merge accessed contracts into the main transition's partial state. + let PartialState::TrieValues(values) = + &mut witness.inner.main_state_transition.base_state; + values.extend(accessed_contracts.into_iter().map(|code| code.0.into())); - tracing::debug!(target: "client", ?key, "Sending encoded witness to client."); - self.client_sender.send(ChunkStateWitnessMessage { witness, raw_witness_size }); + tracing::debug!(target: "client", ?key, "Sending encoded witness to client."); + self.client_sender.send(ChunkStateWitnessMessage { witness, raw_witness_size }); + } } self.record_total_parts_cache_size_metric(); Ok(()) } - fn get_encoder(&mut self, key: &ChunkProductionKey) -> Result, Error> { + fn get_encoder(&self, key: &ChunkProductionKey) -> Result, Error> { // The expected number of parts for the Reed Solomon encoding is the number of chunk validators. let num_parts = self .epoch_manager .get_chunk_validator_assignments(&key.epoch_id, key.shard_id, key.height_created)? .len(); - Ok(self.encoders.entry(num_parts)) + let mut encoders = self.encoders.lock().unwrap(); + Ok(encoders.entry(num_parts)) } // Function to insert a new entry into the cache for the chunk hash if it does not already exist // We additionally check if an evicted entry has been fully decoded and processed. - fn maybe_insert_new_entry_in_parts_cache(&mut self, key: &ChunkProductionKey) { - if !self.parts_cache.contains(key) { + fn maybe_insert_new_entry_in_parts_cache(&self, key: &ChunkProductionKey) { + let mut parts_cache = self.parts_cache.lock().unwrap(); + if !parts_cache.contains(key) { if let Some((evicted_key, evicted_entry)) = - self.parts_cache.push(key.clone(), CacheEntry::new(key.shard_id)) + parts_cache.push(key.clone(), CacheEntry::new(key.shard_id)) { tracing::warn!( target: "client", @@ -461,7 +473,8 @@ impl PartialEncodedStateWitnessTracker { } fn record_total_parts_cache_size_metric(&self) { - let total_size: usize = self.parts_cache.iter().map(|(_, entry)| entry.total_size()).sum(); + let parts_cache = self.parts_cache.lock().unwrap(); + let total_size: usize = parts_cache.iter().map(|(_, entry)| entry.total_size()).sum(); metrics::PARTIAL_WITNESS_CACHE_SIZE.set(total_size as f64); } @@ -472,7 +485,7 @@ impl PartialEncodedStateWitnessTracker { let decode_start = std::time::Instant::now(); let (witness, raw_witness_size) = encoded_witness.decode()?; let decode_elapsed_seconds = decode_start.elapsed().as_secs_f64(); - let witness_shard = witness.chunk_header.shard_id(); + let witness_shard = witness.inner.chunk_header.shard_id(); // Record metrics after validating the witness near_chain::stateless_validation::metrics::CHUNK_STATE_WITNESS_DECODE_TIME diff --git a/chain/client/src/stateless_validation/state_witness_tracker.rs b/chain/client/src/stateless_validation/state_witness_tracker.rs index 8ad0ed414d6..420e25793ef 100644 --- a/chain/client/src/stateless_validation/state_witness_tracker.rs +++ b/chain/client/src/stateless_validation/state_witness_tracker.rs @@ -118,7 +118,7 @@ impl ChunkStateWitnessTracker { &mut self, witness: &near_primitives::stateless_validation::state_witness::ChunkStateWitness, ) -> Option<&ChunkStateWitnessRecord> { - let key = ChunkStateWitnessKey::new(witness.chunk_header.chunk_hash()); + let key = ChunkStateWitnessKey::new(witness.inner.chunk_header.chunk_hash()); self.witnesses.get(&key) } } @@ -163,7 +163,11 @@ mod state_witness_tracker_tests { let clock = dummy_clock(); let mut tracker = ChunkStateWitnessTracker::new(clock.clock()); - tracker.record_witness_sent(witness.chunk_header.compute_hash(), 4321, NUM_VALIDATORS); + tracker.record_witness_sent( + witness.inner.chunk_header.compute_hash(), + 4321, + NUM_VALIDATORS, + ); clock.advance(Duration::milliseconds(3444)); // Ack received from all "except for one". @@ -182,7 +186,11 @@ mod state_witness_tracker_tests { let clock = dummy_clock(); let mut tracker = ChunkStateWitnessTracker::new(clock.clock()); - tracker.record_witness_sent(witness.chunk_header.compute_hash(), 4321, NUM_VALIDATORS); + tracker.record_witness_sent( + witness.inner.chunk_header.compute_hash(), + 4321, + NUM_VALIDATORS, + ); clock.advance(Duration::milliseconds(3444)); // Ack received from all. diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 22ca09bde99..37422fd7597 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -164,6 +164,7 @@ pub fn setup( epoch_manager.clone(), runtime.clone(), Arc::new(RayonAsyncComputationSpawner), + Arc::new(RayonAsyncComputationSpawner), )); let partial_witness_adapter = partial_witness_addr.with_auto_span_context(); diff --git a/chain/client/src/test_utils/test_env.rs b/chain/client/src/test_utils/test_env.rs index 5dcf2d55449..ba3522da943 100644 --- a/chain/client/src/test_utils/test_env.rs +++ b/chain/client/src/test_utils/test_env.rs @@ -360,8 +360,10 @@ impl TestEnv { fn found_differing_post_state_root_due_to_state_transitions( witness: &ChunkStateWitness, ) -> bool { - let mut post_state_roots = HashSet::from([witness.main_state_transition.post_state_root]); - post_state_roots.extend(witness.implicit_transitions.iter().map(|t| t.post_state_root)); + let mut post_state_roots = + HashSet::from([witness.inner.main_state_transition.post_state_root]); + post_state_roots + .extend(witness.inner.implicit_transitions.iter().map(|t| t.post_state_root)); post_state_roots.len() >= 2 } diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 90370ff2816..cda45d4d4b9 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -35,6 +35,7 @@ use crate::types::{ ChainInfo, PeerManagerSenderForNetwork, PeerType, ReasonForBan, StatePartRequestBody, Tier3Request, Tier3RequestBody, }; +use actix::ArbiterHandle; use anyhow::Context; use arc_swap::ArcSwap; use near_async::messaging::{CanSend, SendAsync, Sender}; @@ -698,6 +699,97 @@ impl NetworkState { success } + /// Send message to specific account. + /// Return whether the message is sent or not. + /// The message might be sent over TIER1 or TIER2 connection depending on the message type. + pub fn send_message_to_account_with_arbiter( + self: &Arc, + clock: &time::Clock, + account_id: &AccountId, + msg: RoutedMessageBody, + arbiter: ArbiterHandle, + ) -> bool { + // If the message is allowed to be sent to self, we handle it directly. + if self.config.validator.account_id().is_some_and(|id| &id == account_id) { + // For now, we don't allow some types of messages to be sent to self. + debug_assert!(msg.allow_sending_to_self()); + let this = self.clone(); + let clock = clock.clone(); + let peer_id = self.config.node_id(); + let msg = self.sign_message( + &clock, + RawRoutedMessage { target: PeerIdOrHash::PeerId(peer_id.clone()), body: msg }, + ); + arbiter.spawn(async move { + this.receive_routed_message(&clock, peer_id, msg.hash(), msg.msg.body).await; + }); + return true; + } + + let accounts_data = self.accounts_data.load(); + if tcp::Tier::T1.is_allowed_routed(&msg) { + for key in accounts_data.keys_by_id.get(account_id).iter().flat_map(|keys| keys.iter()) + { + let data = match accounts_data.data.get(key) { + Some(data) => data, + None => continue, + }; + let conn = match self.get_tier1_proxy(data) { + Some(conn) => conn, + None => continue, + }; + // TODO(gprusak): in case of PartialEncodedChunk, consider stripping everything + // but the header. This will bound the message size + conn.send_message(Arc::new(PeerMessage::Routed(self.sign_message( + clock, + RawRoutedMessage { + target: PeerIdOrHash::PeerId(data.peer_id.clone()), + body: msg, + }, + )))); + return true; + } + } + + let peer_id_from_account_data = accounts_data + .keys_by_id + .get(account_id) + .iter() + .flat_map(|keys| keys.iter()) + .flat_map(|key| accounts_data.data.get(key)) + .next() + .map(|data| data.peer_id.clone()); + // Find the target peer_id: + // - first look it up in self.accounts_data + // - if missing, fall back to lookup in self.graph.routing_table + // We want to deprecate self.graph.routing_table.account_owner in the next release. + let target = if let Some(peer_id) = peer_id_from_account_data { + metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AccountData"]).inc(); + peer_id + } else if let Some(peer_id) = self.account_announcements.get_account_owner(account_id) { + metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AnnounceAccount"]).inc(); + peer_id + } else { + // TODO(MarX, #1369): Message is dropped here. Define policy for this case. + metrics::MessageDropped::UnknownAccount.inc(&msg); + tracing::debug!(target: "network", + account_id = ?self.config.validator.account_id(), + to = ?account_id, + ?msg,"Drop message: unknown account", + ); + tracing::trace!(target: "network", known_peers = ?self.account_announcements.get_accounts_keys(), "Known peers"); + return false; + }; + + let mut success = false; + let msg = RawRoutedMessage { target: PeerIdOrHash::PeerId(target), body: msg }; + let msg = self.sign_message(clock, msg); + for _ in 0..msg.body.message_resend_count() { + success |= self.send_message_to_peer(clock, tcp::Tier::T2, msg.clone()); + } + success + } + pub async fn receive_routed_message( self: &Arc, clock: &time::Clock, diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 1f4a00cf8d8..09ad567abae 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -40,6 +40,7 @@ use network_protocol::MAX_SHARDS_PER_SNAPSHOT_HOST_INFO; use rand::seq::{IteratorRandom, SliceRandom}; use rand::thread_rng; use rand::Rng; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::cmp::min; use std::collections::HashSet; use std::sync::atomic::Ordering; @@ -1070,28 +1071,34 @@ impl PeerManagerActor { NetworkResponses::NoResponse } NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => { - for (chunk_validator, partial_witness) in validator_witness_tuple { - self.state.send_message_to_account( - &self.clock, - &chunk_validator, - RoutedMessageBody::PartialEncodedStateWitness(partial_witness), - ); - } + let arbiter = actix::Arbiter::current(); + validator_witness_tuple.into_par_iter().for_each( + |(chunk_validator, partial_witness)| { + self.state.send_message_to_account_with_arbiter( + &self.clock, + &chunk_validator, + RoutedMessageBody::PartialEncodedStateWitness(partial_witness), + arbiter.clone(), + ); + }, + ); NetworkResponses::NoResponse } NetworkRequests::PartialEncodedStateWitnessForward( chunk_validators, partial_witness, ) => { - for chunk_validator in chunk_validators { - self.state.send_message_to_account( + let arbiter = actix::Arbiter::current(); + chunk_validators.into_par_iter().for_each(|chunk_validator| { + self.state.send_message_to_account_with_arbiter( &self.clock, &chunk_validator, RoutedMessageBody::PartialEncodedStateWitnessForward( partial_witness.clone(), ), + arbiter.clone(), ); - } + }); NetworkResponses::NoResponse } NetworkRequests::EpochSyncRequest { peer_id } => { diff --git a/core/primitives/src/reed_solomon.rs b/core/primitives/src/reed_solomon.rs index f6830a4f1d3..f960fdba809 100644 --- a/core/primitives/src/reed_solomon.rs +++ b/core/primitives/src/reed_solomon.rs @@ -187,7 +187,7 @@ pub enum InsertPartResult { Accepted, PartAlreadyAvailable, InvalidPartOrd, - Decoded(std::io::Result), + Decoded(std::io::Result, f64), } impl ReedSolomonPartsTracker { @@ -239,7 +239,10 @@ impl ReedSolomonPartsTracker { self.parts[part_ord] = Some(part); if self.has_enough_parts() { - InsertPartResult::Decoded(self.encoder.decode(&mut self.parts, self.encoded_length)) + let decode_start = std::time::Instant::now(); + let decoded = self.encoder.decode(&mut self.parts, self.encoded_length); + let decode_elapsed_seconds = decode_start.elapsed().as_secs_f64(); + InsertPartResult::Decoded(decoded, decode_elapsed_seconds) } else { InsertPartResult::Accepted } diff --git a/core/primitives/src/stateless_validation/state_witness.rs b/core/primitives/src/stateless_validation/state_witness.rs index f6c1aff510b..71f56b79521 100644 --- a/core/primitives/src/stateless_validation/state_witness.rs +++ b/core/primitives/src/stateless_validation/state_witness.rs @@ -81,14 +81,62 @@ pub struct ChunkStateWitnessAck { impl ChunkStateWitnessAck { pub fn new(witness: &ChunkStateWitness) -> Self { - Self { chunk_hash: witness.chunk_header.chunk_hash() } + Self { chunk_hash: witness.inner.chunk_header.chunk_hash() } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize, ProtocolSchema)] +pub struct ChunkStateWitness { + pub inner: ChunkStateWitnessInner, + padding: Vec, +} + +impl ChunkStateWitness { + pub fn new( + chunk_producer: AccountId, + epoch_id: EpochId, + chunk_header: ShardChunkHeader, + main_state_transition: ChunkStateTransition, + source_receipt_proofs: HashMap, + applied_receipts_hash: CryptoHash, + transactions: Vec, + implicit_transitions: Vec, + new_transactions: Vec, + new_transactions_validation_state: PartialState, + ) -> Self { + let inner = ChunkStateWitnessInner::new( + chunk_producer, + epoch_id, + chunk_header, + main_state_transition, + source_receipt_proofs, + applied_receipts_hash, + transactions, + implicit_transitions, + new_transactions, + new_transactions_validation_state, + ); + let inner_len = borsh::to_vec(&inner).unwrap().len(); + const WANTED_SIZE: usize = 30 * 1024 * 1024; // 30 MB + let padding_size = WANTED_SIZE - inner_len; + let padding = vec![0; padding_size]; + Self { inner, padding } + } + + pub fn chunk_production_key(&self) -> ChunkProductionKey { + self.inner.chunk_production_key() + } + + pub fn new_dummy(height: BlockHeight, shard_id: ShardId, prev_block_hash: CryptoHash) -> Self { + let inner = ChunkStateWitnessInner::new_dummy(height, shard_id, prev_block_hash); + Self { inner, padding: vec![] } } } /// The state witness for a chunk; proves the state transition that the /// chunk attests to. #[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize, ProtocolSchema)] -pub struct ChunkStateWitness { +pub struct ChunkStateWitnessInner { // TODO(stateless_validation): Deprecate this field in the next version of the state witness. pub chunk_producer: AccountId, /// EpochId corresponds to the next block after chunk's previous block. @@ -162,7 +210,7 @@ pub struct ChunkStateWitness { signature_differentiator: SignatureDifferentiator, } -impl ChunkStateWitness { +impl ChunkStateWitnessInner { pub fn new( chunk_producer: AccountId, epoch_id: EpochId, diff --git a/core/primitives/src/utils/compression.rs b/core/primitives/src/utils/compression.rs index 0b6fcce1103..ec69d2bef16 100644 --- a/core/primitives/src/utils/compression.rs +++ b/core/primitives/src/utils/compression.rs @@ -22,12 +22,11 @@ where fn encode(uncompressed: &T) -> std::io::Result<(Self, usize)> { // Flow of data: Original --> Borsh serialization --> Counting write --> zstd compression --> Bytes. // CountingWrite will count the number of bytes for the Borsh-serialized data, before compression. - let mut counting_write = - CountingWrite::new(zstd::stream::Encoder::new(Vec::new().writer(), COMPRESSION_LEVEL)?); + let mut counting_write = CountingWrite::new(Vec::new().writer()); borsh::to_writer(&mut counting_write, uncompressed)?; let borsh_bytes_len = counting_write.bytes_written(); - let encoded_bytes = counting_write.into_inner().finish()?.into_inner(); + let encoded_bytes = counting_write.into_inner().into_inner(); Ok((Self::from(encoded_bytes.into()), borsh_bytes_len.as_u64() as usize)) } @@ -44,10 +43,7 @@ where fn decode_with_limit(&self, limit: ByteSize) -> std::io::Result<(T, usize)> { // Flow of data: Bytes --> zstd decompression --> Counting read --> Borsh deserialization --> Original. // CountingRead will count the number of bytes for the Borsh-deserialized data, after decompression. - let mut counting_read = CountingRead::new_with_limit( - zstd::stream::Decoder::new(self.as_ref().reader())?, - limit, - ); + let mut counting_read = CountingRead::new_with_limit(self.as_ref().reader(), limit); match borsh::from_reader(&mut counting_read) { Err(err) => { diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index 436eb230501..fa687ec99f7 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -725,6 +725,7 @@ impl TestLoopBuilder { epoch_manager.clone(), runtime_adapter.clone(), Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))), + Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))), ); let gc_actor = GCActor::new( diff --git a/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs b/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs index cdda3ba38e3..2401c0a073b 100644 --- a/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs +++ b/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs @@ -175,7 +175,7 @@ fn setup_orphan_witness_test() -> OrphanWitnessTestEnv { "There should be no missing chunks." ); let witness = witness_opt.unwrap(); - assert_eq!(witness.chunk_header.chunk_hash(), block2.chunks()[0].chunk_hash()); + assert_eq!(witness.inner.chunk_header.chunk_hash(), block2.chunks()[0].chunk_hash()); for client_idx in clients_without_excluded { let blocks_processed = env.clients[client_idx] @@ -290,7 +290,7 @@ fn test_orphan_witness_not_fully_validated() { setup_orphan_witness_test(); // Make the witness invalid in a way that won't be detected during orphan witness validation - witness.source_receipt_proofs.insert( + witness.inner.source_receipt_proofs.insert( ChunkHash::default(), ReceiptProof( vec![], @@ -316,7 +316,7 @@ fn modify_witness_header_inner( witness: &mut ChunkStateWitness, f: impl FnOnce(&mut ShardChunkHeaderV3), ) { - match &mut witness.chunk_header { + match &mut witness.inner.chunk_header { ShardChunkHeader::V3(header) => { f(header); } diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index 8eecd5a410a..a6fe267d014 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -148,6 +148,7 @@ fn setup_network_node( epoch_manager, runtime, Arc::new(RayonAsyncComputationSpawner), + Arc::new(RayonAsyncComputationSpawner), )); shards_manager_adapter.bind(shards_manager_actor.with_auto_span_context()); let peer_manager = PeerManagerActor::spawn( diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 0b255586fa1..c7619d6c7b3 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -372,6 +372,7 @@ pub fn start_with_config_and_synchronization( epoch_manager.clone(), runtime.clone(), Arc::new(RayonAsyncComputationSpawner), + Arc::new(RayonAsyncComputationSpawner), )); let (_gc_actor, gc_arbiter) = spawn_actix_actor(GCActor::new( diff --git a/tools/state-viewer/src/latest_witnesses.rs b/tools/state-viewer/src/latest_witnesses.rs index 8d5a3aee1af..9fb1e846bd1 100644 --- a/tools/state-viewer/src/latest_witnesses.rs +++ b/tools/state-viewer/src/latest_witnesses.rs @@ -72,9 +72,9 @@ impl DumpWitnessesCmd { println!( "#{} (height: {}, shard_id: {}, epoch_id: {:?})", i, - witness.chunk_header.height_created(), - witness.chunk_header.shard_id(), - witness.epoch_id + witness.inner.chunk_header.height_created(), + witness.inner.chunk_header.shard_id(), + witness.inner.epoch_id ); match self.mode { DumpWitnessesMode::Pretty => { @@ -84,9 +84,9 @@ impl DumpWitnessesCmd { DumpWitnessesMode::Binary { ref output_dir } => { let file_name = format!( "witness_{}_{}_{}_{}.bin", - witness.chunk_header.height_created(), - witness.chunk_header.shard_id(), - witness.epoch_id.0, + witness.inner.chunk_header.height_created(), + witness.inner.chunk_header.shard_id(), + witness.inner.epoch_id.0, i ); let file_path = output_dir.join(file_name); diff --git a/utils/near-cache/src/sync.rs b/utils/near-cache/src/sync.rs index 4b971c9a655..8cec6a2cc33 100644 --- a/utils/near-cache/src/sync.rs +++ b/utils/near-cache/src/sync.rs @@ -30,6 +30,18 @@ where self.inner.lock().unwrap().is_empty() } + /// Returns true if the cache contains the key and false otherwise. + pub fn contains(&self, key: &K) -> bool { + self.inner.lock().unwrap().contains(key) + } + + /// Pushes a key-value pair into the cache. If an entry with key `k` already exists in + /// the cache or another cache entry is removed (due to the lru's capacity), + /// then it returns the old entry's key-value pair. Otherwise, returns `None`. + pub fn push(&self, key: K, value: V) -> Option<(K, V)> { + self.inner.lock().unwrap().push(key, value) + } + /// Return the value of they key in the cache otherwise computes the value and inserts it into /// the cache. If the key is already in the cache, they get moved to the head of /// the LRU list.