package com.taobao.metamorphosis.client.consumer.storage;

import com.taobao.metamorphosis.client.consumer.TopicPartitionRegInfo;
import com.taobao.metamorphosis.client.consumer.storage.JDBCUtils;
import com.taobao.metamorphosis.cluster.Partition;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import javax.sql.DataSource;

/* loaded from: input_file:com/taobao/metamorphosis/client/consumer/storage/MysqlOffsetStorage.class */
public class MysqlOffsetStorage implements OffsetStorage {
    public static final String DEFAULT_TABLE_NAME = "meta_topic_partition_group_offset";
    private DataSource dataSource;
    private String tableName = DEFAULT_TABLE_NAME;

    /* renamed from: com.taobao.metamorphosis.client.consumer.storage.MysqlOffsetStorage$3, reason: invalid class name */
    /* loaded from: input_file:com/taobao/metamorphosis/client/consumer/storage/MysqlOffsetStorage$3.class */
    class AnonymousClass3 implements JDBCUtils.ConnectionCallback {
        final /* synthetic */ String val$topic;
        final /* synthetic */ Partition val$partition;
        final /* synthetic */ String val$group;

        AnonymousClass3(String str, Partition partition, String str2) {
            this.val$topic = str;
            this.val$partition = partition;
            this.val$group = str2;
        }

        @Override // com.taobao.metamorphosis.client.consumer.storage.JDBCUtils.ConnectionCallback
        public Object doInConnection(Connection connection) throws SQLException {
            return JDBCUtils.execute(connection.prepareStatement("select offset,msg_id from " + MysqlOffsetStorage.this.tableName + " where topic=? and partition=? and group_id=?"), new JDBCUtils.PreparedStatementCallback() { // from class: com.taobao.metamorphosis.client.consumer.storage.MysqlOffsetStorage.3.1
                @Override // com.taobao.metamorphosis.client.consumer.storage.JDBCUtils.PreparedStatementCallback
                public Object doInPreparedStatement(PreparedStatement preparedStatement) throws SQLException {
                    preparedStatement.setString(1, AnonymousClass3.this.val$topic);
                    preparedStatement.setString(2, AnonymousClass3.this.val$partition.toString());
                    preparedStatement.setString(3, AnonymousClass3.this.val$group);
                    return JDBCUtils.execute(preparedStatement.executeQuery(), new JDBCUtils.ResultSetCallback() { // from class: com.taobao.metamorphosis.client.consumer.storage.MysqlOffsetStorage.3.1.1
                        @Override // com.taobao.metamorphosis.client.consumer.storage.JDBCUtils.ResultSetCallback
                        public Object doInResultSet(ResultSet resultSet) throws SQLException {
                            if (!resultSet.next()) {
                                return null;
                            }
                            return new TopicPartitionRegInfo(AnonymousClass3.this.val$topic, AnonymousClass3.this.val$partition, resultSet.getLong(1), resultSet.getLong(2));
                        }
                    });
                }
            });
        }
    }

    public String getTableName() {
        return this.tableName;
    }

    public void setTableName(String str) {
        this.tableName = str;
    }

    public MysqlOffsetStorage(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override // com.taobao.metamorphosis.client.consumer.storage.OffsetStorage
    public void commitOffset(final String str, final Collection<TopicPartitionRegInfo> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        JDBCUtils.execute(JDBCUtils.getConnection(this.dataSource), new JDBCUtils.ConnectionCallback() { // from class: com.taobao.metamorphosis.client.consumer.storage.MysqlOffsetStorage.1
            @Override // com.taobao.metamorphosis.client.consumer.storage.JDBCUtils.ConnectionCallback
            public Object doInConnection(Connection connection) throws SQLException {
                JDBCUtils.execute(connection.prepareStatement("update " + MysqlOffsetStorage.this.tableName + " set offset=?,msg_id=? where topic=? and  partition=?  and group_id=?"), new JDBCUtils.PreparedStatementCallback() { // from class: com.taobao.metamorphosis.client.consumer.storage.MysqlOffsetStorage.1.1
                    @Override // com.taobao.metamorphosis.client.consumer.storage.JDBCUtils.PreparedStatementCallback
                    public Object doInPreparedStatement(PreparedStatement preparedStatement) throws SQLException {
                        for (TopicPartitionRegInfo topicPartitionRegInfo : collection) {
                            synchronized (topicPartitionRegInfo) {
                                if (topicPartitionRegInfo.isModified()) {
                                    long j = topicPartitionRegInfo.getOffset().get();
                                    long messageId = topicPartitionRegInfo.getMessageId();
                                    topicPartitionRegInfo.setModified(false);
                                    preparedStatement.setLong(1, j);
                                    preparedStatement.setLong(2, messageId);
                                    preparedStatement.setString(3, topicPartitionRegInfo.getTopic());
                                    preparedStatement.setString(4, topicPartitionRegInfo.getPartition().toString());
                                    preparedStatement.setString(5, str);
                                    preparedStatement.addBatch();
                                }
                            }
                        }
                        preparedStatement.executeBatch();
                        return null;
                    }
                });
                return null;
            }
        });
    }

    @Override // com.taobao.metamorphosis.client.consumer.storage.OffsetStorage
    public void close() {
        this.dataSource = null;
    }

    @Override // com.taobao.metamorphosis.client.consumer.storage.OffsetStorage
    public void initOffset(final String str, final String str2, final Partition partition, final long j) {
        JDBCUtils.execute(JDBCUtils.getConnection(this.dataSource), new JDBCUtils.ConnectionCallback() { // from class: com.taobao.metamorphosis.client.consumer.storage.MysqlOffsetStorage.2
            @Override // com.taobao.metamorphosis.client.consumer.storage.JDBCUtils.ConnectionCallback
            public Object doInConnection(Connection connection) throws SQLException {
                JDBCUtils.execute(connection.prepareStatement("insert into " + MysqlOffsetStorage.this.tableName + " (topic,partition,group_id,offset,msg_id) values(?,?,?,?,?)"), new JDBCUtils.PreparedStatementCallback() { // from class: com.taobao.metamorphosis.client.consumer.storage.MysqlOffsetStorage.2.1
                    @Override // com.taobao.metamorphosis.client.consumer.storage.JDBCUtils.PreparedStatementCallback
                    public Object doInPreparedStatement(PreparedStatement preparedStatement) throws SQLException {
                        preparedStatement.setString(1, str);
                        preparedStatement.setString(2, partition.toString());
                        preparedStatement.setString(3, str2);
                        preparedStatement.setLong(4, j);
                        preparedStatement.setLong(5, -1L);
                        preparedStatement.executeUpdate();
                        return null;
                    }
                });
                return null;
            }
        });
    }

    @Override // com.taobao.metamorphosis.client.consumer.storage.OffsetStorage
    public TopicPartitionRegInfo load(String str, String str2, Partition partition) {
        return (TopicPartitionRegInfo) JDBCUtils.execute(JDBCUtils.getConnection(this.dataSource), new AnonymousClass3(str, partition, str2));
    }
}
