关于此 Codelab
1. 概览
Spring Integration 为您提供了通过 MessageChannels
交换 Messages
的消息传递机制。它使用通道适配器与外部系统进行通信。
在本练习中,我们将创建两个应用,它们使用 Spring Cloud GCP 提供的 Spring Integration 通道适配器进行通信。Spring Integration 通过这些适配器使用 Google Cloud Pub/Sub 作为消息交换后端。
您将学习如何使用 Cloud Shell 和 Cloud SDK gcloud 命令。
本教程使用 Spring Boot 入门指南中的示例代码。
学习内容
- 如何使用 Spring Integration 和 Spring Cloud GCP 在应用与 Google Cloud Pub/Sub 之间交换消息
所需条件
您将如何使用本教程?
您如何评价自己在构建 HTML/CSS Web 应用方面的经验水平?
您如何评价自己在使用 Google Cloud Platform 服务方面的经验水平?
2. 设置和要求
自定进度的环境设置
如果您还没有 Google 帐号(Gmail 或 Google Apps),则必须创建一个。登录 Google Cloud Platform Console (console.cloud.google.com) 并创建一个新项目:
请记住项目 ID,它在所有 Google Cloud 项目中都是唯一名称(很抱歉,上述名称已被占用,您无法使用!)。它稍后将在此 Codelab 中被称为 PROJECT_ID
。
接下来,您需要在 Cloud Console 中启用结算功能,才能使用 Google Cloud 资源。
在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高(请参阅本文档末尾的“清理”部分)。
Google Cloud Platform 的新用户有资格获享 $300 免费试用。
Google Cloud Shell
虽然 Google Cloud 可以从笔记本电脑远程操作,但在此 Codelab 中,我们将使用 Google Cloud Shell,这是一个在云端运行的命令行环境。
激活 Google Cloud Shell
在 GCP 控制台中,点击右上角工具栏上的 Cloud Shell 图标:
然后点击“启动 Cloud Shell”:
配置和连接到环境应该只需要片刻时间:
这个虚拟机已加载了您需要的所有开发工具。它提供了一个永久性的 5GB 主目录,并且在 Google Cloud 上运行,从而大大增强了网络性能和身份验证功能。只需使用一个浏览器或 Google Chromebook 即可完成本实验中的大部分(甚至全部)工作。
在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设置为您的 PROJECT_ID。
在 Cloud Shell 中运行以下命令以确认您已通过身份验证:
gcloud auth list
命令输出
Credentialed accounts: - <myaccount>@<mydomain>.com (active)
gcloud config list project
命令输出
[core] project = <PROJECT_ID>
如果不是上述结果,您可以使用以下命令进行设置:
gcloud config set project <PROJECT_ID>
命令输出
Updated property [core/project].
3. 预配 Pub/Sub 资源
导航到 Google Cloud Pub/Sub 主题页面并启用该 API。
点击创建主题。
输入 exampleTopic
作为主题名称,然后点击创建。
创建主题后,留在“主题”页面中。找到您刚刚创建的主题,按行末的三个垂直点,然后点击新建订阅。
在订阅名称文本框中输入 exampleSubscription
,然后点击创建。
4. 初始化 Spring Boot 应用
在 Cloud Shell 启动后,您可以使用 Spring Initializr 通过命令行生成两个新的 Spring Boot 应用:
$ curl https://start.spring.io/starter.tgz \
-d bootVersion=2.0.6.RELEASE \
-d dependencies=web \
-d baseDir=spring-integration-sender | tar -xzvf -
$ curl https://start.spring.io/starter.tgz \
-d bootVersion=2.0.6.RELEASE \
-d baseDir=spring-integration-receiver | tar -xzvf -
5. 创建用于发送消息的应用
现在,我们来创建消息发送应用。切换到正在发送的应用的目录。
$ cd spring-integration-sender
我们希望应用向通道写入消息。消息进入通道后,将由出站通道适配器提取。该适配器将消息从常规 Spring 消息转换为 Google Cloud Pub/Sub 消息,并将其发布到 Google Cloud Pub/Sub 主题。
为了让我们的应用向通道写入数据,可以使用 Spring Integration 消息传递网关。使用来自 vim
、emacs
或 nano
的文本编辑器在 DemoApplication
类中声明 PubsubOutboundGateway
接口。
src/main/java/com/example/demo/DemoApplication.java
...
import org.springframework.integration.annotation.MessagingGateway;
@SpringBootApplication
public class DemoApplication {
...
@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
public interface PubsubOutboundGateway {
void sendToPubsub(String text);
}
}
我们现在有一种向通道发送消息的机制,但是这些消息在进入通道后去了哪里?
我们需要一个出站通道适配器来接收通道中的新消息,并将其发布到 Google Cloud Pub/Sub 主题。
src/main/java/com/example/demo/DemoApplication.java
...
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.messaging.MessageHandler;
@SpringBootApplication
public class DemoApplication {
...
@Bean
@ServiceActivator(inputChannel = "pubsubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
}
}
@ServiceActivator
注释可将此 MessageHandler
应用于 inputChannel
中的任何新消息。在本例中,我们将调用出站通道适配器 PubSubMessageHandler
,将消息发布到 Google Cloud Pub/Sub 的 exampleTopic
主题。
设置好通道适配器后,我们现在可以自动连接 PubsubOutboundGateway
对象,并使用它向通道写入消息。
src/main/java/com/example/demo/DemoApplication.java
...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;
@SpringBootApplication
public class DemoApplication {
...
@Autowired
private PubsubOutboundGateway messagingGateway;
@PostMapping("/postMessage")
public RedirectView postMessage(@RequestParam("message") String message) {
this.messagingGateway.sendToPubsub(message);
return new RedirectView("/");
}
}
得益于 @PostMapping
注释,我们现在有了一个端点来侦听 HTTP POST 请求,但还要向 DemoApplication
类添加 @RestController
注释来将其标记为 REST 控制器。
src/main/java/com/example/demo/DemoApplication.java
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RestController
public class DemoApplication {
...
}
我们只需添加所需的依赖项就能让应用运行。
pom.xml
<project>
...
<!-- Add Spring Cloud GCP Dependency BOM -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-dependencies</artifactId>
<version>1.0.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
...
<!-- Add Pub/Sub Starter -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
<!-- Add Spring Integration -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
</dependencies>
</project>
运行发送者应用。
# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list \
--format 'value(core.project)'`
$ ./mvnw spring-boot:run
该应用正在侦听包含端口 8080 和端点 /postMessage
上消息的 POST 请求,但我们稍后会对此进行说明。
6. 创建用于接收消息的应用
我们刚刚创建了一个通过 Google Cloud Pub/Sub 发送消息的应用。现在,我们将再创建一个接收这些消息并进行处理的应用。
点击 + 打开新的 Cloud Shell 会话。
然后,在新的 Cloud Shell 会话中,将目录更改为接收者应用的目录:
$ cd spring-integration-receiver
在上一个应用中,消息传递网关声明为我们创建了出站通道。由于我们不使用消息传递网关来接收消息,因此需要声明自己的 MessageChannel
(收到的消息将到达此处)。
src/main/java/com/example/demo/DemoApplication.java
...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
@SpringBootApplication
public class DemoApplication {
...
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
}
我们需要入站通道适配器接收来自 Google Cloud Pub/Sub 的消息,并将其中继到 pubsubInputChannel
。
src/main/java/com/example/demo/DemoApplication.java
...
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
@SpringBootApplication
public class DemoApplication {
...
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
adapter.setOutputChannel(inputChannel);
return adapter;
}
}
此适配器会将其自身绑定到 pubsubInputChannel
,并侦听 Google Cloud Pub/Sub exampleSubscription
订阅中的新消息。
我们有一个通道,收到的消息会发布到其中,但该如何处理这些消息呢?
让我们使用在 pubsubInputChannel
收到新消息时触发的 @ServiceActivator
来处理它们。
src/main/java/com/example/demo/DemoApplication.java
...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;
@SpringBootApplication
public class DemoApplication {
...
private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);
@ServiceActivator(inputChannel = "pubsubInputChannel")
public void messageReceiver(String payload) {
LOGGER.info("Message arrived! Payload: " + payload);
}
}
在本例中,我们只需记录消息载荷。
我们需要添加必要的依赖项。
pom.xml
<project>
...
<!-- Add Spring Cloud GCP Dependency BOM -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-dependencies</artifactId>
<version>1.0.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
...
<!-- Add Pub/Sub Starter -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
<!-- Add Spring Integration -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
</dependencies>
</project>
运行接收者应用。
$ ./mvnw spring-boot:run
现在,您发送到发送者应用的任何消息都会记录在接收者应用中。如需进行测试,请打开新的 Cloud Shell 会话,并向发送者应用发出 HTTP POST 请求。
$ curl --data "message=Hello world!" localhost:8080/postMessage
然后,验证接收者应用是否记录了您发送的消息!
INFO: Message arrived! Payload: Hello world!
7. 清理
删除在此练习中创建的订阅和主题。
$ gcloud beta pubsub subscriptions delete exampleSubscription
$ gcloud beta pubsub topics delete exampleTopic
8. 总结
您设置了两个 Spring Boot 应用,它们使用适用于 Google Cloud Pub/Sub 的 Spring Integration 通道适配器。它们彼此之间交换消息,而无需与 Google Cloud Pub/Sub API 进行交互。
9. 恭喜!
您已经了解如何使用适用于 Google Cloud Pub/Sub 的 Spring Integration 通道适配器!
了解详情
- Google Cloud Pub/Sub:https://cloud.google.com/pubsub/
- GCP 上的 Spring 项目:http://cloud.spring.io/spring-cloud-gcp/
- GCP GitHub 代码库上的 Spring:https://github.com/spring-cloud/spring-cloud-gcp
- Google Cloud Platform 上的 Java:https://cloud.google.com/java/
许可
此作品已获得 Creative Commons Attribution 2.0 通用许可授权。