package com.ai.aif.msgframe;

import com.ai.aif.msgframe.common.CompletionListener;
import com.ai.aif.msgframe.common.ConsumerModel;
import com.ai.aif.msgframe.common.ProducerModel;
import com.ai.aif.msgframe.common.ex.exception.IExceptionPersitence;
import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.model.impl.ContainerModel;
import com.ai.aif.msgframe.common.model.impl.SubjectModel;
import com.ai.aif.msgframe.common.route.ILoadBalanceStrategy;
import com.ai.aif.msgframe.common.route.impl.ConsistentHashStrategy;
import com.ai.aif.msgframe.common.route.impl.RandomStrategy;
import com.ai.aif.msgframe.common.util.ObjectInstanceLoaderHelper;
import com.ai.aif.msgframe.common.util.StringUtils;
import java.io.IOException;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/MfProducerInner.class */
public final class MfProducerInner {
    private static final Logger logger = LoggerFactory.getLogger(MfProducerInner.class);
    protected static final ContainerModel CONTAINER = ContainerModel.getInstance();
    private static final ILoadBalanceStrategy RANDOM_STRATEGY = new RandomStrategy();
    private ConsistentHashStrategy CONSISTENT_HASH_STRATEGY;

    public void send(String str, MsgFMessage msgFMessage, CompletionListener completionListener) throws Exception {
        try {
            msgFMessage.setHeaderAttribute("topic", str);
            sendInner(str, msgFMessage, completionListener);
        } catch (Exception e) {
            logger.error("发送消息时异常!topic=" + str + ",msgid=" + msgFMessage.getMsgId(), e);
            if (1 == 0) {
                handleFailedMessages(str, msgFMessage, e);
                throw e;
            }
            int i = 1000;
            int i2 = 1;
            ContainerModel containerModel = CONTAINER;
            if (null != ContainerModel.getCfg().getProducerCfg()) {
                ContainerModel containerModel2 = CONTAINER;
                i = ContainerModel.getCfg().getProducerCfg().getResenddelay();
                ContainerModel containerModel3 = CONTAINER;
                i2 = ContainerModel.getCfg().getProducerCfg().getResendTimes();
            }
            for (int i3 = 1; i3 <= i2; i3++) {
                try {
                    logger.error("消息中间件发送消息异常,稍后" + i + "毫秒尝试第" + i3 + "重发,消息主题：" + str + " 消息信息:" + msgFMessage, e);
                    Thread.currentThread();
                    Thread.sleep(i);
                    sendInner(str, msgFMessage, completionListener);
                    logger.info("消息中间件发送消息异常,第" + i3 + "重发,重发已经成功,消息主题：" + str + " 消息信息:" + msgFMessage);
                    return;
                } catch (Exception e2) {
                    logger.error("消息重发时异常!topic=" + str + ",msgid=" + msgFMessage.getMsgId(), e);
                }
            }
            handleFailedMessages(str, msgFMessage, e);
            throw e;
        }
    }

    public void sendOneway(String str, MsgFMessage msgFMessage) throws Exception {
        try {
            msgFMessage.setHeaderAttribute("topic", str);
            sendInner(str, msgFMessage, null);
        } catch (Exception e) {
            handleFailedMessages(str, msgFMessage, e);
            throw e;
        }
    }

    public void sendOrderMsg(String str, MsgFMessage msgFMessage, String str2) throws Exception {
        try {
            msgFMessage.setHeaderAttribute("topic", str);
            sendOrderMsgInner(str, msgFMessage, str2);
        } catch (Exception e) {
            int i = 1000;
            int i2 = 1;
            ContainerModel containerModel = CONTAINER;
            if (null != ContainerModel.getCfg().getProducerCfg()) {
                ContainerModel containerModel2 = CONTAINER;
                i = ContainerModel.getCfg().getProducerCfg().getResenddelay();
                ContainerModel containerModel3 = CONTAINER;
                i2 = ContainerModel.getCfg().getProducerCfg().getResendTimes();
            }
            if (!(e instanceof IOException)) {
                handleFailedMessages(str, msgFMessage, e);
                throw e;
            }
            int i3 = 1;
            while (i3 <= i2) {
                try {
                    logger.error("消息中间件发送消息异常,稍后" + i + "毫秒尝试第" + i3 + "重发,消息主题：" + str + " 消息信息:" + msgFMessage, e);
                    Thread.currentThread();
                    Thread.sleep(i);
                    sendOrderMsgInner(str, msgFMessage, str2);
                    logger.info("消息中间件发送消息异常,第" + i3 + "重发,重发已经成功,消息主题：" + str + " 消息信息:" + msgFMessage);
                    return;
                } catch (Exception e2) {
                    i3++;
                    if (!(e2 instanceof IOException)) {
                        logger.error("msgframe消息重发遇到内部错误，", e2);
                        handleFailedMessages(str, msgFMessage, e2);
                        throw e;
                    }
                }
            }
            if (i3 == i2 + 1) {
                handleFailedMessages(str, msgFMessage, e);
                throw e;
            }
        }
    }

