package com.couchbase.kafka;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.CouchbaseCore;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.deps.com.lmax.disruptor.EventHandler;
import com.couchbase.client.deps.com.lmax.disruptor.ExceptionHandler;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import com.couchbase.client.deps.com.lmax.disruptor.dsl.Disruptor;
import com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory;
import com.couchbase.kafka.DefaultCouchbaseKafkaEnvironment;
import com.couchbase.kafka.filter.Filter;
import com.couchbase.kafka.state.ConnectorState;
import com.couchbase.kafka.state.StateSerializer;
import com.couchbase.kafka.state.StreamState;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.cluster.Broker;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import scala.collection.Iterator;

/* loaded from: input_file:com/couchbase/kafka/CouchbaseKafkaConnector.class */
public class CouchbaseKafkaConnector implements Runnable {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(CouchbaseKafkaConnector.class);
    private static final DCPEventFactory DCP_EVENT_FACTORY = new DCPEventFactory();
    private final ClusterFacade core;
    private final ExecutorService disruptorExecutor;
    private final Disruptor<DCPEvent> disruptor;
    private final RingBuffer<DCPEvent> dcpRingBuffer;
    private final KafkaWriter kafkaWriter;
    private final Producer<String, DCPEvent> producer;
    private final CouchbaseReader couchbaseReader;
    private final Filter filter;
    private final StateSerializer stateSerializer;
    private final CouchbaseKafkaEnvironment environment;

    private CouchbaseKafkaConnector(List<String> list, String str, String str2, String str3, String str4, CouchbaseKafkaEnvironment couchbaseKafkaEnvironment) {
        try {
            this.filter = (Filter) Class.forName(couchbaseKafkaEnvironment.kafkaFilterClass()).newInstance();
            try {
                this.stateSerializer = (StateSerializer) Class.forName(couchbaseKafkaEnvironment.couchbaseStateSerializerClass()).getDeclaredConstructor(CouchbaseKafkaEnvironment.class).newInstance(couchbaseKafkaEnvironment);
                this.environment = couchbaseKafkaEnvironment;
                this.core = new CouchbaseCore(couchbaseKafkaEnvironment);
                this.disruptorExecutor = Executors.newFixedThreadPool(2, new DefaultThreadFactory("cb-kafka", true));
                this.disruptor = new Disruptor<>(DCP_EVENT_FACTORY, couchbaseKafkaEnvironment.kafkaEventBufferSize(), this.disruptorExecutor);
                this.disruptor.handleExceptionsWith(new ExceptionHandler() { // from class: com.couchbase.kafka.CouchbaseKafkaConnector.1
                    public void handleEventException(Throwable th, long j, Object obj) {
                        CouchbaseKafkaConnector.LOGGER.warn("Exception while Handling DCP Events {}", obj, th);
                    }

                    public void handleOnStartException(Throwable th) {
                        CouchbaseKafkaConnector.LOGGER.warn("Exception while Starting DCP RingBuffer", th);
                    }

                    public void handleOnShutdownException(Throwable th) {
                        CouchbaseKafkaConnector.LOGGER.info("Exception while shutting down DCP RingBuffer", th);
                    }
                });
                Properties properties = new Properties();
                ZkClient zkClient = new ZkClient(str3, 4000, 6000, ZKStringSerializer$.MODULE$);
                ArrayList arrayList = new ArrayList();
                Iterator it = ZkUtils.getAllBrokersInCluster(zkClient).iterator();
                while (it.hasNext()) {
                    Broker broker = (Broker) it.next();
                    arrayList.add(broker.host() + ":" + broker.port());
                }
                properties.put("metadata.broker.list", joinNodes(arrayList));
                properties.put("serializer.class", couchbaseKafkaEnvironment.kafkaValueSerializerClass());
                properties.put("key.serializer.class", couchbaseKafkaEnvironment.kafkaKeySerializerClass());
                this.producer = new Producer<>(new ProducerConfig(properties));
                this.kafkaWriter = new KafkaWriter(str4, couchbaseKafkaEnvironment, this.producer, this.filter);
                this.disruptor.handleEventsWith(new EventHandler[]{this.kafkaWriter});
                this.disruptor.start();
                this.dcpRingBuffer = this.disruptor.getRingBuffer();
                this.couchbaseReader = new CouchbaseReader(list, str, str2, this.core, couchbaseKafkaEnvironment, this.dcpRingBuffer, this.stateSerializer);
                this.couchbaseReader.connect();
            } catch (ClassNotFoundException e) {
                throw new IllegalArgumentException("Cannot initialize serializer class:" + couchbaseKafkaEnvironment.kafkaFilterClass(), e);
            } catch (IllegalAccessException e2) {
                throw new IllegalArgumentException("Cannot initialize serializer class:" + couchbaseKafkaEnvironment.kafkaFilterClass(), e2);
            } catch (InstantiationException e3) {
                throw new IllegalArgumentException("Cannot initialize serializer class:" + couchbaseKafkaEnvironment.kafkaFilterClass(), e3);
            } catch (NoSuchMethodException e4) {
                throw new IllegalArgumentException("Cannot initialize serializer class:" + couchbaseKafkaEnvironment.kafkaFilterClass(), e4);
            } catch (InvocationTargetException e5) {
                throw new IllegalArgumentException("Cannot initialize serializer class:" + couchbaseKafkaEnvironment.kafkaFilterClass(), e5);
            }
        } catch (ClassNotFoundException e6) {
            throw new IllegalArgumentException("Cannot initialize filter class:" + couchbaseKafkaEnvironment.kafkaFilterClass(), e6);
        } catch (IllegalAccessException e7) {
            throw new IllegalArgumentException("Cannot initialize filter class:" + couchbaseKafkaEnvironment.kafkaFilterClass(), e7);
        } catch (InstantiationException e8) {
            throw new IllegalArgumentException("Cannot initialize filter class:" + couchbaseKafkaEnvironment.kafkaFilterClass(), e8);
        }
    }

