个人博客
Remote Procedure Call:远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束。

这里生产者作为客户端来调用,消费者作为服务端接收请求然后响应给生产者。
1、同步调用
1.1、绑定队列
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | @Configurationpublic class RPCRabbitConfig {
 @Bean
 public Queue RPCQueue() {
 return new Queue("RPCQueue", true, false, false);
 }
 
 @Bean
 public DirectExchange RPCExchange() {
 return new DirectExchange("RPCExchange", true, false);
 }
 
 @Bean
 public Binding bindingRPC() {
 return BindingBuilder.bind(RPCQueue()).to(RPCExchange()).with("RPC");
 }
 }
 
 | 
1.2、消费者(服务端)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | @Component@RabbitListener(queues = "RPCQueue")
 @Slf4j
 public class RPCReceiver {
 @RabbitHandler
 public String process(String message) {
 log.info("接收远程调用请求消息:[{}]", message);
 return "remote procedure call success!";
 }
 }
 
 | 
1.3、生产者(客户端)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 
 | @RestController@Slf4j
 public class RPCController {
 @Autowired
 private RabbitTemplate rabbitTemplate;
 
 @PostConstruct
 public void init() {
 
 rabbitTemplate.setReplyTimeout(60000);
 }
 
 @PostMapping("/syncRPC")
 public String syncRPC() {
 Object response = rabbitTemplate.convertSendAndReceive("RPCExchange", "RPC", "RPC同步调用");
 String respMsg = response.toString();
 log.info("远程调用响应:[{}]", respMsg);
 return respMsg;
 }
 }
 
 | 
可以通过setReplyTimeout(long milliseconds)函数设置超时时间。
1.4、结果
| 12
 
 | 接收远程调用请求消息:[RPC同步调用]远程调用响应:[remote procedure call success!]
 
 | 
2、异步调用
2.1、配置Bean
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | 
 
 
 
 
 @Bean
 public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) {
 return new AsyncRabbitTemplate(rabbitTemplate);
 }
 
 | 
2.2、生产者(客户端)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 
 | @RestController@Slf4j
 public class RPCController {
 @Autowired
 private AsyncRabbitTemplate asyncRabbitTemplate;
 
 @PostMapping("/asyncRPC")
 public String asyncRPC() {
 AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("RPCExchange", "RPC", "RPC异步调用");
 future.addCallback(new ListenableFutureCallback<Object>() {
 @Override
 public void onFailure(Throwable throwable) {
 log.error("异步调用失败", throwable);
 }
 
 @Override
 public void onSuccess(Object o) {
 log.info("异步调用响应:[{}}", o.toString());
 }
 });
 return "ok";
 }
 }
 
 | 
2.3、结果
| 12
 3
 
 | SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-nHw71SucAmOUHb6hGVjaZA identity=5fbed23f] started接收远程调用请求消息:[RPC异步调用]
 异步调用响应:[remote procedure call success!}
 
 | 
参考链接
代码地址