package com.asiainfo.common.exception.core.custom.schemehandle.async;

import com.asiainfo.common.exception.config.cache.ExceCacheFactory;
import com.asiainfo.common.exception.core.ThrowableBean;
import com.asiainfo.common.exception.core.helpers.ExcePropUtil;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import net.sf.json.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/asiainfo/common/exception/core/custom/schemehandle/async/KafkaExceSchemeHandle.class */
public class KafkaExceSchemeHandle {
    private static final transient Log log = LogFactory.getLog(KafkaExceSchemeHandle.class);
    private ConsumerConnector consumer;
    private String topic;
    private Map<String, Integer> topicCountMap;
    private ExecutorService executor;

    /* loaded from: input_file:com/asiainfo/common/exception/core/custom/schemehandle/async/KafkaExceSchemeHandle$ExceConsumer.class */
    class ExceConsumer implements Runnable {
        private String content;

        public ExceConsumer(String str) {
            this.content = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.content == null) {
                return;
            }
            ThrowableBean throwableBean = (ThrowableBean) JSONObject.toBean(JSONObject.fromObject(this.content), ThrowableBean.class);
            long schemeId = throwableBean.getSchemeId();
            if (KafkaExceSchemeHandle.log.isDebugEnabled()) {
                KafkaExceSchemeHandle.log.debug("【异常框架】【异步执行模块】【KafkaExceSchemeHandle】异常处理ID为【" + schemeId + "】");
            }
            try {
                if (ExceCacheFactory.getExceScheme(schemeId) != null) {
                    ExceCacheFactory.getExceScheme(throwableBean.getSchemeId()).getScheme().execute(throwableBean);
                } else if (KafkaExceSchemeHandle.log.isDebugEnabled()) {
                    KafkaExceSchemeHandle.log.debug("【异常框架】【异步执行模块】【KafkaExceSchemeHandle】根据异常处理ID【" + schemeId + "】获取异常执行实现类失败");
                }
            } catch (Exception e) {
                KafkaExceSchemeHandle.log.error("【异常框架】【异步执行模块】【KafkaExceSchemeHandle】根据异常处理ID【" + schemeId + "】获取异常执行实现类异常：" + e);
            }
        }
    }

    public KafkaExceSchemeHandle() {
        Properties property = ExcePropUtil.getProperty();
        this.topic = property.getProperty("ai.exception.kafka.topic");
        if (log.isDebugEnabled()) {
            log.debug("【异常框架】【异步执行模块】【KafkaExceSchemeHandle】kafka消费者Topic：" + this.topic);
        }
        this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(property));
        if (log.isDebugEnabled()) {
            log.debug("【异常框架】【异步执行模块】【KafkaExceSchemeHandle】kafka消费者对象：" + this.consumer);
        }
        this.topicCountMap = new HashMap(1);
        this.topicCountMap.put(this.topic, 1);
        this.executor = Executors.newFixedThreadPool(Integer.parseInt(property.getProperty("exception.kafka.topic.threadNum")));
    }

    public void shutdown() {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    public void run() {
        ConsumerIterator it = ((KafkaStream) ((List) this.consumer.createMessageStreams(this.topicCountMap).get(this.topic)).get(0)).iterator();
        while (it.hasNext()) {
            try {
                String str = new String((byte[]) it.next().message(), "UTF-8");
                if (log.isDebugEnabled()) {
                    log.debug("【异常框架】【异步执行模块】【KafkaExceSchemeHandle】从kafka服务器上拉取的数据Content为【" + str + "】");
                }
                this.executor.submit(new ExceConsumer(str));
            } catch (UnsupportedEncodingException e) {
                log.error("【异常框架】【异步执行模块】【KafkaExceSchemeHandle】从kafka服务器上拉取的数据编码格式不支持");
                return;
            }
        }
    }
}
