package com.ai.aif.msgframe.consumer.mq.kafka.api;

import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.consumer.mq.ISubscribeCallBack;
import com.ai.aif.msgframe.consumer.mq.kafka.KafkaConsumerModel;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.jsoup.helper.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/consumer/mq/kafka/api/KafkaResources.class */
public class KafkaResources {
    private static final Logger log = LoggerFactory.getLogger(KafkaResources.class);
    private volatile ConcurrentMap<String, KafkaConsumer> PUSH_CONSUMER_MAP = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ai/aif/msgframe/consumer/mq/kafka/api/KafkaResources$LazyHolder.class */
    public static class LazyHolder {
        private static final KafkaResources INSTANCE = new KafkaResources();

        private LazyHolder() {
        }
    }

    public static final KafkaResources getInstance() {
        return LazyHolder.INSTANCE;
    }

    public void startPushConsumer(KafkaConsumerModel kafkaConsumerModel, String str, ISubscribeCallBack<KafkaConsumer> iSubscribeCallBack) {
        String str2 = "CONSUMER_" + kafkaConsumerModel.getSubject().toUpperCase() + "_" + str.replace("*", "ALL").toUpperCase() + "_" + kafkaConsumerModel.getClusterName().toUpperCase() + "_GROUP";
        if (this.PUSH_CONSUMER_MAP.containsKey(str2)) {
            return;
        }
        synchronized (this.PUSH_CONSUMER_MAP) {
            if (!this.PUSH_CONSUMER_MAP.containsKey(str2)) {
                Properties properties = new Properties();
                properties.put("bootstrap.servers", kafkaConsumerModel.getUrl());
                properties.put("group.id", str2);
                properties.put("enable.auto.commit", "true");
                properties.put("auto.commit.interval.ms", "3000");
                properties.put("session.timeout.ms", "30000");
                properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                if (!StringUtil.isBlank(kafkaConsumerModel.getUser()) && !StringUtil.isBlank(kafkaConsumerModel.getPassword()) && !"admin".equalsIgnoreCase(kafkaConsumerModel.getUser()) && !"admin".equalsIgnoreCase(kafkaConsumerModel.getPassword())) {
                    properties.setProperty("sasl.mechanism", "PLAIN");
                    properties.setProperty("security.protocol", "SASL_PLAINTEXT");
                    StringBuffer stringBuffer = new StringBuffer();
                    stringBuffer.append("org.apache.kafka.common.security.plain.PlainLoginModule required ").append("username=").append(kafkaConsumerModel.getUser()).append(" ").append("password=").append(kafkaConsumerModel.getPassword()).append(";");
                    properties.setProperty("sasl.jaas.config", stringBuffer.toString());
                }
                KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                log.info("创建consumer :" + properties.getProperty("group.id"));
                this.PUSH_CONSUMER_MAP.put(str2, kafkaConsumer);
                iSubscribeCallBack.startSubscribe(kafkaConsumer);
            }
        }
    }

    public KafkaConsumer getConsumer(KafkaConsumerModel kafkaConsumerModel) {
        int index = kafkaConsumerModel.getDestinationInfo().getIndex();
        String str = "CONSUMER_" + kafkaConsumerModel.getSubject().toUpperCase() + "_" + kafkaConsumerModel.getTag().replace("*", "ALL").toUpperCase() + "_" + kafkaConsumerModel.getClusterName().toUpperCase() + "_GROUP";
        String str2 = str + "_" + index;
        if (!this.PUSH_CONSUMER_MAP.containsKey(str2)) {
            synchronized (this.PUSH_CONSUMER_MAP) {
                if (!this.PUSH_CONSUMER_MAP.containsKey(str2)) {
                    Properties properties = new Properties();
                    properties.put("bootstrap.servers", kafkaConsumerModel.getUrl());
                    properties.put("group.id", str);
                    properties.put("enable.auto.commit", "true");
                    properties.put("auto.commit.interval.ms", "3000");
                    properties.put("session.timeout.ms", "30000");
                    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                    properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                    if (!StringUtil.isBlank(kafkaConsumerModel.getUser()) && !StringUtil.isBlank(kafkaConsumerModel.getPassword()) && !"admin".equalsIgnoreCase(kafkaConsumerModel.getUser()) && !"admin".equalsIgnoreCase(kafkaConsumerModel.getPassword())) {
                        properties.setProperty("sasl.mechanism", "PLAIN");
                        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
                        StringBuffer stringBuffer = new StringBuffer();
                        stringBuffer.append("org.apache.kafka.common.security.plain.PlainLoginModule required ").append("username=").append(kafkaConsumerModel.getUser()).append(" ").append("password=").append(kafkaConsumerModel.getPassword()).append(";");
                        properties.setProperty("sasl.jaas.config", stringBuffer.toString());
                    }
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                    log.info("创建consumer :" + properties.getProperty("group.id"));
                    this.PUSH_CONSUMER_MAP.put(str2, kafkaConsumer);
                }
            }
        }
        return this.PUSH_CONSUMER_MAP.get(str2);
    }

    public boolean clearConnResources() throws MsgFrameClientException {
        for (KafkaConsumer kafkaConsumer : this.PUSH_CONSUMER_MAP.values()) {
            try {
                if (kafkaConsumer instanceof KafkaConsumer) {
                    kafkaConsumer.close();
                }
            } catch (Exception e) {
                log.error("kafka消费者停止失败", e);
            }
        }
        this.PUSH_CONSUMER_MAP.clear();
        return true;
    }
}
