package com.ai.aif.msgframe.consumer.mq;

import com.ai.aif.msgframe.common.ConsumerModel;
import com.ai.aif.msgframe.common.ProducerModel;
import com.ai.aif.msgframe.common.ex.exception.IExceptionPersitence;
import com.ai.aif.msgframe.common.exception.DuplicateKeyException;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.hook.ConsumeMsgContext;
import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.message.ReceiveMsgFObjectMessage;
import com.ai.aif.msgframe.common.model.BaseModelInfo;
import com.ai.aif.msgframe.common.model.impl.BrokerModel;
import com.ai.aif.msgframe.common.model.impl.ContainerModel;
import com.ai.aif.msgframe.common.route.impl.DestinationInfo;
import com.ai.aif.msgframe.common.util.ObjectInstanceLoaderHelper;
import com.ai.aif.msgframe.common.util.StringUtils;
import com.ai.aif.msgframe.consumer.facade.IConsumerProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/consumer/mq/AConsumerProviderModel.class */
public abstract class AConsumerProviderModel extends BaseModelInfo implements ConsumerModel {
    private static final Logger log = LoggerFactory.getLogger(AConsumerProviderModel.class);

    public AConsumerProviderModel(BrokerModel brokerModel, DestinationInfo destinationInfo) {
        super(brokerModel, destinationInfo);
    }

    public Object processMsg(MsgFMessage msgFMessage, String str, String str2, String... strArr) {
        Boolean bool = null;
        long j = 0;
        ConsumeMsgContext consumeMsgContext = null;
        try {
        } catch (Error | Exception e) {
            log.error("处理消息失败,处理上下文信息=" + this + ",message=" + msgFMessage, e);
            processException(msgFMessage, e);
            try {
                if (!(e instanceof DuplicateKeyException)) {
                    bool = new Boolean(false);
                    if (hasConsumeMessageHook()) {
                        executeHookAfterandException(null, msgFMessage, false, 0L, str, str2, e.getClass().toString(), e.getMessage());
                    }
                }
                if (msgFMessage != null && msgFMessage.isNeedReturn()) {
                    ProducerModel findProducer = findProducer(msgFMessage.getFilterTag(), Integer.valueOf(msgFMessage.getHeaderAttribute("msg_index")).intValue());
                    ReceiveMsgFObjectMessage receiveMsgFObjectMessage = new ReceiveMsgFObjectMessage(e.getMessage());
                    receiveMsgFObjectMessage.setMsgId(msgFMessage.getMsgId());
                    try {
                        findProducer.getNormalProducer().replySend(receiveMsgFObjectMessage);
                    } catch (Exception e2) {
                        log.error("回调消息发送失败", e2);
                    }
                }
            } catch (Exception e3) {
                log.error("回调消息发送失败", e3);
            }
        }
        if (msgFMessage == null) {
            throw new MsgFrameClientException("消息序反列化失败");
        }
        msgFMessage.setCluster(getClusterName());
        msgFMessage.setRealQueueName(getRealQueue());
        msgFMessage.setDestination(getSubject());
        msgFMessage.setTag(getTag());
        if (log.isDebugEnabled()) {
            log.debug("开始消费消息，消息：" + msgFMessage);
        }
        for (String str3 : strArr) {
            if (hasConsumeMessageHook()) {
                consumeMsgContext = new ConsumeMsgContext();
                executeHookBefore(consumeMsgContext, msgFMessage, str, str2);
                j = System.currentTimeMillis();
            }
            ((IConsumerProcessor) ObjectInstanceLoaderHelper.getCacheObjectByClassName(str3)).process(msgFMessage);
            if (hasConsumeMessageHook()) {
                if (0 == 0 || !(bool instanceof Boolean) || ((Boolean) null).booleanValue()) {
                    executeHookAfter(consumeMsgContext, msgFMessage, true, j, str, str2);
                } else {
                    executeHookAfter(consumeMsgContext, msgFMessage, false, j, str, str2);
                }
            }
        }
        if (msgFMessage.isNeedReturn()) {
            if (log.isDebugEnabled()) {
                log.debug("消息消费成功,开始发送确认消息！msgid=" + msgFMessage.getMsgId());
            }
            ProducerModel findProducer2 = findProducer(msgFMessage.getFilterTag(), Integer.valueOf(msgFMessage.getHeaderAttribute("msg_index")).intValue());
            ReceiveMsgFObjectMessage receiveMsgFObjectMessage2 = new ReceiveMsgFObjectMessage();
            receiveMsgFObjectMessage2.setMsgId(msgFMessage.getMsgId());
            findProducer2.getNormalProducer().replySend(receiveMsgFObjectMessage2);
        }
        return bool;
    }

