以太坊详解之交易池
交易池
架构设计
1.交易处理流程
下图是一笔交易从出生到交易进入区块的主要流程:
graph TD
A[用户] -->|发送交易| B[钱包/API]
B --> C[节点1]
C -->|广播交易| D[P2P网络]
D -->|接收交易| E[构建者节点]
C --> F[交易池]
E --> G[交易池]
G --> H[构建区块 + MEV优化]
H --> K[验证者节点]
K --> L[签署并上链]
L --> M[区块]
D -->|广播交易| G
subgraph 节点1
C
F
end
subgraph 构建者节点
E
G
H
end
subgraph 验证者节点
K
L
end
style A fill:#4096ff,stroke:#333
style B fill:#e74c3c,stroke:#333
style C fill:#f0f0f0,stroke:#ccc
style D fill:#4096ff,stroke:#333
style E fill:#f0f0f0,stroke:#ccc
style F fill:#ffffff,stroke:#ccc
style G fill:#ffffff,stroke:#ccc
style H fill:#ffeaa7,stroke:#e67e22
style K fill:#55efc4,stroke:#00b894
style L fill:#55efc4,stroke:#00b894
style M fill:#74b9ff,stroke:#0984e3
graph TD
A[用户] -->|发送交易| B[钱包/API]
B --> C[节点1]
C -->|广播交易| D[P2P网络]
D -->|接收交易| E[构建者节点]
C --> F[交易池]
E --> G[交易池]
G --> H[构建区块 + MEV优化]
H --> K[验证者节点]
K --> L[签署并上链]
L --> M[区块]
D -->|广播交易| G
subgraph 节点1
C
F
end
subgraph 构建者节点
E
G
H
end
subgraph 验证者节点
K
L
end
style A fill:#4096ff,stroke:#333
style B fill:#e74c3c,stroke:#333
style C fill:#f0f0f0,stroke:#ccc
style D fill:#4096ff,stroke:#333
style E fill:#f0f0f0,stroke:#ccc
style F fill:#ffffff,stroke:#ccc
style G fill:#ffffff,stroke:#ccc
style H fill:#ffeaa7,stroke:#e67e22
style K fill:#55efc4,stroke:#00b894
style L fill:#55efc4,stroke:#00b894
style M fill:#74b9ff,stroke:#0984e3
graph TD
A[用户] -->|发送交易| B[钱包/API]
B --> C[节点1]
C -->|广播交易| D[P2P网络]
D -->|接收交易| E[构建者节点]
C --> F[交易池]
E --> G[交易池]
G --> H[构建区块 + MEV优化]
H --> K[验证者节点]
K --> L[签署并上链]
L --> M[区块]
D -->|广播交易| G
subgraph 节点1
C
F
end
subgraph 构建者节点
E
G
H
end
subgraph 验证者节点
K
L
end
style A fill:#4096ff,stroke:#333
style B fill:#e74c3c,stroke:#333
style C fill:#f0f0f0,stroke:#ccc
style D fill:#4096ff,stroke:#333
style E fill:#f0f0f0,stroke:#ccc
style F fill:#ffffff,stroke:#ccc
style G fill:#ffffff,stroke:#ccc
style H fill:#ffeaa7,stroke:#e67e22
style K fill:#55efc4,stroke:#00b894
style L fill:#55efc4,stroke:#00b894
style M fill:#74b9ff,stroke:#0984e3graph TD
A[用户] -->|发送交易| B[钱包/API]
B --> C[节点1]
C -->|广播交易| D[P2P网络]
D -->|接收交易| E[构建者节点]
C --> F[交易池]
E --> G[交易池]
G --> H[构建区块 + MEV优化]
H --> K[验证者节点]
K --> L[签署并上链]
L --> M[区块]
D -->|广播交易| G
subgraph 节点1
C
F
end
subgraph 构建者节点
E
G
H
end
subgraph 验证者节点
K
L
end
style A fill:#4096ff,stroke:#333
style B fill:#e74c3c,stroke:#333
style C fill:#f0f0f0,stroke:#ccc
style D fill:#4096ff,stroke:#333
style E fill:#f0f0f0,stroke:#ccc
style F fill:#ffffff,stroke:#ccc
style G fill:#ffffff,stroke:#ccc
style H fill:#ffeaa7,stroke:#e67e22
style K fill:#55efc4,stroke:#00b894
style L fill:#55efc4,stroke:#00b894
style M fill:#74b9ff,stroke:#0984e3
首先,用户可以通过以太坊钱包或直接调用以太坊执行层节点(如 Geth)的 API,将交易发送至一个运行中的节点。
此时,由于交易是通过该节点的本地 API 接收的,该交易被视为本地交易(local transaction)。在经过基础验证(如签名、nonce、余额、gas 费用)后,交易被纳入该节点的交易池,并随后通过 P2P 网络广播给所有已连接的邻近节点。
当其他节点(例如区块构建者节点)接收到此交易时,在将其纳入自身交易池前,会将该交易标记为远程交易(remote transaction)。远程交易同样需经过完整验证,才能进入交易池,成为待打包的候选交易。
即使接收节点并非构建者节点也无妨 —— 所有全节点默认都会将合法交易继续转发给邻居节点。依托 P2P 网络的 gossip 机制,一笔交易通常在数秒内即可传播至网络中绝大多数节点,包括活跃的区块构建者。
交易在交易池中被区分为本地或远程,主要是为了体现节点对不同来源交易的处理策略差异。本地交易享有若干特权,例如:
- 豁免最低 Gas Price / Priority Fee 限制(若配置允许);
- 在交易池容量满时,优先保留,不易被驱逐;
- 可持久化到本地 journal 文件,节点重启后自动重载;
- 在 EIP-1559 模型下,仍按
maxPriorityFeePerGas(小费)排序,但本地交易在同等小费下可能获得轻微优先(取决于实现)。
📌 重要转变:在当前 PBS(提议者-构建者分离)架构下,交易池的主要服务对象已从“矿工”转变为“区块构建者”。普通验证者节点通常不再维护完整交易池或自行打包,而是通过 MEV-Boost 等中间件,从构建者市场中选择收益最高的预构建区块。因此,交易能否被打包,最终取决于构建者是否将其纳入区块并出价竞标成功,而不仅仅是交易池中的排序。
2.以太坊交易池设计
自2014年以来,以太坊交易池持续迭代优化。从这里也说明,交易池不仅仅重要,还需要高性能。
以太坊交易池的主要设计模块,分别是交易池配置、实时的区块链状态、交易管理容器、本地交易存储和新交易事件。各个模块相互影响,其中最重要的的交易管理。
在 Geth 实现中,交易池由 TxPool 管理,它聚合多个子池(如 LegacyPool 处理传统交易)。下文如无特别说明,‘交易池’指 LegacyPool 实现。
TxPool 自身不直接管理交易,而是将交易分发给子池(如 LegacyPool)处理。配置参数(如 PriceLimit、Locals)由各子池独立使用。
2.1 交易池配置
配置信息由 TxPoolConfig 所定义,各项信息如下:
// core/txpool/txpool.go
// TxPool 是多个特定类型交易池的聚合器,统一管理节点认为“值得关注”的所有交易。
// 交易在从网络接收到或本地提交时进入交易池;
// 在被纳入区块链或因资源限制被驱逐时离开交易池。
type TxPool struct {
subpools []SubPool // 用于处理不同类型交易的子池列表
chain BlockChain
signer types.Signer
stateLock sync.RWMutex // 保护 state 实例的读写锁
state *state.StateDB // 区块链头部区块对应的当前状态数据库
subs event.SubscriptionScope // 订阅作用域,用于在关闭时统一取消所有订阅
quit chan chan error // 退出通道,用于终止区块头更新器
term chan struct{} // 终止信号通道,用于检测交易池是否已关闭
sync chan chan error // 用于测试/模拟的通道,可阻塞等待内部重置完成
}
// core/txpool/legacypool/legacypool.go
// Config是交易池的配置参数。
type Config struct {
Locals []common.Address // 本地优先地址
NoLocals bool // 是否禁用本地交易特殊处理
Journal string // 本地交易日志文件路径(持久化未上链交易)
Rejournal time.Duration // 重新写入日志的间隔
PriceLimit uint64 // 最小 gas price 阈值(低于此值的交易会被拒绝)
PriceBump uint64 // 替换交易时要求的 gas price 最小涨幅(百分比,如 10 表示 10%)
AccountSlots uint64 // 每个账户在“可执行交易池”中保留的最小槽数
GlobalSlots uint64 // 整个可执行交易池最大交易数
AccountQueue uint64 // 每个账户在“排队交易池”中允许的最大交易数
GlobalQueue uint64 // 整个排队交易池最大交易数
Lifetime time.Duration // 交易在池中的最大存活时间(超时会被丢弃)
}Locals: 定义了一组视为 local 交易的账户地址。任何来自此清单的交易均被视为 local 交易。这些交易通常享有特权,例如豁免最低 Gas 价格限制 (PriceLimit) 和防止被非 local 交易替换。NoLocals: 指示是否禁用对 local 地址 (Locals) 的特殊处理。如果设置为true,则所有地址的交易都遵循相同的规则,没有特权。Journal: 指定本地交易日志文件路径,用于在节点重启后重新导入未上链的本地交易。Rejournal: 每隔多久将当前本地交易重新写入日志文件,防止节点崩溃导致交易丢失。PriceLimit: 交易进入交易池所需的最低 gas pricePriceBump: 当用户想用新交易替换同 nonce 的旧交易时,新交易的 gas price(或 tip)必须至少提高多少百分比。AccountSlots: 为每个账户在“可执行交易池(pending)”中保留的最小槽数。GlobalSlots: 设定整个“可执行交易池”(pending pool)所能容纳的最大交易数量。AccountQueue: 每个账户在“排队交易池(queue)”中允许存放的最大交易数量。GlobalQueue: “排队交易池”允许的全局最大交易数量。Lifetime: 允许 remote 的非可执行交易可在交易池存活的最长时间。交易池每分钟检查一次,一旦发现有超期的remote 账户,则移除该账户下的所有非可执行交易。默认为3小时。
在上述 LegacyPool 配置中,交易池内部将交易划分为两类核心状态:可执行交易(pending) 与 排队交易(queued,即“非可执行交易”)。
-
可执行交易(pending):指那些 nonce 连续、费用满足当前区块要求(如 EIP-1559 下
maxFeePerGas >= baseFee)、可被立即执行 的交易。区块构建者(Block Builders)在组装区块时,会优先从这部分交易中挑选并排序,以最大化区块收益(如 MEV 或 Priority Fee)。 -
排队交易(queued):指那些 因 nonce 不连续(如前面有缺失交易)或暂时不满足执行条件(如费用不足) 而无法立即执行的交易。它们被暂存在队列中,等待前置交易上链或费用条件满足后,才可能被提升为“可执行”状态。
📌 重要说明:在当前 PBS(提议者-构建者分离)架构下,验证者(Validators)通常不再直接访问或选择交易池中的交易。交易能否被打包,取决于构建者是否将其纳入预构建区块并成功竞标。因此,交易池的“pending/queued”状态,实质上是为构建者提供候选交易集合,而非为验证者服务。
并非“所有刚进入交易池的交易都属于非可执行状态”。实际上:
如果一笔交易的 nonce 正好等于账户当前链上 nonce + 1(即连续),且满足 gas price/tip 要求,它会直接进入 pending 池,成为“可执行交易”。
只有当交易的 nonce “跳号”(如当前 nonce=5,却收到 nonce=7)时,它才会被放入 queued 池,等待 nonce=6 的交易出现并被打包后,才可能被激活。
2.2 链状态
所有进入交易池的交易都必须经过校验,最基本的包括:
- 账户余额是否足够支付交易的 gas 和转账金额;
- 交易的 nonce 是否合法(比如不能小于当前账户已用 nonce)。
交易池内部会维护一个最新的区块状态(StateDB),用于做这些校验。当交易池收到新区块的通知时,会立即更新(重置)这个 StateDB,确保后续交易校验基于最新链上状态。
在交易池启动后,将订阅链的区块头事件:
// core/txpool/txpool.go
// loop是交易池的主事件循环,等待并响应外部区块链事件以及各种报告和交易驱逐事件。
func (p *TxPool) loop(head *types.Header) {
// Close the termination marker when the pool stops
defer close(p.term)
// 订阅链头事件以触发子池重置
var (
newHeadCh = make(chan core.ChainHeadEvent)
newHeadSub = p.chain.SubscribeChainHeadEvent(newHeadCh)
)
...
}主循环通过 select 监听 newHeadCh。收到事件后,只是把新区块头赋值给 newHead,不执行任何重置逻辑。
select {
case event := <-newHeadCh:
// 仅更新 newHead,不立即触发 reset
newHead = event.Header
// 等待下一轮循环判断是否需要 reset
}之后检测状态变化,异步触发 reset
if newHead != oldHead || resetForced {
select {
case resetBusy <- struct{}{}:
// 更新 TxPool 的全局 state(非子池状态)
if statedb, err := p.chain.StateAt(newHead.Root); err == nil {
p.stateLock.Lock()
p.state = statedb
p.stateLock.Unlock()
}
// 异步触发所有子池 Reset
go func(oldHead, newHead *types.Header) {
for _, subpool := range p.subpools {
subpool.Reset(oldHead, newHead)
}
resetDone <- newHead
}(oldHead, newHead)
resetForced = false
default:
// reset 已在运行,跳过
}
}其中异步触发的部分:
// core/txpool/legacypool/legacypool.go
func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
wait := pool.requestReset(oldHead, newHead)
<-wait
}
// requestReset请求将池重置到新的头部块。
// 当重置完成后,返回的通道将被关闭。
func (pool *LegacyPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
select {
case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}:
return <-pool.reorgDoneCh
case <-pool.reorgShutdownCh:
return pool.reorgShutdownCh
}
}在 txpool 中 New 方法中:
func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
...
for i, subpool := range subpools {
if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil {
for j := i - 1; j >= 0; j-- {
subpools[j].Close()
}
return nil, err
}
}
go pool.loop(head)
return pool, nil
}LegacyPool 的 Init 方法:
func (pool *LegacyPool) Init() error {
// ... 初始化
go pool.scheduleReorgLoop() // ← 启动 reorg 调度循环
return nil
}
func (pool *LegacyPool) scheduleReorgLoop() {
for {
select {
case req := <-pool.reqResetCh:
// 收集 reset 请求
reset = &txpoolResetRequest{oldHead: req.oldHead, newHead: req.newHead}
case ... // 其他事件(如新交易)
default:
if reset != nil || len(dirtyAccounts) > 0 {
// 启动批量 reorg 处理
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
reset, dirtyAccounts, queuedEvents = nil, nil, nil
}
time.Sleep(100 * time.Millisecond) // 避免空转
}
}
}
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events TxEventQueue) {
defer func() { done <- struct{}{} }()
if reset != nil {
// 真正执行状态重置和交易池清理
pool.reset(reset.oldHead, reset.newHead)
}
// ... 处理 dirtyAccounts、events(如 promoteExecutables)
// 通知外部 reset 完成
select {
case pool.reorgDoneCh <- struct{}{}:
default:
}
}最终触发reset(),reset()做了三件事:
- 处理
reorg: 找回被丢弃但有效的交易,准备重新注入 - 更新状态: 同步最新区块状态(StateDB + Nonce)
- 重注入交易: 将找回的交易重新走一遍入池流程
2.3 本地交易
在交易池中将交易标记为 local 的有多种用途:
- 在本地磁盘存储已发送的交易。这样,本地交易不会丢失,重启节点时可以重新加载到交易池,实时广播出去。
- 可以作为外部程序和以太坊沟通的一个渠道。外部程序只需要监听文件内容变化,则可以获得交易清单。
- local 交易可优先于 remote 交易。对交易量的限制等操作,不影响 local 下的账户和交易。
对应本地交易存储,在启动交易池时根据配置开启本地交易存储能力:
// eth/backend.go
func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
...
if !config.TxPool.NoLocals {
rejournal := config.TxPool.Rejournal
if rejournal < time.Second {
log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second)
rejournal = time.Second
}
eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool)
stack.RegisterLifecycle(eth.localTxTracker)
}
...
}journal 并不是保存所有的本地交易以及历史,他仅仅是存储当前交易池中存在的本地交易。journal 文件仅持久化当前存在于交易池中的本地交易。每次 rotate 会用最新内容覆盖旧文件,确保节点重启后能恢复未上链的本地交易。
// core/txpool/locals/tx_tracker.go
func (tracker *TxTracker) loop() {
...
for {
select {
case <-tracker.shutdownCh:
return
case <-timer.C:
...
if checkJournal {
// Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts
tracker.mu.Lock()
lastJournal = time.Now()
if err := tracker.journal.rotate(rejournal); err != nil {
log.Warn("Transaction journal rotation failed", "err", err)
}
tracker.mu.Unlock()
}
timer.Reset(recheckInterval)
}
}
}2.4 新交易信号
进入交易池的交易将被广播到网络中。这是依赖于交易池支持外部订阅新交易事件信号。任何订阅此事件的子模块,在交易池出现新的可执行交易时,均可实时接受到此事件通知,并获得新交易信息。
需要注意的是并非所有进入交易池的交易均被通知外部,而是只有交易从非可执行状态变成可执行状态后才会发送信号。
// core/txpool/legacypool/legacypool.go
// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*SortedMap) {
...
if len(events) > 0 {
var txs []*types.Transaction
for _, set := range events {
txs = append(txs, set.Flatten()...)
}
pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
}
}在交易池中,有两处地方才会执行发送信号。一是交易时用于替换已经存在的可执行交易时。二是有新的一批交易从非可执行状态提升到可执行状态后。
外部只需要订阅 SubscribeNewTxsEvent 新可执行交易事件,则可实时接受交易。在 geth 中网络层将订阅交易事件,以便实时广播。
// eth/handler.go
func (h *handler) Start(maxPeers int) {
...
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
go h.txBroadcastLoop()
...
}
// txBroadcastLoop announces new transactions to connected peers.
func (h *handler) txBroadcastLoop() {
defer h.wg.Done()
for {
select {
case event := <-h.txsCh:
h.BroadcastTransactions(event.Txs)
case <-h.txsSub.Err():
return
}
}
}交易存储
下图是交易池对本地待处理交易的磁盘存储管理流程,涉及加载、实时写入和定期更新维护。
flowchart LR
%% --- Part 1: 启动流程 ---
A((交易池启动)) --> B{开启交易
journaling 吗?};
B -- 是 --> C[从 journal 文件中加载
交易到交易池];
C --> D[更新 journal 文件];
%% --- Part 2: 交易循环 ---
subgraph 交易循环 [Transaction Loop]
direction LR
loop_trigger((🕐
每 1 小时)) --> check_journaling{开启交易
journaling 吗?};
check_journaling -- 是 --> update_journal[更新 journal 文件];
%% 新增的箭头,形成一个闭环,表示任务完成后等待下一次触发
update_journal --> loop_trigger;
end
%% --- 流程之间的连接 ---
%% 启动过程完成后,开始周期性循环
D --> loop_trigger;
B -- 否 --> loop_trigger;
flowchart LR
%% --- Part 1: 启动流程 ---
A((交易池启动)) --> B{开启交易
journaling 吗?};
B -- 是 --> C[从 journal 文件中加载
交易到交易池];
C --> D[更新 journal 文件];
%% --- Part 2: 交易循环 ---
subgraph 交易循环 [Transaction Loop]
direction LR
loop_trigger((🕐
每 1 小时)) --> check_journaling{开启交易
journaling 吗?};
check_journaling -- 是 --> update_journal[更新 journal 文件];
%% 新增的箭头,形成一个闭环,表示任务完成后等待下一次触发
update_journal --> loop_trigger;
end
%% --- 流程之间的连接 ---
%% 启动过程完成后,开始周期性循环
D --> loop_trigger;
B -- 否 --> loop_trigger;
flowchart LR
%% --- Part 1: 启动流程 ---
A((交易池启动)) --> B{开启交易
journaling 吗?};
B -- 是 --> C[从 journal 文件中加载
交易到交易池];
C --> D[更新 journal 文件];
%% --- Part 2: 交易循环 ---
subgraph 交易循环 [Transaction Loop]
direction LR
loop_trigger((🕐
每 1 小时)) --> check_journaling{开启交易
journaling 吗?};
check_journaling -- 是 --> update_journal[更新 journal 文件];
%% 新增的箭头,形成一个闭环,表示任务完成后等待下一次触发
update_journal --> loop_trigger;
end
%% --- 流程之间的连接 ---
%% 启动过程完成后,开始周期性循环
D --> loop_trigger;
B -- 否 --> loop_trigger;flowchart LR
%% --- Part 1: 启动流程 ---
A((交易池启动)) --> B{开启交易
journaling 吗?};
B -- 是 --> C[从 journal 文件中加载
交易到交易池];
C --> D[更新 journal 文件];
%% --- Part 2: 交易循环 ---
subgraph 交易循环 [Transaction Loop]
direction LR
loop_trigger((🕐
每 1 小时)) --> check_journaling{开启交易
journaling 吗?};
check_journaling -- 是 --> update_journal[更新 journal 文件];
%% 新增的箭头,形成一个闭环,表示任务完成后等待下一次触发
update_journal --> loop_trigger;
end
%% --- 流程之间的连接 ---
%% 启动过程完成后,开始周期性循环
D --> loop_trigger;
B -- 否 --> loop_trigger;
flowchart LR
E((接收到新交易)) --> F[添加交易
到交易池];
F --> G{是本地交易吗?};
G -- 是 --> H[添加到
journal 文件];
flowchart LR
E((接收到新交易)) --> F[添加交易
到交易池];
F --> G{是本地交易吗?};
G -- 是 --> H[添加到
journal 文件];
flowchart LR
E((接收到新交易)) --> F[添加交易
到交易池];
F --> G{是本地交易吗?};
G -- 是 --> H[添加到
journal 文件];flowchart LR
E((接收到新交易)) --> F[添加交易
到交易池];
F --> G{是本地交易吗?};
G -- 是 --> H[添加到
journal 文件];
首先 main.go 根据 app.Action 执行 geth 函数
//cmd/geth/main.go
func init() {
// Initialize the CLI app and start Geth
app.Action = geth
...
}
func main() {
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}然后会进入下面的流程:geth()->startNode->utils.StartNode->node.Start->Txtracker.Start
// cmd/geth/main.go
func geth(ctx *cli.Context) error {
if args := ctx.Args().Slice(); len(args) > 0 {
return fmt.Errorf("invalid command: %q", args[0])
}
prepare(ctx)
stack := makeFullNode(ctx)
defer stack.Close()
startNode(ctx, stack, false)
stack.Wait()
return nil
}
func startNode(ctx *cli.Context, stack *node.Node, isConsole bool) {
// Start up the node itself
utils.StartNode(ctx, stack, isConsole)
...
}
func StartNode(ctx *cli.Context, stack *node.Node, isConsole bool) {
if err := stack.Start(); err != nil {
Fatalf("Error starting protocol stack: %v", err)
}
...
}
// node/node.go
func (n *Node) Start() error {
...
n.state = runningState
// open networking and RPC endpoints
err := n.openEndpoints()
lifecycles := make([]Lifecycle, len(n.lifecycles))
copy(lifecycles, n.lifecycles)
n.lock.Unlock()
...
var started []Lifecycle
for _, lifecycle := range lifecycles {
if err = lifecycle.Start(); err != nil {
break
}
started = append(started, lifecycle)
}
...
return err
}其中的lificycle.Start会调用 Txtracker.Start:
// core/txpool/locals/tx_tracker.go
// Start implements node.Lifecycle interface
// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
func (tracker *TxTracker) Start() error {
tracker.wg.Add(1)
go tracker.loop()
return nil
}其中的loop()就是本文的主要内容.
加载已存储交易
在交易池首次启动 journal 时,主动将已存储的交易加载到交易池。
// core/txpool/locals/tx_tracker.go
func (tracker *TxTracker) loop() {
defer tracker.wg.Done()
if tracker.journal != nil {
tracker.journal.load(func(transactions []*types.Transaction) []error {
tracker.TrackAll(transactions)
return nil
})
defer tracker.journal.close()
}
...
} lo处理时,如果文件不存在则直接返回 nil(跳过加载),这是兼容性更强的文件存在性判断方式(使用 errors.Is + fs.ErrNotExist)。若打开失败且非“不存在”,则返回错误。成功打开后 defer 关闭文件句柄,防止资源泄漏。
// core/txpool/locals/journal.go
input, err := os.Open(journal.path)
if errors.Is(err, fs.ErrNotExist) {
return nil
}
if err != nil {
return err
}
defer input.Close()在加载过程中,临时将 journal 的写入器替换为 devNull(空设备),目的是防止在加载历史交易时触发新的 journal 写入(避免重复记录)。加载完成后通过 defer 恢复 writer 为 nil,确保后续正常写入。
// core/txpool/locals/journal.go
journal.writer = new(devNull)
defer func() { journal.writer = nil }()因为 journal 中存储的是 RLP 编码的交易序列,因此使用 rlp.NewStream 初始化流式解码器 ,为逐笔解码交易做准备。
// core/txpool/locals/journal.go
stream := rlp.NewStream(input, 0) 定义 loadBatch 函数用于批量提交交易到交易池。每笔交易若添加失败(如 nonce 冲突、余额不足等),则记录 dropped 计数 ,但不中断整体加载流程。
// core/txpool/locals/journal.go
var (
failure error
batch types.Transactions
)
loadBatch := func(txs types.Transactions) {
for _, err := range add(txs) {
if err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++
}
}
}进入循环,逐笔从 RLP 流中解码交易。每解码一笔,total 计数加一。交易采用批处理模式,每积累 1024 笔即调用 loadBatch 提交 ,以减少交易池频繁更新(如排序、驱逐等)的开销。循环在遇到 io.EOF(正常结束)或其它错误时退出,退出前若当前批次仍有未提交交易,则强制提交。
// core/txpool/locals/journal.go
for {
tx := new(types.Transaction)
if err = stream.Decode(tx); err != nil {
if err != io.EOF {
failure = err
}
if batch.Len() > 0 {
loadBatch(batch)
}
break
}
total++
if batch = append(batch, tx); batch.Len() > 1024 {
loadBatch(batch)
batch = batch[:0]
}
}最终打印加载统计:共加载 total 笔交易,其中 dropped 笔因各种原因未能成功加入交易池。
log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)load 方法实现了 journal 文件的原子性、批量化、安全加载。通过临时禁用写入避免重复记录,通过批量提交优化性能,通过错误隔离保证部分失败不影响整体恢复,是交易池持久化恢复机制的关键实现。
load的完整实现:
// core/txpool/locals/journal.go
// 加载从磁盘解析交易日志转储,将其内容加载到指定的池中。
func (journal *journal) load(add func([]*types.Transaction) []error) error {
// 打开 journal 文件,准备加载历史交易
input, err := os.Open(journal.path)
if errors.Is(err, fs.ErrNotExist) {
// 如果 journal 文件根本不存在,则跳过解析,直接返回
return nil
}
if err != nil {
// 其他打开错误则直接返回
return err
}
defer input.Close() // 确保函数结束时关闭文件
// 临时禁用 journal 写入(防止加载时重复写入)
journal.writer = new(devNull)
defer func() { journal.writer = nil }() // 函数退出前恢复 writer
// 将 journal 中的所有交易注入交易池
stream := rlp.NewStream(input, 0) // 创建 RLP 解码流
total, dropped := 0, 0 // 统计总交易数和被丢弃的交易数
// 定义一个内部函数:用于加载一小批交易,并更新计数器
loadBatch := func(txs types.Transactions) {
for _, err := range add(txs) { // 调用外部传入的 add 函数添加交易
if err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++ // 记录失败数量
}
}
}
var (
failure error // 记录最终的解析错误(非 EOF)
batch types.Transactions // 当前批次的交易缓存
)
// 循环读取并解析每笔交易
for {
// 解析下一笔交易,出错则终止循环
tx := new(types.Transaction)
if err = stream.Decode(tx); err != nil {
if err != io.EOF { // 如果不是正常结束(EOF),记录错误
failure = err
}
// 如果当前批次还有未处理交易,先处理完
if batch.Len() > 0 {
loadBatch(batch)
}
break // 退出循环
}
// 成功解析一笔交易,加入批次
total++ // 总交易数 +1
batch = append(batch, tx)
// 如果批次大小超过 1024,则立即处理并清空批次
if batch.Len() > 1024 {
loadBatch(batch)
batch = batch[:0] // 重置批次,保留底层数组以复用内存
}
}
// 加载完成后打印日志:共加载多少笔,丢弃多少笔
log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)
return failure // 返回最终错误(如解析格式错误等)
}存储交易
journal.insert将交易实时写入文件流中,相当于实时存储到磁盘。而在写入时,将交易进行RLP编码。
// core/txpool/locals/journal.go
// insert将指定事务添加到本地磁盘日志中。
func (journal *journal) insert(tx *types.Transaction) error {
if journal.writer == nil {
return errNoActiveJournal
}
if err := rlp.Encode(journal.writer, tx); err != nil {
return err
}
return nil
}定期更新 journal
rotate 函数的作用是根据当前交易池的内容重新生成交易日志文件(journal),确保磁盘上持久化的交易记录与内存中交易池的本地交易状态保持一致。
func (tracker *TxTracker) loop() {
...
var (
lastJournal = time.Now()
timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom.
)
for {
select {
case <-tracker.shutdownCh:
return
case <-timer.C:
checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal
resubmits, rejournal := tracker.recheck(checkJournal)
if len(resubmits) > 0 {
tracker.pool.Add(resubmits, false)
}
if checkJournal {
// Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts
tracker.mu.Lock()
lastJournal = time.Now()
if err := tracker.journal.rotate(rejournal); err != nil {
log.Warn("Transaction journal rotation failed", "err", err)
}
tracker.mu.Unlock()
}
...
}
}
}rotate函数首先检查并关闭当前已打开的日志写入器(journal.writer),避免资源泄漏或写入冲突。接着,它创建一个临时新文件(路径为原路径加“.new”后缀),以只写、创建、截断模式打开,权限设为 0644。随后,函数遍历传入的 all 参数——这是一个从地址到交易列表的映射,代表当前交易池中所有本地账户待处理的交易。对每笔交易,使用 RLP 编码写入临时文件。写入过程中若发生错误,立即关闭文件并返回错误。写入完成后,统计写入交易总数(journaled)。
// core/txpool/locals/journal.go
// Close the current journal (if any is open)
if journal.writer != nil {
if err := journal.writer.Close(); err != nil {
return err
}
journal.writer = nil
}
// Generate a new journal with the contents of the current pool
replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
journaled := 0
for _, txs := range all {
for _, tx := range txs {
if err = rlp.Encode(replacement, tx); err != nil {
replacement.Close()
return err
}
}
journaled += len(txs)
}
replacement.Close()完成写入后,函数通过 os.Rename 将临时文件原子性地重命名为原日志文件名,实现覆盖原文件。这一步保证了即使在写入过程中系统崩溃,原日志文件也不会被破坏,因为替换是原子操作。
替换成功后,函数重新打开新生成的日志文件,以追加写入模式(O_WRONLY|O_APPEND)打开,并将文件句柄赋值给 journal.writer,为后续可能的增量写入做准备。
最后,根据交易数量决定日志级别:若无交易,则记录 Debug 级别日志;否则记录 Info 级别日志,输出本次重生成的交易总数和涉及的账户数,便于运维监控。
// Replace the live journal with the newly generated one
if err = os.Rename(journal.path+".new", journal.path); err != nil {
return err
}
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
journal.writer = sink
logger := log.Info
if len(all) == 0 {
logger = log.Debug
}
logger("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all))整个 rotate 过程确保了 journal 文件内容始终反映交易池当前有效的本地交易,同时通过临时文件 + 重命名机制保障了写入的原子性和数据一致性,避免因中途失败导致日志损坏。
rotate完整实现:
// rotate regenerates the transaction journal based on the current contents of
// the transaction pool.
func (journal *journal) rotate(all map[common.Address]types.Transactions) error {
// Close the current journal (if any is open)
if journal.writer != nil {
if err := journal.writer.Close(); err != nil {
return err
}
journal.writer = nil
}
// Generate a new journal with the contents of the current pool
replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
journaled := 0
for _, txs := range all {
for _, tx := range txs {
if err = rlp.Encode(replacement, tx); err != nil {
replacement.Close()
return err
}
}
journaled += len(txs)
}
replacement.Close()
// Replace the live journal with the newly generated one
if err = os.Rename(journal.path+".new", journal.path); err != nil {
return err
}
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
journal.writer = sink
logger := log.Info
if len(all) == 0 {
logger = log.Debug
}
logger("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all))
return nil
}其次,交易池通过 TxTracker 的后台协程定期执行交易重检与本地交易日志轮转。该协程启动后,首先从磁盘日志中加载本地交易并追踪,随后进入循环,每隔 recheckInterval(默认10秒后首次,之后周期性)触发一次检查:
若距离上次日志轮转已超过 rejournal 间隔,则触发本地交易日志的持久化轮转(即将当前本地交易写入磁盘);同时,协程会执行交易重检(recheck),筛选出需要重新提交的交易,并将其重新加入交易池。
日志轮转操作在互斥锁保护下执行,以避免与交易追踪(TrackAll)过程中的日志写入发生并发冲突。
func (tracker *TxTracker) loop() {
...
var (
lastJournal = time.Now()
timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom.
)
for {
select {
case <-tracker.shutdownCh:
return
case <-timer.C:
checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal
resubmits, rejournal := tracker.recheck(checkJournal)
if len(resubmits) > 0 {
tracker.pool.Add(resubmits, false)
}
if checkJournal {
// Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts
tracker.mu.Lock()
lastJournal = time.Now()
if err := tracker.journal.rotate(rejournal); err != nil {
log.Warn("Transaction journal rotation failed", "err", err)
}
tracker.mu.Unlock()
}
timer.Reset(recheckInterval)
}
}
}一笔交易如何进入交易池
以太坊节点的交易池(Transaction Pool)负责管理待处理的交易。其核心入口之一是 TxTracker,它会定期重新提交(resubmit)因价格等原因可能需要重试的交易。
TxTracker 在其主循环 loop 中,会定时检查 resubmits 列表。如果其中有交易,它会调用 TxPool 的 Add 方法将这些交易重新注入池中。
func (tracker *TxTracker) loop() {
// ...
for {
select {
// ...
case <-timer.C:
// ...
if len(resubmits) > 0 {
tracker.pool.Add(resubmits, false)
}
// ...
}
}
}TxPool 是一个高层管理者,它可以包含多个不同类型的子池(subpool),例如处理常规交易的 LegacyPool 和处理 EIP-4844 Blob 交易的 BlobPool。TxPool.Add 方法会根据交易类型将交易分发到对应的子池进行处理。
func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error {
// ...
// txsets 会根据交易类型被分到不同的集合
for i := 0; i < len(p.subpools); i++ {
errsets[i] = p.subpools[i].Add(txsets[i], sync)
}
// ...
return errs
}我们重点关注 LegacyPool 的处理流程。
LegacyPool.Add 函数是批量添加交易的入口,它在获取主锁之前,会进行高效的初步筛选,以尽快排除无效或已知的交易。
- 过滤已知交易:遍历所有传入的交易,通过
pool.all这个包含池中所有交易的集合检查交易哈希。如果交易已存在,则标记为ErrAlreadyKnown并跳过。 - 基础有效性验证:对未知交易调用
pool.ValidateTxBasics方法进行无状态(stateless)验证。
// Add enqueues a batch of transactions into the pool if they are valid.
func (pool *LegacyPool) Add(txs []*types.Transaction, sync bool) []error {
var (
errs = make([]error, len(txs))
news = make([]*types.Transaction, 0, len(txs)) // 收集通过初筛的新交易
)
for i, tx := range txs {
if pool.all.Get(tx.Hash()) != nil {
errs[i] = txpool.ErrAlreadyKnown
continue
}
if err := pool.ValidateTxBasics(tx); err != nil {
errs[i] = err
continue
}
news = append(news, tx)
}
if len(news) == 0 {
return errs
}
// 对所有新交易进行加锁和核心处理
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news)
pool.mu.Unlock()
// 合并错误并触发后续处理
// ...
done := pool.requestPromoteExecutables(dirtyAddrs) // 触发可执行交易的提升检查
// ...
return errs
}ValidateTxBasics 是一个关键的预处理步骤,它在不访问链上状态(如账户余额、Nonce)的情况下,检查交易的内在有效性,包括签名、Gas 设置、交易大小、交易类型支持等。这确保了只有格式正确的交易才会进入下一步更消耗资源的处理。
// ValidateTxBasics checks whether a transaction is valid according to the consensus rules.
func (pool *LegacyPool) ValidateTxBasics(tx *types.Transaction) error {
opts := &txpool.ValidationOptions{
Config: pool.chainconfig,
Accept: 0 | // 定义节点接受的交易类型位掩码
1<<types.LegacyTxType |
1<<types.AccessListTxType |
1<<types.DynamicFeeTxType |
1<<types.SetCodeTxType,
MaxSize: txMaxSize,
MinTip: pool.gasTip.Load().ToBig(), // 检查是否满足节点的最低 Gas Tip 要求
}
return txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts)
}通过初筛的交易被批量传递给 addTxsLocked。该函数在持有锁的情况下,遍历交易并逐个调用 pool.add 方法,这是处理单个交易入池的核心所在。
// addTxsLocked attempts to queue a batch of transactions if they are valid.
func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction) ([]error, *accountSet) {
dirty := newAccountSet(pool.signer) // 记录受影响的账户,用于后续处理
errs := make([]error, len(txs))
for i, tx := range txs {
replaced, err := pool.add(tx)
errs[i] = err
if err == nil && !replaced {
dirty.addTx(tx) // 记录成功添加新交易的账户
}
}
return errs, dirty
}func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) 函数是交易池管理的核心,其逻辑严密而复杂,确保了交易池的健康和高效。一笔交易进入此函数后,会经历以下步骤:
第一步:账户预留与状态检查
在多交易池架构(如同时存在 LegacyPool 和 BlobPool)下,必须确保在任何时刻只有一个子池能处理特定账户的交易,以防状态冲突。reserver 机制实现了这一点。
如果一个交易来自一个在池中完全未知的账户(pending 和 queue 中都没有记录),系统会先尝试“持有”(Hold)该账户。如果持有失败(意味着另一子池正在处理),交易将被拒绝。
// core/txpool/legacypool.go
if !hasPending && !hasQueued {
if err := pool.reserver.Hold(from); err != nil { // ❶ 账户预留机制
return false, err
}
}第二步:容量管理与驱逐逻辑
当交易池接近饱和时,节点必须决定是否接纳新交易,以及是否要为新交易腾出空间。
-
容量检查:系统会精确计算新交易将占用的“槽位”(
slots),并检查加入后是否会超过GlobalSlots(可执行+待处理总容量)和GlobalQueue(待处理容量)的限制。 -
价格门槛:如果交易池已满,新交易必须在价格上优于池中已经存在的、价格最低的交易(通过
pool.priced.Underpriced(tx)判断)。所有交易(不论来源)都遵循同样的价格准入规则。 -
防抖动机制:为防止因网络波动导致交易池频繁换入换出(churn),系统引入了一个“防抖动”阈值。如果自上次重组以来,被驱逐的交易数量过多(
changesSinceReorg超过阈值),即使新交易价格很高,也会被暂时拒绝,以维持池的稳定性。 -
驱逐保护:如果需要驱逐交易来腾出空间,系统会执行一个关键的保护性检查。它会防止一个 Nonce 不连续的未来交易(gapped transaction)挤掉一个当前可以被打包执行的
pending状态的交易。这是为了优先保障矿工的出块效率,避免高价但暂不可用的交易替换掉立即可用的交易。 -
执行驱逐:如果上述所有检查都通过,系统会从
priced排序列表中丢弃价格最低的一批交易,为新交易腾出空间。
// 简化后的容量管理逻辑
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { // ❷ 容量检查
if pool.priced.Underpriced(tx) { // ❸ 价格门槛
return false, txpool.ErrUnderpriced
}
if pool.changesSinceReorg > int(pool.config.GlobalSlots/4) { // ❹ 防抖动机制
return false, ErrTxPoolOverflow
}
drop, success := pool.priced.Discard(...) // ❺ 准备驱逐
if pool.isGapped(from, tx) { // ❻ 驱逐保护
if replacesPending { // 如果这个未来交易会导致一个 pending 交易被驱逐
return false, ErrFutureReplacePending // 则拒绝它
}
}
// ... 执行驱逐 ...
for _, tx := range drop {
pool.removeTx(tx.Hash(), false, ...)
}
}第三步:交易入队(pending 或 queue)
通过容量检查后,交易将被尝试放入 pending(可执行)或 queue(待处理)集合。
- 尝试放入
pending集合:- 系统检查该交易发送方的
pending列表中是否已存在相同 Nonce 的交易。 - 如果存在,新交易必须满足价格上浮(Price Bump) 规则(通常要求 Gas Price/Tip 至少提高 10%)才能成功替换旧交易。
- 如果替换成功,旧交易会从
all和priced数据结构中被彻底移除,新交易取而代之。
- 系统检查该交易发送方的
if list := pool.pending[from]; list != nil && list.Contains(tx.Nonce()) { // ❼
inserted, old := list.Add(tx, pool.config.PriceBump) // ❽
if !inserted {
return false, txpool.ErrReplaceUnderpriced
}
if old != nil { // ❾ 替换成功,移除旧交易
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
}
pool.all.Add(tx)
pool.priced.Put(tx)
return old != nil, nil
}- 尝试放入
queue集合:- 如果
pending中没有可替换的交易(即 Nonce 大于下一个期望 Nonce),交易将被尝试放入queue集合。 enqueueTx函数会执行与pending类似的操作:检查queue中有无相同 Nonce 的交易,并根据价格上浮规则进行替换。
- 如果
replaced, err = pool.enqueueTx(hash, tx, true) // ❿
if err != nil {
return false, err
}