package com.ai.aif.msgframe.producer.mq.rocketmq5.api;

import com.ai.aif.msgframe.common.CompletionListener;
import com.ai.aif.msgframe.common.IMsgForNormalProducer;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.hook.SendMsgContext;
import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.producer.mq.BaseProducer;
import com.ai.aif.msgframe.producer.mq.rocketmq5.RocketMQ5ProducerModel;
import com.ai.ipu.msgframe.util.MessageCovertUtil;
import java.rmi.RemoteException;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/producer/mq/rocketmq5/api/RocketMQ5MsgForNormalProducer.class */
public class RocketMQ5MsgForNormalProducer extends BaseProducer<RocketMQ5ProducerModel, MsgFMessage> implements IMsgForNormalProducer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQ5MsgForNormalProducer.class);
    private static final RocketMQ5Resources RESOURCES = RocketMQ5Resources.getInstance();

    public RocketMQ5MsgForNormalProducer(RocketMQ5ProducerModel rocketMQ5ProducerModel) {
        super(rocketMQ5ProducerModel);
    }

    @Override // com.ai.aif.msgframe.producer.mq.BaseProducer
    protected void send0(MsgFMessage msgFMessage, CompletionListener completionListener) throws MsgFrameClientException, RemoteException {
        SendMsgContext sendMsgContext = null;
        try {
            Message transRocketMQMessage = MessageCovertUtil.transRocketMQMessage(getModelInfo(), msgFMessage);
            Producer producerByCache = RESOURCES.getProducerByCache(getModelInfo());
            if (hasSendMessageHook()) {
                sendMsgContext = new SendMsgContext();
                sendMsgContext.setBrokerAddr(getUrl());
                sendMsgContext.setMsg(msgFMessage);
                sendMsgContext.setProducer(this);
                sendMsgContext.setQueue(getRealQueue());
                executeSendMessageHookBefore(sendMsgContext);
            }
            SendReceipt send = producerByCache.send(transRocketMQMessage);
            if (log.isDebugEnabled()) {
                log.debug("开始发送消息，消息路由信息，消息信息" + msgFMessage + "--->" + getModelInfo() + ",messageId=" + send.getMessageId() + ",queueId=" + msgFMessage.getQueueId() + ",msgHost=" + msgFMessage.getMsgHost());
            }
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(true);
                executeSendMessageHookAfter(sendMsgContext);
            }
        } catch (Exception e) {
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e);
                executeSendMessageHookAfter(sendMsgContext);
            }
            throw new RemoteException("reocketmq网络交互异常,网络信息" + getModelInfo(), e);
        } catch (MsgFrameClientException e2) {
            if (hasSendMessageHook()) {
                sendMsgContext.setSendSucess(false);
                sendMsgContext.setException(e2);
                executeSendMessageHookAfter(sendMsgContext);
            }
            throw new RemoteException("reocketmq网络交互异常,网络信息" + getModelInfo(), e2);
        }
    }

    public void replySend(MsgFMessage msgFMessage) throws MsgFrameClientException, RemoteException {
        try {
            RESOURCES.getProducerByCache(getModelInfo()).send(MessageCovertUtil.transRocketMQMessage(getModelInfo(), msgFMessage, getModelInfo().getDestinationInfo().getSubjectName() + "_reply"));
        } catch (Exception e) {
            throw new RemoteException("发送消息异常，异常节点信息：" + getModelInfo().toString(), e);
        }
    }

    public void sendOrderMsg(MsgFMessage msgFMessage, String str) throws MsgFrameClientException, RemoteException {
        throw new MsgFrameClientException("不支持向RocketMQ5发送顺序消息!");
    }
}
