基本概念

Broker

每个Broker相当于一个服务器,多个Broker构成了一个kafka集群

Topic

主题做消息分类,一个Broker可以包含多个Topic

Partition

分区,一个Topic包含多个分区,分区有leader和follower的区分,由于follower一般起到备份的作用,所以leader和follower一般不在同一台服务器上

kafka架构

0.9之前,offset消费偏移量存储在zk中,0.9之后存储在本地

三个特征

1、一个分区只能被一个消费值组内的一个消费者消费,如果一个消费者组中有消费者消费了该分区,那么同一消费者组中的其他消费者不可以再消费该分区(基于此,消费者组中的消费者个数不应大于分区数)
2、消费者消费消息是以分区为单位的,一次消费一个分区
3、同一个消费者组内,同一时刻只能有一个消费者消费消息

kafka消息写入

写入消息时,有key、partition、value三个参数,
如果只指定value,消息会轮询写入分区,
如果指定key,则会根据key值哈希写入对应的分区

写入流程

分区副本

配置:default.replication.factor=N
此配置可以指定分区的副本(follower)的数量,producer和consumer只与leader分区进行交互

api

生产者

public class KafkaProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka服务端的主机名和端口号
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //等待所有副本节点应答
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        //发送消息重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        //一批消息处理大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //请求延时
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //发送缓存区内存大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        // public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
        ProducerRecord record =
                new ProducerRecord<String, Object>("testInfoTopic", null, System.currentTimeMillis(), null,
                        "value");
        kafkaProducer.send(record);

        /**
         * 回调发送
         */
        kafkaProducer.send(record, (metadata, exception) -> {
            if (metadata != null) {
                System.err.println(metadata.partition() + " : " + metadata.offset());
            }
        });
        kafkaProducer.close();

    }
}

消费者

public class KafkaConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka服务端的主机名和端口号
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        //是否自动确认offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //自动确认offset的时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        //定义consumer
        KafkaConsumer<Object, Object> kafkaConsumer = new KafkaConsumer<>(properties);
        //订阅topic
        kafkaConsumer.subscribe(Arrays.asList("testInfoTopic"));

        while (true) {
            //每隔100ms读取一次数据
            ConsumerRecords<Object, Object> poll = kafkaConsumer.poll(100);
            //读取的是一批数据,需要遍历
            for (ConsumerRecord<Object, Object> record : poll) {
                System.err.println(record.offset() + "--" + record.key() + "--" + record.value());
            }
        }
    }
}

原文地址:http://www.cnblogs.com/MorningBell/p/16909999.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性