package com.ai.aif.msgframe.consumer;

import com.ai.aif.msgframe.common.ConsumerModel;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.model.impl.BrokerModel;
import com.ai.aif.msgframe.common.model.impl.ContainerModel;
import com.ai.aif.msgframe.common.model.impl.SubjectModel;
import com.ai.aif.msgframe.common.scribe.SubscribeParam;
import com.ai.aif.msgframe.common.util.AmberUtil;
import com.ai.aif.msgframe.common.util.StringUtils;
import com.ai.aif.msgframe.consumer.facade.IConsumerProcessor;
import com.asiainfo.msgframe.Subscribes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/consumer/MfConsumerClient.class */
public class MfConsumerClient {
    private static final Logger log = LoggerFactory.getLogger(MfConsumerClient.class);
    private static final ContainerModel CONTAINER = ContainerModel.getInstance();
    private static volatile Map<String, Set<String>> SUBSCRIBE_MAP = new HashMap();
    private static List<ConsumerModel> modelList = new ArrayList();

    public static void subscribe(String str, String str2, String... strArr) {
        subscribe(str, str2, null, null, strArr);
    }

    public static void subscribe(String str, String str2, Subscribes.Subscribe subscribe, List<String> list, String... strArr) {
        AmberUtil.getIsconsumed().compareAndSet(false, true);
        if (StringUtils.isEmpty(str2)) {
            str2 = "*";
        }
        log.info("开始订阅主题，主题名称：" + str + " 主题标记" + str2);
        try {
            checkSubscribe(str, str2);
            SubjectModel findDestination = CONTAINER.findDestination(str);
            SubscribeParam subscribeParam = new SubscribeParam();
            subscribeParam.setSubject(str);
            subscribeParam.setRealQueues(list);
            subscribeParam.setSubclass(strArr);
            subscribeParam.setScribe(subscribe);
            subscribeParam.setTag(str2);
            checkFailbrokers(subscribeParam, findDestination);
            List<ConsumerModel> canaryFilter = (null == list || list.isEmpty()) ? canaryFilter(findDestination.findConsumerModel(str2.split("\\|\\|"))) : findDestination.findConsumerModel(list);
            if (null == canaryFilter || canaryFilter.isEmpty()) {
                log.error("警告：找不到主题的订阅信息，destination={},tags={},canary={}", new Object[]{str, str2, getCanaryName()});
            } else {
                if (null == subscribe) {
                    ContainerModel containerModel = CONTAINER;
                    Subscribes.Subscribe[] subscribeArray = ContainerModel.getCfg().getSubscribes().getSubscribeArray();
                    int length = subscribeArray.length;
                    int i = 0;
                    while (true) {
                        if (i >= length) {
                            break;
                        }
                        Subscribes.Subscribe subscribe2 = subscribeArray[i];
                        if (subscribe2.getSubDestination().equals(str)) {
                            subscribe = subscribe2;
                            break;
                        }
                        i++;
                    }
                }
                if (null != subscribe) {
                    if (subscribe.getConsumeType().toString().equals("pull")) {
                        for (ConsumerModel consumerModel : canaryFilter) {
                            consumerModel.pullSubscribe(str2, subscribe, strArr);
                            modelList.add(consumerModel);
                        }
                    } else {
                        for (ConsumerModel consumerModel2 : canaryFilter) {
                            consumerModel2.pushSubscribe(str2, subscribe, strArr);
                            modelList.add(consumerModel2);
                        }
                    }
                    log.info("订阅主题成功，destination={},tags={},canary={}", new Object[]{str, str2, getCanaryName()});
                } else {
                    log.error("警告：找不到主题的订阅信息，destination={},tags={},canary={}", new Object[]{str, str2, getCanaryName()});
                }
            }
        } catch (MsgFrameClientException e) {
            log.error("订阅主题失败，destination=" + str + ",tags=" + str2 + "," + getCanaryName(), e);
        }
    }

