package com.couchbase.client.dcp.buffer;

import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.dcp.buffer.DcpOps;
import com.couchbase.client.dcp.conductor.ConfigProvider;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import rx.SingleSubscriber;
import rx.Subscription;
import rx.schedulers.Schedulers;

/* loaded from: input_file:com/couchbase/client/dcp/buffer/PersistencePollingHandler.class */
public class PersistencePollingHandler extends ChannelInboundHandlerAdapter {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(PersistencePollingHandler.class);
    private final ClientEnvironment env;
    private final ConfigProvider configProvider;
    private final DcpOps dcpOps;
    private final PersistedSeqnos persistedSeqnos;
    private Subscription configSubscription;
    private int activeGroupId;

    public PersistencePollingHandler(ClientEnvironment clientEnvironment, ConfigProvider configProvider, DcpRequestDispatcher dcpRequestDispatcher) {
        this.env = (ClientEnvironment) Objects.requireNonNull(clientEnvironment);
        this.configProvider = (ConfigProvider) Objects.requireNonNull(configProvider);
        this.persistedSeqnos = (PersistedSeqnos) Objects.requireNonNull(clientEnvironment.persistedSeqnos());
        this.dcpOps = new DcpOpsImpl(dcpRequestDispatcher);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        if (this.configSubscription != null) {
            this.configSubscription.unsubscribe();
        }
        this.activeGroupId++;
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        this.configSubscription = this.configProvider.configs().observeOn(Schedulers.from(channelHandlerContext.executor())).subscribe(couchbaseBucketConfig -> {
            reconfigure(channelHandlerContext, couchbaseBucketConfig);
        });
    }

    private void reconfigure(ChannelHandlerContext channelHandlerContext, CouchbaseBucketConfig couchbaseBucketConfig) {
        LOGGER.debug("Reconfiguring persistence pollers.");
        int i = this.activeGroupId + 1;
        this.activeGroupId = i;
        this.persistedSeqnos.reset(couchbaseBucketConfig);
        LOGGER.debug("Starting persistence polling group {}", Integer.valueOf(i));
        try {
            BucketConfigHelper bucketConfigHelper = new BucketConfigHelper(couchbaseBucketConfig, this.env.sslEnabled());
            for (PartitionInstance partitionInstance : bucketConfigHelper.getAbsentPartitionInstances()) {
                LOGGER.debug("Partition instance {} is absent, will assume all seqnos persisted.", partitionInstance);
                this.persistedSeqnos.markAsAbsent(partitionInstance);
            }
            InetSocketAddress inetSocketAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
            List<PartitionInstance> hostedPartitions = bucketConfigHelper.getHostedPartitions(inetSocketAddress);
            LOGGER.debug("Node {} hosts partitions {}", inetSocketAddress, hostedPartitions);
            for (PartitionInstance partitionInstance2 : hostedPartitions) {
                this.dcpOps.getFailoverLog(partitionInstance2.partition()).subscribe(failoverLogResponse -> {
                    observeAndRepeat(channelHandlerContext, partitionInstance2, failoverLogResponse.getCurrentVbuuid(), i);
                }, th -> {
                    LOGGER.warn("Failed to fetch failover log for {}. Closing channel.", partitionInstance2, th);
                    channelHandlerContext.close();
                });
            }
        } catch (Throwable th2) {
            LOGGER.error("Failed to reconfigure persistence poller; closing channel.", th2);
            channelHandlerContext.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleObserveAndRepeat(ChannelHandlerContext channelHandlerContext, PartitionInstance partitionInstance, long j, int i, int i2) {
        if (i2 < 1) {
            throw new IllegalArgumentException("Interval multiplier must be > 0");
        }
        try {
            channelHandlerContext.executor().schedule(() -> {
                observeAndRepeat(channelHandlerContext, partitionInstance, j, i);
            }, this.env.persistencePollingIntervalMillis() * i2, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            LOGGER.error("Failed to schedule observeSeqno. Closing channel.", th);
            channelHandlerContext.close();
        }
    }

    private void observeAndRepeat(final ChannelHandlerContext channelHandlerContext, final PartitionInstance partitionInstance, final long j, final int i) {
        if (this.env.streamEventBuffer().hasBufferedEvents(partitionInstance.partition())) {
            this.dcpOps.observeSeqno(partitionInstance.partition(), j).subscribe(new SingleSubscriber<ObserveSeqnoResponse>() { // from class: com.couchbase.client.dcp.buffer.PersistencePollingHandler.1
                public void onSuccess(ObserveSeqnoResponse observeSeqnoResponse) {
                    try {
                        if (PersistencePollingHandler.this.activeGroupId != i) {
                            PersistencePollingHandler.LOGGER.debug("Polling group {} is no longer active; stopping polling for ", Integer.valueOf(i), partitionInstance);
                            return;
                        }
                        long vbuuid = observeSeqnoResponse.vbuuid();
                        PersistencePollingHandler.this.env.streamEventBuffer().onSeqnoPersisted(observeSeqnoResponse.vbid(), PersistencePollingHandler.this.persistedSeqnos.update(partitionInstance, vbuuid, observeSeqnoResponse.persistSeqno()));
                        PersistencePollingHandler.this.scheduleObserveAndRepeat(channelHandlerContext, partitionInstance, vbuuid, i, 1);
                    } catch (Throwable th) {
                        PersistencePollingHandler.LOGGER.error("Fatal error. Closing channel.", th);
                        channelHandlerContext.close();
                    }
                }

                /* JADX WARN: Multi-variable type inference failed */
                public void onError(Throwable th) {
                    if (PersistencePollingHandler.this.activeGroupId != i || (th instanceof NotConnectedException)) {
                        PersistencePollingHandler.LOGGER.debug("Polling group {} is no longer active; stopping polling for {}", Integer.valueOf(i), partitionInstance);
                        return;
                    }
                    if (!(th instanceof DcpOps.BadResponseStatusException)) {
                        PersistencePollingHandler.LOGGER.error("observeSeqno failed. Closing channel.", th);
                        channelHandlerContext.close();
                        return;
                    }
                    DcpOps.BadResponseStatusException badResponseStatusException = (DcpOps.BadResponseStatusException) th;
                    if (badResponseStatusException.status().isTemporary()) {
                        PersistencePollingHandler.LOGGER.debug("observeSeqno failed with status code " + badResponseStatusException.status() + " ; will retry after an extended delay.");
                        PersistencePollingHandler.this.scheduleObserveAndRepeat(channelHandlerContext, partitionInstance, j, i, 10);
                    } else {
                        PersistencePollingHandler.LOGGER.error("observeSeqno failed with status code " + badResponseStatusException.status() + " ; Closing channel.");
                        channelHandlerContext.close();
                    }
                }
            });
        } else {
            LOGGER.trace("No buffered events; skipping observeSeqno for partition instance {}", partitionInstance);
            scheduleObserveAndRepeat(channelHandlerContext, partitionInstance, j, i, 1);
        }
    }
}
