package com.taobao.metamorphosis.client.consumer;

import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.ZkClientChangedListener;
import com.taobao.metamorphosis.client.consumer.storage.OffsetStorage;
import com.taobao.metamorphosis.cluster.Broker;
import com.taobao.metamorphosis.cluster.Cluster;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.network.RemotingUtils;
import com.taobao.metamorphosis.utils.MetaZookeeper;
import com.taobao.metamorphosis.utils.ThreadUtils;
import com.taobao.metamorphosis.utils.ZkUtils;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;

/* loaded from: input_file:com/taobao/metamorphosis/client/consumer/ConsumerZooKeeper.class */
public class ConsumerZooKeeper implements ZkClientChangedListener {
    protected ZkClient zkClient;
    private final RemotingClientWrapper remotingClient;
    private final ZkUtils.ZKConfig zkConfig;
    protected final MetaZookeeper metaZookeeper;
    static final Log log = LogFactory.getLog(ConsumerZooKeeper.class);
    protected final ConcurrentHashMap<FetchManager, FutureTask<ZKLoadRebalanceListener>> consumerLoadBalanceListeners = new ConcurrentHashMap<>();
    private final AtomicInteger counter = new AtomicInteger(0);

    /* loaded from: input_file:com/taobao/metamorphosis/client/consumer/ConsumerZooKeeper$ZKLoadRebalanceListener.class */
    public class ZKLoadRebalanceListener implements IZkChildListener {
        private final MetaZookeeper.ZKGroupDirs dirs;
        private final String group;
        protected final String consumerIdString;
        static final int MAX_N_RETRIES = 7;
        private final LoadBalanceStrategy loadBalanceStrategy;
        private final ConcurrentHashMap<String, SubscriberInfo> topicSubcriberRegistry;
        private final ConsumerConfig consumerConfig;
        private final OffsetStorage offsetStorage;
        private final FetchManager fetchManager;
        Map<String, List<String>> oldConsumersPerTopicMap = new HashMap();
        Map<String, List<String>> oldPartitionsPerTopicMap = new HashMap();
        private final Lock rebalanceLock = new ReentrantLock();
        final ConcurrentHashMap<String, ConcurrentHashMap<Partition, TopicPartitionRegInfo>> topicRegistry = new ConcurrentHashMap<>();
        Set<Broker> oldBrokerSet = new HashSet();
        private Cluster oldCluster = new Cluster();

        public ZKLoadRebalanceListener(FetchManager fetchManager, MetaZookeeper.ZKGroupDirs zKGroupDirs, String str, ConsumerConfig consumerConfig, OffsetStorage offsetStorage, ConcurrentHashMap<String, SubscriberInfo> concurrentHashMap, LoadBalanceStrategy loadBalanceStrategy) {
            this.fetchManager = fetchManager;
            this.dirs = zKGroupDirs;
            this.consumerIdString = str;
            this.group = consumerConfig.getGroup();
            this.consumerConfig = consumerConfig;
            this.offsetStorage = offsetStorage;
            this.topicSubcriberRegistry = concurrentHashMap;
            this.loadBalanceStrategy = loadBalanceStrategy;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void commitOffsets() {
            this.offsetStorage.commitOffset(this.consumerConfig.getGroup(), getTopicPartitionRegInfos());
        }

        private TopicPartitionRegInfo initTopicPartitionRegInfo(String str, String str2, Partition partition, long j) {
            this.offsetStorage.initOffset(str, str2, partition, j);
            return new TopicPartitionRegInfo(str, partition, j);
        }

        public Map<String, Set<Partition>> getTopicPartitions() {
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, ConcurrentHashMap<Partition, TopicPartitionRegInfo>> entry : this.topicRegistry.entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue().keySet());
            }
            return hashMap;
        }

        List<TopicPartitionRegInfo> getTopicPartitionRegInfos() {
            ArrayList arrayList = new ArrayList();
            Iterator<ConcurrentHashMap<Partition, TopicPartitionRegInfo>> it = this.topicRegistry.values().iterator();
            while (it.hasNext()) {
                Collection<TopicPartitionRegInfo> values = it.next().values();
                if (values != null) {
                    arrayList.addAll(values);
                }
            }
            return arrayList;
        }

