跳至主要內容

SpringCloudStream-下

soulballad微服务SpringCloud NetfilxSpringCloud约 1494 字大约 5 分钟

Spring Cloud Streamopen in new window

RabbitMQ和 Kafka

RabbitMQ:AMQP、JMS 规范

Kafka : 相对松散的消息队列协议

《企业整合模式》: Enterprise Integration Patternsopen in new window

基本概念

Source:来源,近义词:Producer、Publisher

Sink:接收器,近义词:Consumer、Subscriber

Processor:对于上流而言是 Sink,对于下流而言是 Source

Reactive Streams :

  • Publisher
  • Subscriber
  • Processor

Spring Cloud Stream Binder Kafka

继续沿用 spring-cloud-stream-kakfa-demo 工程

  • 启动 Zookeeper
  • 启动 Kafka

消息大致分为两个部分:

  • 消息头(Headers)

  • 消息体(Body/Payload)

实现步骤

  1. maven 依赖

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    
  2. 定义 Producer,绑定 Source

    @Component
    @EnableBinding({Source.class})
    public class MessageProducerBean {
    
        @Autowired
        @Qualifier(Source.OUTPUT) // Bean名称
        private MessageChannel messageChannel;
    
        public void send(String message) {
            messageChannel.send(MessageBuilder.withPayload(message).build());
        }
    }
    
  3. 定义 Consumer,绑定 Sink,实现监听

    注:也可以不定义 consumer,使用 spring kafka 的 cosumer 进行监听。

    @Component
    @EnableBinding(Sink.class)
    public class MessageConsumerBean {
    
        @Autowired
        @Qualifier(Sink.INPUT)
        SubscribableChannel subscribableChannel;
    
        @Autowired
        private Sink sink;
    
        @PostConstruct
        public void init() {
            subscribableChannel.subscribe(new MessageHandler() {
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
                    System.err.println("subscribe channel receive: "+message.getPayload());
                }
            });
        }
    }
    
  4. 配置 spring-kafka-binder

    这里这里不能有 kafka 的序列化配置

    kafka.topic = gupao
    
    spring.kafka.consumer.group-id=gupao-1
    
    # 定义 Spring Cloud Stream Source 消息去向
    # 针对 Kafka 而言,基本模式如下
    # spring.cloud.stream.bindings.${output-name}.destination=${kafka.topic}
    spring.cloud.stream.bindings.output.destination=${kafka.topic}
    
    spring.cloud.stream.bindings.input.destination=${kafka.topic}
    
  5. 在 web 中调用

    @RestController
    public class KafkaProducerController {
    
        private final KafkaTemplate<String, String> kafkaTemplate;
        private final MessageProducerBean messageProducerBean;
        private final String topic;
    
        @Autowired
        public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
                                       MessageProducerBean messageProducerBean,
                                       @Value("${kafka.topic}") String topic) {
            this.kafkaTemplate = kafkaTemplate;
            this.messageProducerBean = messageProducerBean;
            this.topic = topic;
        }
    
        @PostMapping("/message/send")
        public boolean sendMessageByTemplate(@RequestParam String message) {
            kafkaTemplate.send(topic, message);
            return true;
        }
    
        @GetMapping("/message/send")
        public boolean sendMessageByBinder(@RequestParam String message) {
            messageProducerBean.send(message);
            return true;
        }
    }
    
  6. 访问测试

    • 访问地址:

      • get: http://http://localhost:8080/message/send?message=gupao
    • 结果:

      1570438178453

自定义标准消息发送源

步骤:

  1. 自定义消息源 MessageSource

    public interface MessageSource {
    
        /**
         * 消息来源的管道名称:"gupao"
         */
        String OUTPUT = "gupao";
    
        @Output(OUTPUT)
        MessageChannel gupao();
    }
    
  2. 在 producer 中使用自定义消息源

    @Component
    @EnableBinding({Source.class, MessageSource.class})
    public class MessageProducerBean {
    
        @Autowired
        @Qualifier(Source.OUTPUT) // Bean名称
        private MessageChannel messageChannel;
    
        @Autowired
        @Qualifier(MessageSource.OUTPUT)
        private MessageChannel gupaoMessageChannel;
    
        public void send(String message) {
            messageChannel.send(MessageBuilder.withPayload(message).build());
        }
    
        public void sendToGupao(String message) {
            gupaoMessageChannel.send(MessageBuilder.withPayload(message).build());
        }
    }
    
  3. 在 application.properties 中配置消息源

    # 自定义消息源 gupao
    spring.cloud.stream.bindings.gupao.destination=test
    

Sink 监听的三种方式

先绑定 Sink

@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {

    @Autowired
    @Qualifier(Sink.INPUT) // Bean 名称
    private SubscribableChannel subscribableChannel;

    @Autowired
    private Sink sink;
}

三种方式:

  • 通过 SubscribableChannel 订阅消息

    // 当字段注入完成后的回调
    @PostConstruct
    public void init() {
        subscribableChannel.subscribe(new MessageHandler() {
            // 实现异步回调
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.err.println("subscribe channel receive: "+message.getPayload());
            }
        });
    }
    
  • 通过 @ServiceActivator 订阅消息

    //通过@ServiceActivator
    @ServiceActivator(inputChannel = Sink.INPUT)
    public void onMessage(Object message) {
        System.err.println("service activator receive: " + message);
    }
    
  • 通过 @StreamListener 订阅消息

    @StreamListener(Sink.INPUT)
    public void onMessage(String message) {
        System.err.println("stream listener receive: " + message);
    }
    

