kafka局部调优

kafka常用命令
        cd /opt/kafka/kafka/bin/
        ##启动ZK
        ./zookeeper-server-start.sh -daemon /opt/kafka/kafka/config/zookeeper.properties  
        ##启动kafka
        ./kafka-server-start.sh -daemon /opt/kafka/kafka/config/server.properties

        ##删除topics
        ./kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic events
        ##创建topics  一个分区  单个副本
        ./kafka-topics.sh --create --topic events --bootstrap-server localhost:9092  --partitions 1 --replication-fator 3
        ##desc topics
        ./kafka-topics.sh --list --bootstrap-server localhost:9092
        ./kafka-topics.sh --describe --topic events --bootstrap-server localhost:9092
        ##分区数只能增加,不能减少
        ./kafka-topics.sh --alter --topic events --bootstrap-server localhost:9092 --partitions 1 



        ##创建命令行生产者
        ./kafka-console-producer.sh --topic events --bootstrap-server localhost:9092
        ##打印topic 增量消费
        ./kafka-console-consumer.sh --topic events --from-beginning --bootstrap-server localhost:9092
硬件
        服务器台数=2*(生产者峰值(20M/s)*副本数/100)+1
        磁盘:kafka顺序读写(固态和机械差不多) 100g*2个副本*3天/0.7 =1T
        内存:kafka内存=堆内存(10-15g)+页缓存(segment=1g 默认) 
            (分区数*1g*25%)/服务器台数
        jmap -heap 2321 查看内存堆栈信息
        CPU:num.io.threads=8(总cpu 50% 写磁盘的线程数)  
            num.replica.fetchers=1(1/3*总cpu50% 副本拉取线程数 )
            num.network.threads=3(2/3*总cpu50% 数据传输线程数 )  
            总CPU32建议kafka设置24个给系统预留8个
生产者参数
    read-only(必须重启) per-broker(动态针对单个broker节点) 
    cluster-wide(动态针对集群的节点)

        val ioproperties = new Properties()
        //批次大小
        ioproperties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")
        ioproperties.put(ProducerConfig.LINGER_MS_CONFIG,"70")//linger.ms
        //压缩 gzip snappy lz4 zstd
        ioproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy")
        //缓冲区大小
        ioproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432")
        //数据可靠性分区副本>=2
        ioproperties.put(ProducerConfig.ACKS_CONFIG,"-1")
        //重试次数为Int最大值  设置小一点
        ioproperties.put(ProducerConfig.RETRIES_CONFIG,"5")
        //幂等性(去除数据重复)
        ioproperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
boker
        replica.lag.time.max.ms=30s Leader未收到Follower的消息时间
        auto.leader.rebalance.enable=false 自动平衡,建议关闭
        leader.imbalance.per.broker.percentage=10//不平衡的比率   一般不开启
        leader.imbalance.check.interval.seconds=300s  负载均衡检查时间
        log.segment.bytes=1log日志大小
        log.index.interval.bytes=4K 每4k大小数据创建一个索引
        log.retention.hours 默认7天    log.retention.check.interval.ms 检查周期5分钟
        //delete策略和compact策略
        log.cleanup.policy=delete  最大时间戳作为该文件的最大时间戳
        log.cleanup.policy=compact 
            //对于相同key的不同value,只保留最后一个版本。(适用于用户信息等)
        num.io.threads=8 //写磁盘的线程数
        num.replica.fetchers=1 //副本拉取线程数
        num.network.threads //数据传输线程数
        log.flush.interval.message //强制页缓存刷写到磁盘的条数
        log.flush.interval.ms      //每隔多久刷写一次
消费者
        enable.auto.commit=true //自动提交偏移量
        auto.commit.interval.ms=5s  //提交偏移量的频率
        auto.offset.reset=5s  earliest:自动将偏移量重置为最早的偏移量 latest:最新的偏移量
            none:如果未找到先前偏移量,则消费者抛出异常
        offsets.topic.num.partitions=50 //默认50个分区
        heartbeat.interval.ms=3s //默认心跳时间
        session.timeout.ms=45s //消费者和coordinator 之间的超时时间
        max.poll.interval.ms=5//消费者处理消息的最大时长
        fetch.max.bytes=52428800(50m)//消费者获取服务器段数据最大字节数
        max.poll.records=500 //一次消费拉取最大条数

kafka总体调优

吞吐量
        生产者吞吐:
            ioproperties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")//批次大小  
            ioproperties.put(ProducerConfig.LINGER_MS_CONFIG,"70")//linger.ms

            //压缩 gzip snappy lz4 
            zstdioproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy")
            ioproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432")//缓冲区大小
        增加分区
        消费者吞吐:
            fetch.max.bytes=52428800(50m)//消费者获取服务器段数据最大字节数
            max.poll.records=500 //一次消费拉取最大条数
精确一次
        生产者:ack=-1、幂等性+事务
        broker:分区副本数>=2  ISR里应答>=2
        消费者:事务+手动offset  消费者输出必须支持事务
设置分区
        (1)创建一个分区压测。  目标吞吐量/min(生产者,消费者)(一般3-10个)
单条日志过大(1M以上)
        message.max.bytes=1//单条日志最大值
        max.request.size=1//生产者往broker请求的最大值。针对topic级别设置的消息体大小
        replica.fetch.max.bytes=1//副本同步数据,每批次最大值
        fetch.max.bytes=50//消费者获取服务器端一批数据最大值
压测
        生产者压测脚本;kafka-producer-perf-test.sh 
        ./kafka-producer-perf-test.sh --topic events  --record-size 1024 
            --num-records 1000000 --throughput 10000 
            --producer-props bootstrap-server=localhost:9092 
               batch.size=16384 linger.ms=0 compression.type=snappy 
               buffer.memory=67108864
        ##throughput:每秒多少条,-1不限流    

        消费者压测脚本;kafka-consumer-perf-test.sh 
        ./kafka-consumer-perf-test.sh --topic events 
            bootstrap-server=localhost:9092   
            --consumer-props max.poll.records=500 
                fetch.max.bytes=50m --messages 100000
        ##messages:需要消费的条数

原文地址:http://www.cnblogs.com/wuxiaolong4/p/16800575.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性