Amazon Kinesis 示例

Spring 团队目前在社区贡献的帮助下维护着 Spring Cloud Stream Kinesis Binder。您可以在 spring-cloud/spring-cloud-stream-binder-aws-kinesis 阅读有关 Binder 实现的更多信息。

不过,在本演练中,我们将回顾一个简单的用例,以展示如何将 Kinesis Binder 与 Spring Cloud Stream 一起使用。

先决条件

演示的唯一要求是 访问密钥密钥区域 凭证,您可以从您的 AWS 账户中获取这些凭证。

或者,如果您决定直接将应用程序作为 AWS EC2 实例运行,则无需显式提供任何这些凭证。它们会在引导时自动发现和自动配置。

应用

对于示例生产者和消费者应用程序,您可以克隆位于 spring-cloud-dataflow-samples/kinesisdemo 的存储库,以跟随本示例演练进行操作。

我们从一个简单的生产者开始,它每两秒生成一个新的随机 UUID。每个生成的 UUID 有效负载都会发布到 Kinesis 流,绑定到同一个 Kinesis 流的示例消费者会使用该有效负载并将其记录为结果。

以下清单显示了生产者和消费者应用程序的代码

@EnableScheduling
@EnableBinding(Source.class)
@SpringBootApplication
public class KinesisProducerApplication {

	public static void main(String[] args) {
		SpringApplication.run(KinesisProducerApplication.class, args);
	}

	@Autowired
	private Source source;

	@Scheduled(fixedRate = 2000L)
	public void sendMessage() {
		UUID id = UUID.randomUUID();
		System.out.println("Before sending : " + id);
		source.output().send(MessageBuilder.withPayload(id).build());
		System.out.println("After sending : " + id);
	}
}
@EnableBinding(Sink.class)
@SpringBootApplication
public class KinesisConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(KinesisConsumerApplication.class, args);
	}

	@StreamListener("input")
	public void input(String foo) {
		System.out.println("Hello: " + foo);
	}
}

这两个应用程序都需要在类路径中包含 spring-cloud-stream-binder-kinesis 依赖项。有关更多详细信息,请参阅 spring-cloud-dataflow-samples/kinesisdemo 演示。

以下清单显示了生产者和消费者的绑定器配置

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: test-kinesis-stream
          content-type: text/plain

cloud:
  aws:
    credentials:
      accessKey: # <YOUR_ACCESS_KEY>
      secretKey: # <YOUR_SECRET_KEY>
    region:
      static: # <YOUR_REGION>
    stack:
      auto: false
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test-kinesis-stream
          group: test-kinesis-stream-group
          content-type: text/plain

cloud:
  aws:
    credentials:
      accessKey: # <YOUR_ACCESS_KEY>
      secretKey: # <YOUR_SECRET_KEY>
    region:
      static: # <YOUR_REGION>
    stack:
      auto: false

您需要将 <YOUR_ACCESS_KEY><YOUR_SECRET_KEY><YOUR_REGION> 替换为您的凭据。

部署

准备好使用 AWS Kinesis 进行测试后,您可以在 Maven 构建后启动生产者和消费者应用程序。

从克隆的目录启动生产者

java -jar kinesisproducer/target/kinesisproducer-0.0.1-SNAPSHOT.jar

从克隆的目录启动消费者

java -jar kinesisconsumer/target/kinesisconsumer-0.0.1-SNAPSHOT.jar

结果

您可以看到 test-kinesis-stream Kinesis 流在生产者应用程序启动时自动创建。

Kinesis Stream Listing

两个应用程序启动并运行后,您应该在控制台中看到以下内容。

生产者

Producer Output

消费者

Consumer Output

由于我们在处理了七条记录后停止了应用程序,因此您可以从 AWS 控制台的监控页面中看到 Kinesis 中处理了七条记录。

Total Number of Records

尽管这只是一个简单的演示,但 Kinesis 绑定器在生产者和消费者端的绑定器配置方面提供了全面的覆盖范围(包括对 DynamoDB Streams 的支持!)。有关更多详细信息,请参阅 绑定器文档