package com.ai.aif.msgframe.producer.mq.activemq.api;

import com.ai.aif.msgframe.common.CompletionListener;
import com.ai.aif.msgframe.common.IMsgForNormalProducer;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.hook.SendMsgContext;
import com.ai.aif.msgframe.common.io.transport.JdkSerialize;
import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.message.SendMode;
import com.ai.aif.msgframe.common.model.impl.ContainerModel;
import com.ai.aif.msgframe.common.resources.ActiveMQResources;
import com.ai.aif.msgframe.common.util.StringUtils;
import com.ai.aif.msgframe.producer.mq.BaseProducer;
import com.ai.aif.msgframe.producer.mq.activemq.ActiveMQProducerModel;
import com.ai.ipu.msgframe.util.MessageCovertUtil;
import java.rmi.RemoteException;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/producer/mq/activemq/api/ActiveMQMsgForNormalProducer.class */
public class ActiveMQMsgForNormalProducer extends BaseProducer<ActiveMQProducerModel, MsgFMessage> implements IMsgForNormalProducer {
    private static final Logger logger = LoggerFactory.getLogger(ActiveMQMsgForNormalProducer.class);
    private MessageProducer producer;
    private MessageProducer replyProducer;
    private long defaultLiveTime;
    private int defaultPriority;

    public ActiveMQMsgForNormalProducer(ActiveMQProducerModel activeMQProducerModel) {
        super(activeMQProducerModel);
        this.defaultLiveTime = 0L;
        this.defaultPriority = 4;
    }

