void RCLConsensus::Adaptor::onAccept( Result const& result, RCLCxLedger const& prevLedger, NetClock::duration const& closeResolution, ConsensusCloseTimes const& rawCloseTimes, ConsensusMode const& mode, Json::Value&& consensusJson) { app_.getJobQueue().addJob( jtACCEPT, "acceptLedger", [=, this, cj = std::move(consensusJson)]() mutable {

this



>doAccept(

result,

prevLedger,

closeResolution,

rawCloseTimes,

mode,

std::move(cj));

this



>app_.getOPs().endConsensus();

});

}

void

RCLConsensus::Adaptor::doAccept(

Result const& result,

RCLCxLedger const& prevLedger,

NetClock::duration closeResolution,

ConsensusCloseTimes const& rawCloseTimes,

ConsensusMode const& mode,

Json::Value&& consensusJson)

{

prevProposers_ = result.proposers;

prevRoundTime_ = result.roundTime.read();

bool closeTimeCorrect;

const bool proposing = mode == ConsensusMode::proposing; const bool haveCorrectLCL = mode != ConsensusMode::wrongLedger; const bool consensusFail = result.state == ConsensusState::MovedOn;

auto consensusCloseTime = result.position.closeTime();

if (consensusCloseTime == NetClock::time_point{})

{

using namespace std::chrono_literals; consensusCloseTime = prevLedger.closeTime() + 1s; closeTimeCorrect = false; } else {

consensusCloseTime = effCloseTime(

consensusCloseTime, closeResolution, prevLedger.closeTime());

closeTimeCorrect = true;

}

JLOG(j_.debug()) << "Report: Prop=" << (proposing ? "yes" : "no") << " val=" << (validating_ ? "yes" : "no") << " corLCL=" << (haveCorrectLCL ? "yes" : "no") << " fail=" << (consensusFail ? "yes" : "no"); JLOG(j_.debug()) << "Report: Prev = " << prevLedger.id() << ":" << prevLedger.seq();

std::set<TxID> failed;

CanonicalTXSet retriableTxs{result.txns.map_



>getHash().as_uint256()};

JLOG(j_.debug()) << "Building canonical tx set: " << retriableTxs.key();

for (auto const& item : *result.txns.map_) { try { retriableTxs.insert( std::make_shared<STTx const>(SerialIter{item.slice()})); JLOG(j_.debug()) << "    Tx: " << item.key(); } catch (std::exception const& ex) { failed.insert(item.key()); JLOG(j_.warn()) << "    Tx: " << item.key() << " throws: " << ex.what(); } }

auto built = buildLCL(

prevLedger,

retriableTxs,

consensusCloseTime,

closeTimeCorrect,

closeResolution,

result.roundTime.read(),

failed);

auto const newLCLHash = built.id(); JLOG(j_.debug()) << "Built ledger #" << built.seq() << ": " << newLCLHash;

notify(protocol::neACCEPTED_LEDGER, built, haveCorrectLCL);

if (haveCorrectLCL && result.state == ConsensusState::Yes) { std::vector<TxID> accepted;

result.txns.map_->visitLeaves( [&accepted](boost::intrusive_ptr<SHAMapItem const> const& item) { accepted.push_back(item->key()); });

for (auto const& r : retriableTxs) failed.insert(r.first.getTXID());

censorshipDetector_.check( std::move(accepted), [curr = built.seq(), j = app_.journal("CensorshipDetector"), &failed](uint256 const& id, LedgerIndex seq) { if (failed.count(id)) return true;

auto const wait = curr - seq;

if (wait && (wait % censorshipWarnInternal == 0)) { std::ostringstream ss; ss << "Potential Censorship: Eligible tx " << id << ", which we are tracking since ledger " << seq << " has not been included as of ledger " << curr << ".";

JLOG(j.warn()) << ss.str();

}

return false;

});

}

if (validating_)

validating_ = ledgerMaster_.isCompatible(



built.ledger_, j_.warn(), "Not validating");

if (validating_ && !consensusFail && app_.getValidations().canValidateSeq(built.seq())) { validate(built, result.txns, proposing); JLOG(j_.info()) << "CNF Val " << newLCLHash; } else JLOG(j_.info()) << "CNF buildLCL " << newLCLHash;

ledgerMaster_.consensusBuilt(

built.ledger_, result.txns.id(), std::move(consensusJson));

{

bool anyDisputes = false; for (auto const& [_, dispute] : result.disputes) { (void)_; if (!dispute.getOurVote()) {

try { JLOG(j_.debug()) << "Test applying disputed transaction that did" << " not get in " << dispute.tx().id();

SerialIter sit(dispute.tx().tx_->slice()); auto txn = std::make_shared<STTx const>(sit);

if (isPseudoTx(



txn))

continue;

retriableTxs.insert(txn);

anyDisputes = true; } catch (std::exception const& ex) { JLOG(j_.debug()) << "Failed to apply transaction we voted " "NO on. Exception: " << ex.what(); } } }

std::unique_lock lock{app_.getMasterMutex(), std::defer_lock};

std::unique_lock sl{ledgerMaster_.peekMutex(), std::defer_lock};

std::lock(lock, sl);

auto const lastVal = ledgerMaster_.getValidatedLedger(); std::optional<Rules> rules; if (lastVal) rules = makeRulesGivenLedger(*lastVal, app_.config().features); else rules.emplace(app_.config().features); app_.openLedger().accept( app_, *rules, built.ledger_, localTxs_.getTxSet(), anyDisputes, retriableTxs, tapNONE, "consensus", [&](OpenView& view, beast::Journal j) {

return app_.getTxQ().accept(app_, view);

});

app_.getOPs().reportFeeChange();

}

{

ledgerMaster_.switchLCL(built.ledger_);

XRPL_ASSERT( ledgerMaster_.getClosedLedger()->info().hash == built.id(), "ripple::RCLConsensus::Adaptor::doAccept : ledger hash match"); XRPL_ASSERT( app_.openLedger().current()->info().parentHash == built.id(), "ripple::RCLConsensus::Adaptor::doAccept : parent hash match"); }

if ((mode == ConsensusMode::proposing || mode == ConsensusMode::observing) && !consensusFail) { auto closeTime = rawCloseTimes.self;

JLOG(j_.info()) << "We closed at " << closeTime.time_since_epoch().count(); using usec64_t = std::chrono::duration<std::uint64_t>; usec64_t closeTotal = std::chrono::duration_cast<usec64_t>(closeTime.time_since_epoch()); int closeCount = 1;

for (auto const& [t, v] : rawCloseTimes.peers) { JLOG(j_.info()) << std::to_string(v) << " time votes for " << std::to_string(t.time_since_epoch().count()); closeCount += v; closeTotal += std::chrono::duration_cast<usec64_t>(t.time_since_epoch()) * v; }

closeTotal += usec64_t(closeCount / 2); closeTotal /= closeCount;

using duration = std::chrono::duration<std::int32_t>; using time_point = std::chrono::time_point<NetClock, duration>; auto offset = time_point{closeTotal} - std::chrono::time_point_cast<duration>(closeTime); JLOG(j_.info()) << "Our close offset is estimated at " << offset.count() << " (" << closeCount << ")";

app_.timeKeeper().adjustCloseTime(offset);

}

}

void

RCLConsensus::Adaptor::notify(

protocol::NodeEvent ne,

RCLCxLedger const& ledger,

bool haveCorrectLCL)

{

protocol::TMStatusChange s;