package com.couchbase.kafka;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.endpoint.dcp.DCPConnection;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseMessage;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.cluster.OpenBucketRequest;
import com.couchbase.client.core.message.cluster.OpenBucketResponse;
import com.couchbase.client.core.message.cluster.SeedNodesRequest;
import com.couchbase.client.core.message.cluster.SeedNodesResponse;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.RemoveMessage;
import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
import com.couchbase.client.core.message.kv.MutationToken;
import com.couchbase.client.deps.com.lmax.disruptor.EventTranslatorTwoArg;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import com.couchbase.kafka.state.ConnectorState;
import com.couchbase.kafka.state.StateSerializer;
import com.couchbase.kafka.state.StreamState;
import com.couchbase.kafka.state.StreamStateUpdatedEvent;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/kafka/CouchbaseReader.class */
public class CouchbaseReader {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(CouchbaseReader.class);
    private static final EventTranslatorTwoArg<DCPEvent, DCPConnection, CouchbaseMessage> TRANSLATOR = new EventTranslatorTwoArg<DCPEvent, DCPConnection, CouchbaseMessage>() { // from class: com.couchbase.kafka.CouchbaseReader.1
        public void translateTo(DCPEvent dCPEvent, long j, DCPConnection dCPConnection, CouchbaseMessage couchbaseMessage) {
            dCPEvent.setMessage(couchbaseMessage);
            dCPEvent.setConnection(dCPConnection);
        }
    };
    private final ClusterFacade core;
    private final RingBuffer<DCPEvent> dcpRingBuffer;
    private final List<String> nodes;
    private final String bucket;
    private final String password;
    private final StateSerializer stateSerializer;
    private final String connectionName;
    private final CouchbaseKafkaEnvironment environment;
    private DCPConnection connection;

    public CouchbaseReader(ClusterFacade clusterFacade, CouchbaseKafkaEnvironment couchbaseKafkaEnvironment, RingBuffer<DCPEvent> ringBuffer, StateSerializer stateSerializer) {
        this(couchbaseKafkaEnvironment.couchbaseNodes(), couchbaseKafkaEnvironment.couchbaseBucket(), couchbaseKafkaEnvironment.couchbasePassword(), clusterFacade, couchbaseKafkaEnvironment, ringBuffer, stateSerializer);
    }

    public CouchbaseReader(List<String> list, String str, String str2, ClusterFacade clusterFacade, CouchbaseKafkaEnvironment couchbaseKafkaEnvironment, RingBuffer<DCPEvent> ringBuffer, StateSerializer stateSerializer) {
        this.core = clusterFacade;
        this.dcpRingBuffer = ringBuffer;
        this.nodes = list;
        this.bucket = str;
        this.password = str2;
        this.stateSerializer = stateSerializer;
        this.connectionName = "CouchbaseKafka(" + hashCode() + ")";
        this.environment = couchbaseKafkaEnvironment;
    }

    public void connect() {
        connect(2L, TimeUnit.SECONDS);
    }

    public void connect(long j, TimeUnit timeUnit) {
        this.connection = ((OpenConnectionResponse) this.core.send(new SeedNodesRequest(this.nodes)).flatMap(new Func1<SeedNodesResponse, Observable<OpenBucketResponse>>() { // from class: com.couchbase.kafka.CouchbaseReader.3
            public Observable<OpenBucketResponse> call(SeedNodesResponse seedNodesResponse) {
                return CouchbaseReader.this.core.send(new OpenBucketRequest(CouchbaseReader.this.bucket, CouchbaseReader.this.password));
            }
        }).flatMap(new Func1<OpenBucketResponse, Observable<OpenConnectionResponse>>() { // from class: com.couchbase.kafka.CouchbaseReader.2
            public Observable<OpenConnectionResponse> call(OpenBucketResponse openBucketResponse) {
                return CouchbaseReader.this.core.send(new OpenConnectionRequest(CouchbaseReader.this.connectionName, CouchbaseReader.this.bucket));
            }
        }).timeout(j, timeUnit).toBlocking().single()).connection();
    }

    public ConnectorState currentState() {
        return (ConnectorState) this.connection.getCurrentState().collect(new Func0<ConnectorState>() { // from class: com.couchbase.kafka.CouchbaseReader.4
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public ConnectorState m2call() {
                return new ConnectorState();
            }
        }, new Action2<ConnectorState, MutationToken>() { // from class: com.couchbase.kafka.CouchbaseReader.5
            public void call(ConnectorState connectorState, MutationToken mutationToken) {
                connectorState.put(new StreamState(mutationToken));
            }
        }).toBlocking().single();
    }

    public void run(ConnectorState connectorState, final ConnectorState connectorState2) {
        if (!Arrays.equals(connectorState.partitions(), connectorState2.partitions())) {
            throw new IllegalArgumentException("partitions in FROM state do not match partitions in TO state");
        }
        final ConnectorState m8clone = connectorState.m8clone();
        m8clone.updates().subscribe(new Action1<StreamStateUpdatedEvent>() { // from class: com.couchbase.kafka.CouchbaseReader.6
            public void call(StreamStateUpdatedEvent streamStateUpdatedEvent) {
                CouchbaseReader.this.stateSerializer.dump(streamStateUpdatedEvent.connectorState(), streamStateUpdatedEvent.partition());
            }
        });
        Observable.from(connectorState).flatMap(new Func1<StreamState, Observable<ResponseStatus>>() { // from class: com.couchbase.kafka.CouchbaseReader.9
            public Observable<ResponseStatus> call(StreamState streamState) {
                StreamState streamState2 = connectorState2.get(streamState.partition());
                return CouchbaseReader.this.connection.addStream(streamState.partition(), streamState.vbucketUUID(), streamState.sequenceNumber(), streamState2.sequenceNumber(), streamState.sequenceNumber(), streamState2.sequenceNumber());
            }
        }).toList().flatMap(new Func1<List<ResponseStatus>, Observable<DCPRequest>>() { // from class: com.couchbase.kafka.CouchbaseReader.8
            public Observable<DCPRequest> call(List<ResponseStatus> list) {
                return CouchbaseReader.this.connection.subject();
            }
        }).onBackpressureBuffer(this.environment.kafkaEventBufferSize()).toBlocking().forEach(new Action1<DCPRequest>() { // from class: com.couchbase.kafka.CouchbaseReader.7
            public void call(DCPRequest dCPRequest) {
                if (dCPRequest instanceof SnapshotMarkerMessage) {
                    SnapshotMarkerMessage snapshotMarkerMessage = (SnapshotMarkerMessage) dCPRequest;
                    m8clone.update(snapshotMarkerMessage.partition(), snapshotMarkerMessage.endSequenceNumber());
                } else if (dCPRequest instanceof RemoveMessage) {
                    RemoveMessage removeMessage = (RemoveMessage) dCPRequest;
                    m8clone.update(removeMessage.partition(), removeMessage.bySequenceNumber());
                } else if (dCPRequest instanceof MutationMessage) {
                    MutationMessage mutationMessage = (MutationMessage) dCPRequest;
                    m8clone.update(mutationMessage.partition(), mutationMessage.bySequenceNumber());
                }
                CouchbaseReader.this.dcpRingBuffer.publishEvent(CouchbaseReader.TRANSLATOR, CouchbaseReader.this.connection, dCPRequest);
            }
        });
    }
}
