1 简介

MQ满天飞的时代,kafka也只是其中之一。

Kafka,一个基于分布式高吞吐量消息发布-订阅系统

具有快速持久可扩展处理大量不同消费者的特性。Kafka 不会关心消息的处理过程及消费者队列。

  • 磁盘线性读写,O(1)读写性能
  • 高吞吐量
  • 显式的分布式架构

详细的介绍可以看这里:http://blog.cloudera.com/blog/2014/09/apache-kafka-for-beginners/

2 名词解释

  • broker

kafka集群中的服务器(一个或多个)被称为broker

  • topic

顾名思义,topic就是主题的意思。也就是对消息的一个分类。kafka中的每个消息都有一个分类/类别,这个类别就是topic。

不同topic的消息在磁盘上是分开存储的,同时对消费者是透明的。也就是说当你消费消息的时候并不需要知道消息是怎么存储的,也不需要知道存储在哪里。

  • partition

可以认为是topic的物理分组,一个topic可以分为一个或多个partition。partition一两句话说不清,详情见后面的小节。

  • producer

这个好理解,就是消息的生产者。

  • consumer

和producer相对应,consumer就是消息的消费者。通常是各种高级语言写的客户端API,比如Java、Python甚至是JavaScript所写的客户端API。

  • consumer group

每个consumer都属于一个消费者组。不显示地指定消费者组的时候属于默认的消费者组。

  • message

消息是通信的基本单位,producer可以向topic发布message。新发布的消息就会广播给订阅了这个主题de consumer。message只能传输给某个group中的某一个consumer。

3 partition

kafka作为一个MQ或者日志系统,他的最终数据存储还是离不开磁盘的。kafka对每个消息做了分类,即有了topic,每个topic当然也是持久化在磁盘上的。当消费完之后太过于陈旧的消息(message/topic)将被删除。

鉴于此,将所有topic都存储在同一个目录里将导致磁盘文件太过于庞大,这样一来,管理不便,如果一个磁盘挂了,将导致所有数据丢失;而且磁盘的空间总是有限的。也就是说一个磁盘持久化所有数据在生成环境是不靠谱的。

所以,kafka可以将不同的topic分布式地存储于不同的磁盘上,这就有了kafka所谓的partition的概念。

partition是一个有序的、不可变的消息队列。每个消息都有一个连续的序列号即offset。每个partition又分为若干个segment

最简单的也就是kafka默认的partition分布存储策略就是hash了。简单理解就是所有可用的broker一次轮流一个一个地存储partition。

这么一搞,最终结果就是:

  • 每台可用的broker上的数据逗比总数据要少
  • 但是每个消息数据都有冗余
  • 一般情况下,一台broker宕机并不影响整个系统(当然如果你只有一台broker那就另说了……)

日志文件的存储位置(partition就在这里了)在server.properties中指定: log.dirs=/Users/hylexus/data/kafka/kafka-logs

3.1 单机版partition的磁盘存储

此处先从单机版kafka中partition存储方式说起。

单机版并没有partition带来的好处。

本人此处的设置是kafka数据存储在log.dirs=/Users/hylexus/data/kafka/kafka-logs

创建一个测试用的topic名为topic01

1
2
3
4
5
$ bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 4 \ # 分区数为4
--topic topic01

查看磁盘上partition的存储

1
2
3
4
5
6
7
8
hylexus@hylexusPC kafka-logs $ pwd
/Users/hylexus/data/kafka/kafka-logs
hylexus@hylexusPC kafka-logs $ ll
drwxr-xr-x 5 hylexus staff 170 5 1 15:46 topic01-0
drwxr-xr-x 5 hylexus staff 170 5 1 15:46 topic01-1
drwxr-xr-x 5 hylexus staff 170 5 1 15:46 topic01-2
drwxr-xr-x 5 hylexus staff 170 5 1 15:46 topic01-3
hylexus@hylexusPC kafka-logs $

单机版由于只有一个目录,partition分布较为简单。

3.2 多broker下partition的磁盘存储

此处的多broker是如下形式:

多broker

单机上启动三个broker,注册到同一个zookeeper。

创建一个topic名为topic02,用以测试

1
2
3
4
5
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 2 \ # 2份拷贝
--partitions 4 \ # 4个partition
--topic topic02

