package com.ai.aif.log4x.message.producer.impl;

import com.ai.aif.log4x.config.ConfigManager;
import com.ai.aif.log4x.config.TraceConfig;
import com.ai.aif.log4x.file.FixedSizeRollingFileWriter;
import com.ai.aif.log4x.file.IFileWriter;
import com.ai.aif.log4x.file.TimeRollingFileWriter;
import com.ai.aif.log4x.kafka.Producer;
import com.ai.aif.log4x.message.format.Message;
import com.ai.aif.log4x.message.producer.IMessageListener;
import com.ai.aif.log4x.message.producer.IProducer;
import com.ai.aif.log4x.util.TraceConstants;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/* loaded from: input_file:com/ai/aif/log4x/message/producer/impl/FailoverKafkaProducer.class */
public class FailoverKafkaProducer implements IProducer {
    private IFileWriter fileWriter;
    private String topic;
    private final TraceConfig traceConfig = ConfigManager.getInstance().getTraceConfig();
    private String statusPingStr = "[]";
    private Mode mode = Mode.Kafka;
    private KafkaAckStatus kafkaAckStatus = KafkaAckStatus.NO_RESPONSE;
    private int[] kafkaPingSleepTimes = {1, 5, 10, 30, 60, 120};
    private int threshold = Integer.valueOf(this.traceConfig.getConfigStr(TraceConstants.Config.FAILOVER_MSG_THRESHOLD, "10")).intValue();
    private final Producer<String, String> kafkaProducer = new Producer<>(new ProducerConfig(this.traceConfig.getKafkaProperties()), new ModeManager());

    /* loaded from: input_file:com/ai/aif/log4x/message/producer/impl/FailoverKafkaProducer$KafkaAckStatus.class */
    private enum KafkaAckStatus {
        NO_RESPONSE,
        FAIL,
        SUCCESS
    }

    /* loaded from: input_file:com/ai/aif/log4x/message/producer/impl/FailoverKafkaProducer$KafkaStatusChecker.class */
    private class KafkaStatusChecker extends Thread {
        private int failedTimes;

