package com.ai.aif.msgframe.consumer.mq.rabbitmq;

import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.util.MessageCovertUtil;
import com.ai.aif.msgframe.consumer.facade.IConsumerProcessor;
import com.ai.aif.msgframe.consumer.mq.AConsumerProviderModel;
import com.ai.aif.msgframe.consumer.mq.PullConsumerScheduleService;
import com.asiainfo.msgframe.Subscribes;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/consumer/mq/rabbitmq/RabbitMQPullConsumerScheduleService.class */
public class RabbitMQPullConsumerScheduleService extends PullConsumerScheduleService {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQPullConsumerScheduleService.class);
    private Channel channel;

    public RabbitMQPullConsumerScheduleService(Channel channel, AConsumerProviderModel aConsumerProviderModel, String str, Subscribes.Subscribe subscribe) {
        super(aConsumerProviderModel, str, subscribe);
        this.channel = channel;
    }

    protected Runnable createTask() {
        return new Runnable() { // from class: com.ai.aif.msgframe.consumer.mq.rabbitmq.RabbitMQPullConsumerScheduleService.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    String realQueue = RabbitMQPullConsumerScheduleService.this.getModel().getRealQueue();
                    String str = "ipuExchange" + RabbitMQPullConsumerScheduleService.this.getModel().getDestinationInfo().getSubjectName();
                    RabbitMQPullConsumerScheduleService.this.channel.exchangeDeclare(str, "direct", true);
                    RabbitMQPullConsumerScheduleService.this.channel.queueDeclare(realQueue, true, false, true, (Map) null);
                    RabbitMQPullConsumerScheduleService.this.channel.queueBind(realQueue, str, "ipuRoutingKey" + RabbitMQPullConsumerScheduleService.this.getModel().getDestinationInfo().getSubjectName());
                    while (true) {
                        try {
                            if (RabbitMQPullConsumerScheduleService.log.isDebugEnabled()) {
                                RabbitMQPullConsumerScheduleService.log.debug("开始拉取数据," + RabbitMQPullConsumerScheduleService.this.getModel().toString());
                            }
                            GetResponse basicGet = RabbitMQPullConsumerScheduleService.this.channel.basicGet(realQueue, false);
                            if (basicGet != null) {
                                byte[] body = basicGet.getBody();
                                RabbitMQPullConsumerScheduleService.log.debug(new String(body));
                                MsgFMessage transRabbitMQMessage = MessageCovertUtil.transRabbitMQMessage(body);
                                RabbitMQPullConsumerScheduleService.this.channel.basicAck(basicGet.getEnvelope().getDeliveryTag(), false);
                                IConsumerProcessor[] consumerProcessor = RabbitMQPullConsumerScheduleService.this.getConsumerProcessor();
                                if (null == consumerProcessor || consumerProcessor.length <= 0) {
                                    RabbitMQPullConsumerScheduleService.this.getModel().processMsg(transRabbitMQMessage, (String) null, (String) null, RabbitMQPullConsumerScheduleService.this.getSubclass());
                                } else {
                                    RabbitMQPullConsumerScheduleService.this.getModel().processMsg(transRabbitMQMessage, (String) null, (String) null, consumerProcessor);
                                }
                                if (RabbitMQPullConsumerScheduleService.log.isDebugEnabled()) {
                                    RabbitMQPullConsumerScheduleService.log.debug("拉取到数据并处理结束");
                                }
                            }
                        } catch (Exception e) {
                            RabbitMQPullConsumerScheduleService.log.error("处理消息失败，失败原因" + RabbitMQPullConsumerScheduleService.this, e);
                        }
                    }
                } catch (IOException e2) {
                    RabbitMQPullConsumerScheduleService.log.error("处理消息失败", e2);
                }
            }
        };
    }
}
