/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.transport.netty;

import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.message.DcpBufferAckRequest;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSetVbucketStateMessage;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.Channel;

public class ChannelFlowControllerImpl
implements ChannelFlowController {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(ChannelFlowControllerImpl.class);
    private final Channel channel;
    private final boolean needsBufferAck;
    private final int bufferAckWatermark;
    private int bufferAckCounter;

    public ChannelFlowControllerImpl(Channel channel, ClientEnvironment environment) {
        this.channel = channel;
        this.needsBufferAck = environment.dcpControl().bufferAckEnabled();
        if (this.needsBufferAck) {
            int bufferAckPercent = environment.bufferAckWatermark();
            int bufferSize = Integer.parseInt(environment.dcpControl().get(DcpControl.Names.CONNECTION_BUFFER_SIZE));
            this.bufferAckWatermark = (int)Math.round((double)bufferSize / 100.0 * (double)bufferAckPercent);
            LOGGER.debug("BufferAckWatermark absolute is {}", (Object)this.bufferAckWatermark);
        } else {
            this.bufferAckWatermark = 0;
        }
        this.bufferAckCounter = 0;
    }

    @Override
    public void ack(ByteBuf message) {
        if (this.needsBufferAck && (DcpSetVbucketStateMessage.is(message) || DcpSnapshotMarkerRequest.is(message) || DcpStreamEndMessage.is(message) || DcpMutationMessage.is(message) || DcpDeletionMessage.is(message) || DcpExpirationMessage.is(message))) {
            this.ack(message.readableBytes());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void ack(int numBytes) {
        if (this.needsBufferAck) {
            ChannelFlowControllerImpl channelFlowControllerImpl = this;
            synchronized (channelFlowControllerImpl) {
                this.bufferAckCounter += numBytes;
                LOGGER.trace("BufferAckCounter is now {}", (Object)this.bufferAckCounter);
                if (this.bufferAckCounter >= this.bufferAckWatermark) {
                    if (this.channel.isActive()) {
                        LOGGER.trace("BufferAckWatermark reached on {}, acking now against the server.", (Object)this.channel.remoteAddress());
                        ByteBuf buffer = this.channel.alloc().buffer();
                        DcpBufferAckRequest.init(buffer);
                        DcpBufferAckRequest.ackBytes(buffer, this.bufferAckCounter);
                        this.channel.writeAndFlush((Object)buffer);
                    } else {
                        LOGGER.trace("Skipping flow control ACK because channel is no longer active.");
                    }
                    this.bufferAckCounter = 0;
                }
                LOGGER.trace("Acknowledging {} bytes against connection {}.", (Object)numBytes, (Object)this.channel.remoteAddress());
            }
        }
    }
}

