博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊spring kafka的retry
阅读量:6231 次
发布时间:2019-06-21

本文共 7958 字,大约阅读时间需要 26 分钟。

本文主要聊一下spring for kafka的retry

AbstractRetryingMessageListenerAdapter

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/adapter/AbstractRetryingMessageListenerAdapter.java

主要有两个实现类RetryingAcknowledgingMessageListenerAdapter以及RetryingMessageListenerAdapter

RetryingAcknowledgingMessageListenerAdapter

public class RetryingAcknowledgingMessageListenerAdapter
extends AbstractRetryingMessageListenerAdapter
> implements AcknowledgingMessageListener
{ private final AcknowledgingMessageListener
delegate; /** * Construct an instance with the provided template and delegate. The exception will * be thrown to the container after retries are exhausted. * @param messageListener the listener delegate. * @param retryTemplate the template. */ public RetryingAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener
messageListener, RetryTemplate retryTemplate) { this(messageListener, retryTemplate, null); } /** * Construct an instance with the provided template, callback and delegate. * @param messageListener the listener delegate. * @param retryTemplate the template. * @param recoveryCallback the recovery callback; if null, the exception will be * thrown to the container after retries are exhausted. */ public RetryingAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener
messageListener, RetryTemplate retryTemplate, RecoveryCallback
recoveryCallback) { super(messageListener, retryTemplate, recoveryCallback); Assert.notNull(messageListener, "'messageListener' cannot be null"); this.delegate = messageListener; } @SuppressWarnings("unchecked") @Override public void onMessage(final ConsumerRecord
record, final Acknowledgment acknowledgment) { getRetryTemplate().execute(new RetryCallback
() { @Override public Void doWithRetry(RetryContext context) throws KafkaException { context.setAttribute(CONTEXT_RECORD, record); context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment); RetryingAcknowledgingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment); return null; } }, (RecoveryCallback
) getRecoveryCallback()); }}

RetryingMessageListenerAdapter

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapter.java

public class RetryingMessageListenerAdapter
extends AbstractRetryingMessageListenerAdapter
> implements MessageListener
{ /** * Construct an instance with the provided template and delegate. The exception will * be thrown to the container after retries are exhausted. * @param messageListener the delegate listener. * @param retryTemplate the template. */ public RetryingMessageListenerAdapter(MessageListener
messageListener, RetryTemplate retryTemplate) { this(messageListener, retryTemplate, null); } /** * Construct an instance with the provided template, callback and delegate. * @param messageListener the delegate listener. * @param retryTemplate the template. * @param recoveryCallback the recovery callback; if null, the exception will be * thrown to the container after retries are exhausted. */ public RetryingMessageListenerAdapter(MessageListener
messageListener, RetryTemplate retryTemplate, RecoveryCallback
recoveryCallback) { super(messageListener, retryTemplate, recoveryCallback); Assert.notNull(messageListener, "'messageListener' cannot be null"); } @SuppressWarnings("unchecked") @Override public void onMessage(final ConsumerRecord
record) { getRetryTemplate().execute(new RetryCallback
() { @Override public Void doWithRetry(RetryContext context) throws KafkaException { context.setAttribute(CONTEXT_RECORD, record); RetryingMessageListenerAdapter.this.delegate.onMessage(record); return null; } }, (RecoveryCallback
) getRecoveryCallback()); }}

具体是哪种listener呢

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {        Object messageListener = createMessageListener(container, messageConverter);        Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");        if (this.retryTemplate != null) {            if (messageListener instanceof AcknowledgingMessageListener) {                messageListener = new RetryingAcknowledgingMessageListenerAdapter<>(                        (AcknowledgingMessageListener
) messageListener, this.retryTemplate, this.recoveryCallback); } else { messageListener = new RetryingMessageListenerAdapter<>((MessageListener
) messageListener, this.retryTemplate, (RecoveryCallback
) this.recoveryCallback); } } if (this.recordFilterStrategy != null) { if (messageListener instanceof AcknowledgingMessageListener) { messageListener = new FilteringAcknowledgingMessageListenerAdapter<>( (AcknowledgingMessageListener
) messageListener, this.recordFilterStrategy, this.ackDiscarded); } else if (messageListener instanceof MessageListener) { messageListener = new FilteringMessageListenerAdapter<>((MessageListener
) messageListener, this.recordFilterStrategy); } else if (messageListener instanceof BatchAcknowledgingMessageListener) { messageListener = new FilteringBatchAcknowledgingMessageListenerAdapter<>( (BatchAcknowledgingMessageListener
) messageListener, this.recordFilterStrategy, this.ackDiscarded); } else if (messageListener instanceof BatchMessageListener) { messageListener = new FilteringBatchMessageListenerAdapter<>( (BatchMessageListener
) messageListener, this.recordFilterStrategy); } } container.setupMessageListener(messageListener); }

如果retryTemplate不为null的话,会先判断是不是AcknowledgingMessageListener的子类,如果是则创建RetryingAcknowledgingMessageListenerAdapter,如果不是则创建RetryingMessageListenerAdapter

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

protected MessagingMessageListenerAdapter
createMessageListenerInstance(MessageConverter messageConverter) { if (isBatchListener()) { BatchMessagingMessageListenerAdapter
messageListener = new BatchMessagingMessageListenerAdapter
( this.bean, this.method); if (messageConverter instanceof BatchMessageConverter) { messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter); } return messageListener; } else { RecordMessagingMessageListenerAdapter
messageListener = new RecordMessagingMessageListenerAdapter
(this.bean, this.method); if (messageConverter instanceof RecordMessageConverter) { messageListener.setMessageConverter((RecordMessageConverter) messageConverter); } return messageListener; } }

其中RecordMessagingMessageListenerAdapter实现了AcknowledgingMessageListener接口

public class RecordMessagingMessageListenerAdapter
extends MessagingMessageListenerAdapter
implements MessageListener
, AcknowledgingMessageListener
{ //......}

转载地址:http://npxna.baihongyu.com/

你可能感兴趣的文章
AOP 的利器:ASM 3.0 介绍
查看>>
Redis数据结构详解,五种数据结构分分钟掌握
查看>>
解决Linux下编译.sh文件报错 “[: XXXX: unexpected operator”
查看>>
使用JConsole监控
查看>>
开发规范
查看>>
RAID技术
查看>>
excel 使用 navicat 导入数据库
查看>>
我的友情链接
查看>>
我的大学——我在科创协会的部长感悟
查看>>
数据结构之队列——顺序存储结构(php代码实现——方法一)
查看>>
Hive安装使用
查看>>
JDK 11的新特性
查看>>
MySQL优化20条经验(一)
查看>>
Linux修改时区
查看>>
ubuntu之R攻略
查看>>
《跟阿铭学Linux》第11章 正则表达式:课后习题与答案
查看>>
[软考]挣值管理EVM详细解释及应用,实例讲解收集(信息系统项目管理师-成本管理)...
查看>>
业内人士详述SIEM建设的演进过程
查看>>
数据中心的重要服务器如何保护?
查看>>
Linux 用户的 3 个命令行小技巧
查看>>