        public KafkaStatusChecker() {
            super("KafkaStatusChecker");
            this.failedTimes = 0;
            FailoverKafkaProducer.this.kafkaAckStatus = KafkaAckStatus.NO_RESPONSE;
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:3:0x002d. Please report as an issue. */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            FailoverKafkaProducer.this.kafkaProducer.send(new KeyedMessage(FailoverKafkaProducer.this.topic, FailoverKafkaProducer.this.statusPingStr));
            while (true) {
                switch (FailoverKafkaProducer.this.kafkaAckStatus) {
                    case SUCCESS:
                        break;
                    case FAIL:
                        FailoverKafkaProducer.this.kafkaProducer.send(new KeyedMessage(FailoverKafkaProducer.this.topic, FailoverKafkaProducer.this.statusPingStr));
                        FailoverKafkaProducer.this.mode = Mode.File;
                        this.failedTimes++;
                        try {
                            if (this.failedTimes > FailoverKafkaProducer.this.kafkaPingSleepTimes.length) {
                                Thread.sleep(FailoverKafkaProducer.this.kafkaPingSleepTimes[FailoverKafkaProducer.this.kafkaPingSleepTimes.length - 1] * 1000);
                                FailoverKafkaProducer.this.fileWriter.flush();
                            } else {
                                Thread.sleep(FailoverKafkaProducer.this.kafkaPingSleepTimes[this.failedTimes - 1] * 1000);
                            }
                        } catch (InterruptedException e) {
                            System.out.println("[Error] KafkaStatusChecker sleep failed." + e.getMessage());
                            e.printStackTrace();
                        }
                    default:
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e2) {
                            e2.printStackTrace();
                        }
                }
                FailoverKafkaProducer.this.mode = Mode.Kafka;
                FailoverKafkaProducer.this.fileWriter.flush();
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ai/aif/log4x/message/producer/impl/FailoverKafkaProducer$Mode.class */
    public enum Mode {
        Kafka,
        File
    }

    /* loaded from: input_file:com/ai/aif/log4x/message/producer/impl/FailoverKafkaProducer$ModeManager.class */
    private class ModeManager implements IMessageListener {
        private int errorTimes;

        private ModeManager() {
        }

        @Override // com.ai.aif.log4x.message.producer.IMessageListener
        public void onSuccess(List<String> list) {
            if (FailoverKafkaProducer.this.traceConfig.isDebugEnabled()) {
                System.out.println("[Info] send message to kafka successfully.");
            }
            if (list != null && list.size() == 1 && FailoverKafkaProducer.this.statusPingStr.equals(list.get(0))) {
                FailoverKafkaProducer.this.kafkaAckStatus = KafkaAckStatus.SUCCESS;
            }
        }

        @Override // com.ai.aif.log4x.message.producer.IMessageListener
        public void onFail(List<String> list, Throwable th) {
            if (list == null) {
                return;
            }
            if (list.size() == 1 && FailoverKafkaProducer.this.statusPingStr.equals(list.get(0))) {
                FailoverKafkaProducer.this.kafkaAckStatus = KafkaAckStatus.FAIL;
                return;
            }
            if (FailoverKafkaProducer.this.mode == Mode.Kafka) {
                this.errorTimes += list.size();
            }
            System.out.println("[Info] send to kafka failed, save the data into local file." + th.getMessage());
            try {
                Iterator<String> it = list.iterator();
                while (it.hasNext()) {
                    FailoverKafkaProducer.this.fileWriter.write(it.next());
                }
            } catch (IOException e) {
                System.out.println("Write data to file failed for " + e.getMessage());
            }
            if (this.errorTimes >= FailoverKafkaProducer.this.threshold) {
                FailoverKafkaProducer.this.fileWriter.flush();
                FailoverKafkaProducer.this.mode = Mode.File;
                new KafkaStatusChecker().start();
                this.errorTimes = 0;
                System.out.println("[Info] change write mode from kafka to file.");
            }
        }
    }

    public FailoverKafkaProducer(String str) {
        this.topic = str;
        initFileWriter();
    }

    @Override // com.ai.aif.log4x.message.producer.IProducer
    public void produce(Message message) {
        send(parseMessage(message));
    }

    @Override // com.ai.aif.log4x.message.producer.IProducer
    public void produce(List<Message> list) {
        send(parseMessage(list));
    }

    private void initFileWriter() {
        String configStr = ConfigManager.getInstance().getTraceConfig().getConfigStr(TraceConstants.Config.FAILOVER_WRITER);
        Class<?> cls = null;
        if (configStr != null) {
            try {
                cls = Class.forName(configStr);
            } catch (ClassNotFoundException e) {
                System.out.println("[ Warn] The class for file writer : [" + configStr + "] is not found, use " + FixedSizeRollingFileWriter.class + " instead.");
            }
        }
        if (cls == null || cls == FixedSizeRollingFileWriter.class) {
            this.fileWriter = new FixedSizeRollingFileWriter(Integer.valueOf(this.traceConfig.getConfigStr(TraceConstants.Config.FAILOVER_FILE_THRESHOLD, 10485760)).intValue(), this.traceConfig.requiredConfig(TraceConstants.Config.FAILOVER_FILENAME, FixedSizeRollingFileWriter.class.getName()), this.traceConfig.requiredConfig(TraceConstants.Config.FAILOVER_FILE_PATTERN, FixedSizeRollingFileWriter.class.getName()));
        } else {
            this.fileWriter = new TimeRollingFileWriter(this.traceConfig.requiredConfig(TraceConstants.Config.FAILOVER_FILENAME, TimeRollingFileWriter.class.getName()), this.traceConfig.requiredConfig(TraceConstants.Config.FAILOVER_DATE_PATTERN, TimeRollingFileWriter.class.getName()), this.traceConfig.requiredConfig(TraceConstants.Config.FAILOVER_FILE_PATTERN, TimeRollingFileWriter.class.getName()));
        }
    }

    private void send(String str) {
        if (this.traceConfig.isDebugEnabled()) {
            System.out.println("Current mode is " + this.mode);
        }
        switch (this.mode) {
            case Kafka:
                this.kafkaProducer.send(new KeyedMessage<>(this.topic, str));
                return;
            default:
                try {
                    this.fileWriter.write(str);
                    return;
                } catch (IOException e) {
                    System.out.println("Write data to file failed for " + e.getMessage());
                    throw new RuntimeException(e);
                }
        }
    }

    private String parseMessage(List<Message> list) {
        if (list == null) {
            return "";
        }
        StringBuilder sb = new StringBuilder("[");
        int i = 0;
        for (Message message : list) {
            if (i > 0) {
                sb.append(",");
            }
            sb.append(message.toString());
            i++;
        }
        sb.append("]");
        return sb.toString();
    }

    private String parseMessage(Message message) {
        return message == null ? "" : "[" + message.toString() + "]";
    }
}
