package com.ai.bss.terminalSubscribePush.service.impl;

import com.ai.bss.infrastructure.kafka.KafkaProducerConfig;
import com.ai.bss.infrastructure.util.RedisCacheUtil;
import com.ai.bss.terminal.event.model.APISubscriber;
import com.ai.bss.terminal.event.repository.APISubscriberRepository;
import com.ai.bss.terminalSubscribePush.dto.KafkaParametersDto;
import com.ai.bss.terminalSubscribePush.service.SubscribeSendService;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Resource;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/ai/bss/terminalSubscribePush/service/impl/SubscribeSendServiceImpl.class */
public class SubscribeSendServiceImpl implements SubscribeSendService {
    private static final Logger log;

    @Autowired
    APISubscriberRepository apiSubscriberRepository;

    @Resource
    private RedisCacheUtil redisCacheUtil;

    @Value("${mqtt.uri:tcp://8.130.50.1:6883}")
    private String mqttUri;

    @Value("${subscribe.send.failure.times}")
    private int pushFailureTimes;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<String, Channel> rabbitChannelMap = new ConcurrentHashMap(16);
    private final Map<String, DefaultMQProducer> rocketProducerMap = new ConcurrentHashMap(16);
    private final Map<String, MqttClient> mqttClientMap = new ConcurrentHashMap(16);
    private Map<String, KafkaTemplate> kafkaTemplateMap = new HashMap();

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public void subscribeSendTerminalInfo() {
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public void subscribeSendTerminalFault() {
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public void subscribeSendDeviceStatus() {
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public String sendMsgToUrl(String str, String str2, String str3) {
        StringEntity stringEntity = new StringEntity(str3, "utf-8");
        RequestConfig build = RequestConfig.custom().setConnectionRequestTimeout(5000).setConnectTimeout(50000).setSocketTimeout(5000).build();
        HttpClientBuilder.create().build();
        HttpPost httpPost = new HttpPost(str2);
        httpPost.setHeader("token", "1122");
        httpPost.setConfig(build);
        httpPost.setEntity(stringEntity);
        return httpSend(httpPost, str);
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public String sendMsgToUrlWithHead(String str, String str2, String str3, JSONObject jSONObject) {
        StringEntity stringEntity = new StringEntity(str3, "utf-8");
        stringEntity.setContentType(ContentType.APPLICATION_JSON.toString());
        RequestConfig build = RequestConfig.custom().setConnectionRequestTimeout(5000).setConnectTimeout(50000).setSocketTimeout(5000).build();
        HttpPost httpPost = new HttpPost(str2);
        if (jSONObject != null) {
            jSONObject.forEach((str4, obj) -> {
                httpPost.setHeader(str4, (String) obj);
            });
        }
        httpPost.setConfig(build);
        httpPost.setEntity(stringEntity);
        return httpSend(httpPost, str);
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public void clearPushFailureTimes() throws Exception {
        List findAll = this.apiSubscriberRepository.findAll();
        if (findAll != null) {
            Iterator it = findAll.iterator();
            while (it.hasNext()) {
                this.redisCacheUtil.cleanCacheByIterator("PushFailureTimes_" + ((APISubscriber) it.next()).getSubscriberId());
            }
        }
        log.info("清除推送失败次数缓存");
    }

    private String httpSend(HttpPost httpPost, String str) {
        String str2 = "";
        CloseableHttpClient build = HttpClientBuilder.create().build();
        int i = 0;
        try {
            if (this.redisCacheUtil.getValueByKey("PushFailureTimes_" + str) != null) {
                i = ((Integer) this.redisCacheUtil.getValueByKey("PushFailureTimes_" + str)).intValue();
            }
        } catch (Exception e) {
            log.error("获取缓存订阅推送失败次数出错", e);
        }
        try {
            if (i < this.pushFailureTimes) {
                HttpResponse execute = build.execute(httpPost);
                log.error("It's successful httpResponse======= :" + execute.getEntity().getContent());
                str2 = execute.getStatusLine().getStatusCode() + "";
                this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, 0);
            } else {
                log.error("连续推送失败次数超过" + this.pushFailureTimes + "次，停止推送，清理缓存后可以恢复推送。");
            }
        } catch (IOException e2) {
            try {
                this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, Integer.valueOf(i + 1));
            } catch (Exception e3) {
                log.error("缓存订阅推送失败次数出错", e3);
            }
            log.error("订阅序号(" + str + ")第" + i + "次 http请求出错!", e2);
        } catch (Exception e4) {
            log.error("缓存订阅推送失败次数出错", e4);
        }
        log.error("httpResponse======= :" + str2);
        return str2;
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public Boolean sendForKafka(String str, String str2, String str3, String str4) {
        KafkaTemplate kafkaTemplate = this.kafkaTemplateMap.get(str2);
        if (kafkaTemplate == null) {
            kafkaTemplate = new KafkaProducerConfig(str2).kafkaTemplate();
            this.kafkaTemplateMap.put(str2, kafkaTemplate);
        }
        int i = 0;
        try {
            if (this.redisCacheUtil.getValueByKey("PushFailureTimes_" + str) != null) {
                i = ((Integer) this.redisCacheUtil.getValueByKey("PushFailureTimes_" + str)).intValue();
            }
        } catch (Exception e) {
            log.error("获取缓存订阅推送失败次数出错", e);
        }
        try {
            if (i < this.pushFailureTimes) {
                Object obj = kafkaTemplate.send(str3, str4).get();
                kafkaTemplate.flush();
                log.info("It's successful send msg to Kafka:" + str2 + "服务器地址以及topic:" + str3 + " 内容：" + str4 + " 结果" + obj);
                this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, 0);
            } else {
                log.error("连续推送失败次数超过" + this.pushFailureTimes + "次，停止推送，清理缓存后可以恢复推送。");
            }
            return true;
        } catch (InterruptedException e2) {
            try {
                this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, Integer.valueOf(i + 1));
            } catch (Exception e3) {
                log.error("缓存订阅推送失败次数出错", e3);
            }
            log.error("订阅序号(" + str + ")第" + i + "次 KAFKA请求出错!", e2);
            return false;
        } catch (Exception e4) {
            log.error(e4.getMessage());
            return false;
        }
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public Boolean sendForRabbitMq(String str, String str2, Integer num, String str3, String str4, String str5, String str6, String str7, String str8) {
        Channel channel = this.rabbitChannelMap.get(str2 + num);
        if (channel == null) {
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost(str2);
                connectionFactory.setPort(num.intValue());
                connectionFactory.setUsername(str3);
                connectionFactory.setPassword(str4);
                connectionFactory.setVirtualHost(str5);
                channel = connectionFactory.newConnection("myProducer").createChannel();
                channel.confirmSelect();
                this.rabbitChannelMap.put(str2 + num, channel);
            } catch (Exception e) {
                log.error("北向推送RabbitMQ连接错误", e);
            }
        }
        int i = 0;
        try {
            if (this.redisCacheUtil.getValueByKey("PushFailureTimes_" + str) != null) {
                i = ((Integer) this.redisCacheUtil.getValueByKey("PushFailureTimes_" + str)).intValue();
            }
        } catch (Exception e2) {
            log.error("获取缓存订阅推送失败次数出错", e2);
        }
        try {
            if (i < this.pushFailureTimes) {
                channel.basicPublish(str6, str7, (AMQP.BasicProperties) null, str8.getBytes(StandardCharsets.UTF_8));
                if (channel.waitForConfirms()) {
                    log.error("It's successful send msg to RabbitMQ:" + str2 + ":" + num + "exchange:" + str6 + "routingKey:" + str7 + " 内容：" + str8);
                    this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, 0);
                } else {
                    try {
                        this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, Integer.valueOf(i + 1));
                    } catch (Exception e3) {
                        log.error("缓存订阅推送失败次数出错", e3);
                    }
                }
            } else {
                log.error("连续推送失败次数超过" + this.pushFailureTimes + "次，停止推送，清理缓存后可以恢复推送。");
            }
            return true;
        } catch (Exception e4) {
            try {
                this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, Integer.valueOf(i + 1));
            } catch (Exception e5) {
                log.error("缓存订阅推送失败次数出错", e5);
            }
            log.error("订阅序号(" + str + ")第" + i + "次 RabbitMQ请求出错!", e4);
            return false;
        }
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public Boolean sendForRocketMq(String str, String str2, String str3, String str4, String str5, String str6) {
        DefaultMQProducer defaultMQProducer = this.rocketProducerMap.get(str2);
        if (defaultMQProducer == null) {
            defaultMQProducer = new DefaultMQProducer(str3);
            defaultMQProducer.setNamesrvAddr(str2);
            try {
                defaultMQProducer.start();
                this.rocketProducerMap.put(str2, defaultMQProducer);
            } catch (MQClientException e) {
                log.error("RocketMQ生产者开始错误", e);
            }
        }
        int i = 0;
        try {
            try {
                if (this.redisCacheUtil.getValueByKey("PushFailureTimes_" + str) != null) {
                    i = ((Integer) this.redisCacheUtil.getValueByKey("PushFailureTimes_" + str)).intValue();
                }
            } catch (Exception e2) {
                log.error("获取缓存订阅推送失败次数出错", e2);
            }
            sendRocket(str, str2, str3, str4, str5, str6, defaultMQProducer, Integer.valueOf(i));
            return true;
        } catch (Exception e3) {
            try {
                this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, Integer.valueOf(i + 1));
            } catch (Exception e4) {
                log.error("缓存订阅推送失败次数出错", e4);
            }
            log.error("订阅序号(" + str + ")第" + i + "次 RocketMQ请求出错!", e3);
            return false;
        }
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public Boolean sendForMqtt(String str, String str2, String str3, String str4, String str5, String str6) {
        MqttClient mqttClient = this.mqttClientMap.get(str2 + ":" + str5);
        try {
            if (mqttClient != null) {
                try {
                } catch (MqttException e) {
                    log.error("创建Mqtt客户端失败", e);
                }
                if (mqttClient.isConnected()) {
                    sendMqtt(str, str5, str6, mqttClient);
                    return true;
                }
            }
            sendMqtt(str, str5, str6, mqttClient);
            return true;
        } catch (Exception e2) {
            log.error("发送Mqtt消息错误", e2);
            return false;
        }
        mqttClient = new MqttClient(this.mqttUri, str2, new MemoryPersistence());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setUserName(str3);
        mqttConnectOptions.setPassword(str4.toCharArray());
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(30);
        mqttClient.connect(mqttConnectOptions);
        this.mqttClientMap.put(str2 + ":" + str5, mqttClient);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMqtt(final String str, final String str2, final String str3, final MqttClient mqttClient) throws MqttException {
        Integer num = 0;
        try {
            if (this.redisCacheUtil.getValueByKey("PushFailureTimes_" + str) != null) {
                num = (Integer) this.redisCacheUtil.getValueByKey("PushFailureTimes_" + str);
            }
        } catch (Exception e) {
            log.error("获取缓存订阅推送失败次数出错", e);
        }
        try {
            if (!$assertionsDisabled && mqttClient == null) {
                throw new AssertionError();
            }
            MqttTopic topic = mqttClient.getTopic(str2);
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(2);
            mqttMessage.setRetained(false);
            mqttMessage.setPayload(str3.getBytes(StandardCharsets.UTF_8));
            MqttDeliveryToken publish = topic.publish(mqttMessage);
            final Integer num2 = num;
            publish.setActionCallback(new IMqttActionListener() { // from class: com.ai.bss.terminalSubscribePush.service.impl.SubscribeSendServiceImpl.1
                public void onSuccess(IMqttToken iMqttToken) {
                    try {
                        SubscribeSendServiceImpl.this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, 0);
                    } catch (Exception e2) {
                        SubscribeSendServiceImpl.log.error("缓存订阅推送失败次数出错", e2);
                    }
                    SubscribeSendServiceImpl.log.info("发送Mqtt消息在第{}次成功", num2);
                }

                public void onFailure(IMqttToken iMqttToken, Throwable th) {
                    try {
                        SubscribeSendServiceImpl.this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, Integer.valueOf(num2.intValue() + 1));
                    } catch (Exception e2) {
                        SubscribeSendServiceImpl.log.error("缓存订阅推送失败次数出错", e2);
                    }
                    SubscribeSendServiceImpl.log.error("发送mqtt消息错误第{}次错误", num2);
                    SubscribeSendServiceImpl.this.sendMqtt(str, str2, str3, mqttClient);
                }
            });
        } catch (Exception e2) {
            log.error("发送消息错误", e2);
            throw e2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendRocket(final String str, final String str2, final String str3, final String str4, final String str5, final String str6, final DefaultMQProducer defaultMQProducer, final Integer num) throws MQClientException, RemotingException, InterruptedException {
        if (num.intValue() < this.pushFailureTimes) {
            defaultMQProducer.send(new Message(str4, str5, str6.getBytes(StandardCharsets.UTF_8)), new SendCallback() { // from class: com.ai.bss.terminalSubscribePush.service.impl.SubscribeSendServiceImpl.2
                public void onSuccess(SendResult sendResult) {
                    SubscribeSendServiceImpl.log.warn("推送Rocket第" + SubscribeSendServiceImpl.this.pushFailureTimes + "次成功");
                    SubscribeSendServiceImpl.log.error("It's successful send msg to RocketMQ:" + str2 + "groupName:" + str3 + "topic:" + str4 + "tag:" + str5 + " 内容：" + str6);
                    SubscribeSendServiceImpl.this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, 0);
                }

                public void onException(Throwable th) {
                    try {
                        SubscribeSendServiceImpl.log.warn("推送Rocket第" + SubscribeSendServiceImpl.this.pushFailureTimes + "次失败");
                        SubscribeSendServiceImpl.this.redisCacheUtil.setValueByKey("PushFailureTimes_" + str, Integer.valueOf(num.intValue() + 1));
                        SubscribeSendServiceImpl.this.sendRocket(str, str2, str3, str4, str5, str6, defaultMQProducer, num);
                    } catch (Exception e) {
                        SubscribeSendServiceImpl.log.error("缓存订阅推送失败次数出错", e);
                    }
                }
            });
        } else {
            log.error("连续推送失败次数超过" + this.pushFailureTimes + "次，停止推送，清理缓存后可以恢复推送。");
        }
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public APISubscriber findByParamsAndApiNameAndUrl(String str, String str2, String str3, String str4) {
        return this.apiSubscriberRepository.findByTargetValueAndApiNameAndUrlAndSubscriberDataType(str, str2, str3, str4);
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public ArrayList<APISubscriber> findByParamsAndApiName(String str, String str2) {
        return this.apiSubscriberRepository.findByResourceIdAndApiName(str, str2);
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public Long findSubscriberAlarmByIds(String str, String str2) {
        ArrayList findByTargetValue = this.apiSubscriberRepository.findByTargetValue(str2);
        findByTargetValue.addAll(this.apiSubscriberRepository.findByTargetValue(str));
        ArrayList arrayList = new ArrayList();
        Iterator it = findByTargetValue.iterator();
        while (it.hasNext()) {
            APISubscriber aPISubscriber = (APISubscriber) it.next();
            if ("4".equals(aPISubscriber.getSubscriberDataType())) {
                arrayList.add(aPISubscriber);
            }
        }
        if (arrayList.size() == 0) {
            return null;
        }
        return ((APISubscriber) arrayList.get(0)).getSubscriberId();
    }

    @Override // com.ai.bss.terminalSubscribePush.service.SubscribeSendService
    public APISubscriber findByParamsAndApiNameAndKafkaServersAndKafkaTopic(String str, String str2, String str3, String str4, String str5) {
        KafkaParametersDto kafkaParametersDto = new KafkaParametersDto();
        kafkaParametersDto.setKafkaServers(str3);
        kafkaParametersDto.setKafkaTopic(str4);
        return this.apiSubscriberRepository.findByTargetValueAndApiNameAndParamsAndSubscriberDataType(str, str2, kafkaParametersDto.toJSONString(), str5);
    }

    static {
        $assertionsDisabled = !SubscribeSendServiceImpl.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(SubscribeSendServiceImpl.class);
    }
}
