package com.taobao.metamorphosis.client.extension.producer;

import com.taobao.gecko.core.command.ResponseCommand;
import com.taobao.gecko.core.command.ResponseStatus;
import com.taobao.gecko.core.command.kernel.BooleanAckCommand;
import com.taobao.gecko.service.Connection;
import com.taobao.gecko.service.SingleRequestCallBackListener;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.extension.producer.AsyncMessageProducer;
import com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager;
import com.taobao.metamorphosis.client.producer.PartitionSelector;
import com.taobao.metamorphosis.client.producer.ProducerZooKeeper;
import com.taobao.metamorphosis.client.producer.SimpleMessageProducer;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.InvalidMessageException;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.network.BooleanCommand;
import com.taobao.metamorphosis.network.PutCommand;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/metamorphosis/client/extension/producer/AsyncMetaMessageProducer.class */
public class AsyncMetaMessageProducer extends SimpleMessageProducer implements AsyncMessageProducer, MessageRecoverManager.MessageRecoverer {
    private static final Log log = LogFactory.getLog(AsyncMetaMessageProducer.class);
    private static final int DEFAULT_PERMITS = 20000;
    private final SlidingWindow slidingWindow;
    private AsyncMessageProducer.IgnoreMessageProcessor ignoreMessageProcessor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/taobao/metamorphosis/client/extension/producer/AsyncMetaMessageProducer$MessageSendCallBackListener.class */
    public class MessageSendCallBackListener implements SingleRequestCallBackListener {
        int dataLenth;
        AtomicBoolean released = new AtomicBoolean(false);

        MessageSendCallBackListener(PutCommand putCommand) {
            byte[] data = putCommand.getData();
            this.dataLenth = data != null ? data.length : 0;
        }

        public void onResponse(ResponseCommand responseCommand, Connection connection) {
            release();
            if (responseCommand.getResponseStatus() != ResponseStatus.NO_ERROR) {
                StringBuilder sb = new StringBuilder();
                sb.append("onResponse. Status:").append(responseCommand.getResponseStatus());
                if (responseCommand instanceof BooleanCommand) {
                    sb.append(",Code:").append(((BooleanCommand) responseCommand).getCode());
                }
                if (responseCommand instanceof BooleanAckCommand) {
                    sb.append(",ErrorMsg:").append(((BooleanAckCommand) responseCommand).getErrorMsg());
                    sb.append(",ResponseHost:").append(((BooleanAckCommand) responseCommand).getResponseHost());
                }
                AsyncMetaMessageProducer.log.warn(sb.toString());
            }
        }

        public void onException(Exception exc) {
            release();
            AsyncMetaMessageProducer.log.warn(exc);
        }

        private void release() {
            if (this.released.compareAndSet(false, true)) {
                AsyncMetaMessageProducer.this.slidingWindow.releaseByLenth(this.dataLenth);
            }
        }

        public ThreadPoolExecutor getExecutor() {
            return null;
        }
    }

    /* loaded from: input_file:com/taobao/metamorphosis/client/extension/producer/AsyncMetaMessageProducer$MetaMessageOverflowException.class */
    public static class MetaMessageOverflowException extends NotifyRemotingException {
        private static final long serialVersionUID = -1842231102008256662L;

        public MetaMessageOverflowException(String str) {
            super(str);
        }

        public MetaMessageOverflowException(Throwable th) {
            super(th);
        }
    }

