package com.ai.aif.msgframe.consumer.mq.activemq;

import com.ai.aif.msgframe.common.ConsumerModel;
import com.ai.aif.msgframe.common.exception.ConsumerException;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.io.transport.JdkSerialize;
import com.ai.aif.msgframe.common.model.impl.BrokerModel;
import com.ai.aif.msgframe.common.resources.ActiveMQResources;
import com.ai.aif.msgframe.common.route.impl.DestinationInfo;
import com.ai.aif.msgframe.common.thread.ThreadFactoryImpl;
import com.ai.aif.msgframe.common.util.MessageCovertUtil;
import com.ai.aif.msgframe.consumer.facade.IConsumerProcessor;
import com.ai.aif.msgframe.consumer.mq.AConsumerProviderModel;
import com.ai.aif.msgframe.consumer.mq.PullConsumerScheduleService;
import com.asiainfo.msgframe.Subscribes;
import java.io.Serializable;
import java.rmi.RemoteException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.transport.TransportListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ai/aif/msgframe/consumer/mq/activemq/ActiveMQConsumerModel.class */
public class ActiveMQConsumerModel extends AConsumerProviderModel implements ConsumerModel {
    private MessageConsumer consumer;
    private Session session;
    private Subscribes.Subscribe scribe;
    protected final AtomicBoolean started;
    private PullConsumerScheduleService taskSchedule;
    private Connection conn;
    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    private static final Logger log = LoggerFactory.getLogger(ActiveMQConsumerModel.class);
    private static final Map<ActiveMQConnection, TransportListener> transportListeners = new ConcurrentHashMap();
    private static Object lock = new Object();

    public ActiveMQConsumerModel(BrokerModel brokerModel, DestinationInfo destinationInfo) throws Exception {
        super(brokerModel, destinationInfo);
        this.started = new AtomicBoolean(false);
        this.taskSchedule = null;
        this.conn = null;
        this.scheduledThreadPoolExecutor = null;
    }

    public void pushSubscribe(String str, Subscribes.Subscribe subscribe, final String... strArr) {
        if (this.started.compareAndSet(false, true)) {
            try {
                init();
            } catch (Exception e) {
                log.error("添加activemq生产者与broker的连接监听失败", e);
            }
            this.scribe = subscribe;
            try {
                final MessageConsumer consumer = getConsumer();
                consumer.setMessageListener(new MessageListener() { // from class: com.ai.aif.msgframe.consumer.mq.activemq.ActiveMQConsumerModel.1
                    Session listenerSession;

                    {
                        this.listenerSession = ActiveMQConsumerModel.this.session;
                    }

                    public void onMessage(Message message) {
                        String str2 = null;
                        try {
                            if (consumer instanceof ActiveMQMessageConsumer) {
                                str2 = consumer.getConsumerId().toString();
                            }
                            Object processMsg = ActiveMQConsumerModel.this.processMsg(MessageCovertUtil.transActiveMQMessage(message), str2, (String) null, strArr);
                            if (ActiveMQConsumerModel.this.scribe.getTransaction() && this.listenerSession.getTransacted()) {
                                if (null == processMsg || !(processMsg instanceof Boolean) || ((Boolean) processMsg).booleanValue()) {
                                    this.listenerSession.commit();
                                } else {
                                    this.listenerSession.rollback();
                                }
                            }
                        } catch (Exception e2) {
                            ActiveMQConsumerModel.log.error("处理消息失败，失败原因" + ActiveMQConsumerModel.this, e2);
                            try {
                                if (ActiveMQConsumerModel.this.scribe.getTransaction() && this.listenerSession.getTransacted()) {
                                    this.listenerSession.rollback();
                                }
                            } catch (JMSException e3) {
                                ActiveMQConsumerModel.log.error("处理消息失败，回滚异常,consumer={}", ActiveMQConsumerModel.this, e3);
                            }
                        }
                    }
                });
            } catch (Exception e2) {
                log.error("订阅消息失败：" + this, e2);
            }
        }
    }

