写在前面

前面学习了Spring Cloud Bus整合RabbitMQ的相关内容,接下来开始学习Spring Cloud Bus整合Kafka,同样也能实现消息总线的功能。

Kafka简介

Kafka是一个由LinkedIn开发的分布式消息系统,于2011年年初开源,现在由Apache基金会负责维护与开发。Kafka使用Scala语言编写,被用作LinkedIn的活动流和运营数据处理的管道,现在也被许多企业广泛用作数据流管道和消息系统。

Kafka设计目标

Kafka是基于消息发布-订阅模式实现的消息系统,其主要的设计目标如下:(1)消息持久化。以时间复杂度为O(1)的方式提供消息持久化能力,对于TB级别以上的数据也能保证常数时间复杂度的访问性能。(2)高吞吐。在廉价的商用机器上也可以支持单机每秒1万条以上的吞吐量。(3)分布式。支持消息分区以及分布式消费,并保证分区内的消息顺序。(4)跨平台。支持不同语言的客户端,如Java、PHP、Python等。(5)实时性。支持实时数据处理和离线数据处理。(6)伸缩性。支持水平扩展。

Kafka中的一些基本概念

同样Kafka中有一些比较重要的基础概念,理解它们对于后续学习Kafka有非常大的帮助:
(1)Broker:Kafka集群中包含一个或多个服务器,这些服务器被称为Broker。

(2)Topic:逻辑上和RabbitMQ的Queue队列相似,每条发布到Kafka集群的消息都必须有一个Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但是用户只需指定消息的Topic即可生产或者消费数据而不必关心数据到底存在于何处。)

(3)Partition:Partition是物理概念上的分区,为了提高系统吞吐率,在物理上每个Topic会分成一个或多个Partition,每个Partition对应一个文件夹(存储对应分区的消息内容和索引文件)。

(4)Producer:消息生产者,负责生产消息并发送到Kafka Broker。

(5)Consumer:消息消费者,负责向Kafka Broker读取消息并处理的客户端。

(6)Consumer Group:每个Consumer属于一个特定的组(可为每个Consumer指定属于一个组,若不指定则属于默认组),组可以用来实现一条消息被组内多个成员消费的功能。

Kafka的安装

安装JDK

Kafka的运行需要依赖Java,因此首先是要安装JDK,相应的步骤如下:
第一步,点击 这里 选择合适的JDK版本进行下载,然后点击安装,基本上是一直next,详细过程忽略。

第二步,设置环境变量。右键点击“我的电脑” -> “高级系统设置” -> “环境变量”,在系统变量内设置JAVA_HOME,其值是JDK的安装路径:

接着编辑PATH变量,在里面添加;%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin,如下所示:

第三步,进入DOS状态,使用java -version命令来进行验证:

安装Zookeeper

由于Kafka的运行依赖于Zookeeper,因此在运行Kafka之前我们需要安装并运行Zookeeper,相应的步骤如下:

第一步,点击 这里 选择合适的Zookeeper版本进行下载,然后点击安装,详细过程忽略。

第二步,将apache-zookeeper-3.5.8\conf目录下的zoo_sample.cfg文件修改为zoo.cfg,并修改其中的dataDir参数为自己的目录,笔者为D:/Application/Zookeeper/apache-zookeeper-3.5.8/data,然后点击保存。

第三步,设置环境变量。右键点击“我的电脑” -> “高级系统设置” -> “环境变量”,在系统变量内设置ZOOKEEPER_HOME,其值是ZOOKEEPER_HOME的安装路径(D:\Application\Zookeeper\apache-zookeeper-3.5.8目录):

接着编辑PATH变量,在里面添加;%ZOOKEEPER_HOME%\bin,如下所示:

第四步,运行Zookeeper。 打开cmd进入到DOS状态,然后执行zkserver命令(注意这个窗口不能关闭):

安装Kafka

在完成上述步骤以后,接下来就是安装和运行Kafka了,相应的步骤如下所示:

第一步,点击 这里 选择合适的Kafka版本进行下载,然后点击安装,详细过程忽略。

第二步,修改kafka_2.12-2.5.0\config目录下server.properties文件中第60行的log.dirs参数为自己的目录,笔者为D:/Application/Kafka/kafka_2.12-2.5.0/kafka-logs,然后点击保存。接着修改zookeeper.properties文件中第16行的dataDir参数为自己的目录,注意这个必须与之前设置的zookeeper目录保持一致,即为D:/Application/Zookeeper/apache-zookeeper-3.5.8/data,然后点击保存。

第三步,打开一个新的CMD窗口,使用cd命令进入到Kafka的安装目录(如D:/Application/Kafka/kafka_2.12-2.5.0),并在里面执行如下命令(注意这个窗口不能关闭):

1
D:\Application\Kafka\kafka_2.12-2.5.0>.\bin\windows\kafka-server-start.bat .\config\server.properties

其实就是使用config包下的server.properties配置文件信息来启动kafka服务,如下所示:

