package com.couchbase.client.dcp.transport.netty;

import com.couchbase.client.core.lang.backport.java.util.Objects;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.buffer.DcpRequestDispatcher;
import com.couchbase.client.dcp.conductor.DcpChannelControlHandler;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpNoopRequest;
import com.couchbase.client.dcp.message.DcpNoopResponse;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter;
import com.couchbase.client.deps.io.netty.handler.timeout.IdleState;
import com.couchbase.client.deps.io.netty.handler.timeout.IdleStateEvent;
import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.deps.io.netty.util.concurrent.EventExecutor;
import com.couchbase.client.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.deps.io.netty.util.concurrent.ImmediateEventExecutor;
import com.couchbase.client.deps.io.netty.util.concurrent.Promise;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;

/* loaded from: input_file:com/couchbase/client/dcp/transport/netty/DcpMessageHandler.class */
public class DcpMessageHandler extends ChannelInboundHandlerAdapter implements DcpRequestDispatcher {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) DcpMessageHandler.class);
    private final DataEventHandler dataEventHandler;
    private final DcpChannelControlHandler controlHandler;
    private final ChannelFlowController flowController;
    private volatile ChannelHandlerContext volatileContext;
    private int nextOpaque = Integer.MIN_VALUE;
    private final Queue<OutstandingRequest> outstandingRequests = new ArrayDeque();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/couchbase/client/dcp/transport/netty/DcpMessageHandler$OutstandingRequest.class */
    public static class OutstandingRequest {
        private final int opaque;
        private final Promise<DcpResponse> promise;

        private OutstandingRequest(int i, Promise<DcpResponse> promise) {
            this.opaque = i;
            this.promise = (Promise) Objects.requireNonNull(promise);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DcpMessageHandler(Channel channel, ClientEnvironment clientEnvironment, DcpChannelControlHandler dcpChannelControlHandler) {
        this.dataEventHandler = clientEnvironment.dataEventHandler();
        this.controlHandler = dcpChannelControlHandler;
        this.flowController = new ChannelFlowControllerImpl(channel, clientEnvironment);
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.volatileContext = channelHandlerContext;
        super.channelActive(channelHandlerContext);
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.volatileContext = null;
        NotConnectedException notConnectedException = new NotConnectedException("Channel became inactive while awaiting response.");
        Iterator<OutstandingRequest> it = this.outstandingRequests.iterator();
        while (it.hasNext()) {
            try {
                it.next().promise.setFailure(notConnectedException);
            } catch (Throwable th) {
                LOGGER.error("Failed to set promise failure", th);
            }
        }
        this.outstandingRequests.clear();
        super.channelInactive(channelHandlerContext);
    }

    @Override // com.couchbase.client.dcp.buffer.DcpRequestDispatcher
    public Future<DcpResponse> sendRequest(final ByteBuf byteBuf) {
        ChannelHandlerContext channelHandlerContext = this.volatileContext;
        if (channelHandlerContext == null) {
            ReferenceCountUtil.safeRelease(byteBuf);
            return ImmediateEventExecutor.INSTANCE.newFailedFuture(new NotConnectedException("Failed to issue request; channel is not active."));
        }
        EventExecutor executor = channelHandlerContext.executor();
        final Promise<DcpResponse> newPromise = executor.newPromise();
        if (executor.inEventLoop()) {
            unsafeSendRequest(channelHandlerContext, byteBuf, newPromise);
            return newPromise;
        }
        try {
            executor.submit(new Runnable() { // from class: com.couchbase.client.dcp.transport.netty.DcpMessageHandler.1
                @Override // java.lang.Runnable
                public void run() {
                    ChannelHandlerContext channelHandlerContext2 = DcpMessageHandler.this.volatileContext;
                    if (channelHandlerContext2 != null) {
                        DcpMessageHandler.this.unsafeSendRequest(channelHandlerContext2, byteBuf, newPromise);
                    } else {
                        ReferenceCountUtil.safeRelease(byteBuf);
                        newPromise.setFailure(new NotConnectedException("Failed to issue request; channel is not active."));
                    }
                }
            });
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(byteBuf);
            newPromise.setFailure(th);
        }
        return newPromise;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unsafeSendRequest(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, Promise<DcpResponse> promise) {
        if (!channelHandlerContext.executor().inEventLoop()) {
            throw new IllegalStateException("Must not be called outside event loop");
        }
        try {
            int i = this.nextOpaque;
            this.nextOpaque = i + 1;
            MessageUtil.setOpaque(i, byteBuf);
            channelHandlerContext.writeAndFlush(byteBuf, channelHandlerContext.voidPromise());
            this.outstandingRequests.add(new OutstandingRequest(i, promise));
        } catch (Throwable th) {
            promise.setFailure(th);
        }
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        ByteBuf byteBuf = (ByteBuf) obj;
        if (byteBuf.getByte(0) != -127) {
            handleRequest(channelHandlerContext, byteBuf);
            return;
        }
        OutstandingRequest poll = this.outstandingRequests.poll();
        if (poll != null && MessageUtil.getOpaque(byteBuf) == poll.opaque) {
            poll.promise.setSuccess(new DcpResponse(byteBuf));
        } else {
            LOGGER.error("Unexpected response with opaque {} (expected {}); closing connection", Integer.valueOf(MessageUtil.getOpaque(byteBuf)), poll == null ? "none" : Integer.valueOf(poll.opaque));
            channelHandlerContext.close();
        }
    }

    @Override // com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.deps.io.netty.channel.ChannelInboundHandler
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof IdleStateEvent) || ((IdleStateEvent) obj).state() != IdleState.READER_IDLE) {
            super.userEventTriggered(channelHandlerContext, obj);
        } else {
            LOGGER.warn("Closing dead connection.");
            channelHandlerContext.close();
        }
    }

    private void handleRequest(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        if (isDataMessage(byteBuf)) {
            this.dataEventHandler.onEvent(this.flowController, byteBuf);
            return;
        }
        if (isControlMessage(byteBuf)) {
            this.controlHandler.onEvent(this.flowController, byteBuf);
            return;
        }
        if (!DcpNoopRequest.is(byteBuf)) {
            try {
                LOGGER.warn("Unknown DCP Message, ignoring. \n{}", MessageUtil.humanize(byteBuf));
                byteBuf.release();
            } finally {
            }
        } else {
            try {
                ByteBuf buffer = channelHandlerContext.alloc().buffer();
                DcpNoopResponse.init(buffer);
                MessageUtil.setOpaque(MessageUtil.getOpaque(byteBuf), buffer);
                channelHandlerContext.writeAndFlush(buffer);
                byteBuf.release();
            } finally {
            }
        }
    }

    private static boolean isControlMessage(ByteBuf byteBuf) {
        return DcpStreamEndMessage.is(byteBuf) || DcpSnapshotMarkerRequest.is(byteBuf);
    }

    private static boolean isDataMessage(ByteBuf byteBuf) {
        return DcpMutationMessage.is(byteBuf) || DcpDeletionMessage.is(byteBuf) || DcpExpirationMessage.is(byteBuf);
    }
}