如果同时配置了三种监听

1570438781385

则会轮询接收消息

1570438829455

Spring Cloud Stream Binder RabbitMQ

实现步骤

  1. maven 依赖

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
    
  2. 消息源

    @Component
    @EnableBinding({Source.class})
    public class MessageProducerBean {
    
        @Autowired
        @Qualifier(Source.OUTPUT) // Bean名称
        private MessageChannel messageChannel;
    
        public void send(String message) {
            messageChannel.send(MessageBuilder.withPayload(message).build());
        }
    }
    
  3. 消息监听

    @Component
    @EnableBinding(Sink.class)
    public class MessageConsumerBean {
    
        @Autowired
        @Qualifier(Sink.INPUT)
        SubscribableChannel subscribableChannel;
    
        @Autowired
        private Sink sink;
    
        @PostConstruct
        public void init() {
            subscribableChannel.subscribe(new MessageHandler() {
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
                    System.err.println("subscribe channel receive: "+message.getPayload());
                }
            });
        }
    
        @ServiceActivator(inputChannel = Sink.INPUT)
        public void onMessage(Object message) {
            System.err.println("service activator receive: " + message);
        }
    
        @StreamListener(Sink.INPUT)
        public void onMessage(String message) {
            System.err.println("stream listener receive: " + message);
        }
    }
    
  4. rabbitmq 配置

    rabbitmq.topic = gupao
    # spring rabbitmq 配置
    spring.rabbitmq.addresses=172.16.11.125:5672
    
    # 定义 Spring Cloud Stream Source 消息去向
    # 针对 rabbitmq 而言,基本模式如下
    # spring.cloud.stream.bindings.${output-name}.destination=${kafka.topic}
    spring.cloud.stream.bindings.output.destination=${rabbitmq.topic}
    spring.cloud.stream.bindings.input.destination=${rabbitmq.topic}
    
  5. web 调用

    @RestController
    public class RabbitMQProducerController {
    
        private final MessageProducerBean messageProducerBean;
    
        @Autowired
        public RabbitMQProducerController(MessageProducerBean messageProducerBean) {
            this.messageProducerBean = messageProducerBean;
        }
    
        @GetMapping("/message/send")
        public boolean sendMessageByBinder(@RequestParam String message) {
            messageProducerBean.send(message);
            return true;
        }
    }
    
  6. 效果测试

    1570439270286

可以看到 sprin-cloud-stream-binder-rabbitmqspring-cloud-stream-binder-kafka 很相似,如果要从 kafka 切换到 rabbitmq,主要修改配置和调用就行;source 和 sink 这些已经通用化。

问答部分

  • @EnableBinding 有什么用?

    答:@EnableBindingSourceSink 以及 Processor 提升成相应的代理

  • @Autorwired Source source这种写法是默认用官方的实现?

    答:是官方的实现

  • 这么多消息框架 各自优点是什么 怎么选取

    答:RabbitMQ:AMQP、JMS 规范

    Kafka : 相对松散的消息队列协议

    ActiveMQ:AMQP、JMS 规范

    ​ AMQP v1.0 support
    ​ MQTT v3.1 support allowing for connections in an IoT environment.

    https://content.pivotal.io/rabbitmq/understanding-when-to-use-rabbitmq-or-apache-kafka

  • 如果中间件如果有问题怎么办,我们只管用,不用维护吗。现在遇到的很多问题不是使用,而是维护,中间件一有问题,消息堵塞或丢失都傻眼了,都只有一键重启

    答:消息中间件无法保证不丢消息,多数高一致性的消息背后还是有持久化的。

  • @EnableBinder, @EnableZuulProxy,@EnableDiscoverClient这些注解都是通过特定BeanPostProcessor实现的吗?

    答:不完全对,主要处理接口在@Import:

    • ImportSelector 实现类
    • ImportBeanDefinitionRegistrar 实现类
    • @Configuration 标注类
    • BeanPostProcessor 实现类
  • 我对流式处理还是懵懵的 到底啥是流式处理 怎样才能称为流式处理 一般应用在什么场景?

    答:Stream 处理简单地说,异步处理,消息是一种处理方式。

    提交申请,机器生成,对于高密度提交任务,多数场景采用异步处理,Stream、Event-Driven。举例说明:审核流程,鉴别黄图。

  • 如果是大量消息 怎么快速消费 用多线程吗?

    答:确实是使用多线程,不过不一定奏效。依赖于处理具体内容,比如:一个线程使用了

    25% CPU,四个线程就将CPU 耗尽。因此,并发 100个处理,实际上,还是 4个线程在处理。I/O 密集型、CPU 密集型。

  • 如果是大量消息 怎么快速消费 用多线程吗

    答:大多数是多线程,其实也单线程,流式非阻塞。

  • 购物车的价格计算可以使用流式计算来处理么?能说下思路么?有没有什么高性能的方式推荐?

    答:当商品添加到购物车的时候,就可以开始计算了。

上次编辑于:
贡献者: soulballad