当前位置: 首页 > 开发知识 >

使用Spring Cloud Stream构建和测试消息驱动的微服务

作者:游戏app开发公司 阅读: 发布时间:2024-08-20 10:00

摘要:背景Spring Boot和Spring Cloud为我们提供了一个使用不同通信方式快速构建微服务方式。...

背景

Spring Boot和Spring Cloud为我们提供了一个使用不同通信方式快速构建微服务方式。我们可以基于Spring Cloud Netflix库创建同步REST微服务,可以创建使用Spring WebFlux在Netty上部署的异步,反应式微服务,并将其与Spring Cloud库成功结合,可以使用Spring Cloud Stream和Apache Kafka或RabbitMQ等消息代理,基于发布/订阅模型来实现消息驱动的微服务。我们演示基于RabbitMQ代理有效地构建,扩展,运行和测试消息传递微服务。

_消息驱动设计_微服务领域驱动设计

架构

采用Spring Cloud Stream的功能,设计一个示例系统,该系统使用发布/订阅模型进行服务间通信。它提供三种微服务:订购服务,产品服务和帐户服务。应用程序订单服务公开了一个HTTP端点,该端点负责处理发送到我们系统的订单。所有传入的订单都被异步处理-订单服务准备并向RabbitMQ交换发送消息,然后响应调用客户端该请求已被接受进行处理。应用程序帐户服务和产品服务正在侦听传入交换机的订单消息。微服务帐户服务负责检查客户帐户中是否有足够的资金来实现订单,然后从该帐户中提取现金。微服务产品服务检查商店中是否有足够数量的产品,并在处理订单后更改可用产品的数量。帐户服务和产品服务都通过RabbitMQ交换(这是使用直接交换的一对一通信)发送处于运行状态的异步响应。收到响应消息后,微服务订单服务会设置订单的适当状态,并通过REST端点GET / order / {id}向外部客户端公开。

启用Spring Cloud Stream

建议在项目中包含Spring Cloud Stream的方法是使用依赖项管理系统。 Spring Cloud Stream具有相对于整个Spring Cloud框架的独立发行培训管理。 但是,如果我们在依赖管理内部的Elmhurst.RELEASE版本中声明了spring-cloud-dependencies部分,我们不必在pom.xml中声明其他任何内容。 如果您只想使用Spring Cloud Stream项目,则应定义以下部分。


 
 
 org.springframework.cloud
 spring-cloud-stream-dependencies
 Elmhurst.RELEASE
 pom
 import
 
 

下一步是将spring-cloud-stream工件添加到项目依赖项中。 我还建议您至少包括spring-cloud-sleuth库,以提供与定单服务传入的源请求具有相同traceId的发送消息。


 org.springframework.cloud
 spring-cloud-stream


 org.springframework.cloud
 spring-cloud-sleuth

Spring Cloud Stream编程模型

要为您的应用程序启用到消息代理的连接,请使用@EnableBinding注释主类。 @EnableBinding批注将一个或多个接口作为参数。 您可以在Spring Cloud Stream提供的三个接口之间进行选择:

这是启用Spring Cloud Stream绑定的订单服务的主要类别。

@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
 public static void main(String[] args) {
 new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
 }
}

添加消息代理

在Spring Cloud Stream术语中,负责与特定消息代理集成的实现称为绑定程序。 默认情况下,Spring Cloud Stream为Kafka和RabbitMQ提供绑定实现。 它能够自动检测和使用在类路径上找到的绑定。 任何特定于中间件的设置都可以通过Spring Boot支持的形式的外部配置属性来覆盖,例如应用程序参数,环境变量或只是application.yml文件。 为了包括对RabbitMQ(本文中用作消息代理)的支持,您应将以下依赖项添加到项目中。


 org.springframework.cloud
 spring-cloud-starter-stream-rabbit

现在,我们的应用程序需要连接一个RabbitMQ代理的共享实例。 这就是为什么我在默认5672端口上运行RabbitMQ暴露在外部的Docker映像的原因。 它还会启动Web仪表板,其地址为:15672。

docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

我们需要通过将属性spring.rabbitmq.host设置为Docker计算机IP 192.168.99.100,来为每个Spring Boot应用程序覆盖RabbitMQ的默认地址。

消息驱动设计_微服务领域驱动设计_

spring:
 rabbitmq:
 host: 192.168.99.100
 port: 5672

实现消息驱动的微服务

Spring Cloud Stream构建在Spring Integration项目之上。 Spring Integration扩展了Spring编程模型,以支持著名的企业集成模式(EIP)。 EIP定义了许多通常用于分布式系统中的业务流程的组件。 您可能已经听说过诸如消息通道,路由器,聚合器或端点之类的模式。

我们从订单服务开始,该服务负责接受订单,在共享主题上发布订单,然后从下游服务收集异步响应。 这是@Service,它使用Source bean生成一条消息并将其发布到远程主题。

@Service
public class OrderSender {
 @Autowired
 private Source source;
 public boolean send(Order order) {
 return this.source.output().send(MessageBuilder.withPayload(order).build());
 }
}

该@Service由控制器调用,该控制器公开HTTP端点,用于提交新订单并获取ID为ID的订单。

