Skip to content

Commit

Permalink
Merge branch 'stefan/improved_parallel' into stefan/improved_parallel…
Browse files Browse the repository at this point in the history
…_forknet
  • Loading branch information
stedfn committed Jan 3, 2025
2 parents dbba188 + 53246b4 commit a052e36
Show file tree
Hide file tree
Showing 48 changed files with 1,021 additions and 618 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

## Reference implementation of NEAR Protocol

![Buildkite](https://img.shields.io/buildkite/0eae07525f8e44a19b48fa937813e2c21ee04aa351361cd851)
[![Buildkite](https://img.shields.io/buildkite/0eae07525f8e44a19b48fa937813e2c21ee04aa351361cd851)][buildkite]
![Stable Status][stable-release]
![Prerelease Status][prerelease]
[![codecov][codecov-badge]][codecov-url]
[![Discord chat][discord-badge]][discord-url]
[![Telegram Group][telegram-badge]][telegram-url]

[buildkite]: https://github.com/near/nearcore/actions
[stable-release]: https://img.shields.io/github/v/release/nearprotocol/nearcore?label=stable
[prerelease]: https://img.shields.io/github/v/release/nearprotocol/nearcore?include_prereleases&label=prerelease
[ci-badge-master]: https://badge.buildkite.com/a81147cb62c585cc434459eedd1d25e521453120ead9ee6c64.svg?branch=master
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3915,7 +3915,7 @@ impl Chain {
}

/// Function to check whether we need to create a new snapshot while processing the current block
/// Note that this functions is called as a part of block preprocesing, so the head is not updated to current block
/// Note that this functions is called as a part of block preprocessing, so the head is not updated to current block
fn should_make_or_delete_snapshot(&mut self) -> Result<SnapshotAction, Error> {
// head value is that of the previous block, i.e. curr_block.prev_hash
let head = self.head()?;
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/orphan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Orphan {
/// 2) size of the pool exceeds MAX_ORPHAN_SIZE and the orphan was added a long time ago
/// or the height is high
pub struct OrphanBlockPool {
/// A map from block hash to a orphan block
/// A map from block hash to an orphan block
orphans: HashMap<CryptoHash, Orphan>,
/// A set that contains all orphans for which we have requested missing chunks for them
/// An orphan can be added to this set when it was first added to the pool, or later
Expand Down
2 changes: 1 addition & 1 deletion chain/chunks/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl ShardedTransactionPool {
/// the new shard layout.
/// It works by emptying the pools for old shard uids and re-inserting the
/// transactions back to the pool with the new shard uids.
/// TODO check if this logic works in resharding V3
/// TODO(resharding) check if this logic works in resharding V3
pub fn reshard(&mut self, old_shard_layout: &ShardLayout, new_shard_layout: &ShardLayout) {
tracing::debug!(
target: "resharding",
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1684,7 +1684,7 @@ impl Client {
// layout is changing we need to reshard the transaction pool.
// TODO make sure transactions don't get added for the old shard
// layout after the pool resharding
// TODO check if this logic works in resharding V3
// TODO(resharding) check if this logic works in resharding V3
if self.epoch_manager.is_next_block_epoch_start(&block_hash).unwrap_or(false) {
let new_shard_layout =
self.epoch_manager.get_shard_layout_from_prev_block(&block_hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,11 @@ impl PartialWitnessActor {
witness_bytes,
&chunk_validators,
&signer,
)?;
);

if !contract_deploys.is_empty() {
self.send_chunk_contract_deploys_parts(key, contract_deploys)?;
}

Ok(())
}

Expand All @@ -259,7 +258,7 @@ impl PartialWitnessActor {
witness_bytes: EncodedChunkStateWitness,
chunk_validators: &[AccountId],
signer: &ValidatorSigner,
) -> Result<Vec<(AccountId, PartialEncodedStateWitness)>, Error> {
) -> Vec<(AccountId, PartialEncodedStateWitness)> {
tracing::debug!(
target: "client",
chunk_hash=?chunk_header.chunk_hash(),
Expand All @@ -271,7 +270,7 @@ impl PartialWitnessActor {
let encoder = self.witness_encoders.entry(chunk_validators.len());
let (parts, encoded_length) = encoder.encode(&witness_bytes);

Ok(chunk_validators
chunk_validators
.iter()
.zip_eq(parts)
.enumerate()
Expand All @@ -288,7 +287,7 @@ impl PartialWitnessActor {
);
(chunk_validator.clone(), partial_witness)
})
.collect_vec())
.collect_vec()
}

fn generate_contract_deploys_parts(
Expand Down Expand Up @@ -336,7 +335,7 @@ impl PartialWitnessActor {
witness_bytes: EncodedChunkStateWitness,
chunk_validators: &[AccountId],
signer: &ValidatorSigner,
) -> Result<(), Error> {
) {
// Capture these values first, as the sources are consumed before calling record_witness_sent.
let chunk_hash = chunk_header.chunk_hash();
let witness_size_in_bytes = witness_bytes.size_bytes();
Expand All @@ -352,7 +351,7 @@ impl PartialWitnessActor {
witness_bytes,
chunk_validators,
signer,
)?;
);
encode_timer.observe_duration();

// Record the witness in order to match the incoming acks for measuring round-trip times.
Expand All @@ -367,37 +366,6 @@ impl PartialWitnessActor {
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple),
));
Ok(())
}

/// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part.
fn _forward_state_witness_part(
&self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
let ChunkProductionKey { shard_id, epoch_id, height_created } =
partial_witness.chunk_production_key();
let chunk_producer = self
.epoch_manager
.get_chunk_producer_info(&ChunkProductionKey { epoch_id, height_created, shard_id })?
.take_account_id();

// Forward witness part to chunk validators except the validator that produced the chunk and witness.
let target_chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(&epoch_id, shard_id, height_created)?
.ordered_chunk_validators()
.into_iter()
.filter(|validator| validator != &chunk_producer)
.collect();

self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(
target_chunk_validators,
partial_witness,
),
));
Ok(())
}

/// Function to handle receiving partial_encoded_state_witness message from chunk producer.
Expand Down Expand Up @@ -431,24 +399,24 @@ impl PartialWitnessActor {

self.partial_witness_spawner.spawn("handle_partial_encoded_state_witness", move || {
// Validate the partial encoded state witness and forward the part to all the chunk validators.
let validation = validate_partial_encoded_state_witness(
match validate_partial_encoded_state_witness(
epoch_manager.as_ref(),
&partial_witness,
&signer,
runtime_adapter.store(),
);
match validation {
) {
Ok(true) => {
forward_state_witness_part_v2(
partial_witness,
target_chunk_validators,
network_adapter,
);
network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(
target_chunk_validators,
partial_witness,
),
));
}
Ok(false) => {
tracing::warn!(
target: "client",
"Received partial encoded state witness that is not valid"
"Received invalid partial encoded state witness"
);
}
Err(err) => {
Expand Down Expand Up @@ -479,22 +447,21 @@ impl PartialWitnessActor {
"handle_partial_encoded_state_witness_forward",
move || {
// Validate the partial encoded state witness and store the partial encoded state witness.
let validation = validate_partial_encoded_state_witness(
match validate_partial_encoded_state_witness(
epoch_manager.as_ref(),
&partial_witness,
&signer,
runtime_adapter.store(),
);
match validation {
) {
Ok(true) => {
partial_witness_tracker
.store_partial_encoded_state_witness(partial_witness)
.unwrap();
if let Err(err) = partial_witness_tracker.store_partial_encoded_state_witness(partial_witness) {
tracing::error!(target: "client", "Failed to store partial encoded state witness: {}", err);
}
}
Ok(false) => {
tracing::warn!(
target: "client",
"Received partial encoded state witness that is not valid"
"Received invalid partial encoded state witness"
);
}
Err(err) => {
Expand Down Expand Up @@ -871,17 +838,3 @@ fn contracts_cache_contains_contract(
let cache_key = get_contract_cache_key(contract_hash.0, &runtime_config.wasm_config);
cache.memory_cache().contains(cache_key) || cache.has(&cache_key).is_ok_and(|has| has)
}

/// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part.
fn forward_state_witness_part_v2(
partial_witness: PartialEncodedStateWitness,
target_chunk_validators: Vec<AccountId>,
network_adapter: PeerManagerAdapter,
) {
network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(
target_chunk_validators,
partial_witness,
),
));
}
6 changes: 3 additions & 3 deletions chain/epoch-manager/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2514,13 +2514,13 @@ fn test_epoch_validators_cache() {
#[test]
fn test_chunk_producers() {
let amount_staked = 1_000_000;
// Make sure that last validator has at least 160/1'000'000 / num_shards of stake.
// Make sure that last validator has at least 160/1'000'000 of stake.
// We're running with 2 shards and test1 + test2 has 2'000'000 tokens - so chunk_only should have over 160.
let validators = vec![
("test1".parse().unwrap(), amount_staked),
("test2".parse().unwrap(), amount_staked),
("chunk_only".parse().unwrap(), 200),
("not_enough_producer".parse().unwrap(), 100),
("chunk_only".parse().unwrap(), 321),
("not_enough_producer".parse().unwrap(), 320),
];

// There are 2 shards, and 2 block producers seats.
Expand Down
21 changes: 11 additions & 10 deletions chain/epoch-manager/src/validator_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,13 @@ fn select_chunk_producers(
num_shards: u64,
protocol_version: ProtocolVersion,
) -> (Vec<ValidatorStake>, BinaryHeap<OrderedValidatorStake>, Balance) {
select_validators(
all_proposals,
max_num_selected,
min_stake_ratio * Ratio::new(1, num_shards as u128),
protocol_version,
)
let min_stake_ratio =
if checked_feature!("stable", FixChunkProducerStakingThreshold, protocol_version) {
min_stake_ratio
} else {
min_stake_ratio * Ratio::new(1, num_shards as u128)
};
select_validators(all_proposals, max_num_selected, min_stake_ratio, protocol_version)
}

// Takes the top N proposals (by stake), or fewer if there are not enough or the
Expand Down Expand Up @@ -1037,10 +1038,10 @@ mod tests {
// Given `epoch_info` and `proposals` above, the sample at a given height is deterministic.
let height = 42;
let expected_assignments = vec![
vec![(4, 56), (1, 168), (2, 300), (3, 84), (0, 364)],
vec![(3, 70), (1, 300), (4, 42), (2, 266), (0, 308)],
vec![(4, 42), (1, 238), (3, 42), (0, 450), (2, 196)],
vec![(2, 238), (1, 294), (3, 64), (0, 378)],
vec![(2, 192), (0, 396), (1, 280)],
vec![(1, 216), (2, 264), (0, 396)],
vec![(0, 396), (2, 288), (1, 192)],
vec![(2, 256), (1, 312), (0, 312)],
];
assert_eq!(epoch_info.sample_chunk_validators(height), expected_assignments);
}
Expand Down
4 changes: 2 additions & 2 deletions core/async/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This crate contains helpers related to common asynchronous programming patterns
used in nearcore:

* messaging: common interfaces for sending messages between components.
* test_loop: a event-loop-based test framework that can test multiple components
* test_loop: an event-loop-based test framework that can test multiple components
together in a synchronous way.


Expand Down Expand Up @@ -52,4 +52,4 @@ In tests, the `TestLoopBuilder` provides the `sender()` function which also
implements `CanSend`, see the examples directory under this crate.

`AsyncSender<T>` is similar, except that calling `send_async` returns a future
that carries the response to the message.
that carries the response to the message.
2 changes: 1 addition & 1 deletion core/async/src/test_loop/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! To support this, pass test_loop.future_spawner() the &dyn FutureSpawner
//! to any component that needs to spawn futures.
//!
//! This causes any futures spawned during the test to end up as an callback in the
//! This causes any futures spawned during the test to end up as a callback in the
//! test loop. The event will eventually be executed by the drive_futures function,
//! which will drive the future until it is either suspended or completed. If suspended,
//! then the waker of the future (called when the future is ready to resume) will place
Expand Down
38 changes: 37 additions & 1 deletion core/chain-configs/src/test_genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Default for TestEpochConfigBuilder {
fishermen_threshold: FISHERMEN_THRESHOLD,
protocol_upgrade_stake_threshold: PROTOCOL_UPGRADE_STAKE_THRESHOLD,
minimum_stake_divisor: 10,
minimum_stake_ratio: Rational32::new(160i32, 1_000_000i32),
minimum_stake_ratio: Rational32::new(16i32, 1_000_000i32),
chunk_producer_assignment_changes_limit: 5,
shuffle_shard_assignment_for_chunk_producers: false,
// consider them ineffective
Expand Down Expand Up @@ -628,6 +628,42 @@ pub struct GenesisAndEpochConfigParams<'a> {

/// Handy factory for building test genesis and epoch config store. Use it if it is enough to have
/// one epoch config for your test. Otherwise, just use builders directly.
///
/// ```
/// use near_chain_configs::test_genesis::build_genesis_and_epoch_config_store;
/// use near_chain_configs::test_genesis::GenesisAndEpochConfigParams;
/// use near_chain_configs::test_genesis::ValidatorsSpec;
/// use near_primitives::shard_layout::ShardLayout;
/// use near_primitives::test_utils::create_test_signer;
/// use near_primitives::types::AccountId;
/// use near_primitives::types::AccountInfo;
///
/// const ONE_NEAR: u128 = 1_000_000_000_000_000_000_000_000;
///
/// let protocol_version = 73;
/// let epoch_length = 10;
/// let accounts = (0..6).map(|i| format!("test{}", i).parse().unwrap()).collect::<Vec<AccountId>>();
/// let shard_layout = ShardLayout::multi_shard(6, 1);
/// let validators = vec![
/// AccountInfo {
/// account_id: accounts[0].clone(),
/// public_key: create_test_signer(accounts[0].as_str()).public_key(),
/// amount: 62500 * ONE_NEAR,
/// },
/// ];
/// let validators_spec = ValidatorsSpec::raw(validators, 3, 3, 3);
/// let (genesis, epoch_config_store) = build_genesis_and_epoch_config_store(
/// GenesisAndEpochConfigParams {
/// protocol_version,
/// epoch_length,
/// accounts: &accounts,
/// shard_layout,
/// validators_spec,
/// },
/// |genesis_builder| genesis_builder.genesis_height(10000).transaction_validity_period(1000),
/// |epoch_config_builder| epoch_config_builder.shuffle_shard_assignment_for_chunk_producers(true),
/// );
/// ```
pub fn build_genesis_and_epoch_config_store<'a>(
params: GenesisAndEpochConfigParams<'a>,
customize_genesis_builder: impl FnOnce(TestGenesisBuilder) -> TestGenesisBuilder,
Expand Down
2 changes: 1 addition & 1 deletion core/o11y/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! `observe()` method to record durations (e.g., block processing time).
//! - `IncCounter`: used to represent an ideally ever-growing, never-shrinking
//! integer (e.g., number of block processing requests).
//! - `IntGauge`: used to represent an varying integer (e.g., number of
//! - `IntGauge`: used to represent a varying integer (e.g., number of
//! attestations per block).
//!
//! ## Important
Expand Down
11 changes: 4 additions & 7 deletions core/parameters/src/config_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::{CongestionControlConfig, RuntimeConfig};
use crate::config::{CongestionControlConfig, RuntimeConfig, WitnessConfig};
use crate::parameter_table::{ParameterTable, ParameterTableDiff};
use crate::vm;
use near_primitives_core::types::ProtocolVersion;
Expand Down Expand Up @@ -147,11 +147,8 @@ impl RuntimeConfigStore {
near_primitives_core::chains::BENCHMARKNET => {
let mut config_store = Self::new(None);
let mut config = RuntimeConfig::clone(config_store.get_config(PROTOCOL_VERSION));
config.congestion_control_config.max_tx_gas = 10u64.pow(16);
config.congestion_control_config.min_tx_gas = 10u64.pow(16);
config.witness_config.main_storage_proof_size_soft_limit = 999_999_999_999_999;
config.witness_config.new_transactions_validation_state_size_soft_limit =
999_999_999_999_999;
config.congestion_control_config = CongestionControlConfig::test_disabled();
config.witness_config = WitnessConfig::test_disabled();
let mut wasm_config = vm::Config::clone(&config.wasm_config);
wasm_config.limit_config.per_receipt_storage_proof_size_limit = 999_999_999_999_999;
config.wasm_config = Arc::new(wasm_config);
Expand Down Expand Up @@ -460,6 +457,6 @@ mod tests {
fn test_benchmarknet_config() {
let store = RuntimeConfigStore::for_chain_id(near_primitives_core::chains::BENCHMARKNET);
let config = store.get_config(PROTOCOL_VERSION);
assert_eq!(config.witness_config.main_storage_proof_size_soft_limit, 999_999_999_999_999);
assert_eq!(config.witness_config.main_storage_proof_size_soft_limit, usize::MAX);
}
}
Loading

0 comments on commit a052e36

Please sign in to comment.