package com.couchbase.connect.kafka.handler.source;

import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.connect.kafka.converter.ConverterUtils;
import com.couchbase.connect.kafka.dcp.EventType;
import com.couchbase.connect.kafka.handler.source.CouchbaseSourceRecord;
import com.couchbase.connect.kafka.util.Schemas;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/connect/kafka/handler/source/DefaultSchemaSourceHandler.class */
public class DefaultSchemaSourceHandler extends SourceHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSchemaSourceHandler.class);

    @Override // com.couchbase.connect.kafka.handler.source.SourceHandler
    public CouchbaseSourceRecord handle(SourceHandlerParams sourceHandlerParams) {
        CouchbaseSourceRecord.Builder builder = CouchbaseSourceRecord.builder();
        builder.topic(sourceHandlerParams.topic());
        buildKey(sourceHandlerParams, builder);
        if (buildValue(sourceHandlerParams, builder)) {
            return builder.build();
        }
        return null;
    }

    protected void buildKey(SourceHandlerParams sourceHandlerParams, CouchbaseSourceRecord.Builder builder) {
        builder.key(Schemas.KEY_SCHEMA, sourceHandlerParams.documentEvent().key());
    }

    protected boolean buildValue(SourceHandlerParams sourceHandlerParams, CouchbaseSourceRecord.Builder builder) {
        DocumentEvent documentEvent = sourceHandlerParams.documentEvent();
        ByteBuf rawDcpEvent = documentEvent.rawDcpEvent();
        EventType of = EventType.of(rawDcpEvent);
        if (of == null) {
            LOGGER.warn("unexpected event type {}", Byte.valueOf(rawDcpEvent.getByte(1)));
            return false;
        }
        Struct struct = new Struct(Schemas.VALUE_DEFAULT_SCHEMA);
        struct.put("bucket", documentEvent.bucket());
        struct.put("partition", Short.valueOf(documentEvent.vBucket()));
        struct.put("vBucketUuid", Long.valueOf(documentEvent.vBucketUuid()));
        struct.put("key", documentEvent.key());
        struct.put("cas", Long.valueOf(documentEvent.cas()));
        struct.put("bySeqno", Long.valueOf(documentEvent.bySeqno()));
        struct.put("revSeqno", Long.valueOf(documentEvent.revisionSeqno()));
        if (of == EventType.MUTATION) {
            struct.put("event", "mutation");
            struct.put("expiration", Integer.valueOf(DcpMutationMessage.expiry(rawDcpEvent)));
            struct.put("flags", Integer.valueOf(DcpMutationMessage.flags(rawDcpEvent)));
            struct.put("lockTime", Integer.valueOf(DcpMutationMessage.lockTime(rawDcpEvent)));
            struct.put("content", ConverterUtils.bufToBytes(DcpMutationMessage.content(rawDcpEvent)));
        } else if (of == EventType.DELETION) {
            struct.put("event", "deletion");
        } else {
            if (of != EventType.EXPIRATION) {
                LOGGER.warn("unexpected event type {}", Byte.valueOf(rawDcpEvent.getByte(1)));
                return false;
            }
            struct.put("event", "expiration");
        }
        builder.value(Schemas.VALUE_DEFAULT_SCHEMA, struct);
        return true;
    }
}
