Spring Kafka(十)ConsumerAwareErrorHandler异常处理器


异常处理

代码异常十之八九,十段代码九个bug,哈哈哈哈。平常程序异常我们使用try catch捕获异常,在catch方法中根据异常类型进行相关处理,既然我们可以使用try catch处理异常,那为什么还要使用ConsumerAwareErrorHandler异常处理器去处理异常呢?
首先,KafkaListener要做的事只是监听Topic中的数据并消费,如果在KafkaListener中还需要对异常进行处理则会显得代码块非常臃肿不利于维护,我们可以把异常处理的这些代码抽象出来,构造成一个异常处理器,KafkaListener中所抛出的异常都会经过ConsumerAwareErrorHandler异常处理器进行处理,这样就非常方便我们进行后期维护,比如后期更改异常处理业务的时候,只需要修改ConsumerAwareErrorHandler处理器就行了,而不需要KafkaListener的一堆代码中去修改代码。这也是一种思想的体现。


单消息消费异常处理器

这里主要就是注册一个ConsumerAwareListenerErrorHandler 类型的异常处理器,bean的注册默认使用的是方法名,所以我们将这个异常处理的BeanName放到@KafkaListener注解的errorHandler属性里面。当KafkaListener抛出异常的时候,则会自动调用异常处理器。

@Component
public class ErrorListener {

    private static final Logger log= LoggerFactory.getLogger(ErrorListener.class);

    @KafkaListener(id = "err", topics = "topic.quick.error", errorHandler = "consumerAwareErrorHandler")
    public void errorListener(String data) {
        log.info("topic.quick.error  receive : " + data);
        throw new RuntimeException("fail");
    }

    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {

            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
                return null;
            }
        };
    }

}


编写测试方法,发送一条消息到topic.quick.error中,运行测试方法后我们可以看到异常处理器已经能正常使用了。

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    public void testErrorHandler() {
        kafkaTemplate.send("topic.quick.error", "test error handle");
    }
2018-09-14 11:42:05.099  INFO 8912 --- [      err-0-C-1] com.viu.kafka.listen.ErrorListener       : topic.quick.error  receive : test error handle
2018-09-14 11:42:05.101  INFO 8912 --- [      err-0-C-1] com.viu.kafka.listen.ErrorListener       : consumerAwareErrorHandler receive : test error handle


批量消费异常处理器

批量消费代码也是差不多的,只不过传递过来的数据都是List集合方式,这里就不做其他代码的展示了。

    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {

            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
                MessageHeaders headers = message.getHeaders();
                List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
                List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
                List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
                Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
          
                return null;
            }
        };
    }


声明:海苔Blog|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - Spring Kafka(十)ConsumerAwareErrorHandler异常处理器


海苔胖胖是胖还是不胖呢