    public static CouchbaseKafkaConnector create() {
        DefaultCouchbaseKafkaEnvironment.Builder builder = DefaultCouchbaseKafkaEnvironment.builder();
        builder.dcpEnabled(true);
        return create(builder.m5build());
    }

    public static CouchbaseKafkaConnector create(CouchbaseKafkaEnvironment couchbaseKafkaEnvironment) {
        return create(couchbaseKafkaEnvironment.couchbaseNodes(), couchbaseKafkaEnvironment.couchbaseBucket(), couchbaseKafkaEnvironment.couchbasePassword(), couchbaseKafkaEnvironment.kafkaZookeeperAddress(), couchbaseKafkaEnvironment.kafkaTopic(), couchbaseKafkaEnvironment);
    }

    public static CouchbaseKafkaConnector create(List<String> list, String str, String str2, String str3, String str4, CouchbaseKafkaEnvironment couchbaseKafkaEnvironment) {
        return new CouchbaseKafkaConnector(list, str, str2, str3, str4, couchbaseKafkaEnvironment);
    }

    public static CouchbaseKafkaConnector create(String str, String str2, String str3, String str4, String str5) {
        DefaultCouchbaseKafkaEnvironment.Builder builder = DefaultCouchbaseKafkaEnvironment.builder();
        builder.couchbaseNodes(Collections.singletonList(str)).couchbasePassword(str3).couchbaseBucket(str2).kafkaZookeeperAddress(str4).kafkaTopic(str5).dcpEnabled(true);
        return create(builder.m5build());
    }

    public ConnectorState currentState(int... iArr) {
        ConnectorState currentState = this.couchbaseReader.currentState();
        if (iArr.length == 0) {
            return currentState;
        }
        ConnectorState connectorState = new ConnectorState();
        for (int i : iArr) {
            connectorState.put(currentState.get((short) i));
        }
        return connectorState;
    }

    public ConnectorState startState(int... iArr) {
        return overrideSequenceNumber(currentState(iArr), 0L);
    }

    public ConnectorState endState(int... iArr) {
        return overrideSequenceNumber(currentState(iArr), -1L);
    }

    public ConnectorState loadState(int... iArr) {
        return this.stateSerializer.load(startState(iArr));
    }

    @Override // java.lang.Runnable
    public void run() {
        run(startState(new int[0]), endState(new int[0]));
    }

    public void run(ConnectorState connectorState, ConnectorState connectorState2) {
        this.couchbaseReader.run(connectorState, connectorState2);
    }

    private String joinNodes(List<String> list) {
        StringBuilder sb = new StringBuilder();
        boolean z = true;
        for (String str : list) {
            if (z) {
                z = false;
            } else {
                sb.append(",");
            }
            sb.append(str);
        }
        return sb.toString();
    }

    private ConnectorState overrideSequenceNumber(ConnectorState connectorState, long j) {
        ConnectorState connectorState2 = new ConnectorState();
        java.util.Iterator<StreamState> it = connectorState.iterator();
        while (it.hasNext()) {
            StreamState next = it.next();
            connectorState2.put(new StreamState(next.partition(), next.vbucketUUID(), j));
        }
        return connectorState2;
    }
}
