package com.ai.aif.msgframe.producer.mq.activemq.api;

import com.ai.aif.msgframe.common.CompletionListener;
import com.ai.aif.msgframe.common.IMsgForTxProducerInner;
import com.ai.aif.msgframe.common.ex.exception.IExceptionPersitence;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.hook.SendMsgContext;
import com.ai.aif.msgframe.common.interfaces.ITransactionCheckListener;
import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.message.MsgFMessageTX;
import com.ai.aif.msgframe.common.message.SendMode;
import com.ai.aif.msgframe.common.model.impl.ContainerModel;
import com.ai.aif.msgframe.common.thread.ThreadFactoryImpl;
import com.ai.aif.msgframe.common.util.MessageCovertUtil;
import com.ai.aif.msgframe.common.util.ObjectInstanceLoaderHelper;
import com.ai.aif.msgframe.common.util.StringUtils;
import com.ai.aif.msgframe.producer.mq.BaseProducer;
import com.ai.aif.msgframe.producer.mq.activemq.ActiveMQProducerModel;
import com.asiainfo.msgframe.Persistence;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageProducer;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/producer/mq/activemq/api/ActiveMQMsgForTxProducer.class */
public class ActiveMQMsgForTxProducer extends BaseProducer<ActiveMQProducerModel, MsgFMessageTX> implements IMsgForTxProducerInner {
    private boolean used;
    private int sessionTimeOut;
    private long defaultLiveTime;
    private int defaultPriority;
    private ITransactionCheckListener transactionCheckListener;
    private static final Logger logger = LoggerFactory.getLogger(ActiveMQMsgForTxProducer.class);
    private static final ThreadLocal<ConcurrentMap<String, ProducerModel>> threadLocalSession = new ThreadLocal<>();
    private static final ConcurrentMap<ProducerModel, Long> producerModelManger = new ConcurrentHashMap();
    private static ScheduledExecutorService scheduleSessionExecutor = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/ai/aif/msgframe/producer/mq/activemq/api/ActiveMQMsgForTxProducer$ProducerModel.class */
    public class ProducerModel {
        private ActiveMQSession session;
        private ConcurrentMap<String, MessageProducer> localMessageProducers = new ConcurrentHashMap();
        private ConcurrentHashMap<String, List<MsgFMessageTX>> localMessageMap = new ConcurrentHashMap<>();

        ProducerModel() {
        }

        public ConcurrentMap<String, MessageProducer> getLocalMessageProducers() {
            return this.localMessageProducers;
        }

        public void setLocalMessageProducers(ConcurrentMap<String, MessageProducer> concurrentMap) {
            this.localMessageProducers = concurrentMap;
        }

        public ActiveMQSession getSession() {
            return this.session;
        }

        public void setSession(ActiveMQSession activeMQSession) {
            this.session = activeMQSession;
        }

        public void addLocalMessage(String str, MsgFMessageTX msgFMessageTX) {
            if (this.localMessageMap.get(str) != null) {
                this.localMessageMap.get(str).add(msgFMessageTX);
                return;
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add(msgFMessageTX);
            this.localMessageMap.put(str, arrayList);
        }

        public List<MsgFMessageTX> getLocalMessages(String str) {
            return this.localMessageMap.get(str);
        }

        public ConcurrentHashMap<String, List<MsgFMessageTX>> getLocalAllMessages() {
            return this.localMessageMap;
        }
    }

    public ActiveMQMsgForTxProducer(ActiveMQProducerModel activeMQProducerModel) {
        super(activeMQProducerModel);
        this.sessionTimeOut = 300;
        this.defaultLiveTime = 0L;
        this.defaultPriority = 4;
    }

