Skip to content

Commit

Permalink
triedb/pathdb: introduce lookup structure to optimize node query
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Oct 14, 2024
1 parent 16f6409 commit 64c34fc
Show file tree
Hide file tree
Showing 12 changed files with 1,253 additions and 54 deletions.
4 changes: 3 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var (

snapshotCommitTimer = metrics.NewRegisteredResettingTimer("chain/snapshot/commits", nil)
triedbCommitTimer = metrics.NewRegisteredResettingTimer("chain/triedb/commits", nil)
prefetchWaitTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/wait", nil)

blockInsertTimer = metrics.NewRegisteredResettingTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredResettingTimer("chain/validation", nil)
Expand Down Expand Up @@ -1952,12 +1953,13 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
if statedb.StorageLoaded != 0 {
storageReadSingleTimer.Update(statedb.StorageReads / time.Duration(statedb.StorageLoaded))
}
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation)
accountUpdateTimer.Update(statedb.AccountUpdates - statedb.PrefetcherWait) // Account updates are complete(in validation)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
triehash := statedb.AccountHashes // The time spent on tries hashing
trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update
blockExecutionTimer.Update(ptime - (statedb.AccountReads + statedb.StorageReads)) // The time spent on EVM processing
prefetchWaitTimer.Update(statedb.PrefetcherWait) // The time spent on waiting prefetcher to finish preload tasks
blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation
blockCrossValidationTimer.Update(xvtime) // The time spent on stateless cross validation

Expand Down
17 changes: 10 additions & 7 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,15 @@ type StateDB struct {
witness *stateless.Witness

// Measurements gathered during execution for debugging purposes
AccountReads time.Duration
AccountHashes time.Duration
AccountUpdates time.Duration
AccountCommits time.Duration
StorageReads time.Duration
StorageUpdates time.Duration
StorageCommits time.Duration
AccountReads time.Duration
AccountHashes time.Duration
AccountUpdates time.Duration
AccountCommits time.Duration
StorageReads time.Duration
StorageUpdates time.Duration
StorageCommits time.Duration

PrefetcherWait time.Duration
SnapshotCommits time.Duration
TrieDBCommits time.Duration

Expand Down Expand Up @@ -866,6 +868,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
} else {
s.trie = trie
}
s.PrefetcherWait = time.Since(start)
}
// Perform updates before deletions. This prevents resolution of unnecessary trie nodes
// in circumstances similar to the following:
Expand Down
35 changes: 26 additions & 9 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package state
import (
"errors"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -55,13 +56,15 @@ type triePrefetcher struct {
accountDupWriteMeter metrics.Meter
accountDupCrossMeter metrics.Meter
accountWasteMeter metrics.Meter
accountLoadTimer metrics.ResettingTimer

storageLoadReadMeter metrics.Meter
storageLoadWriteMeter metrics.Meter
storageDupReadMeter metrics.Meter
storageDupWriteMeter metrics.Meter
storageDupCrossMeter metrics.Meter
storageWasteMeter metrics.Meter
storageLoadTimer metrics.ResettingTimer
}

func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher {
Expand All @@ -78,13 +81,15 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads

accountLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/read", nil),
accountLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/write", nil),
accountLoadTimer: metrics.GetOrRegisterResettingTimer(prefix+"/account/load/time", nil),
accountDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/read", nil),
accountDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/write", nil),
accountDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/cross", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),

storageLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/read", nil),
storageLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/write", nil),
storageLoadTimer: metrics.GetOrRegisterResettingTimer(prefix+"/storage/load/time", nil),
storageDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/read", nil),
storageDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/write", nil),
storageDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/cross", nil),
Expand Down Expand Up @@ -120,7 +125,10 @@ func (p *triePrefetcher) report() {
if fetcher.root == p.root {
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))

total := len(fetcher.seenRead) + len(fetcher.seenWrite)
if total > 0 {
p.accountLoadTimer.Update(fetcher.readTime / time.Duration(total))
}
p.accountDupReadMeter.Mark(int64(fetcher.dupsRead))
p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross))
Expand All @@ -133,7 +141,10 @@ func (p *triePrefetcher) report() {
} else {
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))

