package com.ai.aif.msgframe;

import com.ai.aif.msgframe.common.CompletionListener;
import com.ai.aif.msgframe.common.IMsgForTxProducerInner;
import com.ai.aif.msgframe.common.ProducerModel;
import com.ai.aif.msgframe.common.ex.exception.IExceptionPersitence;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.message.MsgFMessageTX;
import com.ai.aif.msgframe.common.model.impl.ContainerModel;
import com.ai.aif.msgframe.common.route.ILoadBalanceStrategy;
import com.ai.aif.msgframe.common.route.impl.ConsistentHashStrategy;
import com.ai.aif.msgframe.common.route.impl.RandomStrategy;
import com.ai.aif.msgframe.common.util.ObjectInstanceLoaderHelper;
import com.ai.aif.msgframe.common.util.StringUtils;
import com.ai.aif.msgframe.facade.IMsgForTxProducer;
import java.rmi.RemoteException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/MfProducerTxClient.class */
public class MfProducerTxClient implements IMsgForTxProducer {
    private volatile boolean committed;
    private ConsistentHashStrategy CONSISTENT_HASH_STRATEGY;
    private static final Logger logger = LoggerFactory.getLogger(MfProducerTxClient.class);
    protected static final ContainerModel CONTAINER = ContainerModel.getInstance();
    private static final ILoadBalanceStrategy RANDOM_STRATEGY = new RandomStrategy();
    private static final ThreadLocal<ConcurrentHashMap<String, SendTransModel>> threadLocalTX = new ThreadLocal<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/ai/aif/msgframe/MfProducerTxClient$SendTransModel.class */
    public class SendTransModel {
        private String center;
        private IMsgForTxProducerInner txProducer;

        SendTransModel() {
        }

        public String getCenter() {
            return this.center;
        }

        public void setCenter(String str) {
            this.center = str;
        }

        public IMsgForTxProducerInner getTxProducer() {
            return this.txProducer;
        }

        public void setTxProducer(IMsgForTxProducerInner iMsgForTxProducerInner) {
            this.txProducer = iMsgForTxProducerInner;
        }
    }

    public void send(String str, MsgFMessage msgFMessage) throws MsgFrameClientException, RemoteException {
        try {
            sendOrderMsg(str, msgFMessage, null);
        } catch (Exception e) {
            throw new MsgFrameClientException(e.getMessage(), e);
        }
    }

    public void sendOrderMsg(String str, MsgFMessage msgFMessage, String str2) throws Exception {
        checkCommitState(str);
        msgFMessage.setHeaderAttribute("topic", str);
        msgFMessage.setHeaderAttribute("orderId", str2);
        MsgFMessageTX msgFMessageTX = new MsgFMessageTX(msgFMessage, str, str2);
        ProducerModel producerModel = getProducerModel(msgFMessageTX);
        msgFMessageTX.setHeaderAttribute("topic", str);
        msgFMessageTX.setQueueType(producerModel.isQueueType());
        msgFMessageTX.setRealQueue(producerModel.getRealQueue());
        try {
            if (null == threadLocalTX.get()) {
                threadLocalTX.set(new ConcurrentHashMap<>());
            }
            if (!threadLocalTX.get().containsKey(str)) {
                threadLocalTX.get().put(str, new SendTransModel());
            }
            if (null == threadLocalTX.get().get(str).getTxProducer()) {
                IMsgForTxProducerInner txProducer = producerModel.getTxProducer();
                txProducer.setUsed(Boolean.TRUE.booleanValue());
                threadLocalTX.get().get(str).setTxProducer(txProducer);
            }
            threadLocalTX.get().get(str).getTxProducer().send(msgFMessageTX, (CompletionListener) null);
            if (logger.isDebugEnabled()) {
                logger.debug("事务消息发送成功,messageID=" + msgFMessage.getMsgId());
            }
        } catch (Throwable th) {
            excepMsgHandler(th);
            logger.error("事务消息发送失败,message=" + msgFMessage, th);
            rollback();
            throw new MsgFrameClientException(th.getMessage(), th);
        }
    }

    public SendResult sendMessageInTransaction(String str, MsgFMessage msgFMessage, LocalTransactionExecuter localTransactionExecuter) throws MsgFrameClientException {
        TransactionSendResult transactionSendResult = null;
        try {
            transactionSendResult = RANDOM_STRATEGY.routeProducer(CONTAINER.findDestination(str), msgFMessage, (Object) null).getTxProducer().sendMessageInTransaction(msgFMessage, localTransactionExecuter);
        } catch (Throwable th) {
            excepMsgHandler(th);
            logger.error("half消息发送异常", th);
        }
        return transactionSendResult;
    }

