package com.ai.aif.msgframe.common.model.impl;

import com.ai.aif.msgframe.common.ConsumerModel;
import com.ai.aif.msgframe.common.ProducerModel;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.health.StateHandler;
import com.ai.aif.msgframe.common.model.IMsgframeModel;
import com.ai.aif.msgframe.common.route.ICanaryRule;
import com.ai.aif.msgframe.common.route.IDestinationRule;
import com.ai.aif.msgframe.common.route.impl.DefaultCanaryRule;
import com.ai.aif.msgframe.common.route.impl.DestinationInfo;
import com.ai.aif.msgframe.common.thread.ActivemqReconnectPoolTask;
import com.ai.aif.msgframe.common.thread.ThreadFactoryImpl;
import com.asiainfo.msgframe.Clusters;
import com.asiainfo.msgframe.Destination;
import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.JMSException;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/common/model/impl/BrokerModel.class */
public class BrokerModel extends StateHandler {
    private String url;
    private String user;
    private String password;
    private ClusterModel parentModel;
    private List<ProducerModel> producers = new ArrayList();
    private List<ConsumerModel> consumers = new ArrayList();
    private Runnable subscribeTaskAfterConnect = null;
    private static final Logger logger = LoggerFactory.getLogger(BrokerModel.class);
    private static ExecutorService MQ_RECONNECT_POOL = Executors.newFixedThreadPool(4, new ThreadFactoryImpl("Check_DownBroker"));

    public BrokerModel(ClusterModel clusterModel, Clusters.Cluster.Url url) throws MsgFrameClientException {
        this.url = url.getStringValue();
        this.user = url.getUser();
        this.password = url.getPassword();
        this.parentModel = clusterModel;
        try {
            init();
        } catch (InvocationTargetException e) {
            logger.error("创建生产者和消费者连接失败，broker=" + this, e);
            if (this.parentModel.getCfg().getType().toString().toLowerCase().equals("activemq") && (e.getTargetException() instanceof JMSException)) {
                if ((e.getTargetException().getCause() instanceof ConnectException) || (e.getTargetException().getCause() instanceof TransportDisposedIOException) || (e.getTargetException().getCause() instanceof SocketTimeoutException)) {
                    logger.error("mq=activemq时，启动异步线程每5秒尝试重新连接，broker=" + this);
                    MQ_RECONNECT_POOL.execute(new ActivemqReconnectPoolTask(this));
                }
            }
        }
    }

    public void init() throws InvocationTargetException {
        Destination cfg = this.parentModel.getParentModel().getCfg();
        String str = this.parentModel.getCfg().getType().toString();
        if (str.equals("activemq")) {
            str = "ActiveMQ";
        }
        String str2 = "com.ai.aif.msgframe.producer.mq." + str.toLowerCase() + "." + str + "ProducerModel";
        String str3 = "com.ai.aif.msgframe.consumer.mq." + str.toLowerCase() + "." + str + "ConsumerModel";
        try {
            Class<?> cls = Class.forName(str2);
            Class<?> cls2 = Class.forName(str3);
            for (DestinationInfo destinationInfo : ((ICanaryRule) DefaultCanaryRule.getCanaryRuleImplClass().newInstance()).makeDestinations(this.parentModel.getParentModel(), ((IDestinationRule) Class.forName(cfg.getRuleClass()).newInstance()).makeDestinations(this.parentModel.getParentModel()))) {
                ProducerModel producerModel = (ProducerModel) cls.getConstructor(BrokerModel.class, DestinationInfo.class).newInstance(this, destinationInfo);
                ConsumerModel consumerModel = (ConsumerModel) cls2.getConstructor(BrokerModel.class, DestinationInfo.class).newInstance(this, destinationInfo);
                this.producers.add(producerModel);
                this.consumers.add(consumerModel);
            }
        } catch (InvocationTargetException e) {
            throw e;
        } catch (Exception e2) {
            logger.error("创建生产者和消费者连接失败", e2);
        }
    }

    public void registerSubscribeTask(Runnable runnable) {
        this.subscribeTaskAfterConnect = runnable;
    }

