SpringCloudStream-下
Spring Cloud Stream
RabbitMQ和 Kafka
RabbitMQ:AMQP、JMS 规范
Kafka : 相对松散的消息队列协议
《企业整合模式》: Enterprise Integration Patterns
基本概念
Source:来源,近义词:Producer、Publisher
Sink:接收器,近义词:Consumer、Subscriber
Processor:对于上流而言是 Sink,对于下流而言是 Source
Reactive Streams :
- Publisher
- Subscriber
- Processor
Spring Cloud Stream Binder Kafka
继续沿用 spring-cloud-stream-kakfa-demo 工程
- 启动 Zookeeper
- 启动 Kafka
消息大致分为两个部分:
消息头(Headers)
消息体(Body/Payload)
实现步骤
maven 依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>定义 Producer,绑定 Source
@Component @EnableBinding({Source.class}) public class MessageProducerBean { @Autowired @Qualifier(Source.OUTPUT) // Bean名称 private MessageChannel messageChannel; public void send(String message) { messageChannel.send(MessageBuilder.withPayload(message).build()); } }定义 Consumer,绑定 Sink,实现监听
注:也可以不定义 consumer,使用 spring kafka 的 cosumer 进行监听。
@Component @EnableBinding(Sink.class) public class MessageConsumerBean { @Autowired @Qualifier(Sink.INPUT) SubscribableChannel subscribableChannel; @Autowired private Sink sink; @PostConstruct public void init() { subscribableChannel.subscribe(new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.err.println("subscribe channel receive: "+message.getPayload()); } }); } }配置 spring-kafka-binder
这里这里不能有 kafka 的序列化配置
kafka.topic = gupao spring.kafka.consumer.group-id=gupao-1 # 定义 Spring Cloud Stream Source 消息去向 # 针对 Kafka 而言,基本模式如下 # spring.cloud.stream.bindings.${output-name}.destination=${kafka.topic} spring.cloud.stream.bindings.output.destination=${kafka.topic} spring.cloud.stream.bindings.input.destination=${kafka.topic}在 web 中调用
@RestController public class KafkaProducerController { private final KafkaTemplate<String, String> kafkaTemplate; private final MessageProducerBean messageProducerBean; private final String topic; @Autowired public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate, MessageProducerBean messageProducerBean, @Value("${kafka.topic}") String topic) { this.kafkaTemplate = kafkaTemplate; this.messageProducerBean = messageProducerBean; this.topic = topic; } @PostMapping("/message/send") public boolean sendMessageByTemplate(@RequestParam String message) { kafkaTemplate.send(topic, message); return true; } @GetMapping("/message/send") public boolean sendMessageByBinder(@RequestParam String message) { messageProducerBean.send(message); return true; } }访问测试
访问地址:
- get: http://http://localhost:8080/message/send?message=gupao
结果:

自定义标准消息发送源
步骤:
自定义消息源 MessageSource
public interface MessageSource { /** * 消息来源的管道名称:"gupao" */ String OUTPUT = "gupao"; @Output(OUTPUT) MessageChannel gupao(); }在 producer 中使用自定义消息源
@Component @EnableBinding({Source.class, MessageSource.class}) public class MessageProducerBean { @Autowired @Qualifier(Source.OUTPUT) // Bean名称 private MessageChannel messageChannel; @Autowired @Qualifier(MessageSource.OUTPUT) private MessageChannel gupaoMessageChannel; public void send(String message) { messageChannel.send(MessageBuilder.withPayload(message).build()); } public void sendToGupao(String message) { gupaoMessageChannel.send(MessageBuilder.withPayload(message).build()); } }在 application.properties 中配置消息源
# 自定义消息源 gupao spring.cloud.stream.bindings.gupao.destination=test
Sink 监听的三种方式
先绑定 Sink
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT) // Bean 名称
private SubscribableChannel subscribableChannel;
@Autowired
private Sink sink;
}
三种方式:
通过
SubscribableChannel订阅消息// 当字段注入完成后的回调 @PostConstruct public void init() { subscribableChannel.subscribe(new MessageHandler() { // 实现异步回调 @Override public void handleMessage(Message<?> message) throws MessagingException { System.err.println("subscribe channel receive: "+message.getPayload()); } }); }通过
@ServiceActivator订阅消息//通过@ServiceActivator @ServiceActivator(inputChannel = Sink.INPUT) public void onMessage(Object message) { System.err.println("service activator receive: " + message); }通过
@StreamListener订阅消息@StreamListener(Sink.INPUT) public void onMessage(String message) { System.err.println("stream listener receive: " + message); }
如果同时配置了三种监听

