/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.buffer;

import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.buffer.DcpOps;
import com.couchbase.client.dcp.buffer.DcpOpsImpl;
import com.couchbase.client.dcp.buffer.DcpRequestDispatcher;
import com.couchbase.client.dcp.buffer.ObserveSeqnoResponse;
import com.couchbase.client.dcp.buffer.PartitionInstance;
import com.couchbase.client.dcp.buffer.PersistedSeqnos;
import com.couchbase.client.dcp.conductor.ConfigProvider;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import rx.SingleSubscriber;
import rx.Subscription;
import rx.schedulers.Schedulers;

public class PersistencePollingHandler
extends ChannelInboundHandlerAdapter {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(PersistencePollingHandler.class);
    private final ClientEnvironment env;
    private final ConfigProvider configProvider;
    private final DcpOps dcpOps;
    private final PersistedSeqnos persistedSeqnos;
    private final AtomicBoolean loggedClosureWarning = new AtomicBoolean();
    private Subscription configSubscription;
    private int activeGroupId;

    public PersistencePollingHandler(ClientEnvironment env, ConfigProvider configProvider, DcpRequestDispatcher dispatcher) {
        this.env = Objects.requireNonNull(env);
        this.configProvider = Objects.requireNonNull(configProvider);
        this.persistedSeqnos = Objects.requireNonNull(env.persistedSeqnos());
        this.dcpOps = new DcpOpsImpl(dispatcher);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        if (this.configSubscription != null) {
            this.configSubscription.unsubscribe();
        }
        ++this.activeGroupId;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.configSubscription = this.configProvider.configs().observeOn(Schedulers.from((Executor)ctx.executor())).subscribe(bucketConfig -> this.reconfigure(ctx, (DcpBucketConfig)bucketConfig));
    }

    private void reconfigure(ChannelHandlerContext ctx, DcpBucketConfig bucketConfig) {
        LOGGER.debug("Reconfiguring persistence pollers.");
        int groupId = ++this.activeGroupId;
        this.persistedSeqnos.reset(bucketConfig);
        LOGGER.debug("Starting persistence polling group {}", (Object)groupId);
        try {
            for (PartitionInstance absentInstance : bucketConfig.getAbsentPartitionInstances()) {
                LOGGER.debug("Partition instance {} is absent, will assume all seqnos persisted.", (Object)absentInstance);
                this.persistedSeqnos.markAsAbsent(absentInstance);
            }
            InetSocketAddress nodeAddress = (InetSocketAddress)ctx.channel().remoteAddress();
            List<PartitionInstance> partitions = bucketConfig.getHostedPartitions(nodeAddress);
            LOGGER.debug("Node {} hosts partitions {}", (Object)nodeAddress, partitions);
            Iterator<PartitionInstance> iterator = partitions.iterator();
            while (iterator.hasNext()) {
                PartitionInstance partitionInstance;
                PartitionInstance pas = partitionInstance = iterator.next();
                this.dcpOps.getFailoverLog(partitionInstance.partition()).subscribe(failoverLog -> {
                    long vbuuid = failoverLog.getCurrentVbuuid();
                    this.observeAndRepeat(ctx, pas, vbuuid, groupId);
                }, throwable -> this.logWarningAndClose(ctx, "Failed to fetch failover log for {}.", pas, throwable));
            }
        }
        catch (Throwable t) {
            this.logWarningAndClose(ctx, "Failed to reconfigure persistence poller.", t);
        }
    }

    private void scheduleObserveAndRepeat(ChannelHandlerContext ctx, PartitionInstance partitionInstance, long vbuuid, int groupId, int intervalMultiplier) {
        if (intervalMultiplier < 1) {
            throw new IllegalArgumentException("Interval multiplier must be > 0");
        }
        try {
            ctx.executor().schedule(() -> this.observeAndRepeat(ctx, partitionInstance, vbuuid, groupId), this.env.persistencePollingIntervalMillis() * (long)intervalMultiplier, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t) {
            this.logWarningAndClose(ctx, "Failed to schedule observeSeqno.", t);
        }
    }

    private void observeAndRepeat(final ChannelHandlerContext ctx, final PartitionInstance partitionInstance, final long vbuuid, final int groupId) {
        if (!this.env.streamEventBuffer().hasBufferedEvents(partitionInstance.partition())) {
            LOGGER.trace("No buffered events; skipping observeSeqno for partition instance {}", (Object)partitionInstance);
            this.scheduleObserveAndRepeat(ctx, partitionInstance, vbuuid, groupId, 1);
            return;
        }
        this.dcpOps.observeSeqno(partitionInstance.partition(), vbuuid).subscribe((SingleSubscriber)new SingleSubscriber<ObserveSeqnoResponse>(){

            public void onSuccess(ObserveSeqnoResponse observeSeqnoResponse) {
                try {
                    if (PersistencePollingHandler.this.activeGroupId != groupId) {
                        LOGGER.debug("Polling group {} is no longer active; stopping polling for ", (Object)groupId, (Object)partitionInstance);
                        return;
                    }
                    long newVbuuid = observeSeqnoResponse.vbuuid();
                    long minSeqnoPersistedEverywhere = PersistencePollingHandler.this.persistedSeqnos.update(partitionInstance, newVbuuid, observeSeqnoResponse.persistSeqno());
                    PersistencePollingHandler.this.env.streamEventBuffer().onSeqnoPersisted(observeSeqnoResponse.vbid(), minSeqnoPersistedEverywhere);
                    PersistencePollingHandler.this.scheduleObserveAndRepeat(ctx, partitionInstance, newVbuuid, groupId, 1);
                }
                catch (Throwable t) {
                    PersistencePollingHandler.this.logWarningAndClose(ctx, "Fatal error in observeAndRepeat handling observeSeqno response.", new Object[]{t});
                }
            }

            public void onError(Throwable t) {
                if (PersistencePollingHandler.this.activeGroupId != groupId || t instanceof NotConnectedException) {
                    LOGGER.debug("Polling group {} is no longer active; stopping polling for {}", (Object)groupId, (Object)partitionInstance);
                    return;
                }
                if (t instanceof DcpOps.BadResponseStatusException) {
                    DcpOps.BadResponseStatusException e = (DcpOps.BadResponseStatusException)((Object)t);
                    if (e.status().isTemporary()) {
                        LOGGER.debug("observeSeqno failed with status code " + e.status() + " ; will retry after an extended delay.");
                        PersistencePollingHandler.this.scheduleObserveAndRepeat(ctx, partitionInstance, vbuuid, groupId, 10);
                    } else {
                        PersistencePollingHandler.this.logWarningAndClose(ctx, "observeSeqno failed with status code " + e.status(), new Object[0]);
                    }
                } else {
                    PersistencePollingHandler.this.logWarningAndClose(ctx, "observeSeqno failed.", new Object[]{t});
                }
            }
        });
    }

    private void logWarningAndClose(ChannelHandlerContext ctx, String msg, Object ... params) {
        if (this.loggedClosureWarning.compareAndSet(false, true)) {
            LOGGER.warn("Closing channel; " + msg, params);
            ctx.close();
        } else {
            LOGGER.trace("Closing channel; " + msg, params);
        }
    }
}