第四步,创建TOPICS。打开一个新的CMD窗口,使用cd命令进入到Kafka安装目录的windows目录(如D:/Application/Kafka/kafka_2.12-2.5.0/bin/windows)。接着创建一个TOPICS,使用的命令如下:

1
D:\Application\Kafka\kafka_2.12-2.5.0\bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

第五步,创建生产者PRODUCER。打开一个新的CMD窗口,使用cd命令进入到Kafka安装目录的windows\bin目录中(如D:\Application\Kafka\kafka_2.12-2.5.0\bin\windows)。接着创建一个生产者PRODUCER,使用的命令如下:

1
D:\Application\Kafka\kafka_2.12-2.5.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test

启动之后就成了终端交互模式,后面可以通过在生产者发送信息,消费者即可得到该信息。

第六步,创建消费者CONSUMER。打开一个新的CMD窗口,使用cd命令进入到Kafka安装目录的windows\bin目录中(如D:\Application\Kafka\kafka_2.12-2.5.0\bin\windows)。接着创建一个消费者CONSUMER,使用的命令如下:

1
D:\Application\Kafka\kafka_2.12-2.5.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

第七步,测试。在生产者的终端交互模式中输入一些信息,如helloworld等,就可以在消费者的终端界面显示该消息:

这样关于Kafka的安装就完成了,将下来就对上述安装过程进行一个较为细致的介绍。

首先是Kafka的安装包解压之后的目录如下所示:

安装过程分析

安装Kafka。由于Kafka的设计中依赖了Zookeeper,因此在bin和config的目录中除了Kafka相关的内容外,还有Zookeeper相关的内容。上面图中的bin目录存放了Kafka和Zookeeper的命令行工具,bin目录中存放的是Kafka和Zookeeper的命令行工具,其中根目录下的sh脚本使用用Linux/Unix,而windows目录下的bat脚本则适用于Windows系统,这样开发者可以根据实际情况来选择使用不同的脚本。config目录下存放的则是关于Kafka和Zookeeper的配置信息。

其次启动Zookeeper。在上面我们使用zkserver命令来启动Zookeeper,实际上它是根据conf\zoo.cfg这一配置文件来绑定2181端口并启动服务的。如果启动失败,可以查看端口是否被占用,可以杀掉占用进程或者通过修改conf\zoo.cfg配置文件中的clientPort参数来使用其他端口启动Zookeeper。

再者是启动Kafka。在前面我们在Kafka安装目录下执行.\bin\windows\kafka-server-start.bat .\config\server.properties命令来启动Kafka,可以看到该命令指定了Kafka配置文件的位置,这都是默认的位置。如果开发者想使用外部其他环境的Zookeeper时,需要在config\server.properties文件中第123行处的zookeeper.connect参数来设置Zookeeper的地址和端口,它默认会连接本地的2181端口:

注意这个地址必须和之前Zookeeper配置文件中的端口号及IP地址保持一致。如果需要设置多个Zookeeper的节点,那么只需将该参数配置多个地址,并以逗号进行分隔。举个例子,如zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184等。同时该配置文件中还提供了关于服务端连接、日志等配置参数,具体的可以自行查看。

接着就是创建TOPIC。开发者使用了kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test命令创建了一个名为test的TOPIC,该TOPIC包含一个分区和一个Replica。在创建完成后,开发者可以使用kafka-topics --list --zookeeper localhost:2181命令来查看当前的TOPIC。

当然也可以不使用kafka-topics.bat命令来手工创建TOPIC,可以直接使用下面的内容进行消息创建时,会自动创建TOPIC。

创建消息生产者。我们进入到windows\bin目录中,去执行kafka-console-producer.bat --broker-list localhost:9092 --topic test命令。kafka-console-producer.bat脚本可以启动Kafka基于命令行的消息生产客户端,启动后可以直接在控制台中输入信息来发送,控制台中的每一行数据都会被视为一条消息来发送。由于此时没有消息消费者,因此这些输入的信息都会被阻塞在名为test的TOPICS中,直到有消费者将其消费掉。

创建消息消费者。我们进入到windows\bin目录中,去执行kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning命令。kafka-console-consumer.bat 脚本可以启动Kafka基于命令行的消息消费客户端,启动后在控制台可以看到输出了之前我们在消息生产客户端中发送的消息。开发者也可以打开两个窗口,在左侧的生产客户端中发送消息,并观察右侧的消费客户端中是否立即输出了对应的消息。

整合Spring Cloud Bus

还记得在前面我们是如何整合RabbitMQ来实现消息总线的吗?我们在pom.xml文件中引入了spring-cloud-starter-bus-amqp依赖,其实当我们想使用Kafka来实现消息总线的时候,只需要将上述依赖修改为spring-cloud-starter-bus-kafka即可。

如果在启动Kafka时均采用了默认配置,那么此时开发者不需要再做任何其他的配置就能在本地实现从RabbitMQ到Kafka的转换。请注意需要将之前的config-server和config-client这两个项目中的spring-cloud-starter-bus-amqp依赖修改为spring-cloud-starter-bus-kafka即可。

然后启动一个config-server实例和两个config-client实例,可以看到config-client实例启动时的控制台输出信息如下:

