/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.event.CouchbaseEvent;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.core.state.NotConnectedException;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.dcp.buffer.DcpBucketConfig;
import com.couchbase.client.dcp.conductor.ConfigProvider;
import com.couchbase.client.dcp.conductor.DcpChannel;
import com.couchbase.client.dcp.conductor.HttpStreamingConfigProvider;
import com.couchbase.client.dcp.conductor.NotMyVbucketException;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.events.FailedToAddNodeEvent;
import com.couchbase.client.dcp.events.FailedToMovePartitionEvent;
import com.couchbase.client.dcp.events.FailedToRemoveNodeEvent;
import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.dcp.util.retry.RetryBuilder;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.util.internal.ConcurrentSet;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
import rx.Single;
import rx.Subscription;
import rx.functions.Action4;
import rx.functions.Func1;

public class Conductor {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Conductor.class);
    private final ConfigProvider configProvider;
    private final Set<DcpChannel> channels = new ConcurrentSet();
    private volatile boolean stopped = true;
    private final ClientEnvironment env;
    private final AtomicReference<DcpBucketConfig> currentConfig = new AtomicReference();
    private final boolean ownsConfigProvider;
    private final SessionState sessionState = new SessionState();

    public Conductor(ClientEnvironment env, ConfigProvider cp) {
        this.env = env;
        this.configProvider = cp == null ? new HttpStreamingConfigProvider(env) : cp;
        this.ownsConfigProvider = cp == null;
        this.configProvider.configs().forEach(config -> {
            LOGGER.trace("Applying new configuration, new rev is {}.", (Object)config.rev());
            this.currentConfig.set((DcpBucketConfig)config);
            this.reconfigure((DcpBucketConfig)config);
        });
    }

    public SessionState sessionState() {
        return this.sessionState;
    }

    public Completable connect() {
        this.stopped = false;
        Completable atLeastOneConfig = this.configProvider.configs().filter(config -> config.numberOfPartitions() != 0).first().toCompletable().timeout(this.env.bootstrapTimeout(), TimeUnit.SECONDS).doOnError(throwable -> LOGGER.warn("Did not receive initial configuration from provider."));
        return this.configProvider.start().timeout(this.env.connectTimeout(), TimeUnit.SECONDS).doOnError(throwable -> LOGGER.warn("Cannot connect configuration provider.")).concatWith(atLeastOneConfig);
    }

    ConfigProvider configProvider() {
        return this.configProvider;
    }

    public boolean disconnected() {
        if (!this.configProvider.isState((Enum)LifecycleState.DISCONNECTED)) {
            return false;
        }
        for (DcpChannel channel : this.channels) {
            if (channel.isState((Enum)LifecycleState.DISCONNECTED)) continue;
            return false;
        }
        return true;
    }

    public Completable stop() {
        LOGGER.debug("Instructed to shutdown.");
        this.stopped = true;
        Completable channelShutdown = Observable.from(this.channels).flatMapCompletable(DcpChannel::disconnect).toCompletable();
        if (this.ownsConfigProvider) {
            channelShutdown = channelShutdown.andThen(this.configProvider.stop());
        }
        return channelShutdown.doOnCompleted(() -> LOGGER.info("Shutdown complete."));
    }

    public int numberOfPartitions() {
        return this.currentConfig.get().numberOfPartitions();
    }

    public Observable<ByteBuf> getSeqnos() {
        return Observable.from(this.channels).flatMap(this::getSeqnosForChannel);
    }

    private Observable<ByteBuf> getSeqnosForChannel(DcpChannel channel) {
        return Observable.just((Object)((Object)channel)).flatMapSingle(DcpChannel::getSeqnos).retryWhen((Func1)RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed((long)200L, (TimeUnit)TimeUnit.MILLISECONDS)).doOnRetry((Action4<Integer, Throwable, Long, TimeUnit>)((Action4)(retry, cause, delay, delayUnit) -> LOGGER.debug("Rescheduling get Seqnos for channel {}, not connected (yet).", (Object)channel))).build());
    }

    public Single<ByteBuf> getFailoverLog(short partition) {
        return Observable.just((Object)partition).map(ignored -> this.masterChannelByPartition(partition)).flatMapSingle(channel -> channel.getFailoverLog(partition)).retryWhen((Func1)RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed((long)200L, (TimeUnit)TimeUnit.MILLISECONDS)).doOnRetry((Action4<Integer, Throwable, Long, TimeUnit>)((Action4)(retry, cause, delay, delayUnit) -> LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", (Object)partition))).build()).toSingle();
    }

    public Completable startStreamForPartition(short partition, long vbuuid, long startSeqno, long endSeqno, long snapshotStartSeqno, long snapshotEndSeqno) {
        return Observable.just((Object)partition).map(ignored -> this.masterChannelByPartition(partition)).flatMapCompletable(channel -> channel.openStream(partition, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno)).retryWhen((Func1)RetryBuilder.anyOf(NotConnectedException.class).max(Integer.MAX_VALUE).delay(Delay.fixed((long)200L, (TimeUnit)TimeUnit.MILLISECONDS)).doOnRetry((Action4<Integer, Throwable, Long, TimeUnit>)((Action4)(retry, cause, delay, delayUnit) -> LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", (Object)partition))).build()).toCompletable();
    }

    public Completable stopStreamForPartition(short partition) {
        if (this.streamIsOpen(partition)) {
            DcpChannel channel = this.masterChannelByPartition(partition);
            return channel.closeStream(partition);
        }
        return Completable.complete();
    }

    public boolean streamIsOpen(short partition) {
        DcpChannel channel = this.masterChannelByPartition(partition);
        return channel.streamIsOpen(partition);
    }

    private DcpChannel masterChannelByPartition(short partition) {
        InetSocketAddress address = this.currentConfig.get().getMasterNodeKvAddress(partition).toAddress();
        for (DcpChannel ch : this.channels) {
            if (!ch.address().equals(address)) continue;
            return ch;
        }
        throw new IllegalStateException("No DcpChannel found for partition " + partition);
    }

    private void reconfigure(DcpBucketConfig configHelper) {
        boolean onlyConnectToPrimaryPartition = !this.env.persistencePollingEnabled();
        List<NodeInfo> nodes = configHelper.getDataNodes(onlyConnectToPrimaryPartition);
        Map<InetSocketAddress, DcpChannel> existingChannelsByAddress = this.channels.stream().collect(Collectors.toMap(DcpChannel::address, c -> c));
        Set nodeAddresses = nodes.stream().map(configHelper::getAddress).collect(Collectors.toSet());
        for (InetSocketAddress inetSocketAddress : nodeAddresses) {
            if (existingChannelsByAddress.containsKey(inetSocketAddress)) continue;
            this.add(inetSocketAddress);
        }
        for (Map.Entry entry : existingChannelsByAddress.entrySet()) {
            if (nodeAddresses.contains(entry.getKey())) continue;
            this.remove((DcpChannel)((Object)entry.getValue()));
        }
    }

    private void add(final InetSocketAddress node) {
        LOGGER.debug("Adding DCP Channel against {}", (Object)node);
        DcpChannel channel = new DcpChannel(node, this.env, this);
        if (!this.channels.add(channel)) {
            throw new IllegalStateException("Tried to add duplicate channel: " + RedactableArgument.system((Object)((Object)channel)));
        }
        channel.connect().retryWhen((Func1)RetryBuilder.anyMatches((Func1<Throwable, Boolean>)((Func1)t -> !this.stopped)).max(this.env.dcpChannelsReconnectMaxAttempts()).delay(this.env.dcpChannelsReconnectDelay()).doOnRetry((Action4<Integer, Throwable, Long, TimeUnit>)((Action4)(retry, cause, delay, delayUnit) -> LOGGER.debug("Rescheduling Node reconnect for DCP channel {}", (Object)node))).build()).subscribe(new CompletableSubscriber(){

            public void onCompleted() {
                LOGGER.debug("Completed Node connect for DCP channel {}", (Object)node);
            }

            public void onError(Throwable e) {
                LOGGER.warn("Got error during connect (maybe retried) for node {}", (Object)RedactableArgument.system((Object)node), (Object)e);
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish((CouchbaseEvent)new FailedToAddNodeEvent(node, e));
                }
            }

            public void onSubscribe(Subscription d) {
            }
        });
    }

    private void remove(final DcpChannel node) {
        if (!this.channels.remove((Object)node)) {
            throw new IllegalStateException("Tried to remove unknown channel: " + RedactableArgument.system((Object)((Object)node)));
        }
        LOGGER.debug("Removing DCP Channel against {}", (Object)node);
        for (short partition = 0; partition < node.streamIsOpen.length(); partition = (short)(partition + 1)) {
            if (!node.streamIsOpen(partition)) continue;
            this.maybeMovePartition(partition);
        }
        node.disconnect().subscribe(new CompletableSubscriber(){

            public void onCompleted() {
                LOGGER.debug("Channel remove notified as complete for {}", (Object)node.address());
            }

            public void onError(Throwable e) {
                LOGGER.warn("Got error during Node removal for node {}", (Object)RedactableArgument.system((Object)node.address()), (Object)e);
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish((CouchbaseEvent)new FailedToRemoveNodeEvent(node.address(), e));
                }
            }

            public void onSubscribe(Subscription d) {
            }
        });
    }

    void maybeMovePartition(final short partition) {
        Observable.timer((long)50L, (TimeUnit)TimeUnit.MILLISECONDS).filter(ignored -> {
            PartitionState ps = this.sessionState.get(partition);
            boolean desiredSeqnoReached = ps.isAtEnd();
            if (desiredSeqnoReached) {
                LOGGER.debug("Reached desired high seqno {} for vbucket {}, not reopening stream.", (Object)ps.getEndSeqno(), (Object)partition);
            }
            return !desiredSeqnoReached;
        }).flatMapCompletable(ignored -> {
            PartitionState ps = this.sessionState.get(partition);
            return this.startStreamForPartition(partition, ps.getLastUuid(), ps.getStartSeqno(), ps.getEndSeqno(), ps.getSnapshotStartSeqno(), ps.getSnapshotEndSeqno()).retryWhen((Func1)RetryBuilder.anyOf(NotMyVbucketException.class).max(Integer.MAX_VALUE).delay(Delay.fixed((long)200L, (TimeUnit)TimeUnit.MILLISECONDS)).build());
        }).toCompletable().subscribe(new CompletableSubscriber(){

            public void onCompleted() {
                LOGGER.trace("Completed Partition Move for partition {}", (Object)partition);
            }

            public void onError(Throwable e) {
                if (e instanceof RollbackException) {
                    LOGGER.warn("Rollback during Partition Move for partition {}", (Object)partition);
                } else {
                    LOGGER.warn("Error during Partition Move for partition {}", (Object)partition, (Object)e);
                }
                if (Conductor.this.env.eventBus() != null) {
                    Conductor.this.env.eventBus().publish((CouchbaseEvent)new FailedToMovePartitionEvent(partition, e));
                }
            }

            public void onSubscribe(Subscription d) {
                LOGGER.debug("Subscribing for Partition Move for partition {}", (Object)partition);
            }
        });
    }
}