    private static List<ConsumerModel> canaryFilter(List<ConsumerModel> list) {
        String property = System.getProperty("MSG_AZ");
        log.error("canary consumer config MSG_AZ : " + property);
        if (!StringUtils.isNotBlank(property)) {
            return list;
        }
        if (list == null || list.size() <= 0) {
            return list;
        }
        ArrayList arrayList = new ArrayList();
        for (ConsumerModel consumerModel : list) {
            if (property.equals(consumerModel.getCanary())) {
                arrayList.add(consumerModel);
            }
        }
        return arrayList;
    }

    private static String getCanaryName() {
        return System.getProperty("MSG_AZ");
    }

    public static void subscribe(String str, String str2, IConsumerProcessor... iConsumerProcessorArr) {
        AmberUtil.getIsconsumed().compareAndSet(false, true);
        if (StringUtils.isEmpty(str2)) {
            str2 = "*";
        }
        log.info("开始订阅主题，主题名称：" + str + " 主题标记" + str2);
        try {
            checkSubscribe(str, str2);
            SubjectModel findDestination = CONTAINER.findDestination(str);
            findDestination.findConsumerModel(str2.split("\\|\\|"));
            List<ConsumerModel> canaryFilter = canaryFilter(findDestination.findConsumerModel(str2.split("\\|\\|")));
            if (null == canaryFilter || canaryFilter.isEmpty()) {
                log.error("警告：找不到主题的订阅信息，destination={},tags={},canary={}", new Object[]{str, str2, getCanaryName()});
            } else {
                Subscribes.Subscribe subscribe = null;
                ContainerModel containerModel = CONTAINER;
                Subscribes.Subscribe[] subscribeArray = ContainerModel.getCfg().getSubscribes().getSubscribeArray();
                int length = subscribeArray.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    Subscribes.Subscribe subscribe2 = subscribeArray[i];
                    if (subscribe2.getSubDestination().equals(str)) {
                        subscribe = subscribe2;
                        break;
                    }
                    i++;
                }
                if (null != subscribe) {
                    if (subscribe.getConsumeType().toString().equals("pull")) {
                        for (ConsumerModel consumerModel : canaryFilter) {
                            consumerModel.pullSubscribe(str2, subscribe, iConsumerProcessorArr);
                            modelList.add(consumerModel);
                        }
                    } else {
                        for (ConsumerModel consumerModel2 : canaryFilter) {
                            consumerModel2.pushSubscribe(str2, subscribe, iConsumerProcessorArr);
                            modelList.add(consumerModel2);
                        }
                    }
                    log.info("订阅主题成功，destination={},tags={},canary={}", new Object[]{str, str2, getCanaryName()});
                } else {
                    log.error("警告：找不到主题的订阅信息，destination={},tags={},canary={}", new Object[]{str, str2, getCanaryName()});
                }
            }
        } catch (MsgFrameClientException e) {
            log.error("订阅主题失败，destination=" + str + ",tags=" + str2 + ",,canary=" + getCanaryName(), e);
        }
    }

    private static void checkSubscribe(String str, String str2) throws MsgFrameClientException {
        if (!SUBSCRIBE_MAP.containsKey(str)) {
            synchronized (SUBSCRIBE_MAP) {
                if (!SUBSCRIBE_MAP.containsKey(str)) {
                    HashSet hashSet = new HashSet();
                    hashSet.add(str2);
                    SUBSCRIBE_MAP.put(str, hashSet);
                    return;
                }
            }
        }
        for (String str3 : SUBSCRIBE_MAP.get(str)) {
            for (String str4 : str3.split("\\|\\|")) {
                for (String str5 : str2.split("\\|\\|")) {
                    if (str4.trim().equals(str5.trim()) || "*".equals(str4.trim()) || "*".equals(str5.trim())) {
                        throw new MsgFrameClientException("存在互斥的订阅关系，队列名称：" + str + " 请求的订阅关系:" + str2 + " 存在的订阅关系:" + str3);
                    }
                }
            }
        }
        for (String str6 : str2.split("\\|\\|")) {
            SUBSCRIBE_MAP.get(str).add(str6);
        }
    }

    private static void checkFailbrokers(final SubscribeParam subscribeParam, SubjectModel subjectModel) {
        for (final BrokerModel brokerModel : subjectModel.getFailBrokers(new CopyOnWriteArrayList())) {
            brokerModel.registerSubscribeTask(new Runnable() { // from class: com.ai.aif.msgframe.consumer.MfConsumerClient.1
                @Override // java.lang.Runnable
                public void run() {
                    if (brokerModel.getConsumers().size() > 0) {
                        ArrayList arrayList = new ArrayList();
                        try {
                            if (subscribeParam.getRealQueues() != null) {
                                brokerModel.findConsumerModel(arrayList, subscribeParam.getRealQueues());
                            } else {
                                brokerModel.findConsumerModel(arrayList, subscribeParam.getTag().split("\\|\\|"));
                            }
                            MfConsumerClient.startRecoverConsumer(arrayList, subscribeParam);
                        } catch (MsgFrameClientException e) {
                            MfConsumerClient.log.error(subscribeParam.getSubject() + "主题找不到单个过滤条件,physicalQueue=" + subscribeParam.getRealQueues());
                        }
                    }
                }
            });
        }
    }

    public static boolean startRecoverConsumer(List<ConsumerModel> list, SubscribeParam subscribeParam) {
        List<ConsumerModel> canaryFilter = canaryFilter(list);
        if (null == canaryFilter || canaryFilter.isEmpty()) {
            return false;
        }
        Subscribes.Subscribe scribe = subscribeParam.getScribe();
        if (null == scribe) {
            ContainerModel containerModel = CONTAINER;
            Subscribes.Subscribe[] subscribeArray = ContainerModel.getCfg().getSubscribes().getSubscribeArray();
            int length = subscribeArray.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                Subscribes.Subscribe subscribe = subscribeArray[i];
                if (subscribe.getSubDestination().equals(subscribeParam.getSubject())) {
                    scribe = subscribe;
                    break;
                }
                i++;
            }
        }
        if (null == scribe) {
            log.error("警告：找不到主题的订阅信息，destination={},tags={},canary={}", new Object[]{subscribeParam.getSubject(), subscribeParam.getTag(), getCanaryName()});
            return false;
        }
        if (scribe.getConsumeType().toString().equals("pull")) {
            for (ConsumerModel consumerModel : canaryFilter) {
                if (subscribeParam.getSubclass() != null) {
                    consumerModel.pullSubscribe(subscribeParam.getTag(), scribe, subscribeParam.getSubclass());
                    modelList.add(consumerModel);
                }
                if (subscribeParam.getSubclassProcessor() != null) {
                    consumerModel.pullSubscribe(subscribeParam.getTag(), scribe, subscribeParam.getSubclassProcessor());
                    modelList.add(consumerModel);
                }
            }
        } else {
            for (ConsumerModel consumerModel2 : canaryFilter) {
                if (subscribeParam.getSubclass() != null) {
                    consumerModel2.pushSubscribe(subscribeParam.getTag(), scribe, subscribeParam.getSubclass());
                    modelList.add(consumerModel2);
                } else if (subscribeParam.getSubclassProcessor() != null) {
                    consumerModel2.pushSubscribe(subscribeParam.getTag(), scribe, subscribeParam.getSubclassProcessor());
                    modelList.add(consumerModel2);
                }
            }
        }
        log.info("订阅主题成功，destination={},tags={},canary={}", new Object[]{subscribeParam.getSubject(), subscribeParam.getTag(), getCanaryName()});
        return true;
    }

    public static List<ConsumerModel> getModelList() {
        return modelList;
    }

    public static void unsubscribe(String str, String str2) {
        for (ConsumerModel consumerModel : modelList) {
            try {
                consumerModel.unsubscribe(consumerModel.getSubject(), consumerModel.getTag());
            } catch (Exception e) {
                log.error("消费者停止出现异常", e);
            }
        }
        getSubscribeMap().clear();
    }

    public static void main(String... strArr) {
        MfServiceStartup.main(strArr);
    }

    protected static Map<String, Set<String>> getSubscribeMap() {
        return SUBSCRIBE_MAP;
    }
}
