package com.ai.aif.msgframe;

import com.ai.aif.msgframe.common.CompletionListener;
import com.ai.aif.msgframe.common.ProducerModel;
import com.ai.aif.msgframe.common.exception.MsgFrameException;
import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.message.MsgFMessageTX;
import com.ai.aif.msgframe.common.model.impl.ClusterModel;
import com.ai.aif.msgframe.common.model.impl.ContainerModel;
import com.ai.aif.msgframe.common.route.ILoadBalanceStrategy;
import com.ai.aif.msgframe.common.route.impl.RandomStrategy;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.SendResult;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/MfTXProducerClient.class */
public class MfTXProducerClient {
    private static final Logger log = LoggerFactory.getLogger(MfTXProducerClient.class);
    protected static final ContainerModel CONTAINER = ContainerModel.getInstance();
    private static final ILoadBalanceStrategy RANDOM_STRATEGY = new RandomStrategy();
    private ThreadLocal<List<MsgFMessageTX>> threadLocalTX = new ThreadLocal<>();
    private volatile boolean isCommit;

    public MfTXProducerClient() {
        this.threadLocalTX.set(new ArrayList());
    }

    private void checkCommitState() throws MsgFrameException {
        if (this.isCommit) {
            throw new MsgFrameException("事物已经提交，不允许send操作");
        }
    }

    public void send(String str, MsgFMessage msgFMessage) throws MsgFrameException, RemoteException {
        msgFMessage.setTopic(str);
        checkCommitState();
        this.threadLocalTX.get().add(new MsgFMessageTX(msgFMessage, str, (String) null));
    }

    public void sendOrderMsg(String str, MsgFMessage msgFMessage, String str2) throws Exception {
        checkCommitState();
        this.threadLocalTX.get().add(new MsgFMessageTX(msgFMessage, str, str2));
    }

    public boolean commit() throws MsgFrameException, RemoteException {
        this.isCommit = true;
        if (this.threadLocalTX.get().isEmpty()) {
            return false;
        }
        HashSet<String> hashSet = new HashSet();
        Iterator<MsgFMessageTX> it = this.threadLocalTX.get().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getSubject());
        }
        List list = null;
        for (String str : hashSet) {
            if (list != null) {
                ArrayList arrayList = new ArrayList();
                for (ClusterModel clusterModel : CONTAINER.findDestination(str).getClusters()) {
                    if (list.contains(clusterModel)) {
                        arrayList.add(clusterModel);
                    }
                }
                list = arrayList;
                if (list.isEmpty()) {
                    break;
                }
            } else {
                list = CONTAINER.findDestination(str).getClusters();
            }
        }
        if (list.isEmpty()) {
            throw new MsgFrameException(hashSet.toString() + "主题找不到公共的集群");
        }
        for (MsgFMessageTX msgFMessageTX : this.threadLocalTX.get()) {
            ProducerModel routeProducer = RANDOM_STRATEGY.routeProducer(CONTAINER.findDestination(msgFMessageTX.getSubject()), msgFMessageTX.getMessage(), list.get(0));
            msgFMessageTX.setRealQueue(routeProducer.getRealQueue());
            try {
                routeProducer.getNormalProducer().send(msgFMessageTX.getMessage(), (CompletionListener) null);
            } catch (Exception e) {
                throw new MsgFrameException(e.getMessage(), e);
            }
        }
        this.threadLocalTX.get().clear();
        return true;
    }

    public boolean rollback() throws MsgFrameException {
        this.threadLocalTX.get().clear();
        return true;
    }

    public SendResult sendMessageInTransaction(String str, MsgFMessage msgFMessage, LocalTransactionExecuter localTransactionExecuter) throws MsgFrameException {
        SendResult sendResult = null;
        try {
            sendResult = RANDOM_STRATEGY.routeProducer(CONTAINER.findDestination(str), msgFMessage, (Object) null).getTxProducer().sendMessageInTransaction(msgFMessage, localTransactionExecuter);
        } catch (Exception e) {
            log.error("half消息发送异常", e);
        }
        return sendResult;
    }

    public com.aliyun.openservices.ons.api.SendResult sendOnsInTransaction(String str, MsgFMessage msgFMessage, com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter localTransactionExecuter) throws MsgFrameException {
        com.aliyun.openservices.ons.api.SendResult sendResult = null;
        try {
            sendResult = RANDOM_STRATEGY.routeProducer(CONTAINER.findDestination(str), msgFMessage, (Object) null).getTxProducer().sendMessageInTransaction(msgFMessage, localTransactionExecuter);
        } catch (Exception e) {
            log.error("half消息发送异常", e);
        }
        return sendResult;
    }
}