    @Override // com.ai.aif.msgframe.producer.mq.BaseProducer
    public void send0(MsgFMessageTX msgFMessageTX, CompletionListener completionListener) throws MsgFrameClientException, RemoteException {
        SendMsgContext sendMsgContext = null;
        long j = 0;
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("开始发送事务消息发送,subject=" + msgFMessageTX.getSubject() + ",msg=" + msgFMessageTX.getMessage());
            }
            MsgFMessage message = msgFMessageTX.getMessage();
            if (StringUtils.isNotBlank(message.getSendMode()) && SendMode.ONEWAY.getValue().equals(message.getSendMode())) {
                throw new MsgFrameClientException("send exception:Oneway delivery mode for activemq does not support temporarily");
            }
            if (StringUtils.isNotBlank(message.getLiveTimes())) {
                this.defaultLiveTime = Long.parseLong(message.getLiveTimes());
            }
            if (StringUtils.isNotBlank(message.getPriority())) {
                this.defaultPriority = Integer.parseInt(message.getPriority());
            }
            if (hasSendMessageHook()) {
                sendMsgContext = new SendMsgContext();
                sendMsgContext.setBrokerAddr(getUrl());
                sendMsgContext.setMsg(message);
                sendMsgContext.setCenterCode(ContainerModel.getCfg().getName());
                sendMsgContext.setTxProducer(this);
                sendMsgContext.setQueue(getRealQueue());
                executeSendMessageHookBefore(sendMsgContext);
                j = System.currentTimeMillis();
            }
            getProducer(msgFMessageTX).send(MessageCovertUtil.transActiveMQMessage(message), 2, this.defaultPriority, this.defaultLiveTime * 1000);
            if (logger.isDebugEnabled()) {
                logger.debug("事务消息发送成功,subject=" + msgFMessageTX.getSubject() + ",msg=" + msgFMessageTX.getMessage());
            }
            setMessage(msgFMessageTX);
            if (hasSendMessageHook()) {
                sendMsgContext.setCostTime(System.currentTimeMillis() - j);
                sendMsgContext.setBrokerAddr(((ProducerModel) ((ConcurrentHashMap) threadLocalSession.get()).get(msgFMessageTX.getSubject())).getSession().getConnection().getTransport().getRemoteAddress());
                sendMsgContext.setSendSucess(true);
                sendMsgContext.setDestinationType(getModelInfo().getDestinationInfo().isQueueType() ? "queue" : "topic");
                executeSendMessageHookAfter(sendMsgContext);
            }
        } catch (MsgFrameClientException e) {
            if (hasSendMessageHook()) {
                sendMsgContext.setCostTime(System.currentTimeMillis() - 0);
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e);
                sendMsgContext.setDestinationType(getModelInfo().getDestinationInfo().isQueueType() ? "queue" : "topic");
                executeSendMessageHookAfter(null);
            }
            throw new MsgFrameClientException(e.getMessage(), e);
        } catch (Exception e2) {
            if (hasSendMessageHook()) {
                sendMsgContext.setCostTime(System.currentTimeMillis() - 0);
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e2);
                sendMsgContext.setDestinationType(getModelInfo().getDestinationInfo().isQueueType() ? "queue" : "topic");
                executeSendMessageHookAfter(null);
            }
            throw new RemoteException("activemq网络错误" + this, e2);
        }
    }

    private MessageProducer getProducer(MsgFMessageTX msgFMessageTX) throws RemoteException {
        RemoteException remoteException;
        String subject = msgFMessageTX.getSubject();
        if (null == threadLocalSession.get()) {
            threadLocalSession.set(new ConcurrentHashMap());
        }
        if (null != ((ConcurrentHashMap) threadLocalSession.get()).get(subject)) {
            MessageProducer messageProducer = ((ProducerModel) ((ConcurrentHashMap) threadLocalSession.get()).get(subject)).getLocalMessageProducers().get(msgFMessageTX.getRealQueue());
            if (messageProducer != null) {
                return messageProducer;
            }
            try {
                ActiveMQSession session = ((ProducerModel) ((ConcurrentHashMap) threadLocalSession.get()).get(subject)).getSession();
                MessageProducer createProducer = session.createProducer(msgFMessageTX.isQueueType() ? session.createQueue(msgFMessageTX.getRealQueue()) : session.createTopic(msgFMessageTX.getRealQueue()));
                MessageProducer putIfAbsent = ((ProducerModel) ((ConcurrentHashMap) threadLocalSession.get()).get(subject)).getLocalMessageProducers().putIfAbsent(msgFMessageTX.getRealQueue(), createProducer);
                return null != putIfAbsent ? putIfAbsent : createProducer;
            } finally {
            }
        }
        ProducerModel producerModel = new ProducerModel();
        try {
            ActiveMQSession activeMQSession = (ActiveMQSession) getModelInfo().getConnection().createSession(Boolean.TRUE.booleanValue(), 1);
            producerModel.setSession(activeMQSession);
            MessageProducer createProducer2 = activeMQSession.createProducer(msgFMessageTX.isQueueType() ? activeMQSession.createQueue(msgFMessageTX.getRealQueue()) : activeMQSession.createTopic(msgFMessageTX.getRealQueue()));
            producerModel.getLocalMessageProducers().put(msgFMessageTX.getRealQueue(), createProducer2);
            checkSessionExecutor(producerModel);
            ((ConcurrentHashMap) threadLocalSession.get()).put(subject, producerModel);
            return createProducer2;
        } finally {
        }
    }

    public boolean commit() throws MsgFrameClientException {
        try {
            try {
                if (threadLocalSession.get() != null) {
                    for (Map.Entry entry : ((ConcurrentHashMap) threadLocalSession.get()).entrySet()) {
                        try {
                            if (logger.isDebugEnabled()) {
                                String remoteAddress = ((ProducerModel) entry.getValue()).getSession().getConnection().getTransport().getRemoteAddress();
                                String clientID = ((ProducerModel) entry.getValue()).getSession().getConnection().getClientID();
                                StringBuffer stringBuffer = new StringBuffer();
                                Iterator<MsgFMessageTX> it = ((ProducerModel) entry.getValue()).getLocalMessages((String) entry.getKey()).iterator();
                                while (it.hasNext()) {
                                    stringBuffer.append("faileMessage=" + it.next().getMessage().toString() + ";");
                                }
                                logger.debug("开始准备提交事务消息,连接地址=" + remoteAddress + ",clientID=" + clientID + ",msgBody=" + stringBuffer.toString());
                            }
                            ((ProducerModel) entry.getValue()).session.commit();
                        } catch (Throwable th) {
                            String remoteAddress2 = ((ProducerModel) entry.getValue()).getSession().getConnection().getTransport().getRemoteAddress();
                            String clientID2 = ((ProducerModel) entry.getValue()).getSession().getConnection().getClientID();
                            StringBuffer stringBuffer2 = new StringBuffer();
                            for (MsgFMessageTX msgFMessageTX : ((ProducerModel) entry.getValue()).getLocalMessages((String) entry.getKey())) {
                                handleFailedMessages(msgFMessageTX.getSubject(), msgFMessageTX.getMessage(), (Exception) th);
                                stringBuffer2.append("faileMessage=" + msgFMessageTX.getMessage().toString() + ";");
                            }
                            logger.error("事务提交异常,连接地址=" + remoteAddress2 + ",clientID=" + clientID2 + ",msgBody=" + stringBuffer2.toString());
                        }
                    }
                }
                return true;
            } catch (Throwable th2) {
                logger.error("处理事务提交异常消息消息出现异常", th2);
                clearResources();
                return true;
            }
        } finally {
            clearResources();
        }
    }

    public boolean rollback() throws MsgFrameClientException {
        try {
            if (threadLocalSession.get() != null) {
                for (Map.Entry entry : ((ConcurrentHashMap) threadLocalSession.get()).entrySet()) {
                    try {
                        ((ProducerModel) entry.getValue()).session.rollback();
                        if (logger.isDebugEnabled()) {
                            StringBuffer stringBuffer = new StringBuffer();
                            Iterator<MsgFMessageTX> it = ((ProducerModel) entry.getValue()).getLocalMessages((String) entry.getKey()).iterator();
                            while (it.hasNext()) {
                                stringBuffer.append("回滚消息RollbackMessage=" + it.next().getMessage().toString() + ";");
                            }
                            logger.debug(stringBuffer.toString());
                        }
                    } catch (Throwable th) {
                        String remoteAddress = ((ProducerModel) entry.getValue()).getSession().getConnection().getTransport().getRemoteAddress();
                        String clientID = ((ProducerModel) entry.getValue()).getSession().getConnection().getClientID();
                        StringBuffer stringBuffer2 = new StringBuffer();
                        Iterator<MsgFMessageTX> it2 = ((ProducerModel) entry.getValue()).getLocalMessages((String) entry.getKey()).iterator();
                        while (it2.hasNext()) {
                            stringBuffer2.append("faileMessage=" + it2.next().getMessage().toString() + ";");
                        }
                        logger.error("事务回滚异常,连接地址=" + remoteAddress + ",clientID=" + clientID + ",msgBody=" + stringBuffer2.toString());
                    }
                }
            }
            clearResources();
            return true;
        } catch (Throwable th2) {
            clearResources();
            throw th2;
        }
    }

    private void clearResources() {
        setUsed(Boolean.FALSE.booleanValue());
        if (null != threadLocalSession.get()) {
            Iterator it = ((ConcurrentHashMap) threadLocalSession.get()).entrySet().iterator();
            while (it.hasNext()) {
                for (Map.Entry entry : ((ProducerModel) ((Map.Entry) it.next()).getValue()).localMessageProducers.entrySet()) {
                    try {
                        if (null != entry.getValue()) {
                            ((ActiveMQMessageProducer) entry.getValue()).close();
                        }
                    } catch (Throwable th) {
                        logger.error("关闭队列" + ((String) entry.getKey()) + "的生产者异常", th);
                    }
                }
            }
        }
        if (null != threadLocalSession.get()) {
            for (Map.Entry entry2 : ((ConcurrentHashMap) threadLocalSession.get()).entrySet()) {
                try {
                    try {
                        ActiveMQSession session = ((ProducerModel) entry2.getValue()).getSession();
                        if (!session.isClosed()) {
                            session.close();
                        }
                        producerModelManger.remove(entry2.getValue());
                    } catch (Throwable th2) {
                        logger.error("关闭session异常", th2);
                        producerModelManger.remove(entry2.getValue());
                    }
                } catch (Throwable th3) {
                    producerModelManger.remove(entry2.getValue());
                    throw th3;
                }
            }
        }
        threadLocalSession.remove();
    }

    private void handleFailedMessages(String str, MsgFMessage msgFMessage, Exception exc) {
        String producerExceptionClass = ContainerModel.getCfg().getPersistence().getExceptionPersistence().getProducerExceptionClass();
        try {
            if (StringUtils.isEmpty(producerExceptionClass)) {
                return;
            }
            logger.error("事务消息提交失败,开始调用错误处理接口，消息对象 " + msgFMessage + " 接口名称" + producerExceptionClass, exc);
            ((IExceptionPersitence) ObjectInstanceLoaderHelper.getCacheObjectByClassName(producerExceptionClass)).processException(msgFMessage, ContainerModel.getInstance().findDestination(str).getDestination().getBelong(), str, exc);
        } catch (Exception e) {
            logger.error("生产端异常处理类配置错误" + producerExceptionClass, e);
        }
    }

    public boolean isUsed() {
        return this.used;
    }

    public void setUsed(boolean z) {
        this.used = z;
    }

    private void setMessage(MsgFMessageTX msgFMessageTX) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("开始保存事务消息到集合中...." + msgFMessageTX.getMessage().getMsgId());
        }
        if (null != ((ConcurrentHashMap) threadLocalSession.get()).get(msgFMessageTX.getSubject())) {
            ((ProducerModel) ((ConcurrentHashMap) threadLocalSession.get()).get(msgFMessageTX.getSubject())).addLocalMessage(msgFMessageTX.getSubject(), msgFMessageTX);
        }
    }

    public List<MsgFMessageTX> getMessage(String str) throws Exception {
        return ((ConcurrentHashMap) threadLocalSession.get()).get(str) != null ? ((ProducerModel) ((ConcurrentHashMap) threadLocalSession.get()).get(str)).getLocalMessages(str) : new ArrayList();
    }

    private void checkSessionExecutor(ProducerModel producerModel) {
        producerModelManger.put(producerModel, Long.valueOf(System.currentTimeMillis()));
        if (scheduleSessionExecutor == null) {
            synchronized (this) {
                if (scheduleSessionExecutor == null) {
                    scheduleSessionExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("Check_SessionTimeOut_Tread_"));
                    scheduleSessionExecutor.scheduleAtFixedRate(new Runnable() { // from class: com.ai.aif.msgframe.producer.mq.activemq.api.ActiveMQMsgForTxProducer.1
                        @Override // java.lang.Runnable
                        public void run() {
                            ActiveMQMsgForTxProducer.this.ckeckTimeOutSession();
                        }
                    }, 5L, 15L, TimeUnit.SECONDS);
                    initTransactionCheckListener();
                }
            }
        }
    }

    private void initTransactionCheckListener() {
        Persistence persistence = ContainerModel.getCfg().getPersistence();
        if (persistence == null || persistence.getExceptionPersistence() == null || persistence.getExceptionPersistence().getTransactionCheckListener() == null) {
            return;
        }
        try {
            this.transactionCheckListener = (ITransactionCheckListener) ObjectInstanceLoaderHelper.getCacheObjectByClassName(persistence.getExceptionPersistence().getTransactionCheckListener());
        } catch (ClassNotFoundException e) {
            logger.error("超时事务回查实现类找不到,请检查配置是否正确", e);
        } catch (InstantiationException e2) {
            logger.error("反射实例化回调实现类出现异常,请检查配置是否正确", e2);
        } catch (Throwable th) {
            logger.error("初始化回调实现类出现异常,请检查配置是否正确", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ckeckTimeOutSession() {
        for (Map.Entry<ProducerModel, Long> entry : producerModelManger.entrySet()) {
            if ((System.currentTimeMillis() - entry.getValue().longValue()) / 1000 > this.sessionTimeOut) {
                logger.error("检测到有超时未处理的会话信息.....");
                StringBuffer stringBuffer = new StringBuffer();
                List<MsgFMessageTX> list = null;
                Iterator<Map.Entry<String, List<MsgFMessageTX>>> it = entry.getKey().getLocalAllMessages().entrySet().iterator();
                while (it.hasNext()) {
                    list = it.next().getValue();
                    Iterator<MsgFMessageTX> it2 = list.iterator();
                    while (it2.hasNext()) {
                        stringBuffer.append("会话超时对应的消息Message=" + it2.next().getMessage() + ";");
                    }
                }
                if (this.transactionCheckListener != null) {
                    ActiveMQSession session = entry.getKey().getSession();
                    try {
                        if (!session.isClosed()) {
                            if (checkTransState(list)) {
                                session.commit();
                            } else {
                                session.rollback();
                            }
                        }
                    } catch (Throwable th) {
                        logger.error("超时会话回滚异常.....");
                    }
                    for (Map.Entry<String, MessageProducer> entry2 : entry.getKey().getLocalMessageProducers().entrySet()) {
                        try {
                            entry2.getValue().close();
                        } catch (Throwable th2) {
                            logger.error("关闭队列" + entry2.getKey() + "的生产者异常", th2);
                        }
                    }
                    try {
                        try {
                            if (!session.isClosed()) {
                                session.close();
                            }
                            producerModelManger.remove(entry.getKey());
                        } catch (Throwable th3) {
                            logger.error("关闭session异常", th3);
                            producerModelManger.remove(entry.getKey());
                        }
                    } catch (Throwable th4) {
                        producerModelManger.remove(entry.getKey());
                        throw th4;
                    }
                } else {
                    producerModelManger.remove(entry.getKey());
                }
                logger.error(stringBuffer.toString());
            }
        }
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:8:0x0018. Please report as an issue. */
    private boolean checkTransState(List<MsgFMessageTX> list) {
        boolean z = false;
        if (this.transactionCheckListener != null) {
            try {
                switch (this.transactionCheckListener.checkTransactionState(list).ordinal()) {
                    case 1:
                        z = true;
                        break;
                    case 2:
                        z = false;
                        break;
                    case 3:
                        z = false;
                        break;
                }
            } catch (Throwable th) {
                logger.error("业务回查超时事务时出现异常", th);
                return false;
            }
        }
        return z;
    }

    public /* bridge */ /* synthetic */ void send(MsgFMessageTX msgFMessageTX, CompletionListener completionListener) throws MsgFrameClientException, RemoteException {
        super.send((ActiveMQMsgForTxProducer) msgFMessageTX, completionListener);
    }
}
