package com.ai.aif.msgframe.extend.es.utils;

import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.thread.ThreadFactoryImpl;
import com.ai.aif.msgframe.extend.es.configure.MappingAttribute;
import com.ai.aif.msgframe.extend.es.message.EsConMessage;
import com.ai.aif.msgframe.extend.es.message.EsMessage;
import com.ai.aif.msgframe.extend.es.message.EsProMessage;
import com.alibaba.fastjson.JSON;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/extend/es/utils/IndicesUtils.class */
public class IndicesUtils extends Utils {
    private static Logger LOG = LoggerFactory.getLogger(IndicesUtils.class);
    private static int NUMBER_OF_SHARDS = 3;
    private static int NUMBER_OF_REPLICAS = 2;
    private static final Map<String, String> indexMap = new ConcurrentHashMap();
    private static final Lock lock = new ReentrantLock();
    private static int defaultCoreSize = 20;
    private static int defaultMaxSize = 50;
    private static int defaultQueueSize = 2000;
    private static ThreadPoolExecutor threadPoolExecutor;

    public static boolean indexExists(String str) {
        GetIndexRequest getIndexRequest = new GetIndexRequest(new String[]{str});
        RestHighLevelClient esConnection = EsConnectionPool.getInstance().getEsConnection();
        try {
            try {
                boolean exists = esConnection.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
                EsConnectionPool.getInstance().returnEsConnection(esConnection);
                return exists;
            } catch (IOException e) {
                LOG.error("判断索引是否存在时异常", e);
                EsConnectionPool.getInstance().returnEsConnection(esConnection);
                return false;
            }
        } catch (Throwable th) {
            EsConnectionPool.getInstance().returnEsConnection(esConnection);
            throw th;
        }
    }

    public static boolean deleteIndex(String str) {
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(str);
        RestHighLevelClient esConnection = EsConnectionPool.getInstance().getEsConnection();
        try {
            try {
                esConnection.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
                EsConnectionPool.getInstance().returnEsConnection(esConnection);
                return true;
            } catch (IOException e) {
                LOG.error("删除索引时异常", e);
                EsConnectionPool.getInstance().returnEsConnection(esConnection);
                return true;
            }
        } catch (Throwable th) {
            EsConnectionPool.getInstance().returnEsConnection(esConnection);
            throw th;
        }
    }

    public static boolean createIndexMapping(List<MappingAttribute> list, String str) throws Exception {
        String prepareMapping = prepareMapping(list);
        if (LOG.isErrorEnabled()) {
            LOG.error("创建索引" + str + ",mapping=" + prepareMapping);
        }
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(str);
        createIndexRequest.settings(Settings.builder().put("number_of_replicas", NUMBER_OF_REPLICAS).put("number_of_shards", NUMBER_OF_SHARDS));
        createIndexRequest.mapping(prepareMapping, XContentType.JSON);
        RestHighLevelClient esConnection = EsConnectionPool.getInstance().getEsConnection();
        try {
            if (!esConnection.indices().create(createIndexRequest, RequestOptions.DEFAULT).isAcknowledged()) {
                throw new Exception("索引" + str + "已存在");
            }
            EsConnectionPool.getInstance().returnEsConnection(esConnection);
            return true;
        } catch (Throwable th) {
            EsConnectionPool.getInstance().returnEsConnection(esConnection);
            throw th;
        }
    }

    public static boolean sendDoc(String str, String str2, EsMessage esMessage) {
        checkIndex(str);
        IndexRequest indexRequest = new IndexRequest(str, str2);
        indexRequest.source(JSON.toJSONString(esMessage), XContentType.JSON);
        RestHighLevelClient esConnection = EsConnectionPool.getInstance().getEsConnection();
        try {
            try {
                esConnection.index(indexRequest, RequestOptions.DEFAULT);
                EsConnectionPool.getInstance().returnEsConnection(esConnection);
                return true;
            } catch (IOException e) {
                LOG.error("添加索引文档时异常", e);
                EsConnectionPool.getInstance().returnEsConnection(esConnection);
                return true;
            }
        } catch (Throwable th) {
            EsConnectionPool.getInstance().returnEsConnection(esConnection);
            throw th;
        }
    }

    public static void sendDocAsync(EsMessage esMessage, String str, String str2, MsgFMessage msgFMessage) {
        threadPoolExecutor.execute(getAction(str + "_" + str2, esMessage, msgFMessage));
    }

    private static Runnable getAction(final String str, final EsMessage esMessage, MsgFMessage msgFMessage) {
        return new Runnable() { // from class: com.ai.aif.msgframe.extend.es.utils.IndicesUtils.2
            @Override // java.lang.Runnable
            public void run() {
                IndicesUtils.sendDoc(str, "_doc", esMessage);
                IndicesUtils.LOG.info("向" + str + "写入数据成功!");
            }
        };
    }

    public static boolean checkIndex(String str) {
        if (indexMap.containsKey(str)) {
            return true;
        }
        try {
            lock.lock();
            if (indexMap.containsKey(str)) {
                lock.unlock();
                return true;
            }
            if (indexExists(str)) {
                indexMap.put(str, str);
                lock.unlock();
                return true;
            }
            try {
                if (str.endsWith(ConstUtils.PRODUCER_TYPE)) {
                    createIndexMapping(getMappingAttribute(EsProMessage.class), str);
                } else if (str.endsWith(ConstUtils.CONSUMER_TYPE)) {
                    createIndexMapping(getMappingAttribute(EsConMessage.class), str);
                }
                indexMap.put(str, str);
            } catch (Exception e) {
                LOG.error("createIndex：" + str + "时出现异常", e);
            }
            lock.unlock();
            return true;
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    static {
        threadPoolExecutor = null;
        if (threadPoolExecutor == null) {
            synchronized (IndicesUtils.class) {
                if (threadPoolExecutor == null) {
                    threadPoolExecutor = new ThreadPoolExecutor(defaultCoreSize, defaultMaxSize, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(defaultQueueSize), new ThreadFactoryImpl("EsSendThread"), new RejectedExecutionHandler() { // from class: com.ai.aif.msgframe.extend.es.utils.IndicesUtils.1
                        @Override // java.util.concurrent.RejectedExecutionHandler
                        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor2) {
                            if (IndicesUtils.LOG.isErrorEnabled()) {
                                IndicesUtils.LOG.error("ActiveCount=" + threadPoolExecutor2.getActiveCount());
                                IndicesUtils.LOG.error("CorePoolSize=" + threadPoolExecutor2.getCorePoolSize());
                                IndicesUtils.LOG.error("MaximumPoolSize=" + threadPoolExecutor2.getMaximumPoolSize());
                                IndicesUtils.LOG.error("TaskCount=" + threadPoolExecutor2.getTaskCount());
                            }
                            IndicesUtils.LOG.error("异步发送ES报文失败，缓存队列可能已满，无法继缓存收消息，队列当前容量为:" + threadPoolExecutor2.getQueue().size());
                        }
                    });
                }
            }
        }
    }
}
