Stream消息驱动

微服务基本组成

Spring Cloud Stream

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

Spring Cloud Stream简介

bander

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离

binder绑定器

常用注解

stream常用注解

消息分组

微服务应用放置于同一个group中,就能保证消息只会被其中一个应用消费一次

不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个会被消费

可以避免消息丢失

stream消息分组

使用

生产者

pom

1
2
3
4
5
<!-- 调用rabbitMQ,引入这个 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
server:
port: 8801

spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址

service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface IMessageProvider   {
String send();
}

// 实现类
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.support.MessageBuilder;

import javax.annotation.Resource;

import org.springframework.cloud.stream.messaging.Source;

import java.util.UUID;
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // 消息发送管道

@Override
public String send()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}

controller

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
public class SendMessageController {

@Resource
private IMessageProvider messageProvider;

@GetMapping(value = "/sendMessage")
public String sendMessage(){
return messageProvider.send();
}

}

消费者

pom

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
server:
port: 8803

spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: atguiguA # 分组

eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8803.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址

controller

1
2
3
4
5
6
7
8
9
10
11
12
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@StreamListener(Sink.INPUT) // 定义消费者
public void input(Message<String> message)
{
// message.getPayload()获取消息内容
System.out.println("消费者2号,----->接受到的消息: "+message.getPayload());
}

相关文章

SpringCloud

服务注册与发现

SpringCloud-OpenFeign问题

SpringCloud-GateWay工具类

DockerCompose常用软件配置

SpringQuartz动态定时任务

Redis集群搭建

redis分布式锁

服务链路追踪

K8S