        private TopicPartitionRegInfo loadTopicPartitionRegInfo(String str, Partition partition) {
            return this.offsetStorage.load(str, this.consumerConfig.getGroup(), partition);
        }

        public void handleChildChange(String str, List<String> list) throws Exception {
            syncedRebalance();
        }

        void syncedRebalance() throws Exception {
            boolean z;
            this.rebalanceLock.lock();
            for (int i = 0; i < MAX_N_RETRIES; i++) {
                try {
                    ConsumerZooKeeper.log.info("begin rebalancing consumer " + this.consumerIdString + " try #" + i);
                    try {
                        z = rebalance();
                    } catch (Throwable th) {
                        ConsumerZooKeeper.log.warn("unexpected exception occured while try rebalancing", th);
                        z = false;
                    }
                    ConsumerZooKeeper.log.info("end rebalancing consumer " + this.consumerIdString + " try #" + i);
                    if (z) {
                        ConsumerZooKeeper.log.info("rebalance success.");
                        this.rebalanceLock.unlock();
                        return;
                    } else {
                        ConsumerZooKeeper.log.warn("rebalance failed,try #" + i);
                        releaseAllPartitionOwnership();
                        resetState();
                        Thread.sleep(ConsumerZooKeeper.this.zkConfig.zkSyncTimeMs);
                    }
                } catch (Throwable th2) {
                    this.rebalanceLock.unlock();
                    throw th2;
                }
            }
            ConsumerZooKeeper.log.error("rebalance failed,finally");
            this.rebalanceLock.unlock();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resetState() {
            this.topicRegistry.clear();
            this.oldConsumersPerTopicMap.clear();
            this.oldPartitionsPerTopicMap.clear();
        }

        protected void updateFetchRunner(Cluster cluster) throws Exception {
            this.fetchManager.resetFetchState();
            HashSet<Broker> hashSet = new HashSet();
            for (Map.Entry<String, ConcurrentHashMap<Partition, TopicPartitionRegInfo>> entry : this.topicRegistry.entrySet()) {
                String key = entry.getKey();
                for (Map.Entry<Partition, TopicPartitionRegInfo> entry2 : entry.getValue().entrySet()) {
                    Partition key2 = entry2.getKey();
                    TopicPartitionRegInfo value = entry2.getValue();
                    Broker brokerRandom = cluster.getBrokerRandom(key2.getBrokerId());
                    if (brokerRandom != null) {
                        hashSet.add(brokerRandom);
                        this.fetchManager.addFetchRequest(new FetchRequest(brokerRandom, 0L, value, this.topicSubcriberRegistry.get(key).getMaxSize()));
                    }
                }
            }
            for (Broker broker : hashSet) {
                if (!this.oldBrokerSet.contains(broker)) {
                    try {
                        ConsumerZooKeeper.this.remotingClient.connect(broker.getZKString(), this);
                        ConsumerZooKeeper.this.remotingClient.awaitReadyInterrupt(broker.getZKString());
                        ConsumerZooKeeper.log.warn("Connect to " + broker.getZKString());
                    } catch (NotifyRemotingException e) {
                        ConsumerZooKeeper.log.error("Connect to " + broker.getZKString() + " failed", e);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            for (Broker broker2 : this.oldBrokerSet) {
                if (!hashSet.contains(broker2)) {
                    try {
                        ConsumerZooKeeper.this.remotingClient.close(broker2.getZKString(), this, false);
                        ConsumerZooKeeper.log.warn("Closed " + broker2.getZKString());
                    } catch (NotifyRemotingException e3) {
                        ConsumerZooKeeper.log.error("Close " + broker2.getZKString() + " failed", e3);
                    }
                }
            }
            ConsumerZooKeeper.log.info("Starting fetch runners");
            this.oldBrokerSet = hashSet;
            this.fetchManager.startFetchRunner();
        }

        boolean rebalance() throws Exception {
            Map<String, String> consumerPerTopic = getConsumerPerTopic(this.consumerIdString);
            Cluster cluster = ConsumerZooKeeper.this.metaZookeeper.getCluster();
            try {
                Map<String, List<String>> consumersPerTopic = getConsumersPerTopic(this.group);
                Map<String, List<String>> partitionStringsForTopics = getPartitionStringsForTopics(consumerPerTopic);
                Map<String, String> relevantTopicMap = getRelevantTopicMap(consumerPerTopic, partitionStringsForTopics, this.oldPartitionsPerTopicMap, consumersPerTopic, this.oldConsumersPerTopicMap);
                if (relevantTopicMap.size() <= 0) {
                    if (!checkClusterChange(cluster)) {
                        ConsumerZooKeeper.log.info("Consumer " + this.consumerIdString + " with " + consumersPerTopic + " doesn't need to be rebalanced.");
                        return true;
                    }
                    ConsumerZooKeeper.log.info("Stopping fetch runners,maybe master or slave changed");
                    this.fetchManager.stopFetchRunner();
                    updateFetchRunner(cluster);
                    this.oldCluster = cluster;
                    return true;
                }
                ConsumerZooKeeper.log.info("Stopping fetch runners");
                this.fetchManager.stopFetchRunner();
                ConsumerZooKeeper.log.info("Comitting all offsets");
                commitOffsets();
                for (Map.Entry<String, String> entry : relevantTopicMap.entrySet()) {
                    String key = entry.getKey();
                    String value = entry.getValue();
                    MetaZookeeper metaZookeeper = ConsumerZooKeeper.this.metaZookeeper;
                    metaZookeeper.getClass();
                    MetaZookeeper.ZKGroupTopicDirs zKGroupTopicDirs = new MetaZookeeper.ZKGroupTopicDirs(metaZookeeper, key, this.group);
                    List<String> list = consumersPerTopic.get(key);
                    List<String> list2 = partitionStringsForTopics.get(key);
                    if (list == null) {
                        ConsumerZooKeeper.log.info("Releasing partition ownerships for topic:" + key);
                        releasePartitionOwnership(key);
                        this.topicRegistry.remove(key);
                        ConsumerZooKeeper.log.info("There are no consumers subscribe topic " + key);
                    } else if (list2 == null) {
                        ConsumerZooKeeper.log.info("Releasing partition ownerships for topic:" + key);
                        releasePartitionOwnership(key);
                        this.topicRegistry.remove(key);
                        ConsumerZooKeeper.log.info("There are no partitions under topic " + key);
                    } else {
                        List<String> partitions = this.loadBalanceStrategy.getPartitions(key, value, list, list2);
                        ConcurrentHashMap<Partition, TopicPartitionRegInfo> concurrentHashMap = this.topicRegistry.get(key);
                        if (concurrentHashMap == null) {
                            concurrentHashMap = new ConcurrentHashMap<>();
                            this.topicRegistry.put(key, new ConcurrentHashMap<>());
                        }
                        Set<Partition> keySet = concurrentHashMap.keySet();
                        for (Partition partition : keySet) {
                            if (!partitions.contains(partition.toString())) {
                                ConsumerZooKeeper.log.info("Releasing partition ownerships for partition:" + partition);
                                concurrentHashMap.remove(partition);
                                releasePartitionOwnership(key, partition);
                            }
                        }
                        for (String str : partitions) {
                            if (!keySet.contains(new Partition(str))) {
                                ConsumerZooKeeper.log.info(value + " attempting to claim partition " + str);
                                if (!processPartition(zKGroupTopicDirs, str, key, value)) {
                                    return false;
                                }
                            }
                        }
                    }
                }
                updateFetchRunner(cluster);
                this.oldPartitionsPerTopicMap = partitionStringsForTopics;
                this.oldConsumersPerTopicMap = consumersPerTopic;
                this.oldCluster = cluster;
                return true;
            } catch (KeeperException.NoNodeException e) {
                ConsumerZooKeeper.log.warn("maybe other consumer is rebalancing now," + e.getMessage());
                return false;
            }
        }

        protected boolean checkClusterChange(Cluster cluster) {
            return !this.oldCluster.equals(cluster);
        }

        protected Map<String, List<String>> getPartitionStringsForTopics(Map<String, String> map) {
            return ConsumerZooKeeper.this.metaZookeeper.getPartitionStringsForSubTopics(map.keySet());
        }

        protected boolean processPartition(MetaZookeeper.ZKGroupTopicDirs zKGroupTopicDirs, String str, String str2, String str3) throws Exception {
            try {
                ZkUtils.createEphemeralPathExpectConflict(ConsumerZooKeeper.this.zkClient, zKGroupTopicDirs.consumerOwnerDir + "/" + str, str3);
                addPartitionTopicInfo(zKGroupTopicDirs, str, str2, str3);
                return true;
            } catch (Exception e) {
                throw e;
            } catch (ZkNodeExistsException e2) {
                ConsumerZooKeeper.log.info("waiting for the partition ownership to be deleted: " + str);
                return false;
            }
        }

        protected void addPartitionTopicInfo(MetaZookeeper.ZKGroupTopicDirs zKGroupTopicDirs, String str, String str2, String str3) {
            Partition partition = new Partition(str);
            ConcurrentHashMap<Partition, TopicPartitionRegInfo> concurrentHashMap = this.topicRegistry.get(str2);
            TopicPartitionRegInfo loadTopicPartitionRegInfo = loadTopicPartitionRegInfo(str2, partition);
            if (loadTopicPartitionRegInfo == null) {
                loadTopicPartitionRegInfo = initTopicPartitionRegInfo(str2, str3, partition, this.consumerConfig.getOffset());
            }
            concurrentHashMap.put(partition, loadTopicPartitionRegInfo);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void releaseAllPartitionOwnership() {
            for (Map.Entry<String, ConcurrentHashMap<Partition, TopicPartitionRegInfo>> entry : this.topicRegistry.entrySet()) {
                String key = entry.getKey();
                MetaZookeeper metaZookeeper = ConsumerZooKeeper.this.metaZookeeper;
                metaZookeeper.getClass();
                MetaZookeeper.ZKGroupTopicDirs zKGroupTopicDirs = new MetaZookeeper.ZKGroupTopicDirs(metaZookeeper, key, this.consumerConfig.getGroup());
                Iterator<Partition> it = entry.getValue().keySet().iterator();
                while (it.hasNext()) {
                    deleteOwnership(zKGroupTopicDirs.consumerOwnerDir + "/" + it.next());
                }
            }
        }

        private void releasePartitionOwnership(String str, Partition partition) {
            MetaZookeeper metaZookeeper = ConsumerZooKeeper.this.metaZookeeper;
            metaZookeeper.getClass();
            deleteOwnership(new MetaZookeeper.ZKGroupTopicDirs(metaZookeeper, str, this.consumerConfig.getGroup()).consumerOwnerDir + "/" + partition);
        }

        private void deleteOwnership(String str) {
            try {
                ZkUtils.deletePath(ConsumerZooKeeper.this.zkClient, str);
            } catch (Throwable th) {
                ConsumerZooKeeper.log.error("exception during releasePartitionOwnership", th);
            }
            if (ConsumerZooKeeper.log.isDebugEnabled()) {
                ConsumerZooKeeper.log.debug("Consumer " + this.consumerIdString + " releasing " + str);
            }
        }

        private void releasePartitionOwnership(String str) {
            MetaZookeeper metaZookeeper = ConsumerZooKeeper.this.metaZookeeper;
            metaZookeeper.getClass();
            MetaZookeeper.ZKGroupTopicDirs zKGroupTopicDirs = new MetaZookeeper.ZKGroupTopicDirs(metaZookeeper, str, this.consumerConfig.getGroup());
            ConcurrentHashMap<Partition, TopicPartitionRegInfo> concurrentHashMap = this.topicRegistry.get(str);
            if (concurrentHashMap != null) {
                Iterator<Partition> it = concurrentHashMap.keySet().iterator();
                while (it.hasNext()) {
                    deleteOwnership(zKGroupTopicDirs.consumerOwnerDir + "/" + it.next());
                }
            }
        }

        private Map<String, String> getRelevantTopicMap(Map<String, String> map, Map<String, List<String>> map2, Map<String, List<String>> map3, Map<String, List<String>> map4, Map<String, List<String>> map5) {
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, String> entry : map.entrySet()) {
                String key = entry.getKey();
                String value = entry.getValue();
                if (!listEquals(map3.get(key), map2.get(key)) || !listEquals(map5.get(key), map4.get(key))) {
                    hashMap.put(key, value);
                }
            }
            return hashMap;
        }

        private boolean listEquals(List<String> list, List<String> list2) {
            if (list == null && list2 != null) {
                return false;
            }
            if (list != null && list2 == null) {
                return false;
            }
            if (list == null && list2 == null) {
                return true;
            }
            return list.equals(list2);
        }

        protected Map<String, List<String>> getConsumersPerTopic(String str) throws Exception, KeeperException.NoNodeException {
            List<String> children = ZkUtils.getChildren(ConsumerZooKeeper.this.zkClient, this.dirs.consumerRegistryDir);
            HashMap hashMap = new HashMap();
            for (String str2 : children) {
                for (String str3 : getTopics(str2)) {
                    if (hashMap.get(str3) == null) {
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(str2);
                        hashMap.put(str3, arrayList);
                    } else {
                        ((List) hashMap.get(str3)).add(str2);
                    }
                }
            }
            Iterator it = hashMap.entrySet().iterator();
            while (it.hasNext()) {
                Collections.sort((List) ((Map.Entry) it.next()).getValue());
            }
            return hashMap;
        }

        public Map<String, String> getConsumerPerTopic(String str) throws Exception {
            List<String> topics = getTopics(str);
            HashMap hashMap = new HashMap();
            Iterator<String> it = topics.iterator();
            while (it.hasNext()) {
                hashMap.put(it.next(), str);
            }
            return hashMap;
        }

        protected List<String> getTopics(String str) throws Exception {
            String[] split = ZkUtils.readData(ConsumerZooKeeper.this.zkClient, this.dirs.consumerRegistryDir + "/" + str).split(",");
            ArrayList arrayList = new ArrayList(split.length);
            for (String str2 : split) {
                arrayList.add(str2);
            }
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/taobao/metamorphosis/client/consumer/ConsumerZooKeeper$ZKSessionExpireListenner.class */
    public class ZKSessionExpireListenner implements IZkStateListener {
        private final String consumerIdString;
        private final ZKLoadRebalanceListener loadBalancerListener;

        public ZKSessionExpireListenner(ZKLoadRebalanceListener zKLoadRebalanceListener) {
            this.consumerIdString = zKLoadRebalanceListener.consumerIdString;
            this.loadBalancerListener = zKLoadRebalanceListener;
        }

        public void handleNewSession() throws Exception {
            ConsumerZooKeeper.log.info("ZK expired; release old broker parition ownership; re-register consumer " + this.consumerIdString);
            this.loadBalancerListener.resetState();
            ConsumerZooKeeper.this.registerConsumerInternal(this.loadBalancerListener);
            this.loadBalancerListener.syncedRebalance();
        }

        public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception {
        }

        public boolean equals(Object obj) {
            if (obj instanceof ZKSessionExpireListenner) {
                return this.loadBalancerListener.equals(((ZKSessionExpireListenner) obj).loadBalancerListener);
            }
            return false;
        }

        public int hashCode() {
            return this.loadBalancerListener.hashCode();
        }
    }

    public ConsumerZooKeeper(MetaZookeeper metaZookeeper, RemotingClientWrapper remotingClientWrapper, ZkClient zkClient, ZkUtils.ZKConfig zKConfig) {
        this.metaZookeeper = metaZookeeper;
        this.zkClient = zkClient;
        this.remotingClient = remotingClientWrapper;
        this.zkConfig = zKConfig;
    }

    public void commitOffsets(FetchManager fetchManager) {
        ZKLoadRebalanceListener brokerConnectionListener = getBrokerConnectionListener(fetchManager);
        if (brokerConnectionListener != null) {
            brokerConnectionListener.commitOffsets();
        }
    }

    public ZKLoadRebalanceListener getBrokerConnectionListener(FetchManager fetchManager) {
        FutureTask<ZKLoadRebalanceListener> futureTask = this.consumerLoadBalanceListeners.get(fetchManager);
        if (futureTask == null) {
            return null;
        }
        try {
            return futureTask.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } catch (ExecutionException e2) {
            throw ThreadUtils.launderThrowable(e2.getCause());
        }
    }

    public void unRegisterConsumer(FetchManager fetchManager) {
        ZKLoadRebalanceListener zKLoadRebalanceListener;
        try {
            FutureTask<ZKLoadRebalanceListener> remove = this.consumerLoadBalanceListeners.remove(fetchManager);
            if (remove != null && (zKLoadRebalanceListener = remove.get()) != null) {
                zKLoadRebalanceListener.commitOffsets();
                this.zkClient.unsubscribeStateChanges(new ZKSessionExpireListenner(zKLoadRebalanceListener));
                MetaZookeeper metaZookeeper = this.metaZookeeper;
                metaZookeeper.getClass();
                MetaZookeeper.ZKGroupDirs zKGroupDirs = new MetaZookeeper.ZKGroupDirs(metaZookeeper, zKLoadRebalanceListener.consumerConfig.getGroup());
                this.zkClient.unsubscribeChildChanges(zKGroupDirs.consumerRegistryDir, zKLoadRebalanceListener);
                log.info("unsubscribeChildChanges:" + zKGroupDirs.consumerRegistryDir);
                Iterator it = zKLoadRebalanceListener.topicSubcriberRegistry.keySet().iterator();
                while (it.hasNext()) {
                    String str = this.metaZookeeper.brokerTopicsSubPath + "/" + ((String) it.next());
                    this.zkClient.unsubscribeChildChanges(str, zKLoadRebalanceListener);
                    log.info("unsubscribeChildChanges:" + str);
                }
                zKLoadRebalanceListener.releaseAllPartitionOwnership();
                ZkUtils.deletePath(this.zkClient, zKLoadRebalanceListener.dirs.consumerRegistryDir + "/" + zKLoadRebalanceListener.consumerIdString);
            }
        } catch (InterruptedException e) {
            Thread.interrupted();
            log.error("Interrupted when unRegisterConsumer", e);
        } catch (Exception e2) {
            log.error("Error in unRegisterConsumer,maybe error when registerConsumer", e2);
        }
    }

    public void registerConsumer(final ConsumerConfig consumerConfig, final FetchManager fetchManager, final ConcurrentHashMap<String, SubscriberInfo> concurrentHashMap, final OffsetStorage offsetStorage, final LoadBalanceStrategy loadBalanceStrategy) throws Exception {
        FutureTask<ZKLoadRebalanceListener> futureTask = new FutureTask<>(new Callable<ZKLoadRebalanceListener>() { // from class: com.taobao.metamorphosis.client.consumer.ConsumerZooKeeper.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public ZKLoadRebalanceListener call() throws Exception {
                MetaZookeeper metaZookeeper = ConsumerZooKeeper.this.metaZookeeper;
                metaZookeeper.getClass();
                return ConsumerZooKeeper.this.registerConsumerInternal(new ZKLoadRebalanceListener(fetchManager, new MetaZookeeper.ZKGroupDirs(metaZookeeper, consumerConfig.getGroup()), consumerConfig.getGroup() + "_" + ConsumerZooKeeper.this.getConsumerUUID(consumerConfig), consumerConfig, offsetStorage, concurrentHashMap, loadBalanceStrategy));
            }
        });
        if (this.consumerLoadBalanceListeners.putIfAbsent(fetchManager, futureTask) != null) {
            throw new MetaClientException("Consumer has been already registed");
        }
        futureTask.run();
    }

    protected ZKLoadRebalanceListener registerConsumerInternal(ZKLoadRebalanceListener zKLoadRebalanceListener) throws UnknownHostException, InterruptedException, Exception {
        MetaZookeeper metaZookeeper = this.metaZookeeper;
        metaZookeeper.getClass();
        MetaZookeeper.ZKGroupDirs zKGroupDirs = new MetaZookeeper.ZKGroupDirs(metaZookeeper, zKLoadRebalanceListener.consumerConfig.getGroup());
        String topicsString = getTopicsString(zKLoadRebalanceListener.topicSubcriberRegistry);
        if (this.zkClient == null) {
            zKLoadRebalanceListener.fetchManager.stopFetchRunner();
            zKLoadRebalanceListener.fetchManager.resetFetchState();
            for (String str : zKLoadRebalanceListener.topicSubcriberRegistry.keySet()) {
                SubscriberInfo subscriberInfo = (SubscriberInfo) zKLoadRebalanceListener.topicSubcriberRegistry.get(str);
                ConcurrentHashMap<Partition, TopicPartitionRegInfo> concurrentHashMap = zKLoadRebalanceListener.topicRegistry.get(str);
                if (concurrentHashMap == null) {
                    concurrentHashMap = new ConcurrentHashMap<>();
                    zKLoadRebalanceListener.topicRegistry.put(str, concurrentHashMap);
                }
                Partition partition = new Partition(zKLoadRebalanceListener.consumerConfig.getPartition());
                TopicPartitionRegInfo topicPartitionRegInfo = new TopicPartitionRegInfo(str, partition, zKLoadRebalanceListener.consumerConfig.getOffset());
                concurrentHashMap.put(partition, topicPartitionRegInfo);
                zKLoadRebalanceListener.fetchManager.addFetchRequest(new FetchRequest(new Broker(0, zKLoadRebalanceListener.consumerConfig.getServerUrl()), 0L, topicPartitionRegInfo, subscriberInfo.getMaxSize()));
            }
            zKLoadRebalanceListener.fetchManager.startFetchRunner();
        } else {
            ZkUtils.createEphemeralPathExpectConflict(this.zkClient, zKGroupDirs.consumerRegistryDir + "/" + zKLoadRebalanceListener.consumerIdString, topicsString);
            this.zkClient.subscribeChildChanges(zKGroupDirs.consumerRegistryDir, zKLoadRebalanceListener);
            Iterator it = zKLoadRebalanceListener.topicSubcriberRegistry.keySet().iterator();
            while (it.hasNext()) {
                String str2 = this.metaZookeeper.brokerTopicsSubPath + "/" + ((String) it.next());
                ZkUtils.makeSurePersistentPathExists(this.zkClient, str2);
                this.zkClient.subscribeChildChanges(str2, zKLoadRebalanceListener);
            }
            this.zkClient.subscribeStateChanges(new ZKSessionExpireListenner(zKLoadRebalanceListener));
            zKLoadRebalanceListener.syncedRebalance();
        }
        return zKLoadRebalanceListener;
    }

    private String getTopicsString(ConcurrentHashMap<String, SubscriberInfo> concurrentHashMap) {
        StringBuilder sb = new StringBuilder();
        boolean z = true;
        for (String str : concurrentHashMap.keySet()) {
            if (z) {
                z = false;
                sb.append(str);
            } else {
                sb.append(",").append(str);
            }
        }
        return sb.toString();
    }

    protected String getConsumerUUID(ConsumerConfig consumerConfig) throws Exception {
        return consumerConfig.getConsumerId() != null ? consumerConfig.getConsumerId() : RemotingUtils.getLocalAddress() + "-" + System.currentTimeMillis() + "-" + this.counter.incrementAndGet();
    }

    @Override // com.taobao.metamorphosis.client.ZkClientChangedListener
    public void onZkClientChanged(ZkClient zkClient) {
        this.zkClient = zkClient;
        Iterator<FutureTask<ZKLoadRebalanceListener>> it = this.consumerLoadBalanceListeners.values().iterator();
        while (it.hasNext()) {
            try {
                ZKLoadRebalanceListener zKLoadRebalanceListener = it.next().get();
                zKLoadRebalanceListener.topicRegistry.clear();
                log.info("re-register consumer to zk,group=" + zKLoadRebalanceListener.consumerConfig.getGroup());
                registerConsumerInternal(zKLoadRebalanceListener);
            } catch (Exception e) {
                log.error("reRegister consumer failed", e);
            }
        }
    }
}
