package com.ai.aif.msgframe.common.resources;

import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.model.impl.BrokerModel;
import com.ai.aif.msgframe.common.model.impl.ContainerModel;
import com.ai.aif.msgframe.common.util.InetAddressUtil;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IdGenerator;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/common/resources/ActiveMQResources.class */
public class ActiveMQResources {
    private static final Logger log = LoggerFactory.getLogger(ActiveMQResources.class);
    private static volatile Map<String, ConnectionFactory> CONNECTION_FACTORY_MAP = new HashMap();
    private static volatile ConcurrentHashMap<String, List<Connection>> CONN_MAP = new ConcurrentHashMap<>();
    private static volatile ConcurrentHashMap<String, List<MqttClient>> MQTT_CONN_MAP = new ConcurrentHashMap<>();
    private final IdGenerator clientIdGenerator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ai/aif/msgframe/common/resources/ActiveMQResources$LazyHolder.class */
    public static class LazyHolder {
        private static final ActiveMQResources INSTANCE = new ActiveMQResources();

        private LazyHolder() {
        }
    }

    private ActiveMQResources() {
        this.clientIdGenerator = new IdGenerator();
    }

    private ConnectionFactory getConnectionFactory(String str) {
        if (!CONNECTION_FACTORY_MAP.containsKey(str)) {
            synchronized (CONNECTION_FACTORY_MAP) {
                if (!CONNECTION_FACTORY_MAP.containsKey(str)) {
                    if (str.contains("tcp://")) {
                        ConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, str);
                        activeMQConnectionFactory.setSendTimeout(3000);
                        activeMQConnectionFactory.setTrustAllPackages(true);
                        CONNECTION_FACTORY_MAP.put(str, activeMQConnectionFactory);
                    } else if (str.contains("amqp://")) {
                        CONNECTION_FACTORY_MAP.put(str, new JmsConnectionFactory(str));
                    } else if (str.contains("mqtt://")) {
                    }
                    log.info("创建连接工厂成功，连接工厂信息：" + str);
                }
            }
        }
        return CONNECTION_FACTORY_MAP.get(str);
    }

    public MqttClient getMQTTConnection(String str, String str2, BrokerModel brokerModel) throws JMSException {
        String str3 = str + "_" + str2;
        if (!MQTT_CONN_MAP.containsKey(str3)) {
            synchronized (MQTT_CONN_MAP) {
                if (!MQTT_CONN_MAP.containsKey(str3)) {
                    int poolSize = brokerModel.getParentModel().getCfg().getPoolSize();
                    CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
                    for (int i = 0; i < poolSize; i++) {
                        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
                        mqttConnectOptions.setCleanSession(true);
                        mqttConnectOptions.setConnectionTimeout(10);
                        mqttConnectOptions.setKeepAliveInterval(20);
                        try {
                            MqttClient mqttClient = new MqttClient(str2.replace("mqtt", "tcp"), "ip:" + InetAddressUtil.getIp() + "_containerId:" + InetAddressUtil.getHostName() + "_appId:" + InetAddressUtil.getAppId() + "_center:" + ContainerModel.getCfg().getName() + "_" + this.clientIdGenerator.generateId(), new MemoryPersistence());
                            mqttClient.connect(mqttConnectOptions);
                            log.error("创建连接成功，连接信息：" + brokerModel.getUrl());
                            copyOnWriteArrayList.add(mqttClient);
                        } catch (Exception e) {
                            log.error("创建至mq的mqtt连接出错!", e);
                        }
                    }
                    if (copyOnWriteArrayList.size() == 0) {
                        throw new JMSException("无法初始化连接池，请检broker地址是否可用");
                    }
                    MQTT_CONN_MAP.put(str3, copyOnWriteArrayList);
                }
            }
        }
        List<MqttClient> list = MQTT_CONN_MAP.get(str3);
        return list.get(Math.abs(brokerModel.hashCode()) % list.size());
    }

    public Connection getConnection(String str, String str2, BrokerModel brokerModel) throws JMSException {
        String str3 = str + "_" + str2;
        if (!CONN_MAP.containsKey(str3)) {
            synchronized (CONN_MAP) {
                if (!CONN_MAP.containsKey(str3)) {
                    CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
                    int poolSize = brokerModel.getParentModel().getCfg().getPoolSize();
                    for (int i = 0; i < poolSize; i++) {
                        Connection createConnection = getConnectionFactory(brokerModel.getUrl()).createConnection();
                        try {
                            createConnection.setClientID("ip:" + InetAddressUtil.getIp() + "_containerId:" + InetAddressUtil.getHostName() + "_appId:" + InetAddressUtil.getAppId() + "_center:" + ContainerModel.getCfg().getName() + "_" + this.clientIdGenerator.generateId());
                            createConnection.start();
                            log.error("创建连接成功，连接信息：" + brokerModel.getUrl());
                            copyOnWriteArrayList.add(createConnection);
                        } catch (Throwable th) {
                            createConnection.close();
                            log.error("创建连接失败，连接信息：" + brokerModel.getUrl(), th);
                            throw th;
                        }
                    }
                    if (copyOnWriteArrayList.size() == 0) {
                        throw new JMSException("无法初始化连接池，请检broker地址是否可用");
                    }
                    CONN_MAP.put(str3, copyOnWriteArrayList);
                }
            }
        }
        List<Connection> list = CONN_MAP.get(str3);
        return list.get(Math.abs(brokerModel.hashCode()) % list.size());
    }

    public void resetTranPortFailedConn(String str, String str2, BrokerModel brokerModel) throws JMSException {
        synchronized (CONN_MAP) {
            List<Connection> list = CONN_MAP.get(str + "_" + str2);
            if (list != null && list.size() > 0) {
                for (Connection connection : list) {
                    Connection createConnection = getConnectionFactory(brokerModel.getUrl()).createConnection();
                    try {
                        createConnection.start();
                        try {
                            connection.close();
                        } catch (Throwable th) {
                            log.error("关闭TransportFailed的连接时出现异常" + connection);
                        }
                        list.add(createConnection);
                        list.remove(connection);
                    } catch (Throwable th2) {
                        createConnection.close();
                        throw th2;
                    }
                }
            }
        }
    }

    public Session createSession(String str, String str2, BrokerModel brokerModel, boolean z) throws JMSException {
        return z ? getConnection(str, str2, brokerModel).createSession(Boolean.TRUE.booleanValue(), 1) : getConnection(str, str2, brokerModel).createSession(Boolean.FALSE.booleanValue(), 1);
    }

    public static final ActiveMQResources getInstance() {
        return LazyHolder.INSTANCE;
    }

    public boolean stopConnection(String str, String str2, String str3, BrokerModel brokerModel) {
        String str4 = str2 + "_" + str3;
        if (!CONN_MAP.containsKey(str4)) {
            return true;
        }
        try {
            for (Connection connection : CONN_MAP.get(str4)) {
                connection.stop();
                connection.close();
            }
            CONN_MAP.remove(str4);
            return true;
        } catch (JMSException e) {
            log.error("ActiveMQ连接信息停止失败", e);
            return true;
        }
    }

    public boolean stopConnection(String str) {
        if (!CONN_MAP.containsKey(str)) {
            return true;
        }
        try {
            for (Connection connection : CONN_MAP.get(str)) {
                connection.stop();
                connection.close();
            }
            CONN_MAP.remove(str);
            return true;
        } catch (JMSException e) {
            log.error("ActiveMQ连接信息停止失败", e);
            return true;
        }
    }

    public boolean clearConnResources() throws MsgFrameClientException {
        Iterator<List<Connection>> it = CONN_MAP.values().iterator();
        while (it.hasNext()) {
            try {
                for (Connection connection : it.next()) {
                    if (connection instanceof ActiveMQConnection) {
                        connection.stop();
                        connection.close();
                    }
                }
            } catch (Exception e) {
                log.error("ActiveMQ消费者停止失败", e);
            }
        }
        CONN_MAP.clear();
        return true;
    }

    public static ConcurrentHashMap<String, List<Connection>> getCONN_MAP() {
        return CONN_MAP;
    }
}
