消息队列的常见问题
- 如何保证消息队列的高可用?
- 如何保证消息不被重复消费?
- 如何保证消息不丢失?
- 如何保证消息的消费顺序?
kafka怎么解决这些问题
1.kafka高可用
kafka可以搭建集群,生成者发送消息到topic,topic内的partition分为learder和多个follower,分别在不同的broker节点,leader所在的broker节点挂了,follower会自动升为leader与producer和consumer交互。数据完全一致。
2.避免消息重复消费
partition中每条消息都有一个连续的序列号叫offset,consumer消费消息时可以提交offset标记这条消息已经消费过,下次消费会找到这条消息的下一条开始消费。但是由于网络等原因,生产者发送消息和消费者消费消息都有极低的可能导致重复消费相同信息,所以跟使用其他消息队列时一样,最好还是在消费端使用数据库或者redis做幂等性处理。
3.如何保证消息不丢失
a.生产者丢失消息
生产者发送消息可以配置重试次数
可以同步发送,根据发送消息的结果做记录
b.MQ丢失消息
设置acks: all,保证所有副本都同步完数据落到磁盘再响应给producer。但是这样会影响性能,根据实际业务配置
扩展:partition同步数据
ISR:leader会维持一个与其保持同步的replica集合,该集合就是ISR,
:保证消费数据的一致性和副本数据一致性

c.消费者丢失消息
consumer消费消息时可手动或自动提交offset,如果由于异常或其他原因没有提交offset,consumer会重复消费10次。(在非批量消费模式下测试结果)
4.实现顺序消费
producer发送消息到固定的partition(上一章有讲具体开发代码)
consumer消费时不能批量消费
kafka实战开发
配置文件
spring:
kafka:
bootstrap-servers: ip:port
producer:
# # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
retries: 1
#一个批次可以使用的内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
acks: all
#事务id
#transaction-id-prefix: test-tran-
consumer:
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
auto-offset-reset: latest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
#手动提交时设置为此值
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
#手工ack,调用ack后立刻提交offset
ack-mode: manual_immediate
#容器运行的线程数
concurrency: 4
producer
配置文件中配置了事务属性,则代码里有需要加事务注解,配置事务后,如果第一次消息发送成功后出现异常,则第一次的消息也会被清除,不会被consumer消费
public class UserController {
private static final String TOPIC_NAME = "default_topic";
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@GetMapping("/api/v1/{num}")
@Transactional(rollbackFor = RuntimeException.class)
public void sendMessage1(@PathVariable("num") String num){
kafkaTemplate.send(TOPIC_NAME,"这是一个消息,num="+num).addCallback(success->{
String topic = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
System.err.println("发送成功:topic="+topic+", partition="+partition+",offset ="+offset);
},failure->{
System.err.println("发送消息失败:"+failure.getMessage());
});
}
/**
* 注解方式的事务
* @param num
*/
@GetMapping("/api/v1/tran1")
@Transactional(rollbackFor = RuntimeException.class)
public void sendMessage2(int num){
kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 1 i="+num);
if(num == 0){
throw new RuntimeException();
}
kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 2 i="+num);
}
}
consumer
@Component
public class MQListener {
@KafkaListener(topics = {"default_topic"},groupId = "test-gp2")
public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
System.err.println("消费消息:"+record.topic()+"----"+record.partition()+"----"+record.value());
ack.acknowledge();
}
}
暂无评论内容