package org.apache.nifi.controller.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.cassandra.CassandraSessionProviderService;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Provides a DistributedMapCache client that is based on Apache Cassandra.")
@Tags({"map", "cache", "distributed", "cassandra"})
/* loaded from: input_file:org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.class */
public class CassandraDistributedMapCache extends AbstractControllerService implements DistributedMapCacheClient {
    public static final PropertyDescriptor SESSION_PROVIDER = new PropertyDescriptor.Builder().name("cassandra-dmc-session-provider").displayName("Session Provider").description("The client service that will configure the cassandra client connection.").required(true).identifiesControllerService(CassandraSessionProviderService.class).build();
    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("cassandra-dmc-table-name").displayName("Table Name").description("The name of the table where the cache will be stored.").required(true).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor KEY_FIELD_NAME = new PropertyDescriptor.Builder().name("cassandra-dmc-key-field-name").displayName("Key Field Name").description("The name of the field that acts as the unique key. (The CQL type should be \"blob\")").required(true).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor VALUE_FIELD_NAME = new PropertyDescriptor.Builder().name("cassandra-dmc-value-field-name").displayName("Value Field Name").description("The name of the field that will store the value. (The CQL type should be \"blob\")").required(true).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder().name("cassandra-dmc-ttl").displayName("TTL").description("If configured, this will set a TTL (Time to Live) for each row inserted into the table so that old cache items expire after a certain period of time.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(false).build();
    public static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(SESSION_PROVIDER, TABLE_NAME, KEY_FIELD_NAME, VALUE_FIELD_NAME, TTL));
    private CassandraSessionProviderService sessionProviderService;
    private String tableName;
    private String keyField;
    private String valueField;
    private Long ttl;
    private Session session;
    private PreparedStatement deleteStatement;
    private PreparedStatement existsStatement;
    private PreparedStatement fetchStatement;
    private PreparedStatement insertStatement;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext configurationContext) {
        this.sessionProviderService = configurationContext.getProperty(SESSION_PROVIDER).asControllerService(CassandraSessionProviderService.class);
        this.tableName = configurationContext.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
        this.keyField = configurationContext.getProperty(KEY_FIELD_NAME).evaluateAttributeExpressions().getValue();
        this.valueField = configurationContext.getProperty(VALUE_FIELD_NAME).evaluateAttributeExpressions().getValue();
        if (configurationContext.getProperty(TTL).isSet()) {
            this.ttl = configurationContext.getProperty(TTL).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
        }
        this.session = this.sessionProviderService.getCassandraSession();
        this.deleteStatement = this.session.prepare(QueryUtils.createDeleteStatement(this.keyField, this.tableName));
        this.existsStatement = this.session.prepare(QueryUtils.createExistsQuery(this.keyField, this.tableName));
        this.fetchStatement = this.session.prepare(QueryUtils.createFetchQuery(this.keyField, this.valueField, this.tableName));
        this.insertStatement = this.session.prepare(QueryUtils.createInsertStatement(this.keyField, this.valueField, this.tableName, this.ttl));
    }

    @OnDisabled
    public void onDisabled() {
        this.session = null;
        this.deleteStatement = null;
        this.existsStatement = null;
        this.fetchStatement = null;
        this.insertStatement = null;
    }

    public <K, V> boolean putIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer2) throws IOException {
        if (containsKey(k, serializer)) {
            return false;
        }
        put(k, v, serializer, serializer2);
        return true;
    }

    public <K, V> V getAndPutIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer2, Deserializer<V> deserializer) throws IOException {
        V v2 = (V) get(k, serializer, deserializer);
        if (putIfAbsent(k, v, serializer, serializer2)) {
            return null;
        }
        return v2;
    }

    public <K> boolean containsKey(K k, Serializer<K> serializer) throws IOException {
        byte[] serializeKey = serializeKey(k, serializer);
        BoundStatement bind = this.existsStatement.bind();
        bind.setBytes(0, ByteBuffer.wrap(serializeKey));
        Iterator it = this.session.execute(bind).iterator();
        return it.hasNext() && ((Row) it.next()).getLong("exist_count") > 0;
    }

    public <K, V> void put(K k, V v, Serializer<K> serializer, Serializer<V> serializer2) throws IOException {
        BoundStatement bind = this.insertStatement.bind();
        bind.setBytes(0, ByteBuffer.wrap(serializeKey(k, serializer)));
        bind.setBytes(1, ByteBuffer.wrap(serializeValue(v, serializer2)));
        this.session.execute(bind);
    }

    public <K, V> V get(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException {
        BoundStatement bind = this.fetchStatement.bind();
        bind.setBytes(0, ByteBuffer.wrap(serializeKey(k, serializer)));
        Iterator it = this.session.execute(bind).iterator();
        if (it.hasNext()) {
            return (V) deserializer.deserialize(((Row) it.next()).getBytes(this.valueField).array());
        }
        return null;
    }

    public void close() throws IOException {
    }

    public <K> boolean remove(K k, Serializer<K> serializer) throws IOException {
        BoundStatement bind = this.deleteStatement.bind();
        bind.setBytes(0, ByteBuffer.wrap(serializeKey(k, serializer)));
        this.session.execute(bind);
        return true;
    }

    public long removeByPattern(String str) throws IOException {
        throw new UnsupportedOperationException();
    }

    private <K> byte[] serializeKey(K k, Serializer<K> serializer) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        serializer.serialize(k, byteArrayOutputStream);
        byteArrayOutputStream.close();
        return byteArrayOutputStream.toByteArray();
    }

    private <V> byte[] serializeValue(V v, Serializer<V> serializer) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        serializer.serialize(v, byteArrayOutputStream);
        byteArrayOutputStream.close();
        return byteArrayOutputStream.toByteArray();
    }
}
