package com.couchbase.connect.kafka;

import com.couchbase.client.core.time.Delay;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.util.retry.RetryBuilder;
import com.couchbase.connect.kafka.util.Version;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSinkTask.class */
public class CouchbaseSinkTask extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSinkTask.class);
    private Map<String, String> configProperties;
    private CouchbaseSinkTaskConfig config;
    private Bucket bucket;
    private CouchbaseCluster cluster;
    private JsonConverter converter;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        try {
            this.configProperties = map;
            this.config = new CouchbaseSinkTaskConfig(this.configProperties);
            List list = this.config.getList(CouchbaseSourceConnectorConfig.CONNECTION_CLUSTER_ADDRESS_CONFIG);
            String string = this.config.getString(CouchbaseSourceConnectorConfig.CONNECTION_BUCKET_CONFIG);
            String username = this.config.getUsername();
            String value = this.config.getPassword(CouchbaseSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG).value();
            boolean booleanValue = this.config.getBoolean(CouchbaseSourceConnectorConfig.CONNECTION_SSL_ENABLED_CONFIG).booleanValue();
            String string2 = this.config.getString(CouchbaseSourceConnectorConfig.CONNECTION_SSL_KEYSTORE_LOCATION_CONFIG);
            this.cluster = CouchbaseCluster.create(DefaultCouchbaseEnvironment.builder().sslEnabled(booleanValue).sslKeystoreFile(string2).sslKeystorePassword(this.config.getPassword(CouchbaseSourceConnectorConfig.CONNECTION_SSL_KEYSTORE_PASSWORD_CONFIG).value()).connectTimeout(this.config.getLong(CouchbaseSourceConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG).longValue()).build(), list);
            this.cluster.authenticate(username, value);
            this.bucket = this.cluster.openBucket(string);
            this.converter = new JsonConverter();
            this.converter.configure(Collections.singletonMap("schemas.enable", false), false);
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start CouchbaseSinkTask due to configuration error", e);
        }
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        SinkRecord next = collection.iterator().next();
        LOGGER.trace("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the Couchbase...", new Object[]{Integer.valueOf(collection.size()), next.topic(), next.kafkaPartition(), Long.valueOf(next.kafkaOffset())});
        Observable.from(collection).flatMap(new Func1<SinkRecord, Observable<Document>>() { // from class: com.couchbase.connect.kafka.CouchbaseSinkTask.1
            public Observable<Document> call(SinkRecord sinkRecord) {
                return CouchbaseSinkTask.this.bucket.async().upsert(CouchbaseSinkTask.this.convert(sinkRecord));
            }
        }).retryWhen(RetryBuilder.anyOf(new Class[]{RuntimeException.class}).delay(Delay.exponential(TimeUnit.SECONDS, 5L)).max(5).build()).toBlocking().last();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Document convert(SinkRecord sinkRecord) {
        String format;
        byte[] bArr;
        Object key = sinkRecord.key();
        Schema keySchema = sinkRecord.keySchema();
        if (key == null || !keySchema.type().isPrimitive()) {
            format = String.format("%s/%d/%d", sinkRecord.topic(), sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()));
        } else if (sinkRecord.keySchema().type() == Schema.Type.BYTES) {
            if (key instanceof ByteBuffer) {
                ByteBuffer slice = ((ByteBuffer) key).slice();
                bArr = new byte[slice.remaining()];
                slice.get(bArr);
            } else {
                bArr = (byte[]) key;
            }
            format = new String(bArr, CharsetUtil.UTF_8);
        } else {
            format = sinkRecord.key().toString();
        }
        return RawJsonDocument.create(format, new String(this.converter.fromConnectData(sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value()), CharsetUtil.UTF_8));
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void stop() {
        this.cluster.disconnect();
    }
}