    public void runSubscribeTask() {
        if (this.subscribeTaskAfterConnect != null) {
            this.subscribeTaskAfterConnect.run();
            this.subscribeTaskAfterConnect = null;
        }
    }

    public static ExecutorService getMqReconnectPool() {
        return MQ_RECONNECT_POOL;
    }

    public List<ProducerModel> getProducers() {
        return this.producers;
    }

    public List<ConsumerModel> getConsumers() {
        return this.consumers;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutDownPools() {
        if (MQ_RECONNECT_POOL == null || MQ_RECONNECT_POOL.isShutdown()) {
            return;
        }
        try {
            MQ_RECONNECT_POOL.shutdownNow();
            MQ_RECONNECT_POOL = null;
        } catch (Exception e) {
            logger.error("ActiveMQ重试线程池关闭异常", e);
        }
    }

    @Override // com.ai.aif.msgframe.common.health.StateHandler
    protected void handleDisconnection(IMsgframeModel iMsgframeModel) {
        if (!(iMsgframeModel instanceof ProducerModel) || !this.producers.contains(iMsgframeModel)) {
            this.consumers.remove(iMsgframeModel);
            return;
        }
        this.producers.remove(iMsgframeModel);
        if (this.producers.size() == 0) {
            this.parentModel.handleDisconnection(this);
        }
    }

    @Override // com.ai.aif.msgframe.common.health.StateHandler
    protected void handleReconnect(IMsgframeModel iMsgframeModel) {
        if (!(iMsgframeModel instanceof ProducerModel)) {
            this.consumers.add((ConsumerModel) iMsgframeModel);
        } else {
            if (this.producers.contains(iMsgframeModel)) {
                return;
            }
            this.producers.add((ProducerModel) iMsgframeModel);
            this.parentModel.handleReconnect(this);
        }
    }

    public String getUrl() {
        return this.url;
    }

    public ClusterModel getParentModel() {
        return this.parentModel;
    }

    public String getUser() {
        return this.user;
    }

    public String getPassword() {
        return this.password;
    }

    public ProducerModel findProducer(String str, int i) throws MsgFrameClientException {
        ProducerModel producerModel = null;
        for (ProducerModel producerModel2 : this.producers) {
            if (str.equals(producerModel2.getTag()) && producerModel2.getIndex() == i) {
                producerModel = producerModel2;
            }
        }
        if (producerModel != null) {
            return producerModel;
        }
        logger.error("找不到Broker：" + toString() + "下的" + toString() + " tag:" + str + " index:" + i + "生产者信息");
        throw new MsgFrameClientException("找不到Broker：" + toString() + "下的" + toString() + " tag:" + str + " index:" + i + "生产者信息");
    }

    public String toString() {
        return getParentModel().toString() + " [broker url]" + this.url;
    }

    public void findConsumerModel(List<ConsumerModel> list, String... strArr) throws MsgFrameClientException {
        for (String str : strArr) {
            int size = list.size();
            for (ConsumerModel consumerModel : this.consumers) {
                if ("*".equals(str.trim())) {
                    list.add(consumerModel);
                } else if (consumerModel.getTag().equals(str.trim())) {
                    list.add(consumerModel);
                }
            }
            if (size == list.size()) {
                logger.error(getParentModel().getParentModel().getSubject() + "主题找不到单个过滤条件" + str);
                throw new MsgFrameClientException(getParentModel().getParentModel().getSubject() + "主题找不到单个过滤条件" + str);
            }
        }
    }

    public void findConsumerModel(List<ConsumerModel> list, List<String> list2) throws MsgFrameClientException {
        int size = list.size();
        for (ConsumerModel consumerModel : this.consumers) {
            if (list2.contains(consumerModel.getRealQueue())) {
                list.add(consumerModel);
            }
        }
        if (size == list.size()) {
            logger.error(getParentModel().getParentModel().getSubject() + "主题找不到单个过滤条件,physicalQueue=" + list2);
            throw new MsgFrameClientException(getParentModel().getParentModel().getSubject() + "主题找不到单个过滤条件,physicalQueue=" + list2);
        }
    }
}
