Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statement-distribution: RFC103 implementation #5883

Draft
wants to merge 2 commits into
base: sandreim/node_v2_descriptors
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions polkadot/node/network/statement-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ polkadot-primitives = { workspace = true, features = ["test"] }
polkadot-primitives-test-helpers = { workspace = true }
rand_chacha = { workspace = true, default-features = true }
polkadot-subsystem-bench = { workspace = true }
rstest = { workspace = true }

[[bench]]
name = "statement-distribution-regression-bench"
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/network/statement-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub enum Error {
FetchValidatorGroups(RuntimeApiError),

#[error("Fetching claim queue failed {0:?}")]
FetchClaimQueue(runtime::Error),
FetchClaimQueue(RuntimeApiError),

#[error("Attempted to share statement when not a validator or not assigned")]
InvalidShare,
Expand Down
189 changes: 126 additions & 63 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::ReputationAggregator,
runtime::{
fetch_claim_queue, request_min_backing_votes, ClaimQueueSnapshot, ProspectiveParachainsMode,
request_min_backing_votes, request_node_features, ClaimQueueSnapshot,
ProspectiveParachainsMode,
},
};
use polkadot_primitives::{
vstaging::CoreState, AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex,
GroupIndex, GroupRotationInfo, Hash, Id as ParaId, IndexedVec, SessionIndex, SessionInfo,
node_features::FeatureIndex,
vstaging::{
transpose_claim_queue, CandidateDescriptorVersion, CoreState, TransposedClaimQueue,
},
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, GroupIndex,
GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures, SessionIndex, SessionInfo,
SignedStatement, SigningContext, UncheckedSignedStatement, ValidatorId, ValidatorIndex,
};

