个人博客
Remote Procedure Call:
远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束。
这里生产者作为客户端来调用,消费者作为服务端接收请求然后响应给生产者。
1、同步调用
1.1、绑定队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Configuration public class RPCRabbitConfig { @Bean public Queue RPCQueue() { return new Queue("RPCQueue", true, false, false); }
@Bean public DirectExchange RPCExchange() { return new DirectExchange("RPCExchange", true, false); }
@Bean public Binding bindingRPC() { return BindingBuilder.bind(RPCQueue()).to(RPCExchange()).with("RPC"); } }
|
1.2、消费者(服务端)
1 2 3 4 5 6 7 8 9 10
| @Component @RabbitListener(queues = "RPCQueue") @Slf4j public class RPCReceiver { @RabbitHandler public String process(String message) { log.info("接收远程调用请求消息:[{}]", message); return "remote procedure call success!"; } }
|
1.3、生产者(客户端)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @RestController @Slf4j public class RPCController { @Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setReplyTimeout(60000); }
@PostMapping("/syncRPC") public String syncRPC() { Object response = rabbitTemplate.convertSendAndReceive("RPCExchange", "RPC", "RPC同步调用"); String respMsg = response.toString(); log.info("远程调用响应:[{}]", respMsg); return respMsg; } }
|
可以通过setReplyTimeout(long milliseconds)
函数设置超时时间。
1.4、结果
1 2
| 接收远程调用请求消息:[RPC同步调用] 远程调用响应:[remote procedure call success!]
|
2、异步调用
2.1、配置Bean
1 2 3 4 5 6 7 8 9 10
|
@Bean public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) { return new AsyncRabbitTemplate(rabbitTemplate); }
|
2.2、生产者(客户端)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @RestController @Slf4j public class RPCController { @Autowired private AsyncRabbitTemplate asyncRabbitTemplate;
@PostMapping("/asyncRPC") public String asyncRPC() { AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("RPCExchange", "RPC", "RPC异步调用"); future.addCallback(new ListenableFutureCallback<Object>() { @Override public void onFailure(Throwable throwable) { log.error("异步调用失败", throwable); }
@Override public void onSuccess(Object o) { log.info("异步调用响应:[{}}", o.toString()); } }); return "ok"; } }
|
2.3、结果
1 2 3
| SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-nHw71SucAmOUHb6hGVjaZA identity=5fbed23f] started 接收远程调用请求消息:[RPC异步调用] 异步调用响应:[remote procedure call success!}
|
参考链接
代码地址