package org.eclipse.moquette.spi.impl;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.eclipse.moquette.proto.messages.AbstractMessage;
import org.eclipse.moquette.proto.messages.ConnAckMessage;
import org.eclipse.moquette.proto.messages.ConnectMessage;
import org.eclipse.moquette.proto.messages.DisconnectMessage;
import org.eclipse.moquette.proto.messages.PubAckMessage;
import org.eclipse.moquette.proto.messages.PubCompMessage;
import org.eclipse.moquette.proto.messages.PubRecMessage;
import org.eclipse.moquette.proto.messages.PubRelMessage;
import org.eclipse.moquette.proto.messages.PublishMessage;
import org.eclipse.moquette.proto.messages.SubAckMessage;
import org.eclipse.moquette.proto.messages.SubscribeMessage;
import org.eclipse.moquette.proto.messages.UnsubAckMessage;
import org.eclipse.moquette.proto.messages.UnsubscribeMessage;
import org.eclipse.moquette.server.ConnectionDescriptor;
import org.eclipse.moquette.server.ServerChannel;
import org.eclipse.moquette.server.netty.NettyChannel;
import org.eclipse.moquette.spi.IMatchingCondition;
import org.eclipse.moquette.spi.IMessagesStore;
import org.eclipse.moquette.spi.ISessionsStore;
import org.eclipse.moquette.spi.impl.events.LostConnectionEvent;
import org.eclipse.moquette.spi.impl.events.OutputMessagingEvent;
import org.eclipse.moquette.spi.impl.events.PubAckEvent;
import org.eclipse.moquette.spi.impl.events.PublishEvent;
import org.eclipse.moquette.spi.impl.security.IAuthenticator;
import org.eclipse.moquette.spi.impl.security.IAuthorizator;
import org.eclipse.moquette.spi.impl.subscriptions.Subscription;
import org.eclipse.moquette.spi.impl.subscriptions.SubscriptionsStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes.dex */
public class ProtocolProcessor implements EventHandler<ValueEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ProtocolProcessor.class);
    private boolean allowAnonymous;
    private IAuthenticator m_authenticator;
    private IAuthorizator m_authorizator;
    private ExecutorService m_executor;
    private IMessagesStore m_messagesStore;
    private RingBuffer<ValueEvent> m_ringBuffer;
    private ISessionsStore m_sessionsStore;
    private SubscriptionsStore subscriptions;
    private Map<String, ConnectionDescriptor> m_clientIDs = new HashMap();
    private Map<String, WillMessage> m_willStore = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public static final class WillMessage {
        private final ByteBuffer payload;
        private final AbstractMessage.QOSType qos;
        private final boolean retained;
        private final String topic;

        public WillMessage(String str, ByteBuffer byteBuffer, boolean z, AbstractMessage.QOSType qOSType) {
            this.topic = str;
            this.payload = byteBuffer;
            this.retained = z;
            this.qos = qOSType;
        }

        public ByteBuffer getPayload() {
            return this.payload;
        }

        public AbstractMessage.QOSType getQos() {
            return this.qos;
        }

        public String getTopic() {
            return this.topic;
        }

        public boolean isRetained() {
            return this.retained;
        }
    }

    private void cleanSession(String str) {
        LOG.info("cleaning old saved subscriptions for client <{}>", str);
        this.m_sessionsStore.wipeSubscriptions(str);
        this.subscriptions.removeForClient(str);
        this.m_messagesStore.dropMessagesInSession(str);
    }

    private void disruptorPublish(OutputMessagingEvent outputMessagingEvent) {
        LOG.debug("disruptorPublish publishing event on output {}", outputMessagingEvent);
        long next = this.m_ringBuffer.next();
        this.m_ringBuffer.get(next).setEvent(outputMessagingEvent);
        this.m_ringBuffer.publish(next);
    }

    private void executePublish(String str, PublishMessage publishMessage) {
        String topicName = publishMessage.getTopicName();
        AbstractMessage.QOSType qos = publishMessage.getQos();
        ByteBuffer payload = publishMessage.getPayload();
        boolean isRetainFlag = publishMessage.isRetainFlag();
        Integer messageID = publishMessage.getMessageID();
        LOG.info("PUBLISH from clientID <{}> on topic <{}> with QoS {}", str, topicName, qos);
        PublishEvent publishEvent = new PublishEvent(str, publishMessage);
        if (qos == AbstractMessage.QOSType.MOST_ONE) {
            forward2Subscribers(publishEvent);
        } else if (qos == AbstractMessage.QOSType.LEAST_ONE) {
            this.m_messagesStore.addInFlight(publishEvent, str, messageID.intValue());
            forward2Subscribers(publishEvent);
            this.m_messagesStore.cleanInFlight(str, messageID.intValue());
            sendPubAck(new PubAckEvent(messageID.intValue(), str));
            LOG.debug("replying with PubAck to MSG ID {}", messageID);
        } else if (qos == AbstractMessage.QOSType.EXACTLY_ONCE) {
            this.m_messagesStore.persistQoS2Message(String.format("%s%d", str, messageID), publishEvent);
            sendPubRec(str, messageID.intValue());
        }
        if (isRetainFlag) {
            if (qos == AbstractMessage.QOSType.MOST_ONE) {
                this.m_messagesStore.cleanRetained(topicName);
            } else {
                this.m_messagesStore.storeRetained(topicName, payload, qos);
            }
        }
    }

    private void failedCredentials(ServerChannel serverChannel) {
        ConnAckMessage connAckMessage = new ConnAckMessage();
        connAckMessage.setReturnCode((byte) 4);
        serverChannel.write(connAckMessage);
        serverChannel.close(false);
    }

    private void forwardPublishWill(WillMessage willMessage, String str) {
        forward2Subscribers(new PublishEvent(willMessage.getTopic(), willMessage.getQos(), willMessage.getPayload(), willMessage.isRetained(), str, willMessage.getQos() != AbstractMessage.QOSType.MOST_ONE ? Integer.valueOf(this.m_messagesStore.nextPacketID(str)) : null));
    }

    private void republishStoredInSession(String str) {
        LOG.trace("republishStoredInSession for client <{}>", str);
        List<PublishEvent> listMessagesInSession = this.m_messagesStore.listMessagesInSession(str);
        if (listMessagesInSession.isEmpty()) {
            LOG.info("No stored messages for client <{}>", str);
            return;
        }
        LOG.info("republishing stored messages to client <{}>", str);
        for (PublishEvent publishEvent : listMessagesInSession) {
            sendPublish(publishEvent.getClientID(), publishEvent.getTopic(), publishEvent.getQos(), publishEvent.getMessage(), false, publishEvent.getMessageID());
            this.m_messagesStore.removeMessageInSession(str, publishEvent.getMessageID());
        }
    }

    private void sendPubAck(PubAckEvent pubAckEvent) {
        LOG.trace("sendPubAck invoked");
        String clientID = pubAckEvent.getClientID();
        PubAckMessage pubAckMessage = new PubAckMessage();
        pubAckMessage.setMessageID(Integer.valueOf(pubAckEvent.getMessageId()));
        try {
            if (this.m_clientIDs == null) {
                throw new RuntimeException("Internal bad error, found m_clientIDs to null while it should be initialized, somewhere it's overwritten!!");
            }
            LOG.debug("clientIDs are {}", this.m_clientIDs);
            if (this.m_clientIDs.get(clientID) == null) {
                throw new RuntimeException(String.format("Can't find a ConnectionDescriptor for client %s in cache %s", clientID, this.m_clientIDs));
            }
            disruptorPublish(new OutputMessagingEvent(this.m_clientIDs.get(clientID).getSession(), pubAckMessage));
        } catch (Throwable th) {
            LOG.error((String) null, th);
        }
    }

    private void sendPubComp(String str, int i) {
        LOG.debug("PUB <--PUBCOMP-- SRV sendPubComp invoked for clientID {} ad messageID {}", str, Integer.valueOf(i));
        PubCompMessage pubCompMessage = new PubCompMessage();
        pubCompMessage.setMessageID(Integer.valueOf(i));
        disruptorPublish(new OutputMessagingEvent(this.m_clientIDs.get(str).getSession(), pubCompMessage));
    }

    private void sendPubRec(String str, int i) {
        LOG.trace("PUB <--PUBREC-- SRV sendPubRec invoked for clientID {} with messageID {}", str, Integer.valueOf(i));
        PubRecMessage pubRecMessage = new PubRecMessage();
        pubRecMessage.setMessageID(Integer.valueOf(i));
        disruptorPublish(new OutputMessagingEvent(this.m_clientIDs.get(str).getSession(), pubRecMessage));
    }

    private boolean subscribeSingleTopic(Subscription subscription, final String str) {
        LOG.info("<{}> subscribed to topic <{}> with QoS {}", subscription.getClientId(), str, AbstractMessage.QOSType.formatQoS(subscription.getRequestedQos()));
        if (!SubscriptionsStore.validate(subscription)) {
            return false;
        }
        this.m_sessionsStore.addNewSubscription(subscription);
        this.subscriptions.add(subscription);
        for (IMessagesStore.StoredMessage storedMessage : this.m_messagesStore.searchMatching(new IMatchingCondition() { // from class: org.eclipse.moquette.spi.impl.ProtocolProcessor.1
            @Override // org.eclipse.moquette.spi.IMatchingCondition
            public boolean match(String str2) {
                return SubscriptionsStore.matchTopics(str2, str);
            }
        })) {
            LOG.debug("send publish message for topic {}", str);
            sendPublish(subscription.getClientId(), storedMessage.getTopic(), storedMessage.getQos(), storedMessage.getPayload(), true, storedMessage.getQos() == AbstractMessage.QOSType.MOST_ONE ? null : Integer.valueOf(this.m_messagesStore.nextPacketID(subscription.getClientId())));
        }
        return true;
    }

    void forward2Subscribers(PublishEvent publishEvent) {
        String topic = publishEvent.getTopic();
        AbstractMessage.QOSType qos = publishEvent.getQos();
        ByteBuffer message = publishEvent.getMessage();
        boolean isRetain = publishEvent.isRetain();
        Integer messageID = publishEvent.getMessageID();
        LOG.debug("forward2Subscribers republishing to existing subscribers that matches the topic {}", topic);
        if (LOG.isDebugEnabled()) {
            LOG.debug("content <{}>", DebugUtils.payload2Str(message));
            LOG.debug("subscription tree {}", this.subscriptions.dumpTree());
        }
        for (Subscription subscription : this.subscriptions.matches(topic)) {
            AbstractMessage.QOSType qOSType = qos;
            if (qOSType.ordinal() > subscription.getRequestedQos().ordinal()) {
                qOSType = subscription.getRequestedQos();
            }
            LOG.debug("Broker republishing to client <{}> topic <{}> qos <{}>, active {}", subscription.getClientId(), subscription.getTopicFilter(), qOSType, Boolean.valueOf(subscription.isActive()));
            ByteBuffer duplicate = message.duplicate();
            if (qOSType == AbstractMessage.QOSType.MOST_ONE && subscription.isActive()) {
                sendPublish(subscription.getClientId(), topic, qOSType, duplicate, false, null);
            } else if (subscription.isCleanSession() || subscription.isActive()) {
                if (qOSType == AbstractMessage.QOSType.EXACTLY_ONCE) {
                    this.m_messagesStore.addInFlight(new PublishEvent(topic, qOSType, duplicate, isRetain, subscription.getClientId(), Integer.valueOf(messageID != null ? messageID.intValue() : 0)), subscription.getClientId(), messageID.intValue());
                }
                if (subscription.isActive()) {
                    sendPublish(subscription.getClientId(), topic, qOSType, duplicate, false, Integer.valueOf(this.m_messagesStore.nextPacketID(subscription.getClientId())));
                }
            } else {
                this.m_messagesStore.storePublishForFuture(new PublishEvent(topic, qOSType, duplicate, isRetain, subscription.getClientId(), Integer.valueOf(messageID != null ? messageID.intValue() : 0)));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void init(SubscriptionsStore subscriptionsStore, IMessagesStore iMessagesStore, ISessionsStore iSessionsStore, IAuthenticator iAuthenticator, boolean z, IAuthorizator iAuthorizator) {
        this.subscriptions = subscriptionsStore;
        this.allowAnonymous = z;
        this.m_authorizator = iAuthorizator;
        LOG.debug("subscription tree on init {}", subscriptionsStore.dumpTree());
        this.m_authenticator = iAuthenticator;
        this.m_messagesStore = iMessagesStore;
        this.m_sessionsStore = iSessionsStore;
        this.m_executor = Executors.newFixedThreadPool(1);
        Disruptor disruptor = new Disruptor(ValueEvent.EVENT_FACTORY, 32768, this.m_executor);
        disruptor.handleEventsWith(this);
        disruptor.start();
        this.m_ringBuffer = disruptor.getRingBuffer();
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(ValueEvent valueEvent, long j, boolean z) throws Exception {
        try {
            OutputMessagingEvent outputMessagingEvent = (OutputMessagingEvent) valueEvent.getEvent();
            LOG.debug("Output event, sending {}", outputMessagingEvent.getMessage());
            outputMessagingEvent.getChannel().write(outputMessagingEvent.getMessage());
        } finally {
            valueEvent.setEvent(null);
        }
    }

    @MQTTMessage(message = ConnectMessage.class)
    void processConnect(ServerChannel serverChannel, ConnectMessage connectMessage) {
        LOG.debug("CONNECT for client <{}>", connectMessage.getClientID());
        if (connectMessage.getProcotolVersion() != 3 && connectMessage.getProcotolVersion() != 4) {
            ConnAckMessage connAckMessage = new ConnAckMessage();
            connAckMessage.setReturnCode((byte) 1);
            LOG.warn("processConnect sent bad proto ConnAck");
            serverChannel.write(connAckMessage);
            serverChannel.close(false);
            return;
        }
        if (connectMessage.getClientID() == null || connectMessage.getClientID().length() == 0) {
            ConnAckMessage connAckMessage2 = new ConnAckMessage();
            connAckMessage2.setReturnCode((byte) 2);
            serverChannel.write(connAckMessage2);
            return;
        }
        if (connectMessage.isUserFlag()) {
            String str = null;
            if (connectMessage.isPasswordFlag()) {
                str = connectMessage.getPassword();
            } else if (!this.allowAnonymous) {
                failedCredentials(serverChannel);
                return;
            }
            if (!this.m_authenticator.checkValid(connectMessage.getUsername(), str)) {
                failedCredentials(serverChannel);
                return;
            }
            serverChannel.setAttribute(NettyChannel.ATTR_KEY_USERNAME, connectMessage.getUsername());
        } else if (!this.allowAnonymous) {
            failedCredentials(serverChannel);
            return;
        }
        if (this.m_clientIDs.containsKey(connectMessage.getClientID())) {
            LOG.info("Found an existing connection with same client ID <{}>, forcing to close", connectMessage.getClientID());
            ServerChannel session = this.m_clientIDs.get(connectMessage.getClientID()).getSession();
            if (((Boolean) session.getAttribute(NettyChannel.ATTR_KEY_CLEANSESSION)).booleanValue()) {
                cleanSession(connectMessage.getClientID());
            }
            session.close(false);
            LOG.debug("Existing connection with same client ID <{}>, forced to close", connectMessage.getClientID());
        }
        this.m_clientIDs.put(connectMessage.getClientID(), new ConnectionDescriptor(connectMessage.getClientID(), serverChannel, connectMessage.isCleanSession()));
        int keepAlive = connectMessage.getKeepAlive();
        LOG.debug("Connect with keepAlive {} s", Integer.valueOf(keepAlive));
        serverChannel.setAttribute(NettyChannel.ATTR_KEY_KEEPALIVE, Integer.valueOf(keepAlive));
        serverChannel.setAttribute(NettyChannel.ATTR_KEY_CLEANSESSION, Boolean.valueOf(connectMessage.isCleanSession()));
        serverChannel.setAttribute(NettyChannel.ATTR_KEY_CLIENTID, connectMessage.getClientID());
        LOG.debug("Connect create session <{}>", serverChannel);
        serverChannel.setIdleTime(Math.round(keepAlive * 1.5f));
        if (connectMessage.isWillFlag()) {
            AbstractMessage.QOSType qOSType = AbstractMessage.QOSType.values()[connectMessage.getWillQos()];
            byte[] willMessage = connectMessage.getWillMessage();
            this.m_willStore.put(connectMessage.getClientID(), new WillMessage(connectMessage.getWillTopic(), (ByteBuffer) ByteBuffer.allocate(willMessage.length).put(willMessage).flip(), connectMessage.isWillRetain(), qOSType));
        }
        this.subscriptions.activate(connectMessage.getClientID());
        if (connectMessage.isCleanSession()) {
            cleanSession(connectMessage.getClientID());
        }
        ConnAckMessage connAckMessage3 = new ConnAckMessage();
        connAckMessage3.setReturnCode((byte) 0);
        if (!connectMessage.isCleanSession() && this.m_sessionsStore.contains(connectMessage.getClientID())) {
            connAckMessage3.setSessionPresent(true);
        }
        serverChannel.write(connAckMessage3);
        LOG.info("Create persistent session for clientID <{}>", connectMessage.getClientID());
        this.m_sessionsStore.addNewSubscription(Subscription.createEmptySubscription(connectMessage.getClientID(), true));
        LOG.info("Connected client ID <{}> with clean session {}", connectMessage.getClientID(), Boolean.valueOf(connectMessage.isCleanSession()));
        if (connectMessage.isCleanSession()) {
            return;
        }
        republishStoredInSession(connectMessage.getClientID());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processConnectionLost(LostConnectionEvent lostConnectionEvent) {
        String str = lostConnectionEvent.clientID;
        if (this.m_clientIDs.remove(str) != null) {
            this.subscriptions.deactivate(str);
            LOG.info("Lost connection with client <{}>", str);
        }
        if (this.m_willStore.containsKey(str)) {
            forwardPublishWill(this.m_willStore.get(str), str);
            this.m_willStore.remove(str);
        }
    }

    @MQTTMessage(message = DisconnectMessage.class)
    void processDisconnect(ServerChannel serverChannel, DisconnectMessage disconnectMessage) throws InterruptedException {
        String str = (String) serverChannel.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
        boolean booleanValue = ((Boolean) serverChannel.getAttribute(NettyChannel.ATTR_KEY_CLEANSESSION)).booleanValue();
        if (booleanValue) {
            cleanSession(str);
        }
        this.m_clientIDs.remove(str);
        serverChannel.close(true);
        this.subscriptions.deactivate(str);
        this.m_willStore.remove(str);
        LOG.info("DISCONNECT client <{}> with clean session {}", str, Boolean.valueOf(booleanValue));
    }

    @MQTTMessage(message = PubAckMessage.class)
    void processPubAck(ServerChannel serverChannel, PubAckMessage pubAckMessage) {
        this.m_messagesStore.removeMessageInSession((String) serverChannel.getAttribute(NettyChannel.ATTR_KEY_CLIENTID), Integer.valueOf(pubAckMessage.getMessageID().intValue()));
    }

    @MQTTMessage(message = PubCompMessage.class)
    void processPubComp(ServerChannel serverChannel, PubCompMessage pubCompMessage) {
        String str = (String) serverChannel.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
        int intValue = pubCompMessage.getMessageID().intValue();
        LOG.debug("\t\tSRV <--PUBCOMP-- SUB processPubComp invoked for clientID {} ad messageID {}", str, Integer.valueOf(intValue));
        this.m_messagesStore.cleanInFlight(str, intValue);
    }

    @MQTTMessage(message = PubRecMessage.class)
    void processPubRec(ServerChannel serverChannel, PubRecMessage pubRecMessage) {
        String str = (String) serverChannel.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
        int intValue = pubRecMessage.getMessageID().intValue();
        LOG.debug("\t\tSRV <--PUBREC-- SUB processPubRec invoked for clientID {} ad messageID {}", str, Integer.valueOf(intValue));
        PubRelMessage pubRelMessage = new PubRelMessage();
        pubRelMessage.setMessageID(Integer.valueOf(intValue));
        pubRelMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
        serverChannel.write(pubRelMessage);
    }

    @MQTTMessage(message = PubRelMessage.class)
    void processPubRel(ServerChannel serverChannel, PubRelMessage pubRelMessage) {
        String str = (String) serverChannel.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
        int intValue = pubRelMessage.getMessageID().intValue();
        LOG.debug("PUB --PUBREL--> SRV processPubRel invoked for clientID {} ad messageID {}", str, Integer.valueOf(intValue));
        String format = String.format("%s%d", str, Integer.valueOf(intValue));
        PublishEvent retrieveQoS2Message = this.m_messagesStore.retrieveQoS2Message(format);
        forward2Subscribers(retrieveQoS2Message);
        this.m_messagesStore.removeQoS2Message(format);
        if (retrieveQoS2Message.isRetain()) {
            this.m_messagesStore.storeRetained(retrieveQoS2Message.getTopic(), retrieveQoS2Message.getMessage(), retrieveQoS2Message.getQos());
        }
        sendPubComp(str, intValue);
    }

    @MQTTMessage(message = PublishMessage.class)
    void processPublish(ServerChannel serverChannel, PublishMessage publishMessage) {
        LOG.trace("PUB --PUBLISH--> SRV executePublish invoked with {}", publishMessage);
        String str = (String) serverChannel.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
        String topicName = publishMessage.getTopicName();
        if (this.m_authorizator.canWrite(topicName, (String) serverChannel.getAttribute(NettyChannel.ATTR_KEY_USERNAME), str)) {
            executePublish(str, publishMessage);
        } else {
            LOG.debug("topic {} doesn't have write credentials", topicName);
        }
    }

    @MQTTMessage(message = SubscribeMessage.class)
    void processSubscribe(ServerChannel serverChannel, SubscribeMessage subscribeMessage) {
        String str = (String) serverChannel.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
        boolean booleanValue = ((Boolean) serverChannel.getAttribute(NettyChannel.ATTR_KEY_CLEANSESSION)).booleanValue();
        LOG.debug("SUBSCRIBE client <{}> packetID {}", str, subscribeMessage.getMessageID());
        SubAckMessage subAckMessage = new SubAckMessage();
        subAckMessage.setMessageID(subscribeMessage.getMessageID());
        for (SubscribeMessage.Couple couple : subscribeMessage.subscriptions()) {
            AbstractMessage.QOSType qOSType = AbstractMessage.QOSType.values()[couple.getQos()];
            if (!subscribeSingleTopic(new Subscription(str, couple.getTopicFilter(), qOSType, booleanValue), couple.getTopicFilter())) {
                qOSType = AbstractMessage.QOSType.FAILURE;
            }
            subAckMessage.addType(qOSType);
        }
        LOG.debug("SUBACK for packetID {}", subscribeMessage.getMessageID());
        serverChannel.write(subAckMessage);
    }

    @MQTTMessage(message = UnsubscribeMessage.class)
    void processUnsubscribe(ServerChannel serverChannel, UnsubscribeMessage unsubscribeMessage) {
        List<String> list = unsubscribeMessage.topicFilters();
        int intValue = unsubscribeMessage.getMessageID().intValue();
        String str = (String) serverChannel.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
        LOG.debug("UNSUBSCRIBE subscription on topics {} for clientID <{}>", list, str);
        for (String str2 : list) {
            this.subscriptions.removeSubscription(str2, str);
            this.m_sessionsStore.removeSubscription(str2, str);
        }
        UnsubAckMessage unsubAckMessage = new UnsubAckMessage();
        unsubAckMessage.setMessageID(Integer.valueOf(intValue));
        LOG.info("replying with UnsubAck to MSG ID {}", Integer.valueOf(intValue));
        serverChannel.write(unsubAckMessage);
    }

    protected void sendPublish(String str, String str2, AbstractMessage.QOSType qOSType, ByteBuffer byteBuffer, boolean z, Integer num) {
        LOG.debug("sendPublish invoked clientId <{}> on topic <{}> QoS {} retained {} messageID {}", str, str2, qOSType, Boolean.valueOf(z), num);
        PublishMessage publishMessage = new PublishMessage();
        publishMessage.setRetainFlag(z);
        publishMessage.setTopicName(str2);
        publishMessage.setQos(qOSType);
        publishMessage.setPayload(byteBuffer);
        LOG.info("send publish message to <{}> on topic <{}>", str, str2);
        if (LOG.isDebugEnabled()) {
            LOG.debug("content <{}>", DebugUtils.payload2Str(byteBuffer));
        }
        if (publishMessage.getQos() != AbstractMessage.QOSType.MOST_ONE) {
            publishMessage.setMessageID(num);
        } else if (num != null) {
            throw new RuntimeException("Internal bad error, trying to forwardPublish a QoS 0 message with PacketIdentifier: " + num);
        }
        if (this.m_clientIDs == null) {
            throw new RuntimeException("Internal bad error, found m_clientIDs to null while it should be initialized, somewhere it's overwritten!!");
        }
        LOG.debug("clientIDs are {}", this.m_clientIDs);
        if (this.m_clientIDs.get(str) == null) {
            throw new RuntimeException(String.format("Can't find a ConnectionDescriptor for client <%s> in cache <%s>", str, this.m_clientIDs));
        }
        ServerChannel session = this.m_clientIDs.get(str).getSession();
        LOG.debug("Session for clientId {} is {}", str, session);
        if (this.m_authorizator.canRead(str2, (String) session.getAttribute(NettyChannel.ATTR_KEY_USERNAME), str)) {
            disruptorPublish(new OutputMessagingEvent(session, publishMessage));
        } else {
            LOG.debug("topic {} doesn't have read credentials", str2);
        }
    }
}