    public void pullSubscribe(String str, final Subscribes.Subscribe subscribe, final String... strArr) {
        if (this.started.compareAndSet(false, true)) {
            try {
                init();
            } catch (Exception e) {
                log.error("添加activemq生产者与broker的连接监听失败", e);
            }
            this.scribe = subscribe;
            if (!super.getUrl().contains("mqtt://")) {
                try {
                    this.taskSchedule = new ActiveMQPullConsumerScheduleService(getConsumer(), this.session, this, str, this.scribe);
                    this.taskSchedule.setSubclass(strArr);
                    this.taskSchedule.start();
                    return;
                } catch (RemoteException e2) {
                    log.error("订阅消息失败：" + this, e2);
                    return;
                }
            }
            if (this.scheduledThreadPoolExecutor == null) {
                this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ThreadFactoryImpl("PullMsgThread-" + getDestinationInfo().getSubjectName()));
                for (int i = 0; i < 1; i++) {
                    this.scheduledThreadPoolExecutor.schedule(new Runnable() { // from class: com.ai.aif.msgframe.consumer.mq.activemq.ActiveMQConsumerModel.2
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                MqttClient mQTTConnection = ActiveMQResources.getInstance().getMQTTConnection(ActiveMQConsumerModel.super.getClusterName(), ActiveMQConsumerModel.super.getUrl(), ActiveMQConsumerModel.super.getBroker());
                                mQTTConnection.setCallback(new MqttCallback() { // from class: com.ai.aif.msgframe.consumer.mq.activemq.ActiveMQConsumerModel.2.1
                                    public void connectionLost(Throwable th) {
                                    }

                                    public void messageArrived(String str2, MqttMessage mqttMessage) throws Exception {
                                        String consumerId = ActiveMQConsumerModel.this.scribe.getConsumerId();
                                        try {
                                            ActiveMQConsumerModel.this.processMsg(new JdkSerialize().deSerialize(mqttMessage.getPayload()), consumerId, (String) null, strArr);
                                        } catch (Exception e3) {
                                            ActiveMQConsumerModel.log.error("消费业务处理异常", e3);
                                        }
                                    }

                                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                                    }
                                });
                                mQTTConnection.subscribeWithResponse(subscribe.getSubDestination()).waitForCompletion();
                            } catch (Exception e3) {
                                e3.printStackTrace();
                            }
                        }
                    }, 5000L, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

    public Serializable receive(String str, int i) throws MsgFrameClientException, ConsumerException {
        MessageConsumer messageConsumer = null;
        try {
            try {
                MessageConsumer replyConsumer = getReplyConsumer(str);
                Message receive = replyConsumer.receive(i);
                if (receive == null) {
                    throw new MsgFrameClientException("同步消息：" + str + "请求超时，超时时间：" + i);
                }
                Serializable msg = MessageCovertUtil.transActiveMQMessage(receive).getMsg();
                if (replyConsumer != null) {
                    try {
                        replyConsumer.close();
                    } catch (JMSException e) {
                        throw new MsgFrameClientException("ActiveMQ发生错误", e);
                    }
                }
                return msg;
            } catch (Throwable th) {
                if (0 != 0) {
                    try {
                        messageConsumer.close();
                    } catch (JMSException e2) {
                        throw new MsgFrameClientException("ActiveMQ发生错误", e2);
                    }
                }
                throw th;
            }
        } catch (JMSException e3) {
            throw new MsgFrameClientException("ActiveMQ发生错误", e3);
        }
    }

    private MessageConsumer getReplyConsumer(String str) throws JMSException, MsgFrameClientException {
        if (isQueueType()) {
            return this.conn.createSession(getScribe().getTransaction(), 1).createConsumer(this.conn.createSession(getScribe().getTransaction(), 1).createQueue(getRealQueue() + "_reply"), "msg_message_id = '" + str + "'");
        }
        throw new MsgFrameClientException("主题不支持同步消息" + this);
    }

    private MessageConsumer getConsumer() throws RemoteException {
        if (this.consumer == null) {
            try {
                this.session = this.conn.createSession(getScribe().getTransaction(), 1);
                String realQueue = getRealQueue();
                if (getDestinationInfo().getModel().getDestination().getOrder()) {
                    realQueue = realQueue + "?consumer.exclusive=true";
                }
                this.consumer = this.session.createConsumer(getDestinationInfo().isQueueType() ? this.session.createQueue(realQueue) : this.session.createTopic(realQueue));
            } catch (JMSException e) {
                throw new RemoteException("activemq网络异常" + this, e);
            }
        }
        return this.consumer;
    }

    private void init() throws Exception {
        if (getUrl().contains("mqtt://")) {
            return;
        }
        this.conn = ActiveMQResources.getInstance().getConnection(getClusterName(), getUrl(), getBroker());
        if (transportListeners.containsKey(this.conn) || !(this.conn instanceof ActiveMQConnection)) {
            return;
        }
        ActiveMQConnection activeMQConnection = this.conn;
        MsgTransportListenerImpl msgTransportListenerImpl = new MsgTransportListenerImpl(this);
        activeMQConnection.addTransportListener(msgTransportListenerImpl);
        transportListeners.put(activeMQConnection, msgTransportListenerImpl);
    }

    public Subscribes.Subscribe getScribe() {
        return this.scribe;
    }

    public void setScribe(Subscribes.Subscribe subscribe) {
        this.scribe = subscribe;
    }

    public void pullSubscribe(String str, Subscribes.Subscribe subscribe, IConsumerProcessor... iConsumerProcessorArr) {
        if (this.started.compareAndSet(false, true)) {
            try {
                init();
            } catch (Exception e) {
                log.error("添加activemq生产者与broker的连接监听失败", e);
            }
            this.scribe = subscribe;
            try {
                this.taskSchedule = new ActiveMQPullConsumerScheduleService(getConsumer(), this.session, this, str, this.scribe);
                this.taskSchedule.setConsumerProcessor(iConsumerProcessorArr);
                this.taskSchedule.start();
            } catch (RemoteException e2) {
                log.error("订阅消息失败：" + this, e2);
            }
        }
    }

    public void pushSubscribe(String str, Subscribes.Subscribe subscribe, final IConsumerProcessor... iConsumerProcessorArr) {
        if (this.started.compareAndSet(false, true)) {
            try {
                init();
            } catch (Exception e) {
                log.error("添加activemq生产者与broker的连接监听失败", e);
            }
            this.scribe = subscribe;
            try {
                final MessageConsumer consumer = getConsumer();
                consumer.setMessageListener(new MessageListener() { // from class: com.ai.aif.msgframe.consumer.mq.activemq.ActiveMQConsumerModel.3
                    Session listenerSession;

                    {
                        this.listenerSession = ActiveMQConsumerModel.this.session;
                    }

                    public void onMessage(Message message) {
                        String str2 = null;
                        try {
                            if (consumer instanceof ActiveMQMessageConsumer) {
                                str2 = consumer.getConsumerId().toString();
                            }
                            Object processMsg = ActiveMQConsumerModel.this.processMsg(MessageCovertUtil.transActiveMQMessage(message), str2, (String) null, iConsumerProcessorArr);
                            if (ActiveMQConsumerModel.this.scribe.getTransaction() && this.listenerSession.getTransacted()) {
                                if (null == processMsg || !(processMsg instanceof Boolean) || ((Boolean) processMsg).booleanValue()) {
                                    this.listenerSession.commit();
                                } else {
                                    this.listenerSession.rollback();
                                }
                            }
                        } catch (Exception e2) {
                            ActiveMQConsumerModel.log.error("处理消息失败，失败原因" + ActiveMQConsumerModel.this, e2);
                            try {
                                if (ActiveMQConsumerModel.this.scribe.getTransaction() && this.listenerSession.getTransacted()) {
                                    this.listenerSession.rollback();
                                }
                            } catch (JMSException e3) {
                                ActiveMQConsumerModel.log.error("处理消息失败，回滚异常,consumer={}", ActiveMQConsumerModel.this, e3);
                            }
                        }
                    }
                });
            } catch (Exception e2) {
                log.error("订阅消息失败：" + this, e2);
            }
        }
    }

    public void unsubscribe(String str, String str2) throws MsgFrameClientException {
        try {
            if (this.consumer != null && this.session != null) {
                synchronized (lock) {
                    if (this.taskSchedule != null) {
                        this.taskSchedule.shutdown();
                        this.taskSchedule = null;
                    }
                    if (this.consumer != null && this.session != null) {
                        this.consumer.close();
                        this.session.close();
                        this.consumer = null;
                        this.session = null;
                    }
                }
            }
        } catch (JMSException e) {
            log.error("消费者停止消费出现异常", e);
        }
    }
}
