package com.ai.aif.msgframe.producer.mq;

import com.ai.aif.msgframe.common.CompletionListener;
import com.ai.aif.msgframe.common.IModelInfo;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.hook.ISendMessageHook;
import com.ai.aif.msgframe.common.hook.SendMsgContext;
import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.model.impl.ContainerModel;
import com.asiainfo.msgframe.ProducerCfg;
import com.google.common.util.concurrent.RateLimiter;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/producer/mq/BaseProducer.class */
public abstract class BaseProducer<K extends IModelInfo, V extends MsgFMessage> implements IModelInfo {
    private static final Logger logger = LoggerFactory.getLogger(BaseProducer.class);
    private static final List<ISendMessageHook> SendMessageHookList = new ArrayList();
    private K modelInfo;
    private static RateLimiter rateLimiter;
    private static long rateLimitWaitTime;

    public boolean hasSendMessageHook() {
        return !SendMessageHookList.isEmpty();
    }

    public void executeSendMessageHookBefore(SendMsgContext sendMsgContext) {
        if (SendMessageHookList.isEmpty()) {
            return;
        }
        Iterator<ISendMessageHook> it = SendMessageHookList.iterator();
        while (it.hasNext()) {
            try {
                it.next().sendMessageBefore(sendMsgContext);
            } catch (Exception e) {
                logger.error("执行hook的sendMessageBefore异常,context={}", sendMsgContext, e);
            }
        }
    }

    public void executeSendMessageHookAfter(SendMsgContext sendMsgContext) {
        if (SendMessageHookList.isEmpty()) {
            return;
        }
        Iterator<ISendMessageHook> it = SendMessageHookList.iterator();
        while (it.hasNext()) {
            try {
                it.next().sendMessageAfter(sendMsgContext);
            } catch (Exception e) {
                logger.error("执行hook的sendMessageAfter异常,context={}", sendMsgContext, e);
            }
        }
    }

    public BaseProducer(K k) {
        this.modelInfo = k;
    }

    public String getSubject() {
        return this.modelInfo.getSubject();
    }

    public String getClusterName() {
        return this.modelInfo.getClusterName();
    }

    public String getCenter() {
        return this.modelInfo.getCenter();
    }

    public String getTag() {
        return this.modelInfo.getTag();
    }

    public String getCanary() {
        return this.modelInfo.getCanary();
    }

    public int getIndex() {
        return this.modelInfo.getIndex();
    }

    public String getUrl() {
        return this.modelInfo.getUrl();
    }

    public String getRealQueue() {
        return this.modelInfo.getRealQueue();
    }

    public boolean isQueueType() {
        return this.modelInfo.isQueueType();
    }

    public int getCompressMsgBodyOverHowmuch() {
        return this.modelInfo.getCompressMsgBodyOverHowmuch();
    }

    public int getMaxMessageSize() {
        return this.modelInfo.getMaxMessageSize();
    }

    public K getModelInfo() {
        return this.modelInfo;
    }

    protected abstract void send0(V v, CompletionListener completionListener) throws MsgFrameClientException, RemoteException;

    public void send(V v, CompletionListener completionListener) throws MsgFrameClientException, RemoteException {
        if (rateLimiter != null && ((v.getHeaderMap() == null || !v.getHeaderMap().containsKey("SKIP_RATELIMITER")) && !rateLimiter.tryAcquire(rateLimitWaitTime, TimeUnit.MILLISECONDS))) {
            throw new MsgFrameClientException("FLOW_CONTROL", "达到流量控制阀值!");
        }
        send0(v, completionListener);
    }

    static {
        String[] prodInjectionArray;
        if (null != ContainerModel.getCfg().getGlobalCfg() && null != (prodInjectionArray = ContainerModel.getCfg().getGlobalCfg().getProdInjectionArray()) && prodInjectionArray.length > 0) {
            for (String str : prodInjectionArray) {
                try {
                    SendMessageHookList.add((ISendMessageHook) Class.forName(str).newInstance());
                } catch (Throwable th) {
                    logger.error("生产端注入实现类" + str + "配置不正确，请检查配置", th);
                }
            }
        }
        ProducerCfg producerCfg = ContainerModel.getCfg().getProducerCfg();
        if (null != producerCfg) {
            logger.info("初始化消息发送流控。ThresholdNum=" + producerCfg.getRateLimitThresholdNum() + ",WaitTime=" + producerCfg.getRateLimitWaitTime());
            rateLimiter = RateLimiter.create(producerCfg.getRateLimitThresholdNum());
            rateLimitWaitTime = producerCfg.getRateLimitWaitTime();
        }
    }
}