    public Object processMsg(MsgFMessage msgFMessage, String str, String str2, IConsumerProcessor... iConsumerProcessorArr) {
        ConsumeMsgContext consumeMsgContext = null;
        Boolean bool = null;
        long j = 0;
        try {
        } catch (Error | Exception e) {
            log.error("处理消息异常,处理上下文信息=" + this + ",message=" + msgFMessage, e);
            processException(msgFMessage, e);
            try {
                if (!(e instanceof DuplicateKeyException)) {
                    bool = new Boolean(false);
                    if (hasConsumeMessageHook()) {
                        executeHookAfterandException(null, msgFMessage, false, 0L, str, str2, e.getClass().toString(), e.getMessage());
                    }
                }
                if (msgFMessage != null && msgFMessage.isNeedReturn()) {
                    ProducerModel findProducer = findProducer(msgFMessage.getFilterTag(), Integer.valueOf(msgFMessage.getHeaderAttribute("msg_index")).intValue());
                    ReceiveMsgFObjectMessage receiveMsgFObjectMessage = new ReceiveMsgFObjectMessage(e.getMessage());
                    receiveMsgFObjectMessage.setMsgId(msgFMessage.getMsgId());
                    try {
                        findProducer.getNormalProducer().replySend(receiveMsgFObjectMessage);
                    } catch (Exception e2) {
                        log.error("回调消息发送失败", e2);
                    }
                }
            } catch (Exception e3) {
                log.error("回调消息发送失败", e3);
            }
        }
        if (msgFMessage == null) {
            throw new MsgFrameClientException("消息序反列化失败");
        }
        msgFMessage.setCluster(getClusterName());
        msgFMessage.setRealQueueName(getRealQueue());
        msgFMessage.setDestination(getSubject());
        msgFMessage.setTag(getTag());
        log.debug("开始消费消息，消息：" + msgFMessage);
        for (IConsumerProcessor iConsumerProcessor : iConsumerProcessorArr) {
            if (hasConsumeMessageHook()) {
                consumeMsgContext = new ConsumeMsgContext();
                executeHookBefore(consumeMsgContext, msgFMessage, str, str2);
                j = System.currentTimeMillis();
            }
            iConsumerProcessor.process(msgFMessage);
            if (hasConsumeMessageHook()) {
                if (0 == 0 || !(bool instanceof Boolean) || ((Boolean) null).booleanValue()) {
                    executeHookAfter(consumeMsgContext, msgFMessage, true, j, str, str2);
                } else {
                    executeHookAfter(consumeMsgContext, msgFMessage, false, j, str, str2);
                }
            }
        }
        if (msgFMessage.isNeedReturn()) {
            log.debug("消息需要返回：" + msgFMessage);
            ProducerModel findProducer2 = findProducer(msgFMessage.getFilterTag(), Integer.valueOf(msgFMessage.getHeaderAttribute("msg_index")).intValue());
            ReceiveMsgFObjectMessage receiveMsgFObjectMessage2 = new ReceiveMsgFObjectMessage();
            receiveMsgFObjectMessage2.setMsgId(msgFMessage.getMsgId());
            findProducer2.getNormalProducer().replySend(receiveMsgFObjectMessage2);
        }
        return bool;
    }

    private void executeHookBefore(ConsumeMsgContext consumeMsgContext, MsgFMessage msgFMessage, String str, String str2) throws Exception {
        consumeMsgContext.setMessage(msgFMessage);
        consumeMsgContext.setClusterName(getClusterName());
        consumeMsgContext.setCenterCode(ContainerModel.getCfg().getName());
        consumeMsgContext.setBrokerAddr(getUrl());
        if (str2 != null) {
            consumeMsgContext.setConsumerGroup(str2);
        }
        if (str != null) {
            consumeMsgContext.setConsumerId(str);
        }
        consumeMsgContext.setQueue(getRealQueue());
        executeHookBefore(consumeMsgContext);
    }

    private void executeHookAfter(ConsumeMsgContext consumeMsgContext, MsgFMessage msgFMessage, boolean z, long j, String str, String str2) throws Exception {
        consumeMsgContext.setCostTime(System.currentTimeMillis() - j);
        consumeMsgContext.setMessage(msgFMessage);
        consumeMsgContext.setQueue(getRealQueue());
        consumeMsgContext.setCenterCode(ContainerModel.getCfg().getName());
        consumeMsgContext.setSuccess(z);
        consumeMsgContext.setDestinationType(getDestinationInfo().isQueueType() ? "queue" : "topic");
        executeHookAfter(consumeMsgContext);
    }

    private void executeHookAfterandException(ConsumeMsgContext consumeMsgContext, MsgFMessage msgFMessage, boolean z, long j, String str, String str2, String str3, String str4) throws Exception {
        consumeMsgContext.setCostTime(System.currentTimeMillis() - j);
        consumeMsgContext.setMessage(msgFMessage);
        consumeMsgContext.setQueue(getRealQueue());
        consumeMsgContext.setCenterCode(ContainerModel.getCfg().getName());
        consumeMsgContext.setSuccess(z);
        consumeMsgContext.setDestinationType(getDestinationInfo().isQueueType() ? "queue" : "topic");
        consumeMsgContext.setExcType(str3);
        consumeMsgContext.setException(str4);
        executeHookAfter(consumeMsgContext);
    }

    private void processException(MsgFMessage msgFMessage, Throwable th) {
        getBroker().getParentModel().getParentModel().getParentModel();
        String consumerExceptionClass = ContainerModel.getCfg().getPersistence().getExceptionPersistence().getConsumerExceptionClass();
        try {
            if (StringUtils.isEmpty(consumerExceptionClass)) {
                return;
            }
            IExceptionPersitence iExceptionPersitence = (IExceptionPersitence) ObjectInstanceLoaderHelper.getCacheObjectByClassName(consumerExceptionClass);
            log.error("开始调用错误处理接口，消息对象 " + msgFMessage + " 接口名称" + consumerExceptionClass);
            String subject = getSubject();
            iExceptionPersitence.processException(msgFMessage, ContainerModel.getInstance().findDestination(subject).getDestination().getBelong(), subject, th);
        } catch (Exception e) {
            log.error("异常处理类配置错误，类名：" + consumerExceptionClass, e);
        }
    }
}