查看磁盘磁盘上partition存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
hylexus@hylexusPC kafka-logs $ pwd
/Users/hylexus/data/kafka/kafka-logs
hylexus@hylexusPC kafka-logs $ ll
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-0
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-2
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-3
hylexus@hylexusPC kafka-logs-02 $ pwd
/Users/hylexus/data/kafka/kafka-logs-02
hylexus@hylexusPC kafka-logs-02 $ ll
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-0
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-1
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-3
hylexus@hylexusPC kafka-logs-03 $ pwd
/Users/hylexus/data/kafka/kafka-logs-03
hylexus@hylexusPC kafka-logs-03 $ ll
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-1
drwxr-xr-x 5 hylexus staff 170 5 1 16:29 topic02-2

此时的partition磁盘分布如下:

partitions

记broker个数为n,则有:

  • 第i个partition分配到第(i % n)个broker
  • 第i个Partition的第j个拷贝分配到第((i + j) % n)个broker

3.3 partition磁盘存储总结

  • 每个partition为一个目录
  • partiton命名规则为topic名称-有序序号
    • 第一个partiton有序序号从0开始
    • 序号最大值为partitions数量减1
  • 如果你只是调用kafka提供的客户端程序的话,你没有必要清楚每个partition是怎么分布的,因为你只是调用客户端消费数据而已

3.4 partition中segment磁盘存储

接着上面说的,其实每个partition就相当于一个大型文件(整个消息记录)被分配到多个大小相等的文件中存储。大小相等但是消息个数不一定相等了,这样利于管理,可以快速的删除陈旧的文件,有效地提高了磁盘的利用率。

此处所说的每个大小相等的文件就是segment(partition的再次细分)了。

既然数据是存储在磁盘上的,即便比不上磁盘高效,但也要在一个MQ系统能接受的范围内。

所以,索引就必不可少了,kafka的segment文件分为两个主要部分:index+log

  • index主要是索引文件,log才是真正的消息数据。
  • segment文件命名时:
    • 第一个segment文件名称从零开始,前导零填充至19位
    • 其他每个segment文件名为上一个segment文件最后一个message的offset

此处就以单机版的kafka为例:

为测试方便,此处我将log.segment.bytes=1024调整为1024,以便快速看到效果,生成一些message之后,topic01-0磁盘存储如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
hylexus@hylexusPC topic01-0 $ pwd
/Users/hylexus/data/kafka/kafka-logs/topic01-0
hylexus@hylexusPC kafka-logs $ ll topic01-0
-rw-r--r-- 1 hylexus staff 0 5 1 19:43 00000000000000000000.index
-rw-r--r-- 1 hylexus staff 990 5 1 19:43 00000000000000000000.log
-rw-r--r-- 1 hylexus staff 0 5 1 19:43 00000000000000000012.index
-rw-r--r-- 1 hylexus staff 990 5 1 19:43 00000000000000000012.log
-rw-r--r-- 1 hylexus staff 0 5 1 19:43 00000000000000000024.index
-rw-r--r-- 1 hylexus staff 998 5 1 19:43 00000000000000000024.log
-rw-r--r-- 1 hylexus staff 0 5 1 19:43 00000000000000000036.index
-rw-r--r-- 1 hylexus staff 994 5 1 19:43 00000000000000000036.log
-rw-r--r-- 1 hylexus staff 0 5 1 19:43 00000000000000000048.index
-rw-r--r-- 1 hylexus staff 1004 5 1 19:43 00000000000000000048.log

3.5 通过offset查找message

就拿上面的segment来说,第一个segment的命名为零(前导零).{index,log}。

另外,上面我将每个segment的大小设置为1024即log.segment.bytes=1024

1
2
3
4
00000000000000000000 offset=0
00000000000000000012 offset=12+1==13
00000000000000000024 offset=24+1==25
……

因此,通过offset查找具体消息步骤如下:

  • 二分查找定位到segment
  • 在某个具体的segment文件中顺序查找到具体message
  • 另外,为效率考虑,index所以文件是直接映射到内存的

例如:查找offset==23的message

  • 二分查找到具体segment为00000000000000000012
  • 在00000000000000000012内部顺序找到offset=23的message

log4jdbc.md
slf4j 简介.md