Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3657276
  • 博文数量: 365
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2522
  • 用 户 组: 普通用户
  • 注册时间: 2019-10-28 13:40
文章分类

全部博文(365)

文章存档

2023年(8)

2022年(130)

2021年(155)

2020年(50)

2019年(22)

我的朋友

分类: Java

2021-07-20 17:25:07

private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {

// 定义了一个Anypromise

    val promise = Promise[Any]()

    val remoteAddr = message.receiver.address

    def onFailure(e: Throwable): Unit = {

      if (!promise.tryFailure(e)) {

        e match {

          case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")

          case _ => logWarning(s"Ignored failure: $e")

        }

      }

    }

/*

这里声明的onSuccess会被填充到RpcResponseCallbackonSuccess中,这个

RpcResponseCallback就是上面【图9】中的listener,当我们从Server端获取到response

注意,获取的不是RpcFailure类型的response,则都会进入到【图9】的

else if (message instanceof RpcResponse) { 分支中

*/

    def onSuccess(reply: Any): Unit = reply match {

      case RpcFailure(e) => onFailure(e)

      case rpcReply =>

/*

当返回的responseOK的没有问题后,onSuccesscallback,这里promisetrySuccess

进行call操作,这里就是上面所说的,为了一个promise铺设了一条future,从而可以执行

这个Future的线程了

*/

        if (!promise.trySuccess(rpcReply)) {

          logWarning(s"Ignored message: $reply")

        }

    }

    try {

      if (remoteAddr == address) {

        val p = Promise[Any]()

        p.future.onComplete {

          case Success(response) => onSuccess(response)

          case Failure(e) => onFailure(e)

        }(ThreadUtils.sameThread)

        dispatcher.postLocalMessage(message, p)

      } else {

        val rpcMessage = RpcOutboxMessage(message.serialize(this),

          onFailure,

          (client, response) => **onSuccess**(deserialize[Any](client, response)))

        postToOutbox(message.receiver, rpcMessage)

/*

如果是callbackFailure,则这里会被执行

*/

        promise.future.failed.foreach {

          case _: TimeoutException => rpcMessage.onTimeout()

          case _ =>

        }(ThreadUtils.sameThread)

      }

      val timeoutCancelable = timeoutScheduler.schedule(new Runnable {

        override def run(): Unit = {

          onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +

            s"in ${timeout.duration}"))

        }

      }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)

/*

promisefuture执行后,会调用这里的onComplete方法

*/

    promise.future.onComplete { v =>

        timeoutCancelable.cancel(true)

      }(ThreadUtils.sameThread)

    } catch {

      case NonFatal(e) =>

        onFailure(e)

    }

/*

利用RpcTimeout中的addMessageIfTimeout的偏函数再去模式匹配一下产生的Throwable内容

如果是RpcTimeoutException 则 直接throw这个ex

如果是TimeoutException 则包装成RpcTimeoutException后再throw出去

*/

    promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)

  }

阅读(1239) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~