package com.ai.aif.msgframe.consumer.mq.rabbitmq;

import com.ai.aif.msgframe.common.ConsumerModel;
import com.ai.aif.msgframe.common.exception.ConsumerException;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.model.impl.BrokerModel;
import com.ai.aif.msgframe.common.route.impl.DestinationInfo;
import com.ai.aif.msgframe.common.util.MessageCovertUtil;
import com.ai.aif.msgframe.consumer.facade.IConsumerProcessor;
import com.ai.aif.msgframe.consumer.mq.AConsumerProviderModel;
import com.ai.aif.msgframe.consumer.mq.rabbitmq.api.RabbitMQResources;
import com.asiainfo.msgframe.Subscribes;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/consumer/mq/rabbitmq/RabbitMQConsumerModel.class */
public class RabbitMQConsumerModel extends AConsumerProviderModel implements ConsumerModel {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQConsumerModel.class);
    private static final RabbitMQResources RESOURCES = RabbitMQResources.getInstance();

    public RabbitMQConsumerModel(BrokerModel brokerModel, DestinationInfo destinationInfo) {
        super(brokerModel, destinationInfo);
    }

    public void pullSubscribe(String str, Subscribes.Subscribe subscribe, String... strArr) {
        try {
            getChannel(subscribe, str).queueDeclare(getRealQueue(), true, false, true, (Map) null);
            RabbitMQPullConsumerScheduleService rabbitMQPullConsumerScheduleService = new RabbitMQPullConsumerScheduleService(getChannel(subscribe, str), this, str, subscribe);
            rabbitMQPullConsumerScheduleService.setSubclass(strArr);
            rabbitMQPullConsumerScheduleService.start();
        } catch (Exception e) {
            log.error("订阅消息失败：" + this, e);
        }
    }

    public void pushSubscribe(String str, Subscribes.Subscribe subscribe, final String... strArr) {
        try {
            Channel channel = getChannel(subscribe, str);
            channel.queueDeclare(getRealQueue(), true, false, true, (Map) null);
            if (getDestinationInfo().isQueueType()) {
                channel.basicConsume(getRealQueue(), true, new DefaultConsumer(channel) { // from class: com.ai.aif.msgframe.consumer.mq.rabbitmq.RabbitMQConsumerModel.1
                    public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                        RabbitMQConsumerModel.this.processMsg(MessageCovertUtil.transRabbitMQMessage(bArr), (String) null, (String) null, strArr);
                    }
                });
            } else {
                String queue = channel.queueDeclare().getQueue();
                channel.queueBind(queue, getRealQueue(), "");
                channel.basicConsume(queue, true, new DefaultConsumer(channel) { // from class: com.ai.aif.msgframe.consumer.mq.rabbitmq.RabbitMQConsumerModel.2
                    public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                        RabbitMQConsumerModel.this.processMsg(MessageCovertUtil.transRabbitMQMessage(bArr), (String) null, (String) null, strArr);
                    }
                });
            }
        } catch (Exception e) {
            log.error("RabbitMQ消费者启动异常", e);
        }
    }

    public Serializable receive(String str, int i) throws MsgFrameClientException, ConsumerException {
        return null;
    }

    private Channel getChannel(Subscribes.Subscribe subscribe, String str) throws Exception {
        Channel createChannel = RabbitMQResources.getInstance().getConnection(this).createChannel();
        if (!getDestinationInfo().isQueueType()) {
            createChannel.exchangeDeclare(getRealQueue(), "fanout", true, false, (Map) null);
        }
        return createChannel;
    }

    public void pullSubscribe(String str, Subscribes.Subscribe subscribe, IConsumerProcessor... iConsumerProcessorArr) {
        try {
            getChannel(subscribe, str).queueDeclare(getRealQueue(), true, false, true, (Map) null);
            RabbitMQPullConsumerScheduleService rabbitMQPullConsumerScheduleService = new RabbitMQPullConsumerScheduleService(getChannel(subscribe, str), this, str, subscribe);
            rabbitMQPullConsumerScheduleService.setConsumerProcessor(iConsumerProcessorArr);
            rabbitMQPullConsumerScheduleService.start();
        } catch (Exception e) {
            log.error("订阅消息失败：" + this, e);
        }
    }

    public void pushSubscribe(String str, Subscribes.Subscribe subscribe, final IConsumerProcessor... iConsumerProcessorArr) {
        try {
            Channel channel = getChannel(subscribe, str);
            channel.queueDeclare(getRealQueue(), true, false, true, (Map) null);
            if (getDestinationInfo().isQueueType()) {
                channel.basicConsume(getRealQueue(), true, new DefaultConsumer(channel) { // from class: com.ai.aif.msgframe.consumer.mq.rabbitmq.RabbitMQConsumerModel.3
                    public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                        RabbitMQConsumerModel.this.processMsg(MessageCovertUtil.transRabbitMQMessage(bArr), (String) null, (String) null, iConsumerProcessorArr);
                    }
                });
            } else {
                String queue = channel.queueDeclare().getQueue();
                channel.queueBind(queue, getRealQueue(), "");
                channel.basicConsume(queue, true, new DefaultConsumer(channel) { // from class: com.ai.aif.msgframe.consumer.mq.rabbitmq.RabbitMQConsumerModel.4
                    public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                        RabbitMQConsumerModel.this.processMsg(MessageCovertUtil.transRabbitMQMessage(bArr), (String) null, (String) null, iConsumerProcessorArr);
                    }
                });
            }
        } catch (Exception e) {
            log.error("RabbitMQ消费者启动异常", e);
        }
    }

    public void unsubscribe(String str, String str2) throws MsgFrameClientException {
        throw new MsgFrameClientException("RabbitMQ暂不支持消费订阅定制功能");
    }
}
