package com.ai.aif.log4x.message.transport.impl;

import com.ai.aif.log4x.config.ConfigManager;
import com.ai.aif.log4x.message.disruptor.LoggingWaitStrategy;
import com.ai.aif.log4x.message.disruptor.MessageEvent;
import com.ai.aif.log4x.message.format.Message;
import com.ai.aif.log4x.message.producer.IProducer;
import com.ai.aif.log4x.message.producer.impl.KafkaProducer;
import com.ai.aif.log4x.message.producer.impl.MsgFrameProducer;
import com.ai.aif.log4x.message.transport.MessageSender;
import com.ai.aif.log4x.util.StringUtils;
import com.ai.aif.log4x.util.TraceConstants;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/* loaded from: input_file:com/ai/aif/log4x/message/transport/impl/DisruptorMessageSenderImpl.class */
public class DisruptorMessageSenderImpl implements MessageSender {
    protected Disruptor<MessageEvent> disruptor;
    private ExecutorService executor;
    private IProducer fileProducer;
    private IProducer traceProducer;
    private IProducer metricProducer;
    private IProducer msgTraceProducer;
    private IProducer msgMetricProducer;
    private List<Message> list;
    private static volatile boolean isInit = false;
    private int BUFFER_SIZE = 4096;
    private int batchSize = 1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/ai/aif/log4x/message/transport/impl/DisruptorMessageSenderImpl$MessageEventHandler.class */
    public class MessageEventHandler implements EventHandler<MessageEvent> {
        MessageEventHandler() {
        }

        public void onEvent(MessageEvent messageEvent, long j, boolean z) throws Exception {
            String appender = ConfigManager.getInstance().getTraceConfig().getAppender();
            if (StringUtils.equals(TraceConstants.APPENDER_FILE, appender)) {
                messageEvent.getMessage().setMsgTime(System.currentTimeMillis());
                DisruptorMessageSenderImpl.this.fileProducer.produce(messageEvent.getMessage());
                return;
            }
            if (StringUtils.equals(TraceConstants.APPENDER_KAFKA, appender)) {
                if (!messageEvent.getMessage().matchType(TraceConstants.MSG_TYPE_TRACE)) {
                    if (messageEvent.getMessage().matchType(TraceConstants.MSG_TYPE_METRIC)) {
                        DisruptorMessageSenderImpl.this.metricProducer.produce(messageEvent.getMessage());
                        return;
                    }
                    return;
                }
                DisruptorMessageSenderImpl.this.list.add(messageEvent.getMessage());
                try {
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                }
                if (DisruptorMessageSenderImpl.this.list.size() == DisruptorMessageSenderImpl.this.batchSize) {
                    DisruptorMessageSenderImpl.this.traceProducer.produce(DisruptorMessageSenderImpl.this.list);
                    return;
                }
                return;
            }
            if (!messageEvent.getMessage().matchType(TraceConstants.MSG_TYPE_TRACE)) {
                if (messageEvent.getMessage().matchType(TraceConstants.MSG_TYPE_METRIC)) {
                    DisruptorMessageSenderImpl.this.msgMetricProducer.produce(messageEvent.getMessage());
                    return;
                }
                return;
            }
            DisruptorMessageSenderImpl.this.list.add(messageEvent.getMessage());
            try {
            } catch (Exception e2) {
                e2.printStackTrace();
            } finally {
            }
            if (DisruptorMessageSenderImpl.this.list.size() == DisruptorMessageSenderImpl.this.batchSize) {
                DisruptorMessageSenderImpl.this.msgTraceProducer.produce(DisruptorMessageSenderImpl.this.list);
            }
        }
    }

    public DisruptorMessageSenderImpl() {
        initialize();
    }

    @Override // com.ai.aif.log4x.message.transport.MessageSender
    public void initialize() {
        if (isInit) {
            return;
        }
        this.traceProducer = new KafkaProducer(ConfigManager.getInstance().getTraceConfig().getTraceTopic());
        this.metricProducer = new KafkaProducer(ConfigManager.getInstance().getTraceConfig().getMetricTopic());
        this.msgTraceProducer = new MsgFrameProducer(ConfigManager.getInstance().getTraceConfig().getTraceTopic());
        this.msgMetricProducer = new MsgFrameProducer(ConfigManager.getInstance().getTraceConfig().getMetricTopic());
        this.list = new ArrayList(this.batchSize);
        this.executor = Executors.newCachedThreadPool();
        this.disruptor = new Disruptor<>(MessageEvent.EVENT_FACTORY, this.BUFFER_SIZE, this.executor, ProducerType.MULTI, new LoggingWaitStrategy(5));
        this.disruptor.handleEventsWith(new EventHandler[]{new MessageEventHandler()});
        this.disruptor.start();
        isInit = true;
    }

    @Override // com.ai.aif.log4x.message.transport.MessageSender
    public void send(Message message) {
        RingBuffer ringBuffer = this.disruptor.getRingBuffer();
        if (ringBuffer.remainingCapacity() <= 0) {
            System.out.println("current ringbuffer avaliable capacity is full");
            return;
        }
        try {
            long tryNext = ringBuffer.tryNext();
            ((MessageEvent) ringBuffer.get(tryNext)).setMessage(message);
            ringBuffer.publish(tryNext);
        } catch (InsufficientCapacityException e) {
            e.printStackTrace();
        }
    }

    @Override // com.ai.aif.log4x.message.transport.MessageSender
    public void shutdown() {
        this.disruptor.shutdown();
        this.executor.shutdown();
    }
}
