package com.ai.ipu.count.consumer;

import com.ai.ipu.basic.log.ILogger;
import com.ai.ipu.basic.log.IpuLoggerFactory;
import com.ai.ipu.count.kafka.KafkaManager;
import com.ai.ipu.count.util.CountConstant;
import com.ai.ipu.count.util.TimeUtil;
import com.ai.ipu.database.conn.SqlSessionManager;
import com.ai.ipu.database.dao.impl.SqlDao;
import io.netty.util.CharsetUtil;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.commons.lang.StringUtils;

/* loaded from: input_file:com/ai/ipu/count/consumer/KafkaConsumer.class */
public class KafkaConsumer implements Runnable {
    protected final transient ILogger logger = IpuLoggerFactory.createLogger(KafkaConsumer.class);
    private final ConsumerConnector consumer = KafkaManager.createConsumerConnector();
    private short logType;
    private String topic;
    private String tableName;

    public KafkaConsumer(short s) {
        this.logType = s;
        this.topic = KafkaConsumerConfig.getTopics()[s];
        this.tableName = KafkaConsumerConfig.getTables()[s];
    }

    @Override // java.lang.Runnable
    public void run() {
        if (checkConsumerConfig()) {
            HashMap hashMap = new HashMap();
            hashMap.put(this.topic, new Integer(1));
            ConsumerIterator it = ((KafkaStream) ((List) this.consumer.createMessageStreams(hashMap).get(this.topic)).get(0)).iterator();
            while (it.hasNext()) {
                String str = new String((byte[]) it.next().message(), CharsetUtil.UTF_8);
                this.logger.debug("receive：" + str);
                int indexOf = str.indexOf(CountConstant.MESSAGE_SPLIT_STRING);
                if (indexOf < 0) {
                    this.logger.error("接收到的消息格式错误，处理下一条消息");
                } else if (Integer.parseInt(str.substring(0, indexOf)) != this.logType) {
                    this.logger.error("接收到的消息格式不是当前需要处理的格式，处理下一条消息");
                } else {
                    String[] split = str.split(CountConstant.MESSAGE_SPLIT_STRING);
                    String[] split2 = KafkaConsumerConfig.getMessageStyle(this.topic).split(CountConstant.MESSAGE_SPLIT_STRING);
                    if (split2.length != split.length) {
                        this.logger.error("接收到的消息内容和配置文件中的配置不相符，处理下一条消息");
                    } else {
                        StringBuffer stringBuffer = new StringBuffer();
                        StringBuffer stringBuffer2 = new StringBuffer();
                        HashMap hashMap2 = new HashMap();
                        stringBuffer.append("insert into ").append(this.tableName).append(" (");
                        stringBuffer2.append(" values (");
                        short s = 0;
                        while (true) {
                            short s2 = s;
                            if (s2 >= split2.length) {
                                break;
                            }
                            stringBuffer.append(split2[s2]).append(",");
                            stringBuffer2.append("#{").append(split2[s2].trim()).append("},");
                            hashMap2.put(split2[s2].trim(), split[s2]);
                            s = (short) (s2 + 1);
                        }
                        stringBuffer.append("create_time").append(")");
                        stringBuffer2.append("#{create_time})");
                        hashMap2.put("create_time", TimeUtil.getYyyyMmDdHhMmSs(new Date()));
                        try {
                            SqlDao sqlDao = new SqlDao(KafkaConsumerConfig.getDataSource());
                            this.logger.debug("sql=" + stringBuffer.toString() + stringBuffer2.toString());
                            this.logger.debug("result=" + sqlDao.executeUpdate(stringBuffer.toString() + stringBuffer2.toString(), hashMap2));
                            SqlSessionManager.commitAll();
                        } catch (Exception e) {
                            SqlSessionManager.rollbackAll();
                            this.logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
            this.logger.debug("消费者服务关闭");
            this.consumer.shutdown();
        }
    }

    private boolean checkConsumerConfig() {
        String[] topics = KafkaConsumerConfig.getTopics();
        String[] tables = KafkaConsumerConfig.getTables();
        if (StringUtils.isEmpty(KafkaConsumerConfig.getMessageStyle(this.topic))) {
            this.logger.error("消息格式为空，请检查配置文件consumer");
            return false;
        }
        if (StringUtils.isEmpty(this.tableName)) {
            this.logger.error("消息对应的数据库表为空，请检查配置文件consumer");
            return false;
        }
        if (this.logType >= topics.length || this.logType < 0) {
            this.logger.error("日志类别与topic列表不相符，请检查配置文件consumer");
            return false;
        }
        if (this.logType >= tables.length) {
            this.logger.error("日志类别与数据库表列表不相符，请检查配置文件consumer");
            return false;
        }
        if (topics.length == tables.length) {
            return true;
        }
        this.logger.error("topic列表和数据库表列表不相符，请检查配置文件consumer");
        return false;
    }
}
