public class RetryingAcknowledgingMessageListenerAdapter extends AbstractRetryingMessageListenerAdapter > implements AcknowledgingMessageListener { private final AcknowledgingMessageListener delegate; /** * Construct an instance with the provided template and delegate. The exception will * be thrown to the container after retries are exhausted. * @param messageListener the listener delegate. * @param retryTemplate the template. */ public RetryingAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener messageListener, RetryTemplate retryTemplate) { this(messageListener, retryTemplate, null); } /** * Construct an instance with the provided template, callback and delegate. * @param messageListener the listener delegate. * @param retryTemplate the template. * @param recoveryCallback the recovery callback; if null, the exception will be * thrown to the container after retries are exhausted. */ public RetryingAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener messageListener, RetryTemplate retryTemplate, RecoveryCallback recoveryCallback) { super(messageListener, retryTemplate, recoveryCallback); Assert.notNull(messageListener, "'messageListener' cannot be null"); this.delegate = messageListener; } @SuppressWarnings("unchecked") @Override public void onMessage(final ConsumerRecord record, final Acknowledgment acknowledgment) { getRetryTemplate().execute(new RetryCallback
public class RetryingMessageListenerAdapter extends AbstractRetryingMessageListenerAdapter > implements MessageListener { /** * Construct an instance with the provided template and delegate. The exception will * be thrown to the container after retries are exhausted. * @param messageListener the delegate listener. * @param retryTemplate the template. */ public RetryingMessageListenerAdapter(MessageListener messageListener, RetryTemplate retryTemplate) { this(messageListener, retryTemplate, null); } /** * Construct an instance with the provided template, callback and delegate. * @param messageListener the delegate listener. * @param retryTemplate the template. * @param recoveryCallback the recovery callback; if null, the exception will be * thrown to the container after retries are exhausted. */ public RetryingMessageListenerAdapter(MessageListener messageListener, RetryTemplate retryTemplate, RecoveryCallback recoveryCallback) { super(messageListener, retryTemplate, recoveryCallback); Assert.notNull(messageListener, "'messageListener' cannot be null"); } @SuppressWarnings("unchecked") @Override public void onMessage(final ConsumerRecord record) { getRetryTemplate().execute(new RetryCallback () { @Override public Void doWithRetry(RetryContext context) throws KafkaException { context.setAttribute(CONTEXT_RECORD, record); RetryingMessageListenerAdapter.this.delegate.onMessage(record); return null; } }, (RecoveryCallback ) getRecoveryCallback()); }}