package com.ai.ipu.mq.kafka;

import com.ai.ipu.basic.log.ILogger;
import com.ai.ipu.basic.log.IpuLoggerFactory;
import com.ai.ipu.mq.AbstractMessageQueue;
import java.time.Duration;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

/* loaded from: input_file:com/ai/ipu/mq/kafka/AbstractKafka.class */
public abstract class AbstractKafka extends AbstractMessageQueue {
    private static final ILogger LOGGER = IpuLoggerFactory.createLogger(AbstractKafka.class);

    protected abstract void producer0(String str, String str2) throws Exception;

    protected abstract void consumer0(String str, String str2) throws Exception;

    public void consumer(String str) {
        KafkaConsumer<String, Object> consumer = KafkaManager.getConsumer(str);
        while (true) {
            try {
                ConsumerRecords poll = consumer.poll(Duration.ofSeconds(100L));
                LOGGER.debug(String.format("主题：%s, 记录数： %d", str, Integer.valueOf(poll.count())));
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    String obj = consumerRecord.value().toString();
                    LOGGER.debug(String.format("偏移量：%d， 值:%s", Long.valueOf(consumerRecord.offset()), obj));
                    consumer(str, obj);
                }
            } catch (WakeupException e) {
                consumer.commitSync();
                consumer.close();
                LOGGER.error("Closed consumer and we are done");
                return;
            } catch (Throwable th) {
                consumer.commitSync();
                consumer.close();
                LOGGER.error("Closed consumer and we are done");
                throw th;
            }
        }
    }
}
