package com.taobao.metamorphosis.client.consumer;

import com.taobao.gecko.core.util.ConcurrentHashSet;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.MessageAccessor;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.InvalidMessageException;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.MetaStatLog;
import java.util.Iterator;
import java.util.concurrent.RejectedExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/metamorphosis/client/consumer/SimpleFetchManager.class */
public class SimpleFetchManager implements FetchManager {
    private volatile boolean shutdown = false;
    private Thread[] fetchThreads;
    private FetchRequestRunner[] requestRunners;
    private volatile int fetchRequestCount;
    private FetchRequestQueue requestQueue;
    private final ConsumerConfig consumerConfig;
    private final InnerConsumer consumer;
    static final Log log = LogFactory.getLog(SimpleFetchManager.class);

    /* loaded from: input_file:com/taobao/metamorphosis/client/consumer/SimpleFetchManager$FetchRequestRunner.class */
    class FetchRequestRunner implements Runnable {
        private static final int DELAY_NPARTS = 10;
        private long lastLogNoConnectionTime;
        private volatile boolean stopped = false;
        private final ConcurrentHashSet<Thread> executorThreads = new ConcurrentHashSet<>();

        FetchRequestRunner() {
        }

        void shutdown() {
            this.stopped = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.stopped) {
                try {
                    processRequest(SimpleFetchManager.this.requestQueue.take());
                } catch (InterruptedException e) {
                }
            }
        }

        void processRequest(FetchRequest fetchRequest) {
            try {
                notifyListener(fetchRequest, SimpleFetchManager.this.consumer.fetch(fetchRequest, -1L, null), SimpleFetchManager.this.consumer.getMessageListener(fetchRequest.getTopic()));
            } catch (InterruptedException e) {
                reAddFetchRequest2Queue(fetchRequest);
            } catch (MetaClientException e2) {
                updateDelay(fetchRequest);
                LogAddRequest(fetchRequest, e2);
            } catch (Throwable th) {
                updateDelay(fetchRequest);
                LogAddRequest(fetchRequest, th);
            }
        }