@RestController
public class OrderController {
 private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
 private ObjectMapper mapper = new ObjectMapper();
 @Autowired
 OrderRepository repository;
 @Autowired
 OrderSender sender;
 @PostMapping
 public Order process(@RequestBody Order order) throws JsonProcessingException {
 Order o = repository.add(order);
 LOGGER.info("Order saved: {}", mapper.writeValueAsString(order));
 boolean isSent = sender.send(o);
 LOGGER.info("Order sent: {}", mapper.writeValueAsString(Collections.singletonMap("isSent", isSent)));
 return o;
 }
 @GetMapping("/{id}")
 public Order findById(@PathVariable("id") Long id) {
 return repository.findById(id);
 }
}

现在,让我们在消费者方面进行仔细研究。 由OrderSender bean从订单服务发送的消息由帐户服务和产品服务接收。 要从主题交流中接收消息,我们只需要使用@StreamListener注释将Order对象作为参数的方法即可。 我们还必须为侦听器定义目标通道-在这种情况下,它是Processor.INPUT。

@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
 private static final Logger LOGGER = LoggerFactory.getLogger(OrderApplication.class);
 @Autowired
 OrderService service;
 public static void main(String[] args) {
 new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
 }
 @StreamListener(Processor.INPUT)
 public void receiveOrder(Order order) throws JsonProcessingException {
 LOGGER.info("Order received: {}", mapper.writeValueAsString(order));
 service.process(order);
 }
}

然后由AccountService bean处理收到的订单。 订单服务可以接受或拒绝订单,具体取决于客户帐户中足够的资金以实现订单。 带有接受状态的响应将通过OrderSender bean调用的输出通道发送回订单服务。

最后一步是配置。 它在application.yml文件中提供。 我们必须正确定义频道的目的地。 当订购服务将订购目的地分配到输出通道,将订购目的地分配给输入通道时,帐户服务和产品服务则相反。 这是合乎逻辑的,因为订购服务通过其输出目的地发送的消息是由消费服务通过其输入目的地接收的。 但是,它仍然是共享经纪人交易所的相同目的地。 这是订购服务的配置设置。

spring:
 cloud:
 stream:
 bindings:
 output:
 destination: orders-out
 input:
 destination: orders-in
 rabbit:
 bindings:
 input:
 consumer:
 exchangeType: direct

spring:
 cloud:
 stream:
 bindings:
 output:
 destination: orders-in
 input:
 destination: orders-out
 rabbit:
 bindings:
 output:
 producer:
 exchangeType: direct
 routingKeyExpression: '"#"'

现在,我们只需要运行每个微服务的单个实例。 您可以通过运行模块订购服务内部源代码存储库中提供的JUnit测试类OrderControllerTest轻松生成一些测试请求。

自动化测试

您可以轻松测试您的微服务,而无需连接到消息代理。 为此,您需要在项目依赖项中包含spring-cloud-stream-test-support。 它包含TestSupportBinder bean,使用它可以与绑定的通道进行交互,并检查应用程序发送和接收的所有消息。


 org.springframework.cloud
 spring-cloud-stream-test-support
 test

在测试类中,我们需要声明MessageCollector bean,该bean负责接收TestSupportBinder保留的消息。 这是我的帐户服务测试课程。 使用处理器bean,我将测试命令发送到输入通道。 然后,MessageCollector接收通过输出通道发送回订单服务的消息。 测试方法testAccepted创建一个应由帐户服务接受的订单,而testRejected方法设置的订单价格过高,导致拒绝该订单。

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class OrderReceiverTest {
 private static final Logger LOGGER = LoggerFactory.getLogger(OrderReceiverTest.class);
 @Autowired
 private Processor processor;
 @Autowired
 private MessageCollector messageCollector;
 @Test
 @SuppressWarnings("unchecked")
 public void testAccepted() {
 Order o = new Order();
 o.setId(1 L);
 o.setAccountId(1 L);
 o.setCustomerId(1 L);
 o.setPrice(500);
 o.setProductIds(Collections.singletonList(2 L));
 processor.input().send(MessageBuilder.withPayload(o).build());
 Message received = (Message) messageCollector.forChannel(processor.output()).poll();
 LOGGER.info("Order response received: {}", received.getPayload());
 assertNotNull(received.getPayload());
 assertEquals(OrderStatus.ACCEPTED, received.getPayload().getStatus());
 }
 @Test
 @SuppressWarnings("unchecked")
 public void testRejected() {
 Order o = new Order();
 o.setId(1 L);
 o.setAccountId(1 L);
 o.setCustomerId(1 L);
 o.setPrice(100000);
 o.setProductIds(Collections.singletonList(2 L));
 processor.input().send(MessageBuilder.withPayload(o).build());
 Message received = (Message) messageCollector.forChannel(processor.output()).poll();
 LOGGER.info("Order response received: {}", received.getPayload());
 assertNotNull(received.getPayload());
 assertEquals(OrderStatus.REJECTED, received.getPayload().getStatus());
 }
}

结论

每当您不需要API的同步响应时,消息驱动的微服务就是一个不错的选择。 微服务之间的服务间通信采用发布/订阅模型一个不错的选择。

  • 原标题:使用Spring Cloud Stream构建和测试消息驱动的微服务

  • 本文由游戏app开发公司小编,整理排版发布,转载请注明出处。部分文章图片来源于网络,如有侵权,请与迪集网络联系删除。
  • 微信二维码

    CLWL6868

    长按复制微信号,添加好友

    微信联系

    在线咨询

    点击这里给我发消息QQ客服专员

    点击这里给我发消息电话客服专员

    在线咨询

    免费通话


    24h咨询☎️:132-5572-7217


    🔺🔺 24小时客服热线电话 🔺🔺

    免费通话
    返回顶部