然后分别去访问http://localhost:4003/envythinkOnehttp://localhost:4002/envythinkOne链接,得到的信息如下所示:

同时查看此时的GItHub的配置信息为envythink=prod-1.0,如下所示:

接下来尝试将其修改为envythink=prod-122.0,如下所示:

并使用POSTMAN等测试工具发送POST请求到其中任意一个链接,如http://localhost:4002/actuator/bus-refresh这个,得到的结果如下所示:

接着分别去访问之前启动的两个config-client实例的/envythinkOne接口,此时两者都会返回config-repo/envy-prod.properties配置文件中envythink属性的最新值:

最后查看一下config-client项目的控制台,如下所示:

可以看到RefreshListener监听类收到了远程刷新请求,并刷新了envythink属性的日志。

Kafka的配置

在上面的例子中,由于Kafka和Zookeeper均运行于本地,在自动化配置的支持下,我们并没有在测试程序中通过配置信息来指定Kafka和Zookeeper的配置信息就完成了本地消息总线的测试。

但是在实际应用中,Kafka和Zookeeper一般都会独立部署,所以需要在应用中配置Kafka和Zookeeper相关的连接信息。需要注意Kafka的整合与RabbitMQ不同,Spring Boot中没有直接提供starter模块,而是采用了Spring Cloud Stream的Kafka模块,而spring-cloud-starter-bus-kafka依赖中已经包含了spring-cloud-starter-stream-kafka依赖,因此对于Kafka的配置均采用spring.cloud.stream.kafa前缀,关于Kafka具体的配置会在后续学习“绑定器配置”的时候进行学习。

深入理解

在整合Kafka实现消息总线之后,我们来做一个有趣的实验:使用POSTMAN等测试工具发送POST请求到其中任意一个链接,如http://localhost:4002/actuator/bus-refresh时,看看Kafka提供的消费者控制台会输出什么内容:

可以看到其实返回的是一个JSON对象,格式化后如下所示:

1
2
3
4
5
6
7
8
9
10
{
"type": "AckRemoteApplicationEvent",
"timestamp": 1601197006297,
"originService": "envy:4002:4d2ae6a6ab6192225fbe219276dc1810",
"destinationService": "**",
"id": "203a177b-a605-4cca-90b3-418337c5c0e2",
"ackId": "4ca3b6ad-553c-4e8b-b422-a49965cbaa6f",
"ackDestinationService": "**",
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}

接下来就详细分析上述消息中的内容:

  • type:表示消息的类型。上面的例子中包含了RefreshRemoteApplicationEvent和AckRemoteApplicationEvent。其中RefreshRemoteApplicationEvent事件是用来刷新配置的;而AckRemoteApplicationEvent则是响应消息已经正确接收的告知信息事件。

  • timestamp:表示消息的时间戳。

  • originService:表示消息的来源服务实例。

  • destinationService:表示消息的目标服务实例。可以看到此处的结果是**,这表示总线上的所有服务实例。如果开发者想指定服务或者是实例,只需通过使用destination参数来定位具体要刷新的应用实例即可。举个例子如发起/actuator/bus-refresh?destination=envy请求,就可以得到如下的刷新事件消息,其中的destinationService为**,表示总线上所有的服务的实例,这似乎和前面的没有什么区别:

    1
    2
    3
    4
    5
    6
    7
    {
    "type": "RefreshRemoteApplicationEvent",
    "timestamp": 1601197006288,
    "originService": "envy:4002:4d2ae6a6ab6192225fbe219276dc1810",
    "destinationService": "**",
    "id": "4ca3b6ad-553c-4e8b-b422-a49965cbaa6f"
    }
  • id:表示消息的唯一标识。

从上面的两段代码中可以看到,type、timestamp、originService、destinationService和id这5种key,其都是AckRemoteApplicationEvent和RefreshRemoteApplicationEvent两种类型都具有的内容。

而下面即将介绍的ackId、ackDestinationService和event这三个消息内容则是AckRemoteApplicationEvent类型独有的,相应的解释如下:

  • ackId:表示Ack消息对应的消息来源。我们可以看到最前面的一条AckRemoteApplicationEvent类型,它的ackId对应了RefreshRemoteApplicationEvent的id,说明这条Ack是告知该RefreshRemoteApplicationEvent事件的消息已经被收到,如下所示:

  • ackDestinationService:表示Ack消息的目标服务实例。可以看到这里使用的是”**”,表示消息总线上的所有服务实例都能收到该Ack消息。

  • event:表示Ack消息的来源事件。从上面的例子中可以知道,这两个Ack均来源于刷新配置的RefreshRemoteApplicationEvent事件,由于我们启动了两个config-client实例,因此会有两个实例接收到了配置刷新事件,同时它们都会返回一个Ack消息。由于ackDestinationService为**,因此两个config-client实例都会收到对RefreshRemoteApplicationEvent事件的Ack消息,如下所示:

这样关于Spring Cloud Bus消息总线整合Kafka的学习就到此为止,后续开始学习其他的知识。