package com.ai.ipu.push.server.action.b;

import com.ai.ipu.basic.util.IpuUtility;
import com.ai.ipu.push.server.action.IPushAction;
import com.ai.ipu.push.server.action.PushActionManager;
import com.ai.ipu.push.server.mqtt.entity.stable.TopicClientEntity;
import com.ai.ipu.push.server.mqtt.entity.standard.TopicChannelEntity;
import com.ai.ipu.push.server.mqtt.manager.IMqttServerManager;
import com.ai.ipu.push.server.mqtt.manager.MqttServerManagerFactory;
import com.ai.ipu.push.server.mqtt.manager.StableMqttServerManager;
import com.ai.ipu.push.server.mqtt.manager.StandardMqttServerManager;
import com.ai.ipu.push.server.util.IpUtil;
import com.ai.ipu.push.server.util.NettyAttrUtil;
import com.ai.ipu.push.server.util.NettyMqttUtil;
import com.ailk.common.data.IData;
import com.ailk.common.data.impl.DataMap;
import io.netty.channel.Channel;
import java.util.Iterator;

/* compiled from: PublishByTopicAction.java */
/* loaded from: input_file:com/ai/ipu/push/server/action/b/g.class */
public class g implements IPushAction {
    @Override // com.ai.ipu.push.server.action.IPushAction
    public IData doAction(IData iData) {
        final String string = iData.getString("msg");
        final String string2 = iData.getString("topic");
        DataMap dataMap = new DataMap();
        final IMqttServerManager mqttServerManager = MqttServerManagerFactory.getMqttServerManager();
        if (mqttServerManager instanceof StandardMqttServerManager) {
            TopicChannelEntity topicChannelEntity = ((StandardMqttServerManager) mqttServerManager).getTopicChannelEntity(string2);
            if (topicChannelEntity == null) {
                dataMap.put("msg", "无推送主题");
                return dataMap;
            }
            Iterator it = topicChannelEntity.getChannels().iterator();
            while (it.hasNext()) {
                final Channel channel = (Channel) it.next();
                PushActionManager.executeActionByThreadPool(new Runnable() { // from class: com.ai.ipu.push.server.action.b.g.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            NettyMqttUtil.publishMessage(channel, NettyMqttUtil.createMqttPublishMessage(string2, IpUtil.getRemotePort(channel), string, mqttServerManager.getClientMappedEntity(NettyAttrUtil.takeClientId(channel)).getQosMap().get(string2)));
                        } catch (Exception e) {
                            IpuUtility.error("推送消息给topic的任务失败:" + e.getMessage());
                        }
                    }
                });
            }
        } else if (mqttServerManager instanceof StableMqttServerManager) {
            final TopicClientEntity topicClientEntity = ((StableMqttServerManager) mqttServerManager).getTopicClientEntity(string2);
            if (topicClientEntity == null) {
                dataMap.put("msg", "无推送主题");
                return dataMap;
            }
            for (final String str : topicClientEntity.getClientIds()) {
                PushActionManager.executeActionByThreadPool(new Runnable() { // from class: com.ai.ipu.push.server.action.b.g.2
                    @Override // java.lang.Runnable
                    public void run() {
                        Channel channel2 = ((StableMqttServerManager) mqttServerManager).getChannel(str);
                        try {
                            NettyMqttUtil.publishMessage(channel2, NettyMqttUtil.createMqttPublishMessage(string2, IpUtil.getRemotePort(channel2), string, topicClientEntity.getQosMap().get(str)));
                        } catch (Exception e) {
                            IpuUtility.error("推送消息给topic的任务失败:" + e.getMessage());
                        }
                    }
                });
            }
        }
        dataMap.put("msg", "推送消息给指定主题的任务成功");
        return dataMap;
    }
}
