Kafka快速学习三进一步了解及实战开发

消息队列的常见问题

  1. 如何保证消息队列的高可用?
  2. 如何保证消息不被重复消费?
  3. 如何保证消息不丢失?
  4. 如何保证消息的消费顺序?

kafka怎么解决这些问题

1.kafka高可用

    kafka可以搭建集群,生成者发送消息到topic,topic内的partition分为learder和多个follower,分别在不同的broker节点,leader所在的broker节点挂了,follower会自动升为leader与producer和consumer交互。数据完全一致。

2.避免消息重复消费

partition中每条消息都有一个连续的序列号叫offsetconsumer消费消息时可以提交offset标记这条消息已经消费过,下次消费会找到这条消息的下一条开始消费。但是由于网络等原因,生产者发送消息和消费者消费消息都有极低的可能导致重复消费相同信息,所以跟使用其他消息队列时一样,最好还是在消费端使用数据库或者redis做幂等性处理。

3.如何保证消息不丢失

a.生产者丢失消息

生产者发送消息可以配置重试次数

可以同步发送,根据发送消息的结果做记录

b.MQ丢失消息

设置acks: all,保证所有副本都同步完数据落到磁盘再响应给producer。但是这样会影响性能,根据实际业务配置

扩展:partition同步数据

ISR:leader会维持一个与其保持同步的replica集合,该集合就是ISR,

HighWatermark
:保证消费数据的一致性和副本数据一致性
Follower发送故障
follower故障后会被临时剔除ISR,等该follower恢复后,会读取本地磁盘记录的上次的HW,并将该log文件高于HW的部分截取掉,从HW开始向Leader进行同步,等该follower的LEO大于等于该partition的HW,即follower追上leader后,就可以重新加入ISR
Leader发生故障
leader发生故障后,会从ISR中选取一个新的leader,为了保证多个副本之间数据一致性,其余的follower会先将各自的log文件高于HW的部分截取掉,(新leader自己不会截取掉),然后从新的leader同步数据。
Kafka快速学习三进一步了解及实战开发

c.消费者丢失消息

consumer消费消息时可手动或自动提交offset,如果由于异常或其他原因没有提交offset,consumer会重复消费10次。(在非批量消费模式下测试结果)

4.实现顺序消费

producer发送消息到固定的partition(上一章有讲具体开发代码)

consumer消费时不能批量消费

kafka实战开发

配置文件

spring:
  kafka:
    bootstrap-servers: ip:port

    producer:
      # # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
      retries: 1
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
      acks: all
      #事务id
      #transaction-id-prefix: test-tran-

    consumer:
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: latest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      #手动提交时设置为此值
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    listener:
      #手工ack,调用ack后立刻提交offset
      ack-mode: manual_immediate
      #容器运行的线程数
      concurrency: 4

producer

配置文件中配置了事务属性,则代码里有需要加事务注解,配置事务后,如果第一次消息发送成功后出现异常,则第一次的消息也会被清除,不会被consumer消费

public class UserController  {

    private static final String TOPIC_NAME = "default_topic";

    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    @GetMapping("/api/v1/{num}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessage1(@PathVariable("num") String num){
        kafkaTemplate.send(TOPIC_NAME,"这是一个消息,num="+num).addCallback(success->{
            String topic = success.getRecordMetadata().topic();
            int partition = success.getRecordMetadata().partition();
            long offset = success.getRecordMetadata().offset();
            System.err.println("发送成功:topic="+topic+", partition="+partition+",offset ="+offset);
        },failure->{
            System.err.println("发送消息失败:"+failure.getMessage());
        });

    }


    /**
     * 注解方式的事务
     * @param num
     */
    @GetMapping("/api/v1/tran1")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessage2(int num){
        kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 1 i="+num);
        if(num == 0){
            throw new RuntimeException();
        }
        kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 2 i="+num);
    }
}

consumer

@Component
public class MQListener {
    
    @KafkaListener(topics = {"default_topic"},groupId = "test-gp2")
    public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        System.err.println("消费消息:"+record.topic()+"----"+record.partition()+"----"+record.value());
        ack.acknowledge();
    }
}

© 版权声明
THE END
喜欢就支持一下吧
点赞12 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容