    private void sendOrderMsgInner(String str, MsgFMessage msgFMessage, String str2) throws Exception {
        if (StringUtils.isEmpty(str2)) {
            throw new Exception("顺序消息的分组编码不能为空！");
        }
        msgFMessage.setHeaderAttribute("topic", str);
        SubjectModel findDestination = CONTAINER.findDestination(str);
        if (this.CONSISTENT_HASH_STRATEGY == null) {
            this.CONSISTENT_HASH_STRATEGY = new ConsistentHashStrategy();
        }
        ProducerModel routeProducer = this.CONSISTENT_HASH_STRATEGY.routeProducer(findDestination, msgFMessage, str2);
        if (logger.isDebugEnabled()) {
            logger.debug("开始发送顺序消息，分组编码:" + str2 + "  消息内容" + msgFMessage + "--->" + routeProducer.toString());
        }
        routeProducer.getNormalProducer().sendOrderMsg(msgFMessage, str2);
    }

    public Serializable syncSend(String str, MsgFMessage msgFMessage) throws Exception {
        SubjectModel findDestination = CONTAINER.findDestination(str);
        if (this.CONSISTENT_HASH_STRATEGY == null) {
            this.CONSISTENT_HASH_STRATEGY = new ConsistentHashStrategy();
        }
        msgFMessage.setHeaderAttribute("topic", str);
        ConsumerModel routeConsumer = this.CONSISTENT_HASH_STRATEGY.routeConsumer(findDestination, msgFMessage, msgFMessage.getMsgId());
        msgFMessage.setHeaderAttribute("msg_isNeedReturn", "true");
        msgFMessage.setHeaderAttribute("msg_subject_name", routeConsumer.getSubject());
        msgFMessage.setHeaderAttribute("msg_cluster", routeConsumer.getClusterName());
        msgFMessage.setFilterTag(routeConsumer.getTag());
        msgFMessage.setHeaderAttribute("msg_index", String.valueOf(routeConsumer.getIndex()));
        msgFMessage.setHeaderAttribute("msg_broker_url", routeConsumer.getUrl());
        send(str, msgFMessage, null);
        if (logger.isDebugEnabled()) {
            logger.debug("开始监听回调消息，消息路由信息--->" + routeConsumer.toString());
        }
        String msgId = msgFMessage.getMsgId();
        ContainerModel containerModel = CONTAINER;
        return routeConsumer.receive(msgId, ContainerModel.getCfg().getProducerCfg().getReplyTimeOut());
    }

    private void handleFailedMessages(String str, MsgFMessage msgFMessage, Exception exc) {
        try {
            ContainerModel containerModel = CONTAINER;
            String producerExceptionClass = ContainerModel.getCfg().getPersistence().getExceptionPersistence().getProducerExceptionClass();
            if (StringUtils.isEmpty(producerExceptionClass)) {
                return;
            }
            logger.error("开始调用错误处理接口，消息对象 " + msgFMessage + " 接口名称" + producerExceptionClass, exc);
            ((IExceptionPersitence) ObjectInstanceLoaderHelper.getCacheObjectByClassName(producerExceptionClass)).processException(msgFMessage, CONTAINER.findDestination(str).getDestination().getBelong(), str, exc);
        } catch (Exception e) {
            logger.error("生产端异常处理失败", e);
        }
    }

    private void sendInner(String str, MsgFMessage msgFMessage, CompletionListener completionListener) throws Exception {
        msgFMessage.setHeaderAttribute("topic", str);
        ProducerModel routeProducer = RANDOM_STRATEGY.routeProducer(CONTAINER.findDestination(str), msgFMessage, (Object) null);
        if (logger.isDebugEnabled()) {
            logger.debug("开始发送消息，消息路由信息--->{},message={}", routeProducer.toString(), msgFMessage);
        }
        routeProducer.getNormalProducer().send(msgFMessage, completionListener);
    }
}
