package com.taobao.metamorphosis.client.consumer.storage;

import com.taobao.metamorphosis.client.ZkClientChangedListener;
import com.taobao.metamorphosis.client.consumer.TopicPartitionRegInfo;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.utils.MetaZookeeper;
import com.taobao.metamorphosis.utils.ZkUtils;
import java.util.Collection;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/metamorphosis/client/consumer/storage/ZkOffsetStorage.class */
public class ZkOffsetStorage implements OffsetStorage, ZkClientChangedListener {
    private volatile ZkClient zkClient;
    private final MetaZookeeper metaZookeeper;
    static final Log log = LogFactory.getLog(ZkOffsetStorage.class);

    @Override // com.taobao.metamorphosis.client.ZkClientChangedListener
    public void onZkClientChanged(ZkClient zkClient) {
        log.info("Update ZkOffsetStorage's zkClient...");
        this.zkClient = zkClient;
    }

    public ZkOffsetStorage(MetaZookeeper metaZookeeper, ZkClient zkClient) {
        this.metaZookeeper = metaZookeeper;
        this.zkClient = zkClient;
    }

    @Override // com.taobao.metamorphosis.client.consumer.storage.OffsetStorage
    public void commitOffset(String str, Collection<TopicPartitionRegInfo> collection) {
        if (this.zkClient == null || collection == null || collection.isEmpty()) {
            return;
        }
        for (TopicPartitionRegInfo topicPartitionRegInfo : collection) {
            String topic = topicPartitionRegInfo.getTopic();
            MetaZookeeper metaZookeeper = this.metaZookeeper;
            metaZookeeper.getClass();
            MetaZookeeper.ZKGroupTopicDirs zKGroupTopicDirs = new MetaZookeeper.ZKGroupTopicDirs(metaZookeeper, topic, str);
            synchronized (topicPartitionRegInfo) {
                if (topicPartitionRegInfo.isModified()) {
                    long j = topicPartitionRegInfo.getOffset().get();
                    long messageId = topicPartitionRegInfo.getMessageId();
                    topicPartitionRegInfo.setModified(false);
                    try {
                        ZkUtils.updatePersistentPath(this.zkClient, zKGroupTopicDirs.consumerOffsetDir + "/" + topicPartitionRegInfo.getPartition().toString(), messageId + "-" + j);
                    } catch (Throwable th) {
                        log.error("exception during commitOffsets", th);
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Committed offset " + j + " for topic " + topicPartitionRegInfo.getTopic());
                    }
                }
            }
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.storage.OffsetStorage
    public TopicPartitionRegInfo load(String str, String str2, Partition partition) {
        MetaZookeeper metaZookeeper = this.metaZookeeper;
        metaZookeeper.getClass();
        String readDataMaybeNull = ZkUtils.readDataMaybeNull(this.zkClient, new MetaZookeeper.ZKGroupTopicDirs(metaZookeeper, str, str2).consumerOffsetDir + "/" + partition.toString());
        if (readDataMaybeNull == null) {
            return null;
        }
        int lastIndexOf = readDataMaybeNull.lastIndexOf("-");
        if (lastIndexOf <= 0) {
            return new TopicPartitionRegInfo(str, partition, Long.parseLong(readDataMaybeNull));
        }
        return new TopicPartitionRegInfo(str, partition, Long.parseLong(readDataMaybeNull.substring(lastIndexOf + 1)), Long.parseLong(readDataMaybeNull.substring(0, lastIndexOf)));
    }

    @Override // com.taobao.metamorphosis.client.consumer.storage.OffsetStorage
    public void close() {
    }

    @Override // com.taobao.metamorphosis.client.consumer.storage.OffsetStorage
    public void initOffset(String str, String str2, Partition partition, long j) {
    }
}
