以太坊DPOS源码分析
时间: 2018-08-19来源:OSCHINA
前景提要
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
一、前言:
任何共识机制都必须回答包括但不限于如下的问题: 下一个添加到数据库的新区块应该由谁来生成? 下一个块应该何时产生? 该区块应包含哪些交易? 怎样对本协议进行修改? 该如何解决交易历史的竞争问题?
二、功能描述:
每一个持有比特股的人按照持股比例进行对候选受托人的投票;从中选取投票数最多的前21位代表(也可以是其他数字,具体由区块链项目方决定) 成为权力完全相等的21个超级节点(真正:受托人/见证人);通过每隔3秒轮询方式产出区块;而其他候选受托人无权产出区块;
1、持股人:比特股持有所有人;每个账户按照持币数给证人投票;可以随时更换投票;也可以不投;但投只能投赞成票;
2、见证人(受托人/代表,类似比特币的矿工):
注册成为候选受托人需要支付一笔保证金,这笔保证金就是为了防止节点出现作恶的情况,一般来说,如果成为受托人,也就成为超级节点进行挖矿,超级节点需要大概两周的时间才能达到损益平衡,这就促使超级节点至少挖满两周不作恶。
3、选定代表(实现步骤中未考虑实现它)
代表也是通过类似选举证人的方式选举出来。 创始账户(the genesis account)有权对网络参数提出修改,而代表就是该特殊账户的共同签署者。这些参数包括交易费用,区块大小,证人工资和区块间隔等。 在大多数代表批准了提议的变更后,股东有2周的复审期(review period),在此期间他们可以投票踢出代表并作废被提议的变更。
4.出块规则:每隔3秒轮询受托人;并且每个证人会轮流地在固定的预先计划好的2秒内生产一个区块。 当所有证人轮完之后,将被洗牌。 如果某个证人没有在他自己的时间段内生产一个区块,那么该时间段将被跳过,下一个证人继续产生下一个区块。每当证人生产一个区块时,他们都会获取相应的服务费。 证人的薪酬水平由股东选出的代表(delegate)来制定。 如果某个证人没有生产出区块,那么就不会给他支付薪酬,并可能在未来被投票踢出。
5.算法主要包含两个核心部分:块验证人选举和块验证人调度
(1)第一批块验证人由创世块指定,后续每个周期(周期由具体实现定义)都会在周期开始的第一个块重新选举。验证人选举过程如下: 踢掉上个周期出块不足的验证人 统计截止选举块(每个周期的第一块)产生时候选人的票数,选出票数最高的前 N 个作为验证人 随机打乱验证人出块顺序,验证人根据随机后的结果顺序出块
(2)验证人调度根据选举结果进行出块,其他节点根据选举结果验证出块顺序和选举结果是否一致,不一致则认为此块不合法,直接丢弃。
三、以太坊DPOS共识机制源码分析

1、启动入口:
以太坊入口调试脚本:

以太坊项目的启动:main.go中的init()函数-->调用geth方法-->调用startNode-->backend.go中的函数StartMining-->miner.go中的start函数 func init() { // Initialize the CLI app and start Geth app.Action = geth } func geth(ctx *cli.Context) error { //根据上下文配置信息获取全量节点并将该节点注册到以太坊服务 //makeFullNode函数-->flags.go中函数RegisterEthService中-->eth.New-->handler.go中NewProtocolManager-->InsertChain内进行dpos的区块信息校验 node := makeFullNode(ctx) //启动节点 startNode(ctx, node) node.Wait() return nil }
启动节点说明 func startNode(ctx *cli.Context, stack *node.Node) { //启动当前节点:utils.StartNode(stack) //解锁注册钱包事件并自动派生钱包 //监听钱包事件 if ctx.GlobalBool(utils.MiningEnabledFlag.Name) { //判断是否全量节点,只有全量节点才有 挖矿 权利: var ethereum *eth.Ethereum if err := stack.Service(ðereum); err != nil { utils.Fatalf("ethereum service not running: %v", err) } //设置gas价格 ethereum.TxPool().SetGasPrice(utils.GlobalBig(ctx, utils.GasPriceFlag.Name)) //验证是否当前出块受托人:validator, err := s.Validator() ,启动服务打包区块 if err := ethereum.StartMining(true); err != nil { utils.Fatalf("Failed to start mining: %v", err) } } }
上面StartMining方法会调用miner.go中的start函数,调用启动函数之前已经启动全量节点,并进行相关初始化工作(具体初始化内容如下); func (self *Miner) Start(coinbase common.Address) { atomic.StoreInt32(&self.shouldStart, 1) self.worker.setCoinbase(coinbase) self.coinbase = coinbase if atomic.LoadInt32(&self.canStart) == 0 { log.Info("Network syncing, will start miner afterwards") return } atomic.StoreInt32(&self.mining, 1) log.Info("Starting mining operation") //获取当前节点地址,启动服务 self.worker.start() }
worker.go的start函数调用mintLoop函数 func (self *worker) mintLoop() { ticker := time.NewTicker(time.Second).C //for循环不断监听self信号,当监测到self停止时,则调用关闭操作代码,并直接挑出循环监听,函数退出。 for { select { case now := <-ticker: self.mintBlock(now.Unix())//打包块 case <-self.stopper: close(self.quitCh) self.quitCh = make(chan struct{}, 1) self.stopper = make(chan struct{}, 1) return } } }
2.相关角色说明
dpos_context.go type DposContext struct { epochTrie *trie.Trie //记录每个周期的验证人列表 delegateTrie *trie.Trie //记录验证人以及对应投票人的列表 voteTrie *trie.Trie //记录投票人对应验证人 candidateTrie *trie.Trie //记录候选人列表 mintCntTrie *trie.Trie //记录验证人在周期内的出块数目 db ethdb.Database }
以太坊MPT(Trie树, Patricia Trie, 和Merkle树)树形结构存储,并定期同步[k,v]型底层数据库是LevelDB数据库
3.相关交易类型说明
以太坊DPOS共识算法中,将"成为候选人"、"退出候选人"、"投票(授权)"、"取消投票(取消授权)"等操作均定义为以太坊的一种交易类型
transaction.go const (//交易类型 Binary TxType = iota //之前的交易主要是转账或者合约调用 LoginCandidate //成为候选人 LogoutCandidate //退出候选人 Delegate //投票(授权) UnDelegate //取消投票(取消授权) )
在一个新块打包时会执行所有块内的交易,如果发现交易类型不是之前的转账或者合约调用类型,那么会调用 applyDposMessage 进行处理
在worker.go的createNewWork()-->commitTransactions函数-->commitTransaction函数-->调用state_processor.go中的ApplyTransaction函数-->applyDposMessage func ApplyTransaction(config *params.ChainConfig, dposContext *types.DposContext, bc *BlockChain, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *big.Int, cfg vm.Config) (*types.Receipt, *big.Int, error) { msg, err := tx.AsMessage(types.MakeSigner(config, header.Number)) if err != nil { return nil, nil, err } if msg.To() == nil && msg.Type() != types.Binary { return nil, nil, types.ErrInvalidType } // 创建EVM环境的上下文 context := NewEVMContext(msg, header, bc, author) // Create a new environment which holds all relevant information // 创建EVM虚拟机处理交易及智能合约 vmenv := vm.NewEVM(context, statedb, config, cfg) // Apply the transaction to the current state (included in the env) _, gas, failed, err := ApplyMessage(vmenv, msg, gp) if err != nil { return nil, nil, err } if msg.Type() != types.Binary { //如果是非转账或者合约调用类型交易 if err = applyDposMessage(dposContext, msg); err != nil { return nil, nil, err } } } func applyDposMessage(dposContext *types.DposContext, msg types.Message) error { switch msg.Type() { case types.LoginCandidate://成为候选人 dposContext.BecomeCandidate(msg.From()) case types.LogoutCandidate://取消候选人 dposContext.KickoutCandidate(msg.From()) case types.Delegate://投票 //投票之前需要先检查该账号是否候选人;如果投票人之前已经给其他人投过票则先取消之前投票,再进行投票 dposContext.Delegate(msg.From(), *(msg.To())) case types.UnDelegate://取消投票 dposContext.UnDelegate(msg.From(), *(msg.To())) default: return types.ErrInvalidType } return nil }
4.打包出块过程

worker.go func (self *worker) mintBlock(now int64) { engine, ok := self.engine.(*dpos.Dpos) if !ok { log.Error("Only the dpos engine was allowed") return } //矿工会定时(每隔3秒)检查当前的 validator 是否为当前节点,如果是则说明轮询到自己出块了; err := engine.CheckValidator(self.chain.CurrentBlock(), now) if err != nil { switch err { case dpos.ErrWaitForPrevBlock, dpos.ErrMintFutureBlock, dpos.ErrInvalidBlockValidator, dpos.ErrInvalidMintBlockTime: log.Debug("Failed to mint the block, while ", "err", err) default: log.Error("Failed to mint the block", "err", err) } return } //创建一个新的打块任务 work, err := self.createNewWork() if err != nil { log.Error("Failed to create the new work", "err", err) return } //Seal 会对新块进行签名 result, err := self.engine.Seal(self.chain, work.Block, self.quitCh) if err != nil { log.Error("Failed to seal the block", "err", err) return } //将新块广播到邻近的节点,其他节点接收到新块会根据块的签名以及选举结果来看新块是否应该由该验证人来出块 self.recv <- &Result{work, result} } func (self *worker) createNewWork() (*Work, error) { //...... num := parent.Number() header := &types.Header{ ParentHash: parent.Hash(), Number: num.Add(num, common.Big1), GasLimit: core.CalcGasLimit(parent), GasUsed: new(big.Int), Extra: self.extra, Time: big.NewInt(tstamp), } // 仅在挖掘时设置coinbase(避免伪块奖励) if atomic.LoadInt32(&self.mining) == 1 { header.Coinbase = self.coinbase } //初始化块头基础信息 if err := self.engine.Prepare(self.chain, header); err != nil { return nil, fmt.Errorf("got error when preparing header, err: %s", err) } //主要是从 transaction pool 按照 gas price 将交易打包到块中 txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) work.commitTransactions(self.mux, txs, self.chain, self.coinbase) // 打包区块 var ( uncles []*types.Header badUncles []common.Hash ) for hash, uncle := range self.possibleUncles { if len(uncles) == 2 { break } if err := self.commitUncle(work, uncle.Header()); err != nil { log.Trace("Bad uncle found and will be removed", "hash", hash) log.Trace(fmt.Sprint(uncle)) badUncles = append(badUncles, hash) } else { log.Debug("Committing new uncle to block", "hash", hash) uncles = append(uncles, uncle.Header()) } } for _, hash := range badUncles { delete(self.possibleUncles, hash) } // 将 prepare 和 CommitNewWork 内容打包成新块,同时里面还有包含出块奖励、选举、更新打块计数等功能 if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts, work.dposContext); err != nil { return nil, fmt.Errorf("got error when finalize block for sealing, err: %s", err) } work.Block.DposContext = work.dposContext return work, nil }
疑问:
(1).这里面没看到跟pow一样的工作量难度证明的哈希函数计算,即当有出块权益时,打包验证好交易后是否直接打包,那如何会出现规定时间打包失败的情况呢?是否是只有类似断网或网络不好时会出现?
(2).Seal会对新块进行封装签名;在pow算法中seal是核心是计算工作量得出随机符合条件hash,而在dpos共识中seal是否只做了封装签名操作?从源码中看是这样
5.选举分析
(1)选举实现步骤: 根据上个周期出块的情况把一些被选上但出块数达不到要求的候选人踢掉 截止到上一块为止,选出票数最高的前 N 个候选人作为验证人 打乱验证人顺序
当调用dpos.go中Finalize函数打包新块时 func (d *Dpos) Finalize(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt, dposContext *types.DposContext) (*types.Block, error) { // 累积块奖励并提交最终状态根 AccumulateRewards(chain.Config(), state, header, uncles) header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) parent := chain.GetHeaderByHash(header.ParentHash) epochContext := &EpochContext{ statedb: state, DposContext: dposContext, TimeStamp: header.Time.Int64(), } if timeOfFirstBlock == 0 { if firstBlockHeader := chain.GetHeaderByNumber(1); firstBlockHeader != nil { timeOfFirstBlock = firstBlockHeader.Time.Int64() } } genesis := chain.GetHeaderByNumber(0) //打包每个块之前调用 tryElect 来看看当前块是否是新周期的第一块,如果是第一块则需要触发选举。 err := epochContext.tryElect(genesis, parent) if err != nil { return nil, fmt.Errorf("got error when elect next epoch, err: %s", err) } //更新验证人在周期内的出块数目 updateMintCnt(parent.Time.Int64(), header.Time.Int64(), header.Validator, dposContext) header.DposContext = dposContext.ToProto() return types.NewBlock(header, txs, uncles, receipts), nil }
epoch_context.go中的tryElect选举函数 func (ec *EpochContext) tryElect(genesis, parent *types.Header) error { genesisEpoch := genesis.Time.Int64() / epochInterval prevEpoch := parent.Time.Int64() / epochInterval currentEpoch := ec.TimeStamp / epochInterval prevEpochIsGenesis := prevEpoch == genesisEpoch if prevEpochIsGenesis && prevEpoch < currentEpoch { prevEpoch = currentEpoch - 1 } prevEpochBytes := make([]byte, 8) binary.BigEndian.PutUint64(prevEpochBytes, uint64(prevEpoch)) iter := trie.NewIterator(ec.DposContext.MintCntTrie().PrefixIterator(prevEpochBytes)) //根据当前块和上一块的时间计算当前块和上一块是否属于同一周期,如果是同一周期,意味着当前块不是周期第一块,不需要触发选举;如果不是同一周期,说明当前块是该周期的第一块,则触发选举 for i := prevEpoch; i < currentEpoch; i++ { // 如果前一个周期不是创世周期,触发踢出候选人规则; if !prevEpochIsGenesis && iter.Next() { //踢出规则主要看上一周期是否存在候选人出块少于特定阈值(50%),如果存在则踢出:if cnt < epochDuration/blockInterval/ maxValidatorSize /2 { if err := ec.kickoutValidator(prevEpoch); err != nil { return err } } //对候选人进行计票 votes, err := ec.countVotes() if err != nil { return err } candidates := sortableAddresses{} for candidate, cnt := range votes { candidates = append(candidates, &sortableAddress{candidate, cnt}) } if len(candidates) < safeSize { return errors.New("too few candidates") } //将候选人按照票数由高到低排序 sort.Sort(candidates) if len(candidates) > maxValidatorSize {//如果候选人大于预定受托人数量常量maxValidatorSize,则选出前maxValidatorSize个为受托人 candidates = candidates[:maxValidatorSize] } // 重排受托人,由于使用seed是由父块的hash以及当前周期编号组成,所以每个节点计算出来的受托人列表也会一致; seed := int64(binary.LittleEndian.Uint32(crypto.Keccak512(parent.Hash().Bytes()))) + i r := rand.New(rand.NewSource(seed)) for i := len(candidates) - 1; i > 0; i-- { j := int(r.Int31n(int32(i + 1))) candidates[i], candidates[j] = candidates[j], candidates[i] } sortedValidators := make([]common.Address, 0) for _, candidate := range candidates { sortedValidators = append(sortedValidators, candidate.address) } //保存受托人列表 epochTrie, _ := types.NewEpochTrie(common.Hash{}, ec.DposContext.DB()) ec.DposContext.SetEpoch(epochTrie) ec.DposContext.SetValidators(sortedValidators) log.Info("Come to new epoch", "prevEpoch", i, "nextEpoch", i+1) } return nil }
(2)计票实现 先找出候选人对应投票人的列表 所有投票人的余额作为票数累积到候选人的总票数中
计票实现函数是epoch_context.go中的tryElect选举函数中的countVotes函数 func (ec *EpochContext) countVotes() (votes map[common.Address]*big.Int, err error) { votes = map[common.Address]*big.Int{} delegateTrie := ec.DposContext.DelegateTrie()//记录验证人以及对应投票人的列表 candidateTrie := ec.DposContext.CandidateTrie()//获取候选人列表 statedb := ec.statedb iterCandidate := trie.NewIterator(candidateTrie.NodeIterator(nil)) existCandidate := iterCandidate.Next() if !existCandidate { return votes, errors.New("no candidates") } //遍历候选人列表 for existCandidate { candidate := iterCandidate.Value candidateAddr := common.BytesToAddress(candidate) delegateIterator := trie.NewIterator(delegateTrie.PrefixIterator(candidate)) existDelegator := delegateIterator.Next() if !existDelegator { votes[candidateAddr] = new(big.Int) existCandidate = iterCandidate.Next() continue } //遍历后续人对应的投票人列表 for existDelegator { delegator := delegateIterator.Value score, ok := votes[candidateAddr] if !ok { score = new(big.Int) } delegatorAddr := common.BytesToAddress(delegator) //获取投票人的余额作为票数累积到候选人的票数中 weight := statedb.GetBalance(delegatorAddr) score.Add(score, weight) votes[candidateAddr] = score existDelegator = delegateIterator.Next() } existCandidate = iterCandidate.Next() } return votes, nil }

科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

热门排行