total := len(fetcher.seenRead) + len(fetcher.seenWrite)
if total > 0 {
p.storageLoadTimer.Update(fetcher.readTime / time.Duration(total))
}
p.storageDupReadMeter.Mark(int64(fetcher.dupsRead))
p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross))
Expand Down Expand Up @@ -221,12 +232,12 @@ func (p *triePrefetcher) trieID(owner common.Hash, root common.Hash) string {
// main prefetcher is paused and either all requested items are processed or if
// the trie being worked on is retrieved from the prefetcher.
type subfetcher struct {
db Database // Database to load trie nodes through
state common.Hash // Root hash of the state to prefetch
owner common.Hash // Owner of the trie, usually account hash
root common.Hash // Root hash of the trie to prefetch
addr common.Address // Address of the account that the trie belongs to
trie Trie // Trie being populated with nodes
db Database // Database to load trie nodes through
state common.Hash // Root hash of the state to prefetch
owner common.Hash // Owner of the trie, usually account hash
root common.Hash // Root hash of the trie to prefetch
addr common.Address // Address of the account that the trie belongs to
trie Trie // Trie being populated with nodes

tasks []*subfetcherTask // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
Expand All @@ -237,6 +248,7 @@ type subfetcher struct {

seenRead map[string]struct{} // Tracks the entries already loaded via read operations
seenWrite map[string]struct{} // Tracks the entries already loaded via write operations
readTime time.Duration // Total time spent on resolving states

dupsRead int // Number of duplicate preload tasks via reads only
dupsWrite int // Number of duplicate preload tasks via writes only
Expand All @@ -254,7 +266,7 @@ type subfetcherTask struct {

// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address}) *subfetcher {

Check failure on line 269 in core/state/trie_prefetcher.go

View workflow job for this annotation

GitHub Actions / build

syntax error: unexpected } in parameter list; possibly missing comma or )
sf := &subfetcher{
db: db,
state: state,
Expand Down Expand Up @@ -378,6 +390,7 @@ func (sf *subfetcher) loop() {
sf.tasks = nil
sf.lock.Unlock()

start := time.Now()
for _, task := range tasks {
key := string(task.key)
if task.read {
Expand Down Expand Up @@ -410,6 +423,10 @@ func (sf *subfetcher) loop() {
sf.seenWrite[key] = struct{}{}
}
}
// Count the time being spent on state resolving. While it's not very
// accurate due to some additional operations (e.g., filter out duplicated
// task), but it's already good enough for monitoring.
sf.readTime += time.Since(start)

case <-sf.stop:
// Termination is requested, abort if no more tasks are pending. If
Expand Down
3 changes: 3 additions & 0 deletions triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ type layer interface {
// This is meant to be used during shutdown to persist the layer without
// flattening everything down (bad for reorgs).
journal(w io.Writer) error

// isStale returns whether this layer has become stale or if it's still live.
isStale() bool
}

// Config contains the settings for database.
Expand Down
29 changes: 26 additions & 3 deletions triedb/pathdb/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ type diffLayer struct {
memory uint64 // Approximate guess as to how much memory we use

parent layer // Parent layer modified by this one, never nil, **can be changed**
lock sync.RWMutex // Lock used to protect parent
stale bool // Signals that the layer became stale (referenced disk layer became stale)
lock sync.RWMutex // Lock used to protect parent and stale fields
}

// newDiffLayer creates a new diff layer on top of an existing layer.
Expand Down Expand Up @@ -95,6 +96,25 @@ func (dl *diffLayer) parentLayer() layer {
return dl.parent
}

// isStale returns whether this layer has become stale or if it's still live.
func (dl *diffLayer) isStale() bool {
dl.lock.RLock()
defer dl.lock.RUnlock()

return dl.stale
}

// markStale sets the stale flag as true.
func (dl *diffLayer) markStale() {
dl.lock.Lock()
defer dl.lock.Unlock()

if dl.stale {
panic("triedb diff layer is stale")
}
dl.stale = true
}

// node implements the layer interface, retrieving the trie node blob with the
// provided node information. No error will be returned if the node is not found.
func (dl *diffLayer) node(owner common.Hash, path []byte, depth int) ([]byte, common.Hash, *nodeLoc, error) {
Expand All @@ -103,6 +123,9 @@ func (dl *diffLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
dl.lock.RLock()
defer dl.lock.RUnlock()

if dl.stale {
return nil, common.Hash{}, nil, errSnapshotStale
}
// If the trie node is known locally, return it
subset, ok := dl.nodes[owner]
if ok {
Expand All @@ -125,7 +148,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes map
}

// persist flushes the diff layer and all its parent layers to disk layer.
func (dl *diffLayer) persist(force bool) (layer, error) {
func (dl *diffLayer) persist(force bool) (*diskLayer, error) {
if parent, ok := dl.parentLayer().(*diffLayer); ok {
// Hold the lock to prevent any read operation until the new
// parent is linked correctly.
Expand All @@ -147,7 +170,7 @@ func (dl *diffLayer) persist(force bool) (layer, error) {

// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
// it. The method will panic if called onto a non-bottom-most diff layer.
func diffToDisk(layer *diffLayer, force bool) (layer, error) {
func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) {
disk, ok := layer.parentLayer().(*diskLayer)
if !ok {
panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer()))
Expand Down
2 changes: 1 addition & 1 deletion triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (dl *diskLayer) parentLayer() layer {
return nil
}

// isStale return whether this layer has become stale (was flattened across) or if
// isStale returns whether this layer has become stale (was flattened across) or if
// it's still live.
func (dl *diskLayer) isStale() bool {
dl.lock.RLock()
Expand Down
Loading

0 comments on commit 64c34fc

Please sign in to comment.