个人博客


Remote Procedure Call:远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束。


这里生产者作为客户端来调用,消费者作为服务端接收请求然后响应给生产者。

1、同步调用

1.1、绑定队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class RPCRabbitConfig {
@Bean
public Queue RPCQueue() {
return new Queue("RPCQueue", true, false, false);
}

@Bean
public DirectExchange RPCExchange() {
return new DirectExchange("RPCExchange", true, false);
}

@Bean
public Binding bindingRPC() {
return BindingBuilder.bind(RPCQueue()).to(RPCExchange()).with("RPC");
}
}

1.2、消费者(服务端)

1
2
3
4
5
6
7
8
9
10
@Component
@RabbitListener(queues = "RPCQueue")
@Slf4j
public class RPCReceiver {
@RabbitHandler
public String process(String message) {
log.info("接收远程调用请求消息:[{}]", message);
return "remote procedure call success!";
}
}

1.3、生产者(客户端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
@Slf4j
public class RPCController {
@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
// 同步调用设置远程调用响应超时时间,单位:毫秒
rabbitTemplate.setReplyTimeout(60000);
}

@PostMapping("/syncRPC")
public String syncRPC() {
Object response = rabbitTemplate.convertSendAndReceive("RPCExchange", "RPC", "RPC同步调用");
String respMsg = response.toString();
log.info("远程调用响应:[{}]", respMsg);
return respMsg;
}
}

可以通过setReplyTimeout(long milliseconds)函数设置超时时间。

1.4、结果

1
2
接收远程调用请求消息:[RPC同步调用]
远程调用响应:[remote procedure call success!]

2、异步调用

2.1、配置Bean

1
2
3
4
5
6
7
8
9
10
/**
* 配置AsyncRabbitTemplate SpringBoot 没有默认的AsyncRabbitTemplate注入,所以这里需要自己配置
*
* @param rabbitTemplate
* @return
*/
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) {
return new AsyncRabbitTemplate(rabbitTemplate);
}

2.2、生产者(客户端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@RestController
@Slf4j
public class RPCController {
@Autowired
private AsyncRabbitTemplate asyncRabbitTemplate;

@PostMapping("/asyncRPC")
public String asyncRPC() {
AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("RPCExchange", "RPC", "RPC异步调用");
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onFailure(Throwable throwable) {
log.error("异步调用失败", throwable);
}

@Override
public void onSuccess(Object o) {
log.info("异步调用响应:[{}}", o.toString());
}
});
return "ok";
}
}

2.3、结果

1
2
3
SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-nHw71SucAmOUHb6hGVjaZA identity=5fbed23f] started
接收远程调用请求消息:[RPC异步调用]
异步调用响应:[remote procedure call success!}

参考链接

代码地址