    @Override // com.ai.aif.msgframe.producer.mq.BaseProducer
    protected void send0(MsgFMessage msgFMessage, CompletionListener completionListener) throws MsgFrameClientException, RemoteException {
        SendMsgContext sendMsgContext = null;
        long j = 0;
        try {
            if (StringUtils.isNotBlank(msgFMessage.getSendMode()) && SendMode.ONEWAY.getValue().equals(msgFMessage.getSendMode())) {
                throw new MsgFrameClientException("send exception:Oneway delivery mode for activemq does not support temporarily");
            }
            if (StringUtils.isNotBlank(msgFMessage.getLiveTimes())) {
                this.defaultLiveTime = Long.parseLong(msgFMessage.getLiveTimes());
            }
            if (StringUtils.isNotBlank(msgFMessage.getPriority())) {
                this.defaultPriority = Integer.parseInt(msgFMessage.getPriority());
            }
            if (hasSendMessageHook()) {
                sendMsgContext = new SendMsgContext();
                sendMsgContext.setBrokerAddr(getUrl());
                sendMsgContext.setMsg(msgFMessage);
                sendMsgContext.setProducer(this);
                sendMsgContext.setClusterName(getClusterName());
                sendMsgContext.setCenterCode(ContainerModel.getCfg().getName());
                sendMsgContext.setQueue(getRealQueue());
                executeSendMessageHookBefore(sendMsgContext);
                j = System.currentTimeMillis();
            }
            if (logger.isDebugEnabled()) {
                logger.debug("开始发送消息,subject=" + msgFMessage.getHeaderAttribute("topic") + ",msg=" + msgFMessage);
            }
            String url = super.getModelInfo().getUrl();
            if (url.contains("mqtt://")) {
                ActiveMQResources.getInstance().getMQTTConnection(super.getClusterName(), url, super.getModelInfo().getBroker()).publish(msgFMessage.getHeaderAttribute("topic"), new JdkSerialize().serialize(msgFMessage), 1, true);
            } else {
                getProducer().send(MessageCovertUtil.transActiveMQMessage(msgFMessage), 2, this.defaultPriority, this.defaultLiveTime * 1000);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("消息发送成功,subject=" + msgFMessage.getHeaderAttribute("topic") + ",msg=" + msgFMessage);
            }
            if (hasSendMessageHook()) {
                sendMsgContext.setCostTime(System.currentTimeMillis() - j);
                sendMsgContext.setSendSucess(true);
                sendMsgContext.setDestinationType(getModelInfo().getDestinationInfo().isQueueType() ? "queue" : "topic");
                executeSendMessageHookAfter(sendMsgContext);
            }
            if (null != completionListener) {
                completionListener.onCompletion(msgFMessage);
            }
        } catch (Exception e) {
            if (hasSendMessageHook()) {
                sendMsgContext.setCostTime(System.currentTimeMillis() - 0);
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e);
                sendMsgContext.setDestinationType(getModelInfo().getDestinationInfo().isQueueType() ? "queue" : "topic");
                executeSendMessageHookAfter(null);
            }
            if (null != completionListener) {
                completionListener.onException(msgFMessage, e);
            }
            throw new RemoteException("activemq网络错误", e);
        } catch (MsgFrameClientException e2) {
            if (hasSendMessageHook()) {
                sendMsgContext.setCostTime(System.currentTimeMillis() - 0);
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e2);
                sendMsgContext.setDestinationType(getModelInfo().getDestinationInfo().isQueueType() ? "queue" : "topic");
                executeSendMessageHookAfter(null);
            }
            if (null != completionListener) {
                completionListener.onException(msgFMessage, e2);
            }
            throw new MsgFrameClientException(e2.getMessage(), e2);
        }
    }

    public void replySend(MsgFMessage msgFMessage) throws MsgFrameClientException, RemoteException {
        try {
            getReplyProducer().send(MessageCovertUtil.transActiveMQMessage(msgFMessage));
        } catch (JMSException e) {
            throw new MsgFrameClientException("消息发送异常");
        } catch (RemoteException e2) {
            throw new RemoteException("activemq网络错误" + this, e2);
        }
    }

    private MessageProducer getProducer() throws RemoteException {
        if (this.producer == null) {
            try {
                Session createSession = getModelInfo().getConnection().createSession(Boolean.FALSE.booleanValue(), 1);
                this.producer = createSession.createProducer(getModelInfo().getDestinationInfo().isQueueType() ? createSession.createQueue(getRealQueue()) : createSession.createTopic(getRealQueue()));
            } catch (Exception e) {
                throw new RemoteException("activemq网络错误" + getModelInfo(), e);
            }
        }
        return this.producer;
    }

    private MessageProducer getReplyProducer() throws RemoteException {
        if (this.replyProducer == null) {
            try {
                Session createSession = getModelInfo().getConnection().createSession(Boolean.FALSE.booleanValue(), 1);
                this.replyProducer = createSession.createProducer(getModelInfo().getDestinationInfo().isQueueType() ? createSession.createQueue(getModelInfo().getRealQueue() + "_reply") : createSession.createTopic(getModelInfo().getRealQueue() + "_reply"));
            } catch (Exception e) {
                throw new RemoteException("activemq网络错误" + getModelInfo(), e);
            }
        }
        return this.replyProducer;
    }

    public void sendOrderMsg(MsgFMessage msgFMessage, String str) throws RemoteException {
        SendMsgContext sendMsgContext = null;
        long j = 0;
        try {
            msgFMessage.setOrderMsgGroupCode(str);
            if (StringUtils.isNotBlank(msgFMessage.getSendMode()) && SendMode.ONEWAY.getValue().equals(msgFMessage.getSendMode())) {
                throw new MsgFrameClientException("send exception:Oneway delivery mode for activemq does not support temporarily");
            }
            if (StringUtils.isNotBlank(msgFMessage.getLiveTimes())) {
                this.defaultLiveTime = Long.parseLong(msgFMessage.getLiveTimes());
            }
            if (StringUtils.isNotBlank(msgFMessage.getPriority())) {
                this.defaultPriority = Integer.parseInt(msgFMessage.getPriority());
            }
            if (hasSendMessageHook()) {
                sendMsgContext = new SendMsgContext();
                sendMsgContext.setBrokerAddr(getUrl());
                sendMsgContext.setMsg(msgFMessage);
                sendMsgContext.setProducer(this);
                sendMsgContext.setClusterName(getClusterName());
                sendMsgContext.setCenterCode(ContainerModel.getCfg().getName());
                sendMsgContext.setQueue(getRealQueue());
                executeSendMessageHookBefore(sendMsgContext);
                j = System.currentTimeMillis();
            }
            if (logger.isDebugEnabled()) {
                logger.debug("开始发送顺序消息,subject=" + msgFMessage.getHeaderAttribute("topic") + ",msg=" + msgFMessage);
            }
            getProducer().send(MessageCovertUtil.transActiveMQMessage(msgFMessage), 2, this.defaultPriority, this.defaultLiveTime * 1000);
            if (logger.isDebugEnabled()) {
                logger.debug("顺序消息发送成功,subject=" + msgFMessage.getHeaderAttribute("topic") + ",msg=" + msgFMessage);
            }
            if (hasSendMessageHook()) {
                sendMsgContext.setCostTime(System.currentTimeMillis() - j);
                sendMsgContext.setSendSucess(true);
                sendMsgContext.setDestinationType(getModelInfo().getDestinationInfo().isQueueType() ? "queue" : "topic");
                executeSendMessageHookAfter(sendMsgContext);
            }
        } catch (Exception e) {
            if (hasSendMessageHook()) {
                sendMsgContext.setCostTime(System.currentTimeMillis() - 0);
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e);
                sendMsgContext.setDestinationType(getModelInfo().getDestinationInfo().isQueueType() ? "queue" : "topic");
                executeSendMessageHookAfter(null);
            }
            throw new RemoteException("activemq网络错误" + this, e);
        }
    }
}
