注册
关闭
区块链大帝

区块链大帝

发布于 1周前 阅读数 4320

精通Filecoin:Lotus真实数据处理之Client处理存储

接上文:《精通 Filecoin:Lotus 真实数据处理之 Client 初始化

客户端发起交易之后,最终调用 Lotus(Client)API 的 ClientStartDeal 方法对交易进行处理。这个方法最终调用 Client 对象的 ProposeStorageDeal 方法处理交易:
result, err := a.SMDealClient.ProposeStorageDeal(      ctx,      params.Walle   &providerInfo,      params.Data,    // 包含了客户端的数据      dealStart,      calcDealExpiration(params.MinBlocksDuration, md, dealStart),      params.EpochPrice,      big.Zero(),      rt,      params.FastRetrieval,      params.VerifiedDeal,  )  

Client 对象的 ProposeStorageDeal 方法处理如下:

  1. 计算客户数据的 commP。
    commP, pieceSize, err := clientutils.CommP(ctx, c.pio, rt, data)  
  2. 检查数据的大小。如果不符合,则直接返回。
  3. 创建交易提案对象。
    dealProposal := market.DealProposal{      PieceCID:             commP,      PieceSize:            pieceSize.Padded(),      Client:               addr,      Provider:             info.Address,      StartEpoch:           startEpoch,      EndEpoch:             endEpoch,      StoragePricePerEpoch: price,      ProviderCollateral:   abi.NewTokenAmount(int64(pieceSize)), // TODO: real calc      ClientCollateral:     big.Zero(),      VerifiedDeal:         verifiedDeal,  }  
  4. 对交易提案进行签名。
    clientDealProposal, err := c.node.SignProposal(ctx, addr, dealProposal)  
  5. 把交易提案转化为 ipld 节点对象。
    proposalNd, err := cborutil.AsIpld(clientDealProposal)  
  6. 创建客户端交易对象。
    deal := &storagemarket.ClientDeal{      ProposalCid:        proposalNd.Cid(),      ClientDealProposal: *clientDealProposal,      State:              storagemarket.StorageDealUnknown,      Miner:              info.PeerID,      MinerWorker:        info.Worker,      DataRef:            data,      FastRetrieval:      fastRetrieval,  }  
  7. 调用 fsm 状态组的 Begin 的方法,生成一个状态机,并开始跟踪客户端交易对象。
    err = c.statemachines.Begin(proposalNd.Cid(), deal)  
  8. 向 fsm 状态组发送 ClientEventOpen 事件。
    err = c.statemachines.Send(deal.ProposalCid, storagemarket.ClientEventOpen)  

    当状态机收到这个事件后,因为初始状态为默认值 0,即 StorageDealUnknown,事件处理器对象经过内部处理把状态从 StorageDealUnknown 修改为 StorageDealEnsureClientFunds,从而调用其处理函数 EnsureClientFunds 确定是否接收交易。

  9. 返回结果。
    return &storagemarket.ProposeStorageDealResult{          ProposalCid: deal.ProposalCid,      }, c.discovery.AddPeer(data.Root, retrievalmarket.RetrievalPeer{          Address:  dealProposal.Provider,          ID:       deal.Miner,          PieceCID: &commP,      })  


1、`EnsureClientFunds` 函数

这个函数用来确认客户端有足够的资金来开始交易提案。

这个函数的执行如下:

  1. 获取客户适配器对象。
    node := environment.Node()  
  2. 获取区块链最顶端 tipset 对应的 tipset key 和 tipset 的高度。
    tok, _, err := node.GetChainHead(ctx.Context())  
  3. 确认客户端有足够多的资金来进行交易。
    mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, deal.Proposal.ClientBalanceRequirement(), tok)  

    EnsureFunds 方法在确认客户资金过程中,会调用 market Actor 对象的 AddBalance 方法进行处理。

  4. 如果客户有足够多的资金,则调用 fsm 上下文对象的 Trigger 方法,通过事件处理器生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ClientEventFundsEnsured
    if mcid == cid.Undef {      return ctx.Trigger(storagemarket.ClientEventFundsEnsured)  }  

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealEnsureClientFunds 修改为 StorageDealFundsEnsured,从而调用其处理函数 ProposeDeal 处理提议的交易。

2、`ProposeDeal` 函数

这个函数处理提议的交易,把交易发送给 provider 对象。

  1. 生成交易提议对象。
    proposal := network.Proposal{      DealProposal:  &deal.ClientDealProposal,      Piece:         deal.DataRef,      FastRetrieval: deal.FastRetrieval,  }  
  2. 创建一个到矿工的交易流。
    s, err := environment.NewDealStream(ctx.Context(), deal.Miner)  

    客户环境对象的 NewDealStream 方法直接底层网络对象(libp2pStorageMarketNetwork)的同名方法进行处理。后者以指定的协议创建到指定矿工的流,并返回包装之后的流对象。包装之后的流对象是 dealStream 对象(storagemarket/network/deal_stream.go)。

    当存储矿工收到这个请求后,使用自身的 handleNewDealStream 方法处理处理这个流。在这个方法中对流进行简单包装,然后调用矿工对象的 HandleDealStream 方法处理客户交易。

  3. 通过流把交易提案提交到矿工。
    err := s.WriteDealProposal(proposal);  

    这个过程会把交易提案对象转化为 cbor 编码的对象。

  4. 读取流返回的响应对象。
    resp, err := s.ReadDealResponse()  

    等待矿工确定是否接收交易。结果是矿工签名的消息。内容如下:

    network.Response{      State:    storagemarket.StorageDealWaitingForData,      Proposal: deal.ProposalCid,  }  
  5. 关闭流。
    err = s.Close()  
  6. 获取当前链顶部相关信息。
    tok, _, err := environment.Node().GetChainHead(ctx.Context())  
  7. 验证矿工返回的响应。
    err := clientutils.VerifyResponse(ctx.Context(), resp, deal.MinerWorker, tok, environment.Node().VerifySignature);  
  8. 如果响应状态不是等待数据,则发送 ClientEventUnexpectedDealState 事件。
    if resp.Response.State != storagemarket.StorageDealWaitingForData {      return ctx.Trigger(storagemarket.ClientEventUnexpectedDealState, resp.Response.State, resp.Response.Message)  }  
  9. 发送初始化数据传输事件。
    return ctx.Trigger(storagemarket.ClientEventInitiateDataTransfer)  

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealFundsEnsured 修改为 StorageDealStartDataTransfer,从而调用其处理函数 InitiateDataTransfer 处理提议的交易。

3、`InitiateDataTransfer` 函数

初始化数据传输。

  1. 检查传输类型。
    if deal.DataRef.TransferType == storagemarket.TTManual {      log.Infof("manual data transfer for deal %s", deal.ProposalCid)      return ctx.Trigger(storagemarket.ClientEventDataTransferComplete)  }  
  2. 推送数据到 miner。
    err := environment.StartDataTransfer(ctx.Context(),      deal.Miner,      &requestvalidation.StorageDataTransferVoucher{Proposal: deal.ProposalCid},      deal.DataRef.Root,      shared.AllSelector(),  )  

    环境对象的 StartDataTransfer 方法内容如下:

    _, err := c.c.dataTransfer.OpenPushDataChannel(ctx, to, voucher, baseCid, selector)  return err  

    dataTransfer 对象在初始 Client 对象时传递进来的,默认为 manager 对象(go-data-transfer 类库 impl/impl.go)。

  3. 如果有错误,则发送错误事件。
    if err != nil {      return ctx.Trigger(storagemarket.ClientEventDataTransferFailed, xerrors.Errorf("failed to open push data channel: %w", err))  }  
  4. 正常情况下,发送传输初始化完毕事件。
    return ctx.Trigger(storagemarket.ClientEventDataTransferInitiated)  

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealStartDataTransfer 修改为 StorageDealTransferring,因为这个状态没有任何处理函数,所有直接返回。

    因为上面已经打开通道进行数据传输,所以状态机一直等待数据传输。在正常情况下,当数据传输完成之后,发送 ClientEventDataTransferComplete 事件。

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealTransferring 修改为 StorageDealCheckForAcceptance,从而调用其处理函数 CheckForDealAcceptance 处理交易。

5、`CheckForDealAcceptance`

这个函数用来检查矿工对数据验证的结果,并根据结果进行不同的。

  1. 调用环境对象的 GetProviderDealState 方法,获取 provider 对交易的处理。
    dealState, err := environment.GetProviderDealState(ctx.Context(), deal.ProposalCid)  

    环境对象的这个方法直接调用 Client 对象的 GetProviderDealState 方法进行处理。后者创建一个交易状态的流,然后向矿工进行请求,并处理结果。

    Provider 对象使用自身的 HandleDealStatusStream 方法对这个流进行处理。它会从状态机中获取交易对象,并把交易对象通过流返回到这里。

  2. 如果交易对象状态为错误,则发送被拒事件。
    if isFailed(dealState.State) {      return ctx.Trigger(storagemarket.ClientEventDealRejected, dealState.State, dealState.Message)  }  
  3. 如果状态为已接收,则发送接收事件。
    if isAccepted(dealState.State) {      if *dealState.ProposalCid != deal.ProposalCid {          return ctx.Trigger(storagemarket.ClientEventResponseDealDidNotMatch, *dealState.ProposalCid, deal.ProposalCid)      }
    return ctx.Trigger(storagemarket.ClientEventDealAccepted, dealState.PublishCid)  
    }  

    当状态机收到 ClientEventDealAccepted 这个事件后,经过事件处理器把状态从 StorageDealCheckForAcceptance 修改为 StorageDealProposalAccepted,从而调用其处理函数 ValidateDealPublished 处理交易。

  4. 如果状态不是前面需要的状态,则一直等待矿工处理交易。
    return waitAgain(ctx, environment, false)  

    waitAgain 函数内容如下:

    func waitAgain(ctx fsm.Context, environment ClientDealEnvironment, pollError bool) error {      t := time.NewTimer(environment.PollingInterval())
    go func() {      select {      case <-t.C:          _ = ctx.Trigger(storagemarket.ClientEventWaitForDealState, pollError)      case <-ctx.Context().Done():          t.Stop()          return      }  }()

    return nil

    }  

6、`ValidateDealPublished` 函数

这个函数用来确认交易信息已经被发布到链上。

  1. 调用 Lotus Client 适配器验证交易已经被发布到链上。
    dealID, err := environment.Node().ValidatePublishedDeal(ctx.Context(), deal)

    if err != nil {     return ctx.Trigger(storagemarket.ClientEventDealPublishFailed, err) }

    Lotus Client 适配器的这个方法等待交易信息被发布到链上,默认等待5个区块确认。

  2. 发送交易已被发布到链上事件。
    return ctx.Trigger(storagemarket.ClientEventDealPublished, dealID)  

    当状态机收到 ClientEventDealPublished 这个事件后,经过事件处理器把状态从 StorageDealProposalAccepted 修改为 StorageDealSealing,从而调用其处理函数 VerifyDealActivated 处理交易。在调用函数之前,使用指定的交易 ID 修改客户交易对象为参数指定的 dealID

7、`VerifyDealActivated` 函数

这个函数用来确认交易被密封在某个扇区,并且是活跃的。

  1. 生成一个回调函数。
    cb := func(err error) {      if err != nil {          _ = ctx.Trigger(storagemarket.ClientEventDealActivationFailed, err)      } else {          _ = ctx.Trigger(storagemarket.ClientEventDealActivated)      }  }  
  2. 调用 Lotus Client 适配器的 OnDealSectorCommitted 方法,确认交易被密封在某个扇区,并且是活跃的。
    if err := environment.Node().OnDealSectorCommitted(ctx.Context(), deal.Proposal.Provider, deal.DealID, cb); err != nil {      return ctx.Trigger(storagemarket.ClientEventDealActivationFailed, err)  }  

    Lotus Client 适配器的 OnDealSectorCommitted 方法会调用状态管理器来获取扇区的相关状态,比如是否预提交上链,是否提交已经上链等。

    无论能否确认都会调用回调函数,如果确认是 Ok的,则发送 ClientEventDealActivated 事件,如果不 OK,则发送 ClientEventDealActivationFailed 事件。

    如果在确认过程中出现错误,则发送 ClientEventDealActivationFailed 事件。

    对于正常情况,当状态机收到 ClientEventDealActivated 这个事件后,经过事件处理器把状态从 StorageDealSealing 修改为 StorageDealActive,从而调用其处理函数 WaitForDealCompletion 处理交易。

  3. 如果确认成功,则直接返回。
    return nil  

8、`WaitForDealCompletion` 函数

  1. 获取 Lotus Client 适配器。
    node := environment.Node()  
  2. 生成失效回调函数。
    expiredCb := func(err error) {      if err != nil {          _ = ctx.Trigger(storagemarket.ClientEventDealCompletionFailed, xerrors.Errorf("deal expiration err: %w", err))      } else {          _ = ctx.Trigger(storagemarket.ClientEventDealExpired)      }  }  
  3. 生成惩罚回调函数。
    slashedCb := func(slashEpoch abi.ChainEpoch, err error) {      if err != nil {          _ = ctx.Trigger(storagemarket.ClientEventDealCompletionFailed, xerrors.Errorf("deal slashing err: %w", err))      } else {          _ = ctx.Trigger(storagemarket.ClientEventDealSlashed, slashEpoch)      }  }  
  4. 调用 Lotus Client 适配器的 OnDealExpiredOrSlashed 方法,当交易失效或惩罚时候通知客户端。
    if err := node.OnDealExpiredOrSlashed(ctx.Context(), deal.DealID, expiredCb, slashedCb); err != nil {      return ctx.Trigger(storagemarket.ClientEventDealCompletionFailed, err)  }  
  5. 返回。
    return nil  

本文链接:https://www.8btc.com/media/628117
转载请注明文章出处

  • 0
区块链大帝
区块链大帝

0 条评论