则会轮询接收消息

Spring Cloud Stream Binder RabbitMQ
实现步骤
maven 依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>消息源
@Component @EnableBinding({Source.class}) public class MessageProducerBean { @Autowired @Qualifier(Source.OUTPUT) // Bean名称 private MessageChannel messageChannel; public void send(String message) { messageChannel.send(MessageBuilder.withPayload(message).build()); } }消息监听
@Component @EnableBinding(Sink.class) public class MessageConsumerBean { @Autowired @Qualifier(Sink.INPUT) SubscribableChannel subscribableChannel; @Autowired private Sink sink; @PostConstruct public void init() { subscribableChannel.subscribe(new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.err.println("subscribe channel receive: "+message.getPayload()); } }); } @ServiceActivator(inputChannel = Sink.INPUT) public void onMessage(Object message) { System.err.println("service activator receive: " + message); } @StreamListener(Sink.INPUT) public void onMessage(String message) { System.err.println("stream listener receive: " + message); } }rabbitmq 配置
rabbitmq.topic = gupao # spring rabbitmq 配置 spring.rabbitmq.addresses=172.16.11.125:5672 # 定义 Spring Cloud Stream Source 消息去向 # 针对 rabbitmq 而言,基本模式如下 # spring.cloud.stream.bindings.${output-name}.destination=${kafka.topic} spring.cloud.stream.bindings.output.destination=${rabbitmq.topic} spring.cloud.stream.bindings.input.destination=${rabbitmq.topic}web 调用
@RestController public class RabbitMQProducerController { private final MessageProducerBean messageProducerBean; @Autowired public RabbitMQProducerController(MessageProducerBean messageProducerBean) { this.messageProducerBean = messageProducerBean; } @GetMapping("/message/send") public boolean sendMessageByBinder(@RequestParam String message) { messageProducerBean.send(message); return true; } }效果测试

可以看到 sprin-cloud-stream-binder-rabbitmq 和 spring-cloud-stream-binder-kafka 很相似,如果要从 kafka 切换到 rabbitmq,主要修改配置和调用就行;source 和 sink 这些已经通用化。
问答部分
@EnableBinding有什么用?答:
@EnableBinding将Source、Sink以及Processor提升成相应的代理@Autorwired Source source这种写法是默认用官方的实现?
答:是官方的实现
这么多消息框架 各自优点是什么 怎么选取
答:RabbitMQ:AMQP、JMS 规范
Kafka : 相对松散的消息队列协议
ActiveMQ:AMQP、JMS 规范
AMQP v1.0 support
MQTT v3.1 support allowing for connections in an IoT environment.https://content.pivotal.io/rabbitmq/understanding-when-to-use-rabbitmq-or-apache-kafka
如果中间件如果有问题怎么办,我们只管用,不用维护吗。现在遇到的很多问题不是使用,而是维护,中间件一有问题,消息堵塞或丢失都傻眼了,都只有一键重启
答:消息中间件无法保证不丢消息,多数高一致性的消息背后还是有持久化的。
@EnableBinder, @EnableZuulProxy,@EnableDiscoverClient这些注解都是通过特定BeanPostProcessor实现的吗?
答:不完全对,主要处理接口在
@Import:ImportSelector实现类ImportBeanDefinitionRegistrar实现类@Configuration标注类BeanPostProcessor实现类
我对流式处理还是懵懵的 到底啥是流式处理 怎样才能称为流式处理 一般应用在什么场景?
答:Stream 处理简单地说,异步处理,消息是一种处理方式。
提交申请,机器生成,对于高密度提交任务,多数场景采用异步处理,Stream、Event-Driven。举例说明:审核流程,鉴别黄图。
如果是大量消息 怎么快速消费 用多线程吗?
答:确实是使用多线程,不过不一定奏效。依赖于处理具体内容,比如:一个线程使用了
25% CPU,四个线程就将CPU 耗尽。因此,并发 100个处理,实际上,还是 4个线程在处理。I/O 密集型、CPU 密集型。
如果是大量消息 怎么快速消费 用多线程吗
答:大多数是多线程,其实也单线程,流式非阻塞。
购物车的价格计算可以使用流式计算来处理么?能说下思路么?有没有什么高性能的方式推荐?
答:当商品添加到购物车的时候,就可以开始计算了。