package com.ai.aif.msgframe.producer.mq.rocketmq.api;

import com.ai.aif.msgframe.common.CompletionListener;
import com.ai.aif.msgframe.common.IMsgForNormalProducer;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.hook.SendMsgContext;
import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.message.SendMode;
import com.ai.aif.msgframe.common.util.MessageCovertUtil;
import com.ai.aif.msgframe.common.util.StringUtils;
import com.ai.aif.msgframe.producer.mq.BaseProducer;
import com.ai.aif.msgframe.producer.mq.rocketmq.RocketMQProducerModel;
import java.rmi.RemoteException;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/producer/mq/rocketmq/api/RocketMQMsgForNormalProducer.class */
public class RocketMQMsgForNormalProducer extends BaseProducer<RocketMQProducerModel, MsgFMessage> implements IMsgForNormalProducer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQMsgForNormalProducer.class);
    private static final RocketMQResources RESOURCES = RocketMQResources.getInstance();

    public RocketMQMsgForNormalProducer(RocketMQProducerModel rocketMQProducerModel) {
        super(rocketMQProducerModel);
    }

    @Override // com.ai.aif.msgframe.producer.mq.BaseProducer
    protected void send0(final MsgFMessage msgFMessage, final CompletionListener completionListener) throws MsgFrameClientException, RemoteException {
        SendMsgContext sendMsgContext = null;
        try {
            Message transRocketMQMessage = MessageCovertUtil.transRocketMQMessage(getModelInfo().getDestinationInfo(), msgFMessage);
            DefaultMQProducer producerByCache = RESOURCES.getProducerByCache(getModelInfo());
            if (hasSendMessageHook()) {
                sendMsgContext = new SendMsgContext();
                sendMsgContext.setBrokerAddr(getUrl());
                sendMsgContext.setMsg(msgFMessage);
                sendMsgContext.setProducer(this);
                sendMsgContext.setQueue(getRealQueue());
                executeSendMessageHookBefore(sendMsgContext);
            }
            if (null != completionListener) {
                producerByCache.send(transRocketMQMessage, new SendCallback() { // from class: com.ai.aif.msgframe.producer.mq.rocketmq.api.RocketMQMsgForNormalProducer.1
                    public void onException(Throwable th) {
                        if (th instanceof Exception) {
                            completionListener.onException(msgFMessage, (Exception) th);
                        } else {
                            completionListener.onException(msgFMessage, new Exception("unknown Exception"));
                        }
                    }

                    public void onSuccess(SendResult sendResult) {
                        msgFMessage.setHeaderAttribute("RocketMQ_SendResult", sendResult.toString());
                        completionListener.onCompletion(msgFMessage);
                    }
                });
            } else if (StringUtils.isNotBlank(msgFMessage.getSendMode()) && SendMode.ONEWAY.getValue().equals(msgFMessage.getSendMode())) {
                producerByCache.sendOneway(transRocketMQMessage);
                return;
            } else {
                SendResult send = producerByCache.send(transRocketMQMessage);
                if (log.isDebugEnabled()) {
                    log.debug("开始发送消息，消息路由信息，消息信息" + msgFMessage + "--->" + getModelInfo() + ",OffsetMsgId=" + send.getOffsetMsgId() + ",queueI" + send.getMessageQueue().getQueueId() + ",Topic=" + send.getMessageQueue().getTopic() + ",brokerName=" + send.getMessageQueue().getBrokerName());
                }
            }
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(true);
                executeSendMessageHookAfter(sendMsgContext);
            }
        } catch (InterruptedException e) {
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e);
                executeSendMessageHookAfter(null);
            }
            throw new RemoteException("reocketmq网络交互异常,网络信息" + getModelInfo(), e);
        } catch (RemotingException e2) {
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e2);
                executeSendMessageHookAfter(null);
            }
            throw new RemoteException("reocketmq网络交互异常,网络信息" + getModelInfo(), e2);
        } catch (MQBrokerException e3) {
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e3);
                executeSendMessageHookAfter(null);
            }
            throw new RemoteException("reocketmq网络交互异常,网络信息" + getModelInfo(), e3);
        } catch (MQClientException e4) {
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e4);
                executeSendMessageHookAfter(null);
            }
            throw new RemoteException("reocketmq网络交互异常,网络信息" + getModelInfo(), e4);
        }
    }

    public void replySend(MsgFMessage msgFMessage) throws MsgFrameClientException, RemoteException {
        Message transRocketMQMessage = MessageCovertUtil.transRocketMQMessage(getModelInfo().getDestinationInfo(), msgFMessage);
        try {
            transRocketMQMessage.setTopic(transRocketMQMessage.getTopic() + "_reply");
            RESOURCES.getProducerByCache(getModelInfo()).send(transRocketMQMessage);
        } catch (Exception e) {
            throw new RemoteException("发送消息异常，异常节点信息：" + getModelInfo().toString(), e);
        }
    }

    public void sendOrderMsg(MsgFMessage msgFMessage, String str) throws MsgFrameClientException, RemoteException {
        SendMsgContext sendMsgContext = null;
        try {
            if (hasSendMessageHook()) {
                sendMsgContext = new SendMsgContext();
                sendMsgContext.setBrokerAddr(getUrl());
                sendMsgContext.setMsg(msgFMessage);
                sendMsgContext.setProducer(this);
                sendMsgContext.setQueue(getRealQueue());
                executeSendMessageHookBefore(sendMsgContext);
            }
            SendResult send = RESOURCES.getProducerByCache(getModelInfo()).send(MessageCovertUtil.transRocketMQMessage(getModelInfo().getDestinationInfo(), msgFMessage), new MessageQueueSelector() { // from class: com.ai.aif.msgframe.producer.mq.rocketmq.api.RocketMQMsgForNormalProducer.2
                public MessageQueue select(List<MessageQueue> list, Message message, Object obj) {
                    return list.get(((Integer) obj).intValue() % list.size());
                }
            }, Integer.valueOf(Math.abs(str.intern().hashCode())));
            if (log.isDebugEnabled()) {
                log.debug("开始发送消息，消息路由信息，消息信息" + msgFMessage + "--->" + getModelInfo() + ",OffsetMsgId=" + send.getOffsetMsgId() + ",queueI" + send.getMessageQueue().getQueueId() + ",Topic=" + send.getMessageQueue().getTopic() + ",brokerName=" + send.getMessageQueue().getBrokerName());
            }
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(true);
                executeSendMessageHookAfter(sendMsgContext);
            }
        } catch (RemotingException e) {
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e);
                executeSendMessageHookAfter(sendMsgContext);
            }
            throw new RemoteException("reocketmq网络交互异常,网络信息" + getModelInfo(), e);
        } catch (MQBrokerException e2) {
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e2);
                executeSendMessageHookAfter(sendMsgContext);
            }
            throw new RemoteException("reocketmq网络交互异常,网络信息" + getModelInfo(), e2);
        } catch (MQClientException e3) {
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e3);
                executeSendMessageHookAfter(sendMsgContext);
            }
            throw new RemoteException("reocketmq网络交互异常,网络信息" + getModelInfo(), e3);
        } catch (InterruptedException e4) {
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e4);
                executeSendMessageHookAfter(sendMsgContext);
            }
            throw new RemoteException("reocketmq网络交互异常,网络信息" + getModelInfo(), e4);
        }
    }
}
