package com.ai.aif.msgframe.consumer.mq.activemq;

import com.ai.aif.msgframe.common.message.MsgFMessage;
import com.ai.aif.msgframe.common.util.MessageCovertUtil;
import com.ai.aif.msgframe.consumer.facade.IConsumerProcessor;
import com.ai.aif.msgframe.consumer.mq.AConsumerProviderModel;
import com.ai.aif.msgframe.consumer.mq.PullConsumerScheduleService;
import com.asiainfo.msgframe.Subscribes;
import javax.jms.IllegalStateException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/consumer/mq/activemq/ActiveMQPullConsumerScheduleService.class */
public class ActiveMQPullConsumerScheduleService extends PullConsumerScheduleService {
    private static final Logger log = LoggerFactory.getLogger(ActiveMQPullConsumerScheduleService.class);
    private MessageConsumer messageConsumer;
    private Session pullSession;

    public ActiveMQPullConsumerScheduleService(MessageConsumer messageConsumer, Session session, AConsumerProviderModel aConsumerProviderModel, String str, Subscribes.Subscribe subscribe) {
        super(aConsumerProviderModel, str, subscribe);
        this.messageConsumer = messageConsumer;
        this.pullSession = session;
    }

    @Override // com.ai.aif.msgframe.consumer.mq.PullConsumerScheduleService
    public Runnable createTask() {
        return new Runnable() { // from class: com.ai.aif.msgframe.consumer.mq.activemq.ActiveMQPullConsumerScheduleService.1
            @Override // java.lang.Runnable
            public void run() {
                Object processMsg;
                while (ActiveMQPullConsumerScheduleService.this.messageConsumer != null) {
                    try {
                        if (ActiveMQPullConsumerScheduleService.log.isDebugEnabled()) {
                            ActiveMQPullConsumerScheduleService.log.debug("开始拉取数据," + ActiveMQPullConsumerScheduleService.this.getModel().toString());
                        }
                        Message receive = ActiveMQPullConsumerScheduleService.this.messageConsumer.receive(5000L);
                        if (receive != null) {
                            MsgFMessage transActiveMQMessage = MessageCovertUtil.transActiveMQMessage(receive);
                            IConsumerProcessor[] consumerProcessor = ActiveMQPullConsumerScheduleService.this.getConsumerProcessor();
                            Object obj = null;
                            String str = null;
                            if (ActiveMQPullConsumerScheduleService.this.messageConsumer instanceof ActiveMQMessageConsumer) {
                                str = ActiveMQPullConsumerScheduleService.this.messageConsumer.getConsumerId().toString();
                            }
                            if (null != consumerProcessor) {
                                try {
                                    try {
                                    } catch (Exception e) {
                                        ActiveMQPullConsumerScheduleService.log.error("消费业务处理异常", e);
                                        if (ActiveMQPullConsumerScheduleService.this.scribe.getTransaction() && ActiveMQPullConsumerScheduleService.this.pullSession.getTransacted()) {
                                            if (0 == 0 || !(obj instanceof Boolean) || ((Boolean) null).booleanValue()) {
                                                ActiveMQPullConsumerScheduleService.this.pullSession.commit();
                                            } else {
                                                ActiveMQPullConsumerScheduleService.this.pullSession.rollback();
                                            }
                                        }
                                    }
                                    if (consumerProcessor.length > 0) {
                                        processMsg = ActiveMQPullConsumerScheduleService.this.getModel().processMsg(transActiveMQMessage, str, (String) null, consumerProcessor);
                                        if (ActiveMQPullConsumerScheduleService.this.scribe.getTransaction() && ActiveMQPullConsumerScheduleService.this.pullSession.getTransacted()) {
                                            if (null == processMsg && (processMsg instanceof Boolean) && !((Boolean) processMsg).booleanValue()) {
                                                ActiveMQPullConsumerScheduleService.this.pullSession.rollback();
                                            } else {
                                                ActiveMQPullConsumerScheduleService.this.pullSession.commit();
                                            }
                                        }
                                    }
                                } catch (Throwable th) {
                                    if (ActiveMQPullConsumerScheduleService.this.scribe.getTransaction() && ActiveMQPullConsumerScheduleService.this.pullSession.getTransacted()) {
                                        if (0 == 0 || !(obj instanceof Boolean) || ((Boolean) null).booleanValue()) {
                                            ActiveMQPullConsumerScheduleService.this.pullSession.commit();
                                        } else {
                                            ActiveMQPullConsumerScheduleService.this.pullSession.rollback();
                                        }
                                    }
                                    throw th;
                                    break;
                                }
                            }
                            processMsg = ActiveMQPullConsumerScheduleService.this.getModel().processMsg(transActiveMQMessage, str, (String) null, ActiveMQPullConsumerScheduleService.this.getSubclass());
                            if (ActiveMQPullConsumerScheduleService.this.scribe.getTransaction()) {
                                if (null == processMsg) {
                                }
                                ActiveMQPullConsumerScheduleService.this.pullSession.commit();
                            }
                        } else if (ActiveMQPullConsumerScheduleService.log.isDebugEnabled()) {
                            ActiveMQPullConsumerScheduleService.log.debug("等待5秒后没有拉取到数据");
                        }
                    } catch (Exception e2) {
                        ActiveMQPullConsumerScheduleService.log.error("消费异常", e2);
                        if (e2 instanceof IllegalStateException) {
                            ActiveMQPullConsumerScheduleService.log.error("关闭消费者成功");
                            return;
                        }
                    }
                }
            }
        };
    }
}
