package com.ai.aif.msgframe.producer.mq.kafka.api;

import com.ai.aif.msgframe.common.CompletionListener;
import com.ai.aif.msgframe.common.IMsgForTxProducerInner;
import com.ai.aif.msgframe.common.exception.MsgFrameClientException;
import com.ai.aif.msgframe.common.message.MsgFMessageTX;
import com.ai.aif.msgframe.common.message.SendMode;
import com.ai.aif.msgframe.common.util.StringUtils;
import com.ai.aif.msgframe.producer.mq.BaseProducer;
import com.ai.aif.msgframe.producer.mq.kafka.KafkaProducerModel;
import com.ai.ipu.msgframe.util.MessageCovertUtil;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;

/* loaded from: input_file:com/ai/aif/msgframe/producer/mq/kafka/api/KafkaMsgForTxProducer.class */
public class KafkaMsgForTxProducer extends BaseProducer<KafkaProducerModel, MsgFMessageTX> implements IMsgForTxProducerInner {
    private static final ThreadLocal<ConcurrentHashMap<String, Producer>> threadLocalSession = new ThreadLocal<>();

    public KafkaMsgForTxProducer(KafkaProducerModel kafkaProducerModel) {
        super(kafkaProducerModel);
    }

    @Override // com.ai.aif.msgframe.producer.mq.BaseProducer
    public void send0(MsgFMessageTX msgFMessageTX, CompletionListener completionListener) throws MsgFrameClientException, RemoteException {
        if (StringUtils.isNotBlank(msgFMessageTX.getSendMode()) && SendMode.ONEWAY.getValue().equals(msgFMessageTX.getSendMode())) {
            throw new MsgFrameClientException("send exception:Oneway delivery mode for kafka does not support temporarily");
        }
        try {
            getProducer(msgFMessageTX).send(MessageCovertUtil.transKafkaMessage(getModelInfo().getSubject(), msgFMessageTX));
            if (null != completionListener) {
                completionListener.onCompletion(msgFMessageTX);
            }
        } catch (KafkaException e) {
            if (null != completionListener) {
                completionListener.onException(msgFMessageTX, e);
            }
            throw new RemoteException("kafka网络异常", e);
        }
    }

    public boolean commit() throws MsgFrameClientException {
        try {
            try {
                if (threadLocalSession.get() != null) {
                    Iterator<Map.Entry<String, Producer>> it = threadLocalSession.get().entrySet().iterator();
                    while (it.hasNext()) {
                        try {
                            it.next().getValue().commitTransaction();
                        } catch (Throwable th) {
                            th.printStackTrace();
                        }
                    }
                }
                return true;
            } catch (Throwable th2) {
                th2.printStackTrace();
                clearResources();
                return true;
            }
        } finally {
            clearResources();
        }
    }

    public boolean rollback() throws MsgFrameClientException {
        try {
            try {
                if (threadLocalSession.get() != null) {
                    Iterator<Map.Entry<String, Producer>> it = threadLocalSession.get().entrySet().iterator();
                    while (it.hasNext()) {
                        try {
                            it.next().getValue().abortTransaction();
                        } catch (Throwable th) {
                            th.printStackTrace();
                        }
                    }
                }
                return true;
            } catch (Throwable th2) {
                th2.printStackTrace();
                clearResources();
                return true;
            }
        } finally {
            clearResources();
        }
    }

    public boolean isUsed() {
        return false;
    }

    public void setUsed(boolean z) {
    }

    public List<MsgFMessageTX> getMessage(String str) throws Exception {
        return new ArrayList();
    }

    private Producer getProducer(MsgFMessageTX msgFMessageTX) throws RemoteException {
        String subject = msgFMessageTX.getSubject();
        if (null == threadLocalSession.get()) {
            threadLocalSession.set(new ConcurrentHashMap<>());
        }
        if (null == threadLocalSession.get().get(subject)) {
            try {
                Producer<String, byte[]> producerTxByCache = KafkaResources.getInstance().getProducerTxByCache(getModelInfo());
                producerTxByCache.initTransactions();
                producerTxByCache.beginTransaction();
                threadLocalSession.get().put(subject, producerTxByCache);
            } catch (Throwable th) {
                throw new RemoteException("kafka网络错误,创建生产者异常" + getModelInfo(), th);
            }
        }
        return threadLocalSession.get().get(subject);
    }

    private void clearResources() {
        setUsed(Boolean.FALSE.booleanValue());
        try {
            try {
                if (threadLocalSession.get() != null) {
                    Iterator<Map.Entry<String, Producer>> it = threadLocalSession.get().entrySet().iterator();
                    while (it.hasNext()) {
                        try {
                            it.next().getValue().close();
                        } catch (Throwable th) {
                            th.printStackTrace();
                        }
                    }
                }
                threadLocalSession.remove();
            } catch (Throwable th2) {
                th2.printStackTrace();
                threadLocalSession.remove();
            }
        } catch (Throwable th3) {
            threadLocalSession.remove();
            throw th3;
        }
    }

    public /* bridge */ /* synthetic */ void send(MsgFMessageTX msgFMessageTX, CompletionListener completionListener) throws MsgFrameClientException, RemoteException {
        super.send((KafkaMsgForTxProducer) msgFMessageTX, completionListener);
    }
}
