package com.couchbase.connect.kafka;

import com.couchbase.client.dcp.state.PartitionState;
import com.couchbase.client.dcp.state.SessionState;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.connect.kafka.converter.Converter;
import com.couchbase.connect.kafka.dcp.Event;
import com.couchbase.connect.kafka.dcp.Snapshot;
import com.couchbase.connect.kafka.filter.Filter;
import com.couchbase.connect.kafka.util.Version;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSourceTask.class */
public class CouchbaseSourceTask extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSourceTask.class);
    private static final long MAX_TIMEOUT = 10000;
    private CouchbaseSourceConnectorConfig config;
    private Map<String, String> configProperties;
    private CouchbaseReader couchbaseReader;
    private BlockingQueue<Event> queue;
    private String topic;
    private String bucket;
    private volatile boolean running;
    private Filter filter;
    private Converter converter;
    private int batchSizeMax;

    private static byte[] bufToBytes(ByteBuf byteBuf) {
        byte[] bArr = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bArr);
        return bArr;
    }

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        try {
            this.configProperties = map;
            this.config = new CouchbaseSourceTaskConfig(this.configProperties);
            this.filter = createFilter(this.config.getString(CouchbaseSourceConnectorConfig.EVENT_FILTER_CLASS_CONFIG));
            this.converter = createConverter(this.config.getString(CouchbaseSourceConnectorConfig.DCP_MESSAGE_CONVERTER_CLASS_CONFIG));
            this.topic = this.config.getString(CouchbaseSourceConnectorConfig.TOPIC_NAME_CONFIG);
            this.bucket = this.config.getString(CouchbaseSourceConnectorConfig.CONNECTION_BUCKET_CONFIG);
            String username = this.config.getUsername();
            String value = this.config.getPassword(CouchbaseSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG).value();
            List list = this.config.getList(CouchbaseSourceConnectorConfig.CONNECTION_CLUSTER_ADDRESS_CONFIG);
            boolean booleanValue = this.config.getBoolean(CouchbaseSourceConnectorConfig.USE_SNAPSHOTS_CONFIG).booleanValue();
            boolean booleanValue2 = this.config.getBoolean(CouchbaseSourceConnectorConfig.CONNECTION_SSL_ENABLED_CONFIG).booleanValue();
            String string = this.config.getString(CouchbaseSourceConnectorConfig.CONNECTION_SSL_KEYSTORE_LOCATION_CONFIG);
            String value2 = this.config.getPassword(CouchbaseSourceConnectorConfig.CONNECTION_SSL_KEYSTORE_PASSWORD_CONFIG).value();
            this.batchSizeMax = this.config.getInt(CouchbaseSourceConnectorConfig.BATCH_SIZE_MAX_CONFIG).intValue();
            long longValue = this.config.getLong(CouchbaseSourceConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG).longValue();
            List list2 = this.config.getList(CouchbaseSourceTaskConfig.PARTITIONS_CONFIG);
            Short[] shArr = new Short[list2.size()];
            ArrayList<Map> arrayList = new ArrayList(1);
            for (int i = 0; i < list2.size(); i++) {
                shArr[i] = Short.valueOf(Short.parseShort((String) list2.get(i)));
                HashMap hashMap = new HashMap(2);
                hashMap.put("bucket", this.bucket);
                hashMap.put("partition", shArr[i].toString());
                arrayList.add(hashMap);
            }
            Map offsets = this.context.offsetStorageReader().offsets(arrayList);
            SessionState sessionState = new SessionState();
            for (Map map2 : arrayList) {
                Map map3 = (Map) offsets.get(map2);
                Short valueOf = Short.valueOf(Short.parseShort((String) map2.get("partition")));
                PartitionState partitionState = new PartitionState();
                long j = 0;
                if (map3 != null && map3.containsKey("bySeqno")) {
                    j = ((Long) map3.get("bySeqno")).longValue();
                }
                partitionState.setStartSeqno(j);
                partitionState.setEndSeqno(-1L);
                partitionState.setSnapshotStartSeqno(j);
                partitionState.setSnapshotEndSeqno(j);
                sessionState.set(valueOf.shortValue(), partitionState);
            }
            this.running = true;
            this.queue = new LinkedBlockingQueue();
            this.couchbaseReader = new CouchbaseReader(list, this.bucket, username, value, longValue, this.queue, shArr, sessionState, booleanValue, booleanValue2, string, value2);
            this.couchbaseReader.start();
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start CouchbaseSourceTask due to configuration error", e);
        }
    }

    private Converter createConverter(String str) {
        try {
            return (Converter) Utils.newInstance(str, Converter.class);
        } catch (ClassNotFoundException e) {
            throw new ConnectException("Couldn't create message converter", e);
        }
    }

    private Filter createFilter(String str) {
        if (str == null || "".equals(str)) {
            return null;
        }
        try {
            return (Filter) Utils.newInstance(str, Filter.class);
        } catch (ClassNotFoundException e) {
            throw new ConnectException("Couldn't create filter in CouchbaseSourceTask due to an error", e);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        LinkedList linkedList = new LinkedList();
        int i = this.batchSizeMax;
        while (this.running) {
            Event poll = this.queue.poll(100L, TimeUnit.MILLISECONDS);
            if (poll != null) {
                for (ByteBuf byteBuf : poll) {
                    if (this.filter == null || this.filter.pass(byteBuf)) {
                        SourceRecord convert = convert(byteBuf);
                        if (convert != null) {
                            linkedList.add(convert);
                        }
                    }
                }
                poll.ack();
                Iterator<ByteBuf> it = poll.iterator();
                while (it.hasNext()) {
                    it.next().release();
                }
                i--;
            }
            if (!linkedList.isEmpty() && (i == 0 || poll == null || (poll instanceof Snapshot))) {
                LOGGER.info("Poll returns {} result(s)", Integer.valueOf(linkedList.size()));
                return linkedList;
            }
        }
        return linkedList;
    }

    public SourceRecord convert(ByteBuf byteBuf) {
        return this.converter.convert(byteBuf, this.bucket, this.topic);
    }

    public void stop() {
        this.running = false;
        this.couchbaseReader.shutdown();
        try {
            this.couchbaseReader.join(10000L);
        } catch (InterruptedException e) {
        }
    }
}