    public boolean commit() throws MsgFrameClientException {
        String producerExceptionClass;
        this.committed = Boolean.TRUE.booleanValue();
        try {
            try {
                if (null != threadLocalTX.get() && threadLocalTX.get().size() > 0) {
                    if (logger.isDebugEnabled()) {
                        StringBuffer stringBuffer = new StringBuffer();
                        for (Map.Entry<String, SendTransModel> entry : threadLocalTX.get().entrySet()) {
                            List message = entry.getValue().getTxProducer().getMessage(entry.getKey());
                            if (message != null && message.size() > 0) {
                                Iterator it = message.iterator();
                                while (it.hasNext()) {
                                    stringBuffer.append("msgId=" + ((MsgFMessageTX) it.next()).getMessage().getMsgId() + ";");
                                }
                            }
                        }
                        logger.debug("开始提交事务消息,当前事务对应的所有消息ID=" + stringBuffer.toString());
                    }
                    for (Map.Entry<String, SendTransModel> entry2 : threadLocalTX.get().entrySet()) {
                        try {
                            entry2.getValue().getTxProducer().commit();
                        } catch (Throwable th) {
                            excepMsgHandler(th);
                            try {
                                producerExceptionClass = ContainerModel.getCfg().getPersistence().getExceptionPersistence().getProducerExceptionClass();
                            } catch (Throwable th2) {
                                logger.error("消息提交异常记录消息出现错误", th2);
                            }
                            if (!StringUtils.isEmpty(producerExceptionClass)) {
                                List<MsgFMessageTX> message2 = entry2.getValue().getTxProducer().getMessage(entry2.getKey());
                                if (null != message2 && !message2.isEmpty()) {
                                    IExceptionPersitence iExceptionPersitence = (IExceptionPersitence) ObjectInstanceLoaderHelper.getCacheObjectByClassName(producerExceptionClass);
                                    for (MsgFMessageTX msgFMessageTX : message2) {
                                        logger.error("开始调用错误处理接口，消息对象 " + msgFMessageTX + " 接口名称" + producerExceptionClass, th);
                                        String headerAttribute = msgFMessageTX.getHeaderAttribute("topic");
                                        if (StringUtils.isBlank(headerAttribute)) {
                                            headerAttribute = msgFMessageTX.getMessage().getHeaderAttribute("topic");
                                        }
                                        iExceptionPersitence.processException(msgFMessageTX, CONTAINER.findDestination(headerAttribute).getDestination().getBelong(), headerAttribute, (Exception) th);
                                    }
                                }
                                logger.warn("提交事务消息出现异常,主题=" + entry2.getKey() + ",中心=" + entry2.getValue().getCenter());
                            }
                        }
                    }
                }
                threadLocalTX.remove();
                this.committed = Boolean.FALSE.booleanValue();
                return true;
            } catch (Exception e) {
                logger.error("事务消息提交失败", e);
                threadLocalTX.remove();
                this.committed = Boolean.FALSE.booleanValue();
                return true;
            }
        } catch (Throwable th3) {
            threadLocalTX.remove();
            this.committed = Boolean.FALSE.booleanValue();
            throw th3;
        }
    }

    public boolean rollback() throws MsgFrameClientException {
        this.committed = Boolean.TRUE.booleanValue();
        try {
            try {
                if (null != threadLocalTX.get() && threadLocalTX.get().size() > 0) {
                    if (logger.isDebugEnabled()) {
                        StringBuffer stringBuffer = new StringBuffer();
                        for (Map.Entry<String, SendTransModel> entry : threadLocalTX.get().entrySet()) {
                            List message = entry.getValue().getTxProducer().getMessage(entry.getKey());
                            if (message != null && message.size() > 0) {
                                Iterator it = message.iterator();
                                while (it.hasNext()) {
                                    stringBuffer.append("msgId=" + ((MsgFMessageTX) it.next()).getMessage().getMsgId() + ";");
                                }
                            }
                        }
                        logger.debug(stringBuffer.toString());
                    }
                    for (Map.Entry<String, SendTransModel> entry2 : threadLocalTX.get().entrySet()) {
                        try {
                            entry2.getValue().getTxProducer().rollback();
                        } catch (Throwable th) {
                            excepMsgHandler(th);
                            logger.error("回滚事务消息出现异常,主题=" + entry2.getKey() + ",中心=" + entry2.getValue().getCenter());
                        }
                    }
                }
                threadLocalTX.remove();
                this.committed = Boolean.FALSE.booleanValue();
                return true;
            } catch (Exception e) {
                logger.error("事务回滚失败", e);
                threadLocalTX.remove();
                this.committed = Boolean.FALSE.booleanValue();
                return true;
            }
        } catch (Throwable th2) {
            threadLocalTX.remove();
            this.committed = Boolean.FALSE.booleanValue();
            throw th2;
        }
    }

    private void excepMsgHandler(Throwable th) {
    }

    private void checkCommitState(String str) throws MsgFrameClientException {
        if (null == threadLocalTX.get()) {
            threadLocalTX.set(new ConcurrentHashMap<>());
        }
        if (this.committed) {
            throw new MsgFrameClientException("线程【" + Thread.currentThread().toString() + "】事物已经提交，不允许send操作");
        }
    }

    private void checkSubjectBelongCenter(String str) throws MsgFrameClientException {
        if (StringUtils.isBlank(threadLocalTX.get().get(str).getCenter())) {
            threadLocalTX.get().get(str).setCenter(CONTAINER.findDestination(str).getCfg().getBelong());
        } else if (!threadLocalTX.get().get(str).getCenter().equals(CONTAINER.findDestination(str).getCfg().getBelong())) {
            throw new MsgFrameClientException("事务消息暂时只支持同一个中心的主题，主题【name=" + str + "，belong=" + CONTAINER.findDestination(str).getCfg().getBelong() + "】所属中心与之前的主题所属的中心【" + threadLocalTX.get().get(str).getCenter() + "】互斥");
        }
    }

    private ProducerModel getProducerModel(MsgFMessageTX msgFMessageTX) throws MsgFrameClientException {
        ProducerModel routeProducer;
        if (msgFMessageTX.getOrderId() == null) {
            routeProducer = RANDOM_STRATEGY.routeProducer(CONTAINER.findDestination(msgFMessageTX.getSubject()), msgFMessageTX.getMessage(), (Object) null);
        } else {
            if (this.CONSISTENT_HASH_STRATEGY == null) {
                this.CONSISTENT_HASH_STRATEGY = new ConsistentHashStrategy();
            }
            routeProducer = this.CONSISTENT_HASH_STRATEGY.routeProducer(CONTAINER.findDestination(msgFMessageTX.getSubject()), msgFMessageTX.getMessage(), msgFMessageTX.getOrderId());
        }
        return routeProducer;
    }
}