        private void LogAddRequest(FetchRequest fetchRequest, Throwable th) {
            if ((th instanceof MetaClientException) && (th.getCause() instanceof NotifyRemotingException) && th.getMessage().contains("无可用连接")) {
                long currentTimeMillis = System.currentTimeMillis();
                if (this.lastLogNoConnectionTime <= 0 || currentTimeMillis - this.lastLogNoConnectionTime > 30000) {
                    SimpleFetchManager.log.error("获取消息失败,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition(), th);
                    this.lastLogNoConnectionTime = currentTimeMillis;
                }
            } else {
                SimpleFetchManager.log.error("获取消息失败,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition(), th);
            }
            reAddFetchRequest2Queue(fetchRequest);
        }

        private void getOffsetAddRequest(FetchRequest fetchRequest, InvalidMessageException invalidMessageException) {
            try {
                try {
                    long offset = SimpleFetchManager.this.consumer.offset(fetchRequest);
                    fetchRequest.resetRetries();
                    if (!this.stopped) {
                        fetchRequest.setOffset(offset, fetchRequest.getLastMessageId(), fetchRequest.getPartitionObject().isAutoAck());
                    }
                    reAddFetchRequest2Queue(fetchRequest);
                } catch (MetaClientException e) {
                    SimpleFetchManager.log.error("查询offset失败,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition(), invalidMessageException);
                    reAddFetchRequest2Queue(fetchRequest);
                }
            } catch (Throwable th) {
                reAddFetchRequest2Queue(fetchRequest);
                throw th;
            }
        }

        public void interruptExecutor() {
            Iterator it = this.executorThreads.iterator();
            while (it.hasNext()) {
                Thread thread = (Thread) it.next();
                if (!thread.isInterrupted()) {
                    thread.interrupt();
                }
            }
        }

        private void notifyListener(final FetchRequest fetchRequest, final MessageIterator messageIterator, final MessageListener messageListener) {
            if (messageListener != null) {
                if (messageListener.getExecutor() == null) {
                    receiveMessages(fetchRequest, messageIterator, messageListener);
                    return;
                }
                try {
                    messageListener.getExecutor().execute(new Runnable() { // from class: com.taobao.metamorphosis.client.consumer.SimpleFetchManager.FetchRequestRunner.1
                        @Override // java.lang.Runnable
                        public void run() {
                            Thread currentThread = Thread.currentThread();
                            FetchRequestRunner.this.executorThreads.add(currentThread);
                            try {
                                FetchRequestRunner.this.receiveMessages(fetchRequest, messageIterator, messageListener);
                                FetchRequestRunner.this.executorThreads.remove(currentThread);
                            } catch (Throwable th) {
                                FetchRequestRunner.this.executorThreads.remove(currentThread);
                                throw th;
                            }
                        }
                    });
                } catch (RejectedExecutionException e) {
                    SimpleFetchManager.log.error("MessageListener线程池繁忙，无法处理消息,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition(), e);
                    reAddFetchRequest2Queue(fetchRequest);
                }
            }
        }

        private void reAddFetchRequest2Queue(FetchRequest fetchRequest) {
            if (this.stopped) {
                return;
            }
            SimpleFetchManager.this.addFetchRequest(fetchRequest);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void receiveMessages(FetchRequest fetchRequest, MessageIterator messageIterator, MessageListener messageListener) {
            if (messageIterator != null && messageIterator.hasNext()) {
                if (processWhenRetryTooMany(fetchRequest, messageIterator)) {
                    return;
                }
                Partition partitionObject = fetchRequest.getPartitionObject();
                if (processReceiveMessage(fetchRequest, messageIterator, messageListener, partitionObject)) {
                    return;
                }
                postReceiveMessage(fetchRequest, messageIterator, partitionObject);
                return;
            }
            if (SimpleFetchManager.this.isRetryTooManyForIncrease(fetchRequest) && messageIterator != null && messageIterator.getDataLength() > 0) {
                fetchRequest.increaseMaxSize();
                SimpleFetchManager.log.warn("警告，第" + fetchRequest.getRetries() + "次无法拉取topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartitionObject() + "的消息，递增maxSize=" + fetchRequest.getMaxSize() + " Bytes");
            }
            if (messageIterator != null) {
                fetchRequest.incrementRetriesAndGet();
            }
            updateDelay(fetchRequest);
            reAddFetchRequest2Queue(fetchRequest);
        }

        /* JADX WARN: Code restructure failed: missing block: B:23:0x003b, code lost:
        
            r11 = r11 + 1;
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private boolean processReceiveMessage(com.taobao.metamorphosis.client.consumer.FetchRequest r7, com.taobao.metamorphosis.client.consumer.MessageIterator r8, com.taobao.metamorphosis.client.consumer.MessageListener r9, com.taobao.metamorphosis.cluster.Partition r10) {
            /*
                r6 = this;
                r0 = 0
                r11 = r0
            L3:
                r0 = r8
                boolean r0 = r0.hasNext()
                if (r0 == 0) goto Ld8
                r0 = r8
                int r0 = r0.getOffset()
                r12 = r0
                r0 = r8
                com.taobao.metamorphosis.Message r0 = r0.next()     // Catch: java.lang.InterruptedException -> L52 com.taobao.metamorphosis.exception.InvalidMessageException -> L89 java.lang.Throwable -> L9e
                r13 = r0
                r0 = r13
                r1 = r10
                com.taobao.metamorphosis.MessageAccessor.setPartition(r0, r1)     // Catch: java.lang.InterruptedException -> L52 com.taobao.metamorphosis.exception.InvalidMessageException -> L89 java.lang.Throwable -> L9e
                r0 = r9
                r1 = r13
                r0.recieveMessages(r1)     // Catch: java.lang.InterruptedException -> L52 com.taobao.metamorphosis.exception.InvalidMessageException -> L89 java.lang.Throwable -> L9e
                r0 = r10
                boolean r0 = r0.isAutoAck()     // Catch: java.lang.InterruptedException -> L52 com.taobao.metamorphosis.exception.InvalidMessageException -> L89 java.lang.Throwable -> L9e
                if (r0 == 0) goto L33
                int r11 = r11 + 1
                goto L4f
            L33:
                r0 = r10
                boolean r0 = r0.isAcked()     // Catch: java.lang.InterruptedException -> L52 com.taobao.metamorphosis.exception.InvalidMessageException -> L89 java.lang.Throwable -> L9e
                if (r0 == 0) goto L41
                int r11 = r11 + 1
                goto Ld8
            L41:
                r0 = r10
                boolean r0 = r0.isRollback()     // Catch: java.lang.InterruptedException -> L52 com.taobao.metamorphosis.exception.InvalidMessageException -> L89 java.lang.Throwable -> L9e
                if (r0 == 0) goto L4c
                goto Ld8
            L4c:
                int r11 = r11 + 1
            L4f:
                goto Ld5
            L52:
                r13 = move-exception
                r0 = r8
                r1 = r12
                r0.setOffset(r1)
                org.apache.commons.logging.Log r0 = com.taobao.metamorphosis.client.consumer.SimpleFetchManager.log
                java.lang.StringBuilder r1 = new java.lang.StringBuilder
                r2 = r1
                r2.<init>()
                java.lang.String r2 = "Process messages thread was interrupted,topic="
                java.lang.StringBuilder r1 = r1.append(r2)
                r2 = r7
                java.lang.String r2 = r2.getTopic()
                java.lang.StringBuilder r1 = r1.append(r2)
                java.lang.String r2 = ",partition="
                java.lang.StringBuilder r1 = r1.append(r2)
                r2 = r7
                int r2 = r2.getPartition()
                java.lang.StringBuilder r1 = r1.append(r2)
                java.lang.String r1 = r1.toString()
                r2 = r13
                r0.error(r1, r2)
                goto Ld8
            L89:
                r13 = move-exception
                r0 = 0
                java.lang.String r1 = "cli_invalid_message"
                r2 = r7
                java.lang.String r2 = r2.getTopic()
                com.taobao.metamorphosis.utils.MetaStatLog.addStat(r0, r1, r2)
                r0 = r6
                r1 = r7
                r2 = r13
                r0.getOffsetAddRequest(r1, r2)
                r0 = 1
                return r0
            L9e:
                r13 = move-exception
                r0 = r8
                r1 = r12
                r0.setOffset(r1)
                org.apache.commons.logging.Log r0 = com.taobao.metamorphosis.client.consumer.SimpleFetchManager.log
                java.lang.StringBuilder r1 = new java.lang.StringBuilder
                r2 = r1
                r2.<init>()
                java.lang.String r2 = "Process messages failed,topic="
                java.lang.StringBuilder r1 = r1.append(r2)
                r2 = r7
                java.lang.String r2 = r2.getTopic()
                java.lang.StringBuilder r1 = r1.append(r2)
                java.lang.String r2 = ",partition="
                java.lang.StringBuilder r1 = r1.append(r2)
                r2 = r7
                int r2 = r2.getPartition()
                java.lang.StringBuilder r1 = r1.append(r2)
                java.lang.String r1 = r1.toString()
                r2 = r13
                r0.error(r1, r2)
                goto Ld8
            Ld5:
                goto L3
            Ld8:
                r0 = 0
                java.lang.String r1 = "cli_get_msg_count"
                r2 = r7
                java.lang.String r2 = r2.getTopic()
                r3 = r11
                long r3 = (long) r3
                com.taobao.metamorphosis.utils.MetaStatLog.addStatValue2(r0, r1, r2, r3)
                r0 = 0
                return r0
            */
            throw new UnsupportedOperationException("Method not decompiled: com.taobao.metamorphosis.client.consumer.SimpleFetchManager.FetchRequestRunner.processReceiveMessage(com.taobao.metamorphosis.client.consumer.FetchRequest, com.taobao.metamorphosis.client.consumer.MessageIterator, com.taobao.metamorphosis.client.consumer.MessageListener, com.taobao.metamorphosis.cluster.Partition):boolean");
        }

        private boolean processWhenRetryTooMany(FetchRequest fetchRequest, MessageIterator messageIterator) {
            if (!SimpleFetchManager.this.isRetryTooMany(fetchRequest)) {
                return false;
            }
            try {
                Message next = messageIterator.next();
                MessageAccessor.setPartition(next, fetchRequest.getPartitionObject());
                MetaStatLog.addStat((String) null, "cli_skip_msg_count", next.getTopic());
                SimpleFetchManager.this.consumer.appendCouldNotProcessMessage(next);
                fetchRequest.resetRetries();
                if (!this.stopped) {
                    fetchRequest.setOffset(fetchRequest.getOffset() + messageIterator.getOffset(), messageIterator.getPrevMessage().getId(), true);
                }
                fetchRequest.setDelay(0L);
                reAddFetchRequest2Queue(fetchRequest);
                return true;
            } catch (InvalidMessageException e) {
                MetaStatLog.addStat((String) null, "cli_invalid_message", fetchRequest.getTopic());
                getOffsetAddRequest(fetchRequest, e);
                return true;
            } catch (Throwable th) {
                LogAddRequest(fetchRequest, th);
                return true;
            }
        }

        private void postReceiveMessage(FetchRequest fetchRequest, MessageIterator messageIterator, Partition partition) {
            if (messageIterator.getOffset() == 0) {
                fetchRequest.incrementRetriesAndGet();
            } else {
                fetchRequest.resetRetries();
            }
            if (partition.isAutoAck()) {
                ackRequest(fetchRequest, messageIterator, true);
                return;
            }
            if (partition.isRollback()) {
                fetchRequest.rollbackOffset();
                partition.reset();
                addRequst(fetchRequest);
            } else if (!partition.isAcked()) {
                ackRequest(fetchRequest, messageIterator, false);
            } else {
                partition.reset();
                ackRequest(fetchRequest, messageIterator, true);
            }
        }

        private void ackRequest(FetchRequest fetchRequest, MessageIterator messageIterator, boolean z) {
            if (!this.stopped) {
                fetchRequest.setOffset(fetchRequest.getOffset() + messageIterator.getOffset(), messageIterator.getPrevMessage() != null ? messageIterator.getPrevMessage().getId() : -1L, z);
            }
            addRequst(fetchRequest);
        }

        private void addRequst(FetchRequest fetchRequest) {
            fetchRequest.setDelay(getRetryDelay(fetchRequest));
            reAddFetchRequest2Queue(fetchRequest);
        }

        private long getRetryDelay(FetchRequest fetchRequest) {
            long maxDelayFetchTimeInMills = SimpleFetchManager.this.getMaxDelayFetchTimeInMills();
            long retries = (maxDelayFetchTimeInMills / 10) * fetchRequest.getRetries();
            if (retries > maxDelayFetchTimeInMills) {
                retries = maxDelayFetchTimeInMills;
            }
            return retries;
        }

        private void updateDelay(FetchRequest fetchRequest) {
            fetchRequest.setDelay(getNextDelay(fetchRequest));
        }

        private long getNextDelay(FetchRequest fetchRequest) {
            long maxDelayFetchTimeInMills = SimpleFetchManager.this.getMaxDelayFetchTimeInMills();
            long delay = fetchRequest.getDelay() + (maxDelayFetchTimeInMills / 10);
            if (delay > maxDelayFetchTimeInMills) {
                delay = maxDelayFetchTimeInMills;
            }
            return delay;
        }
    }

    public SimpleFetchManager(ConsumerConfig consumerConfig, InnerConsumer innerConsumer) {
        this.consumerConfig = consumerConfig;
        this.consumer = innerConsumer;
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public int getFetchRequestCount() {
        return this.fetchRequestCount;
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public boolean isShutdown() {
        return this.shutdown;
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public void stopFetchRunner() throws InterruptedException {
        this.shutdown = true;
        if (this.fetchThreads != null) {
            for (int i = 0; i < this.fetchThreads.length; i++) {
                Thread thread = this.fetchThreads[i];
                FetchRequestRunner fetchRequestRunner = this.requestRunners[i];
                if (thread != null) {
                    fetchRequestRunner.shutdown();
                    thread.interrupt();
                    fetchRequestRunner.interruptExecutor();
                    try {
                        thread.join(100L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        this.fetchRequestCount = 0;
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public void resetFetchState() {
        this.requestQueue = new FetchRequestQueue();
        this.fetchThreads = new Thread[this.consumerConfig.getFetchRunnerCount()];
        this.requestRunners = new FetchRequestRunner[this.consumerConfig.getFetchRunnerCount()];
        for (int i = 0; i < this.fetchThreads.length; i++) {
            FetchRequestRunner fetchRequestRunner = new FetchRequestRunner();
            this.requestRunners[i] = fetchRequestRunner;
            this.fetchThreads[i] = new Thread(fetchRequestRunner);
            this.fetchThreads[i].setName(this.consumerConfig.getGroup() + "Fetch-Runner-" + i);
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public void startFetchRunner() {
        this.fetchRequestCount = this.requestQueue.size();
        this.shutdown = false;
        for (Thread thread : this.fetchThreads) {
            thread.start();
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public void addFetchRequest(FetchRequest fetchRequest) {
        this.requestQueue.offer(fetchRequest);
    }

    FetchRequest takeFetchRequest() throws InterruptedException {
        return this.requestQueue.take();
    }

    boolean isRetryTooMany(FetchRequest fetchRequest) {
        return fetchRequest.getRetries() > this.consumerConfig.getMaxFetchRetries();
    }

    boolean isRetryTooManyForIncrease(FetchRequest fetchRequest) {
        return fetchRequest.getRetries() > this.consumerConfig.getMaxIncreaseFetchDataRetries();
    }

    long getMaxDelayFetchTimeInMills() {
        return this.consumerConfig.getMaxDelayFetchTimeInMills();
    }
}