Expand All @@ -69,7 +74,7 @@ use futures::{
use std::{
collections::{
hash_map::{Entry, HashMap},
HashSet,
BTreeMap, HashSet,
},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -137,6 +142,12 @@ const COST_UNREQUESTED_RESPONSE_STATEMENT: Rep =
Rep::CostMajor("Un-requested Statement In Response");
const COST_INACCURATE_ADVERTISEMENT: Rep =
Rep::CostMajor("Peer advertised a candidate inaccurately");
const COST_INVALID_DESCRIPTOR_VERSION: Rep =
Rep::CostMajor("Candidate Descriptor version is invalid");
const COST_INVALID_CORE_INDEX: Rep =
Rep::CostMajor("Candidate Descriptor contains an invalid core index");
const COST_INVALID_SESSION_INDEX: Rep =
Rep::CostMajor("Candidate Descriptor contains an invalid session index");

const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
const COST_INVALID_REQUEST_BITFIELD_SIZE: Rep =
Expand All @@ -156,6 +167,7 @@ struct PerRelayParentState {
statement_store: StatementStore,
seconding_limit: usize,
session: SessionIndex,
transposed_cq: TransposedClaimQueue,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
disabled_validators: HashSet<ValidatorIndex>,
}
Expand Down Expand Up @@ -219,10 +231,17 @@ struct PerSessionState {
// getting the topology from the gossip-support subsystem
grid_view: Option<grid::SessionTopologyView>,
local_validator: Option<LocalValidatorIndex>,
// `true` if v2 candidate receipts are allowed by the runtime
v2_receipts: bool,
}

impl PerSessionState {
fn new(session_info: SessionInfo, keystore: &KeystorePtr, backing_threshold: u32) -> Self {
fn new(
session_info: SessionInfo,
keystore: &KeystorePtr,
backing_threshold: u32,
v2_receipts: bool,
) -> Self {
let groups = Groups::new(session_info.validator_groups.clone(), backing_threshold);
let mut authority_lookup = HashMap::new();
for (i, ad) in session_info.discovery_keys.iter().cloned().enumerate() {
Expand All @@ -235,7 +254,14 @@ impl PerSessionState {
)
.map(|(_, index)| LocalValidatorIndex::Active(index));

PerSessionState { session_info, groups, authority_lookup, grid_view: None, local_validator }
PerSessionState {
session_info,
groups,
authority_lookup,
grid_view: None,
local_validator,
v2_receipts,
}
}

fn supply_topology(
Expand Down Expand Up @@ -271,6 +297,11 @@ impl PerSessionState {
fn is_not_validator(&self) -> bool {
self.grid_view.is_some() && self.local_validator.is_none()
}

/// Returns `true` if v2 candidate receipts are enabled
fn candidate_receipt_v2_enabled(&self) -> bool {
self.v2_receipts
}
}

pub(crate) struct State {
Expand All @@ -280,7 +311,7 @@ pub(crate) struct State {
implicit_view: ImplicitView,
candidates: Candidates,
per_relay_parent: HashMap<Hash, PerRelayParentState>,
per_session: HashMap<SessionIndex, PerSessionState>,
per_session: BTreeMap<SessionIndex, PerSessionState>,
// Topology might be received before first leaf update, where we
// initialize the per_session_state, so cache it here until we
// are able to use it.
Expand All @@ -299,7 +330,7 @@ impl State {
implicit_view: Default::default(),
candidates: Default::default(),
per_relay_parent: HashMap::new(),
per_session: HashMap::new(),
per_session: BTreeMap::new(),
peers: HashMap::new(),
keystore,
authorities: HashMap::new(),
Expand Down Expand Up @@ -615,8 +646,18 @@ pub(crate) async fn handle_active_leaves_update<Context>(

let minimum_backing_votes =
request_min_backing_votes(new_relay_parent, session_index, ctx.sender()).await?;
let mut per_session_state =
PerSessionState::new(session_info, &state.keystore, minimum_backing_votes);
let node_features =
request_node_features(new_relay_parent, session_index, ctx.sender()).await?;
let mut per_session_state = PerSessionState::new(
session_info,
&state.keystore,
minimum_backing_votes,
node_features
.unwrap_or(NodeFeatures::EMPTY)
.get(FeatureIndex::CandidateReceiptV2 as usize)
.map(|b| *b)
.unwrap_or(false),
);
if let Some(topology) = state.unused_topologies.remove(&session_index) {
per_session_state.supply_topology(&topology.topology, topology.local_index);
}
Expand Down Expand Up @@ -662,12 +703,11 @@ pub(crate) async fn handle_active_leaves_update<Context>(
.map_err(JfyiError::FetchValidatorGroups)?
.1;

let maybe_claim_queue = fetch_claim_queue(ctx.sender(), new_relay_parent)
.await
.unwrap_or_else(|err| {
gum::debug!(target: LOG_TARGET, ?new_relay_parent, ?err, "handle_active_leaves_update: `claim_queue` API not available");
None
});
let claim_queue = ClaimQueueSnapshot(polkadot_node_subsystem_util::request_claim_queue(new_relay_parent, ctx.sender())
.await
.await
.map_err(JfyiError::RuntimeApiUnavailable)?
.map_err(JfyiError::FetchClaimQueue)?);

let local_validator = per_session.local_validator.and_then(|v| {
if let LocalValidatorIndex::Active(idx) = v {
Expand All @@ -676,9 +716,8 @@ pub(crate) async fn handle_active_leaves_update<Context>(
&per_session.groups,
&availability_cores,
&group_rotation_info,
&maybe_claim_queue,
&claim_queue,
seconding_limit,
max_candidate_depth,
)
} else {
Some(LocalValidatorState { grid_tracker: GridTracker::default(), active: None })
Expand All @@ -688,11 +727,12 @@ pub(crate) async fn handle_active_leaves_update<Context>(
let groups_per_para = determine_groups_per_para(
availability_cores,
group_rotation_info,
&maybe_claim_queue,
max_candidate_depth,
&claim_queue,
)
.await;

let transposed_cq = transpose_claim_queue(claim_queue.0);

state.per_relay_parent.insert(
new_relay_parent,
PerRelayParentState {
Expand All @@ -702,6 +742,7 @@ pub(crate) async fn handle_active_leaves_update<Context>(
session: session_index,
groups_per_para,
disabled_validators,
transposed_cq,
},
);
}
Expand Down Expand Up @@ -743,9 +784,8 @@ fn find_active_validator_state(
groups: &Groups,
availability_cores: &[CoreState],
group_rotation_info: &GroupRotationInfo,
maybe_claim_queue: &Option<ClaimQueueSnapshot>,
claim_queue: &ClaimQueueSnapshot,
seconding_limit: usize,
max_candidate_depth: usize,
) -> Option<LocalValidatorState> {
if groups.all().is_empty() {
return None
Expand All @@ -754,22 +794,7 @@ fn find_active_validator_state(
let our_group = groups.by_validator_index(validator_index)?;

let core_index = group_rotation_info.core_for_group(our_group, availability_cores.len());
let paras_assigned_to_core = if let Some(claim_queue) = maybe_claim_queue {
claim_queue.iter_claims_for_core(&core_index).copied().collect()
} else {
availability_cores
.get(core_index.0 as usize)
.and_then(|core_state| match core_state {
CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id),
CoreState::Occupied(occupied_core) if max_candidate_depth >= 1 => occupied_core
.next_up_on_available
.as_ref()
.map(|scheduled_core| scheduled_core.para_id),
CoreState::Free | CoreState::Occupied(_) => None,
})
.into_iter()
.collect()
};
let paras_assigned_to_core = claim_queue.iter_claims_for_core(&core_index).copied().collect();
let group_validators = groups.get(our_group)?.to_owned();

Some(LocalValidatorState {
Expand Down Expand Up @@ -2176,37 +2201,18 @@ async fn provide_candidate_to_grid<Context>(
async fn determine_groups_per_para(
availability_cores: Vec<CoreState>,
group_rotation_info: GroupRotationInfo,
maybe_claim_queue: &Option<ClaimQueueSnapshot>,
max_candidate_depth: usize,
claim_queue: &ClaimQueueSnapshot,
) -> HashMap<ParaId, Vec<GroupIndex>> {
let n_cores = availability_cores.len();

// Determine the core indices occupied by each para at the current relay parent. To support
// on-demand parachains we also consider the core indices at next blocks.
let schedule: HashMap<CoreIndex, Vec<ParaId>> = if let Some(claim_queue) = maybe_claim_queue {
let schedule: HashMap<CoreIndex, Vec<ParaId>> =
claim_queue
.iter_all_claims()
.map(|(core_index, paras)| (*core_index, paras.iter().copied().collect()))
.collect()
} else {
availability_cores
.into_iter()
.enumerate()
.filter_map(|(index, core)| match core {
CoreState::Scheduled(scheduled_core) =>
Some((CoreIndex(index as u32), vec![scheduled_core.para_id])),
CoreState::Occupied(occupied_core) =>
if max_candidate_depth >= 1 {
occupied_core.next_up_on_available.map(|scheduled_core| {
(CoreIndex(index as u32), vec![scheduled_core.para_id])
})
} else {
None
},
CoreState::Free => None,
})
.collect()
};
.collect();


let mut groups_per_para = HashMap::new();
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
Expand Down Expand Up @@ -2353,7 +2359,7 @@ async fn handle_incoming_manifest_common<'a, Context>(
peer: PeerId,
peers: &HashMap<PeerId, PeerState>,
per_relay_parent: &'a mut HashMap<Hash, PerRelayParentState>,
per_session: &'a HashMap<SessionIndex, PerSessionState>,
per_session: &'a BTreeMap<SessionIndex, PerSessionState>,
candidates: &mut Candidates,
candidate_hash: CandidateHash,
relay_parent: Hash,
Expand Down Expand Up @@ -3106,11 +3112,12 @@ pub(crate) async fn handle_response<Context>(
) {
let &requests::CandidateIdentifier { relay_parent, candidate_hash, group_index } =
response.candidate_identifier();
let peer = response.requested_peer().clone();

gum::trace!(
target: LOG_TARGET,
?candidate_hash,
peer = ?response.requested_peer(),
?peer,
"Received response",
);

Expand Down Expand Up @@ -3174,6 +3181,62 @@ pub(crate) async fn handle_response<Context>(
"Successfully received candidate"
);

if !per_session.candidate_receipt_v2_enabled() &&
candidate.descriptor.version() == CandidateDescriptorVersion::V2
{
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?peer,
"Version 2 candidate receipts are not enabled by the runtime"
);

// Punish peer.
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_INVALID_DESCRIPTOR_VERSION,
)
.await;
return
}

// Get the latest session index & check candidate descriptor session index.
match (candidate.descriptor.session_index(), state.per_session.last_key_value()) {
(Some(session_index), Some((latest_session_index, _))) => {
if &session_index != latest_session_index {
// Punish peer.
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_INVALID_SESSION_INDEX,
)
.await;
return
}
// TODO: determine if we need to buffer candidates at session boundaries.
},
_ => {},
}

// Validate the core index.
if let Err(err) = candidate.check_core_index(&relay_parent_state.transposed_cq) {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?err,
?peer,
"Received candidate has invalid core index"
);

// Punish peer.
modify_reputation(reputation, ctx.sender(), peer, COST_INVALID_CORE_INDEX)
.await;
return
}

(candidate, persisted_validation_data, statements)
},
};
Expand Down
Loading
Loading