package com.ai.aif.msgframe.consumer.mq.kafka;

import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.util.MessageCovertUtil;
import com.ai.aif.msgframe.consumer.facade.IConsumerProcessor;
import com.ai.aif.msgframe.consumer.mq.AConsumerProviderModel;
import com.ai.aif.msgframe.consumer.mq.PullConsumerScheduleService;
import com.asiainfo.msgframe.Subscribes;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/consumer/mq/kafka/KafkaPullConsumerScheduleService.class */
public class KafkaPullConsumerScheduleService extends PullConsumerScheduleService {
    private static final Logger log = LoggerFactory.getLogger(KafkaPullConsumerScheduleService.class);
    private KafkaConsumer messageConsumer;

    public KafkaPullConsumerScheduleService(KafkaConsumer kafkaConsumer, AConsumerProviderModel aConsumerProviderModel, String str, Subscribes.Subscribe subscribe) {
        super(aConsumerProviderModel, str, subscribe);
        this.messageConsumer = kafkaConsumer;
    }

    @Override // com.ai.aif.msgframe.consumer.mq.PullConsumerScheduleService
    protected Runnable createTask() {
        return new Runnable() { // from class: com.ai.aif.msgframe.consumer.mq.kafka.KafkaPullConsumerScheduleService.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    KafkaPullConsumerScheduleService.this.messageConsumer.subscribe(Arrays.asList(KafkaPullConsumerScheduleService.this.getSubjectName()));
                    while (true) {
                        if (KafkaPullConsumerScheduleService.log.isDebugEnabled()) {
                            KafkaPullConsumerScheduleService.log.debug("开始拉取数据," + KafkaPullConsumerScheduleService.this.getModel().toString());
                        }
                        ConsumerRecords poll = KafkaPullConsumerScheduleService.this.messageConsumer.poll(5000L);
                        if (poll != null) {
                            Iterator it = poll.iterator();
                            while (it.hasNext()) {
                                MsgFMessage transKafkaMessage = MessageCovertUtil.transKafkaMessage((byte[]) ((ConsumerRecord) it.next()).value());
                                IConsumerProcessor[] consumerProcessor = KafkaPullConsumerScheduleService.this.getConsumerProcessor();
                                if (null == consumerProcessor || consumerProcessor.length <= 0) {
                                    KafkaPullConsumerScheduleService.this.getModel().processMsg(transKafkaMessage, (String) null, (String) null, KafkaPullConsumerScheduleService.this.getSubclass());
                                } else {
                                    KafkaPullConsumerScheduleService.this.getModel().processMsg(transKafkaMessage, (String) null, (String) null, consumerProcessor);
                                }
                            }
                        } else if (KafkaPullConsumerScheduleService.log.isDebugEnabled()) {
                            KafkaPullConsumerScheduleService.log.debug("等待5秒后没有拉取到数据");
                        }
                    }
                } catch (Exception e) {
                    KafkaPullConsumerScheduleService.log.error("订阅异常", e);
                }
            }
        };
    }
}