    public AsyncMetaMessageProducer(MetaMessageSessionFactory metaMessageSessionFactory, RemotingClientWrapper remotingClientWrapper, PartitionSelector partitionSelector, ProducerZooKeeper producerZooKeeper, String str, int i, AsyncMessageProducer.IgnoreMessageProcessor ignoreMessageProcessor) {
        super(metaMessageSessionFactory, remotingClientWrapper, partitionSelector, producerZooKeeper, str);
        this.slidingWindow = new SlidingWindow(i > 0 ? i : DEFAULT_PERMITS);
        this.ignoreMessageProcessor = ignoreMessageProcessor != null ? ignoreMessageProcessor : new AsyncIgnoreMessageProcessor(metaMessageSessionFactory.getMetaClientConfig(), this);
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.AsyncMessageProducer
    public void asyncSendMessage(Message message) {
        asyncSendMessage(message, 3000L, TimeUnit.MILLISECONDS);
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.AsyncMessageProducer
    public void asyncSendMessage(Message message, long j, TimeUnit timeUnit) {
        try {
            super.sendMessage(message, j, timeUnit);
        } catch (InvalidMessageException e) {
            log.warn(e);
        } catch (IllegalStateException e2) {
            log.warn(e2);
        } catch (MetaClientException e3) {
            if (log.isDebugEnabled()) {
                log.debug("save to local strage,and waitting for recover. cause:" + e3.getMessage());
            }
            handleSendFailMessage(message);
        } catch (InterruptedException e4) {
            Thread.currentThread().interrupt();
        } catch (Throwable th) {
            if (log.isDebugEnabled()) {
                log.debug("save to local strage,and waitting for recover. cause:", th);
            }
            handleSendFailMessage(message);
        }
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.AsyncMessageProducer
    public void setIgnoreMessageProcessor(AsyncMessageProducer.IgnoreMessageProcessor ignoreMessageProcessor) {
        this.ignoreMessageProcessor = ignoreMessageProcessor;
    }

    @Override // com.taobao.metamorphosis.client.producer.SimpleMessageProducer
    protected BooleanCommand invokeToGroup(String str, Partition partition, PutCommand putCommand, Message message, long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException, NotifyRemotingException {
        try {
            return trySend(str, putCommand, j, timeUnit);
        } catch (MetaMessageOverflowException e) {
            if (log.isDebugEnabled()) {
                log.debug("save to local strage,and waitting for recover. cause:" + e.getMessage());
            }
            return processOverMessage(partition, putCommand, message, e);
        }
    }

    private BooleanCommand trySend(String str, PutCommand putCommand, long j, TimeUnit timeUnit) throws NotifyRemotingException, InterruptedException {
        int length = putCommand.getData() != null ? putCommand.getData().length : 0;
        if (!this.slidingWindow.tryAcquireByLength(length)) {
            throw new MetaMessageOverflowException("发送消息流量超过滑动窗口单位总数：" + this.slidingWindow.getWindowsSize());
        }
        try {
            this.remotingClient.sendToGroup(str, putCommand, new MessageSendCallBackListener(putCommand), j, timeUnit);
            return new BooleanCommand(200, "-1 " + putCommand.getPartition() + " -1", putCommand.getOpaque());
        } catch (NotifyRemotingException e) {
            this.slidingWindow.releaseByLenth(length);
            if (e.getMessage().contains("超过流量限制") || e.getMessage().contains("超过允许的最大CallBack个数")) {
                throw new MetaMessageOverflowException((Throwable) e);
            }
            throw e;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private BooleanCommand processOverMessage(Partition partition, PutCommand putCommand, Message message, MetaMessageOverflowException metaMessageOverflowException) throws MetaMessageOverflowException {
        if (this.ignoreMessageProcessor == null) {
            throw metaMessageOverflowException;
        }
        handleSendFailMessage(message);
        return new BooleanCommand(200, "-1 " + putCommand.getPartition() + " -1", putCommand.getOpaque());
    }

    private void handleSendFailMessage(Message message) {
        try {
            this.ignoreMessageProcessor.handle(message);
        } catch (Exception e) {
            log.warn(e);
        }
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager.MessageRecoverer
    public void handle(Message message) throws Exception {
        asyncSendMessage(message);
    }

    AsyncMessageProducer.IgnoreMessageProcessor getIgnoreMessageProcessor() {
        return this.ignoreMessageProcessor;
    }
}
