Linux运维kafka集群部署和使

  ApacheKafka是一个开放源代码的分布式事件流平台,成千上万的公司使用它来实现高性能数据管道,流分析,数据集成和关键任务应用程序。

  kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于日志分析,大数据实时处理领域。

  

kafka基础架构概述

  有关kakfa相关的术语如下所示:

  kafka的架构图如下所示:

  kafka高效读写数据的底层原理

  

顺序写磁盘

  kafka的producer生产数据,将数据顺序写入磁盘,从而优化的磁盘的写入效率

  官方有数据表明,同样的写能到M/s,而随机写只有K/s。这与磁盘的机械结构有关,顺序写之所以快,是因为节省去了大量磁头寻址时间

  

零拷贝技术

  如下图所示,DMA的英文拼写是"DirectMemoryAccess",汉语的意思就是直接内存访问,是一种不经过CPU而直接从内存存取数据的数据交换模式

  温馨提示:

  1、JDKNIO零拷贝实现分为两种方案,即mmap和sendFile

  1.mmap比较适合小文件读写,对文件大小有限制,一般在1.5GB~2.0GB之间;

  2.sendFile比较适合大文件传输,可以利用DMA方式,减少CPU拷贝;

  2、下图中的万兆网卡我指的是服务器的硬件网卡,但也有人喜欢使用专业术语NetworkInterfaceController(简称"NIC")来进行说明

  

异步刷盘

  kafka并不会将数据直接写入到磁盘,而是写入OS的cache,而后由OS实现数据的写入

  这样做的好处就是减少kafka源代码更多关于兼容各种厂商类型的磁盘驱动,而是交给更擅长和硬件打交道的操作系统来完成和磁盘的交互

  不得不说异步刷盘的确提高了效率,但也意味着带来了数据丢失的风险,假设数据已经写入到OS的cachepage,但数据并未落盘之前服务器断电,很可能会导致数据的丢失

  

分布式集群

  我们知道topic可以被划分多个partition,而partition可以分布在kafka集群的各个broker实例上

  分布式充分利用了各个broker节点的性能,包括但不限于CPU,内存,磁盘,网卡等

  kafka事务

  

kafka事务概述

  kafka从0.11版本开始引入了事务支持

  事务可以保证kafka在ExactlyOnce语义的基础上,生产和消费可以跨分区的会话,要么全部成功,要么全部失败

  

producer事务

  为了实现跨分区跨会话的事务,需要引入一个全局唯一的TransactionID,并将Producer获得的PID和TransactionID绑定。这样当Producer重启后就可以通过正在进行的TransactionID获得原来的PID

  为了管理Transaction,Kafka引入了新的组件TransactionCoordinator。Producer就是通过和TransactionCoordinator交互获得TransactionID对应的任务状态

  TransactionCoordinator还负责所有写入kafka的内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以的得到恢复,从而继续进行

  

Consumer事务

  上述事务机制主要从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其是无法保证Commit的信息被精确消费

  这是由于Consumer可以通过offset访问任意信息,而且不同的SegmentFile生命周期不同,同一事物的消费可能会出现重启后被删除的情况

  部署单机版的kafka环境

  

下载Kafka软件并解压到指定目录

wget   

创建符号连接并配置环境变量

ln-sv/oldboyedu/softwares/kafka_2.12-3.1.1kafka#配置环境变量cat/etc/profile.d/kafka.shEOF#!/bin/bashKAFKA_HOME="/oldboyedu/softwares/kafka"PATH=\$PATH:\$KAFKA_HOME/binEOF#变量生效source/etc/profile.d/kafka.sh

  

kafka的配置文件简介

  查看kafka的配置文件目录

ls-l/oldboyedu/softwares/kafka/config/

  查看broker默认的配置文件

#egrep-v"^#

^$"/oldboyedu/softwares/kafka/config/server.propertiesbroker.id=0num.network.threads=3num.io.threads=8socket.send.buffer.bytes=socket.receive.buffer.bytes=socket.request.max.bytes=104857log.dirs=/tmp/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=log.segment.bytes=log.retention.check.interval.ms=zookeeper.connect=localhost:zookeeper.connection.timeout.ms=group.initial.rebalance.delay.ms=0

  核心基础配置如下:

  broker.id

  log.dirs

  zookeeper.connect

  

修改kafka实例的配置文件

#grep^[a-Z]/oldboyedu/softwares/kafka/config/server.propertiesbroker.id=...log.dirs=/oldboyedu/data/kafka...zookeeper.connect=10.0.0.:,10.0.0.:,10.0.0.:...

  温馨提示:

  只需修改上述3个参数即可

  

修改broker的堆内存大小

#grepKAFKA_HEAP_OPTS/oldboyedu/softwares/kafka/bin/kafka-server-start.sh...if["x$KAFKA_HEAP_OPTS"="x"];then#exportKAFKA_HEAP_OPTS="-Xmx1G-Xms1G"exportKAFKA_HEAP_OPTS="-XmxM-XmsM"...

  

启动kafka集群

kafka-server-start.sh-daemon$KAFKA_HOME/config/server.propertiesss-ntl

grep

  

查看zookeeper中有关kafka的信息

  使用zkWeb服务通过web页面查看zookeeper,并添加zookeeper服务器配置

  查看当前zookeeper服务器中的kafka节点

  部署集群版的kafka

  

将kafka部署在其他服务器上

#发送kafka到其它服务器上scp-rp/oldboyedu/softwares/kafkaroot

10.0.0.:/oldboyedu/softwares/scp-rp/oldboyedu/softwares/kafkaroot

10.0.0.:/oldboyedu/softwares/#发送kafka环境变量文件到其它服务器上scp-rp/etc/profile.d/kafka.shroot

10.0.0.:/etc/profile.d/kafka.shscp-rp/etc/profile.d/kafka.shroot

10.0.0.:/etc/profile.d/kafka.sh

  

修改相应节点的配置文件

在节点上#grep^broker/oldboyedu/softwares/kafka/config/server.propertiesbroker.id=在节点上#grep^broker/oldboyedu/softwares/kafka/config/server.propertiesbroker.id=

  

编写kafka集群管理脚本

#安装ansibleyum-yinstallansible#配置ansiblecat/etc/ansible/hostsEOF[kafka]10.0.0.10.0.0.10.0.0.EOF#编写启动脚本cat/oldboyedu/softwares/kafka/bin/manager-kafka.shEOF#!/bin/bash#判断用户是否传参if[\$#-ne1];thenecho"无效参数,用法为:\$0{start

stop}"exitfi#获取用户输入的命令cmd=\$1for((i=;i=;i++));dotputsetaf2echo"******zk\${i}.oldboyedu.   

启动kafka集群

#使用kafka启动脚本来启动kafka集群manager-kafka.shstatusmanager-kafka.shstartmanager-kafka.shstatus

  

使用zkWeb工具查看kafka在zookeeper存储的数据信息

  相关znode功能说明如下:

  kafka集群topic管理

  

查看现有的topic名称

kafka-topics.sh--bootstrap-server10.0.0.:--list

  温馨提示:

  我们可以基于"--zookeeper"指令去zookeeper查询现有的topic信息;

  我们也可以基于"--bootstrap-server"指令去kafkabroker查询现有的topic信息,官方推荐使用"--bootstrap-server"的方式去管理topic

  

创建topic

#创建topic#注意,创建的副本数必须小于等于集群的数量!kafka-topics.sh--bootstrap-server10.0.0.:--create--topicdemo--partitions2--replication-factor3#查看创建结果kafka-topics.sh--bootstrap-server10.0.0.:--list

  温馨提示:

  

查看某个topic的详细信息

kafka-topics.sh--bootstrap-server10.0.0.:--describe--topicdemo

  温馨提示:

  如果不使用"--topic"指定topic的名称,则默认查看所有的topic信息哟

  

修改topic的信息

#修改topic的信息kafka-topics.sh--bootstrap-server10.0.0.:--alter--topicdemo--partitions8#查看topic的详细信息kafka-topics.sh--bootstrap-server10.0.0.:--describe--topicdemo#将分区数设置为5会失败,因为分区数只能扩容,无法缩容!kafka-topics.sh--bootstrap-server10.0.0.:--alter--topicdemo--partitions5

  

删除topic

kafka-topics.sh--bootstrap-server10.0.0.:--delete--topicdemokafka-topics.sh--bootstrap-server10.0.0.:--list

  kafka集群生产者管理

  

创建一个生产者往topic写入数据

kafka-console-producer.sh--topicmyNginx--broker-list10.0.0.:

  温馨提示:

  生产者创建成功后,我们可以就手动自定义写入测试数据,而后开启一个消费者进行数据消费,如果能正常获取数据,则说明集群启动是正常运行的

  kafka集群消费者管理

  

从头开始消费数据

kafka-console-consumer.sh--topicmyNginx--bootstrap-server10.0.0.:--from-beginning

  温馨提示:

  我们可以不使用"--from-beginning"参数,则有可能生产者要比消费者先启动一段时间,而且也将数据写入到kafka集群中了,但如果我们不想要在消费者启动时之前的数据可以不加该参数,如果使用该参数则表示从头进行消费

  

查看消费者组信息

kafka-consumer-groups.sh--bootstrap-server10.0.0.:--list

文章来源于:


转载请注明:http://www.aierlanlan.com/tzrz/4050.html