package org.apache.nifi.processor.util.put;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;

/* loaded from: input_file:org/apache/nifi/processor/util/put/AbstractPutEventProcessor.class */
public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactoryProcessor {
    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder().name("Hostname").description("Destination hostname or IP address").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("localhost").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Port").description("Destination port number").required(true).addValidator(StandardValidators.PORT_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Max Size of Socket Send Buffer").description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
    public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor.Builder().name("Idle Connection Expiration").description("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.").required(true).defaultValue("15 seconds").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor.Builder().name("Protocol").description("The protocol for communication.").required(true).allowableValues(new AllowableValue[]{TCP_VALUE, UDP_VALUE}).defaultValue(TCP_VALUE.getValue()).build();
    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder().name("Message Delimiter").description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the contents of the FlowFile will be split on this delimiter and each section sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' relationship.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("Character Set").description("Specifies the character set of the data being sent.").required(true).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder().name("Timeout").description("The timeout for connecting to and communicating with the destination. Does not apply to UDP").required(false).defaultValue("10 seconds").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder().name("Outgoing Message Delimiter").description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder().name("Connection Per FlowFile").description("Specifies whether to send each FlowFile's content on an individual connection.").required(true).defaultValue("false").allowableValues(new String[]{"true", "false"}).build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("Specifies the SSL Context Service to enable TLS socket communication").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are sent successfully to the destination are sent out this relationship.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that failed to send to the destination are sent out this relationship.").build();
    private Set<Relationship> relationships;
    private List<PropertyDescriptor> descriptors;
    protected volatile String transitUri;
    protected EventSender<T> eventSender;
    protected final BlockingQueue<AbstractPutEventProcessor<T>.FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue();
    protected final Set<AbstractPutEventProcessor<T>.FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet());

    /* loaded from: input_file:org/apache/nifi/processor/util/put/AbstractPutEventProcessor$FlowFileMessageBatch.class */
    protected class FlowFileMessageBatch {
        private final ProcessSession session;
        private final FlowFile flowFile;
        private Exception lastFailureReason;
        private final long startTime = System.nanoTime();
        private final List<Range> successfulRanges = new ArrayList();
        private final List<Range> failedRanges = new ArrayList();
        private long numMessages = -1;
        private long completeTime = 0;
        private boolean canceled = false;

        public FlowFileMessageBatch(ProcessSession processSession, FlowFile flowFile) {
            this.session = processSession;
            this.flowFile = flowFile;
        }

        public synchronized void cancelOrComplete() {
            if (isComplete()) {
                completeSession();
                return;
            }
            this.canceled = true;
            this.session.rollback();
            this.successfulRanges.clear();
            this.failedRanges.clear();
        }

        public synchronized void addSuccessfulRange(long j, long j2) {
            if (this.canceled) {
                return;
            }
            this.successfulRanges.add(new Range(j, j2));
            if (isComplete()) {
                AbstractPutEventProcessor.this.activeBatches.remove(this);
                AbstractPutEventProcessor.this.completeBatches.add(this);
                this.completeTime = System.nanoTime();
            }
        }

        public synchronized void addFailedRange(long j, long j2, Exception exc) {
            if (this.canceled) {
                return;
            }
            this.failedRanges.add(new Range(j, j2));
            this.lastFailureReason = exc;
            if (isComplete()) {
                AbstractPutEventProcessor.this.activeBatches.remove(this);
                AbstractPutEventProcessor.this.completeBatches.add(this);
                this.completeTime = System.nanoTime();
            }
        }

        private boolean isComplete() {
            return !this.canceled && this.numMessages > -1 && ((long) (this.successfulRanges.size() + this.failedRanges.size())) >= this.numMessages;
        }

        public synchronized void setNumMessages(long j) {
            this.numMessages = j;
            if (isComplete()) {
                AbstractPutEventProcessor.this.activeBatches.remove(this);
                AbstractPutEventProcessor.this.completeBatches.add(this);
                this.completeTime = System.nanoTime();
            }
        }

        private void transferRanges(List<Range> list, Relationship relationship) {
            Collections.sort(list, new Comparator<Range>() { // from class: org.apache.nifi.processor.util.put.AbstractPutEventProcessor.FlowFileMessageBatch.1
                @Override // java.util.Comparator
                public int compare(Range range, Range range2) {
                    return Long.compare(range.getStart(), range2.getStart());
                }
            });
            int i = 0;
            while (i < list.size()) {
                Range range = list.get(i);
                int i2 = 1;
                while (i + 1 < list.size()) {
                    Range range2 = list.get(i + 1);
                    if (range2.getStart() != range.getEnd()) {
                        break;
                    }
                    range = new Range(range.getStart(), range2.getEnd());
                    i2++;
                    i++;
                }
                FlowFile clone = this.session.clone(this.flowFile, range.getStart(), range.getEnd() - range.getStart());
                if (relationship == AbstractPutEventProcessor.REL_SUCCESS) {
                    this.session.getProvenanceReporter().send(clone, AbstractPutEventProcessor.this.transitUri, "Sent " + i2 + " messages");
                    this.session.transfer(clone, relationship);
                } else {
                    this.session.transfer(this.session.penalize(clone), relationship);
                }
                i++;
            }
        }

        public synchronized void completeSession() {
            if (this.canceled) {
                return;
            }
            if (this.successfulRanges.isEmpty() && this.failedRanges.isEmpty()) {
                AbstractPutEventProcessor.this.getLogger().info("Completed processing {} but sent 0 FlowFiles", new Object[]{this.flowFile});
                this.session.transfer(this.flowFile, AbstractPutEventProcessor.REL_SUCCESS);
                this.session.commitAsync();
                return;
            }
            if (this.successfulRanges.isEmpty()) {
                AbstractPutEventProcessor.this.getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", new Object[]{this.flowFile, this.lastFailureReason});
                this.session.transfer(this.session.penalize(this.flowFile), AbstractPutEventProcessor.REL_FAILURE);
                this.session.commitAsync();
            } else {
                if (this.failedRanges.isEmpty()) {
                    long millis = TimeUnit.NANOSECONDS.toMillis(this.completeTime - this.startTime);
                    this.session.getProvenanceReporter().send(this.flowFile, AbstractPutEventProcessor.this.transitUri, "Sent " + this.successfulRanges.size() + " messages;", millis);
                    this.session.transfer(this.flowFile, AbstractPutEventProcessor.REL_SUCCESS);
                    AbstractPutEventProcessor.this.getLogger().info("Successfully sent {} messages for {} in {} millis", new Object[]{Integer.valueOf(this.successfulRanges.size()), this.flowFile, Long.valueOf(millis)});
                    this.session.commitAsync();
                    return;
                }
                transferRanges(this.successfulRanges, AbstractPutEventProcessor.REL_SUCCESS);
                transferRanges(this.failedRanges, AbstractPutEventProcessor.REL_FAILURE);
                this.session.remove(this.flowFile);
                AbstractPutEventProcessor.this.getLogger().error("Successfully sent {} messages, but failed to send {} messages; the last error received was {}", new Object[]{Integer.valueOf(this.successfulRanges.size()), Integer.valueOf(this.failedRanges.size()), this.lastFailureReason});
                this.session.commitAsync();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/nifi/processor/util/put/AbstractPutEventProcessor$Range.class */
    public static class Range {
        private final long start;
        private final long end;

        public Range(long j, long j2) {
            this.start = j;
            this.end = j2;
        }

        public long getStart() {
            return this.start;
        }

        public long getEnd() {
            return this.end;
        }

        public String toString() {
            return "Range[" + this.start + "-" + this.end + "]";
        }
    }

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(HOSTNAME);
        arrayList.add(PORT);
        arrayList.add(MAX_SOCKET_SEND_BUFFER_SIZE);
        arrayList.add(IDLE_EXPIRATION);
        arrayList.add(TIMEOUT);
        arrayList.addAll(getAdditionalProperties());
        this.descriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.addAll(getAdditionalRelationships());
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected List<Relationship> getAdditionalRelationships() {
        return Collections.emptyList();
    }

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Collections.emptyList();
    }

    public final Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        this.eventSender = getEventSender(processContext);
        this.transitUri = createTransitUri(processContext);
    }

    @OnStopped
    public void closeSenders() throws Exception {
        if (this.eventSender != null) {
            this.eventSender.close();
        }
    }

    protected String createTransitUri(ProcessContext processContext) {
        return String.format("%s://%s:%s", getProtocol(processContext), processContext.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(), processContext.getProperty(PORT).evaluateAttributeExpressions().getValue());
    }

    protected EventSender<T> getEventSender(ProcessContext processContext) {
        String value = processContext.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
        int intValue = processContext.getProperty(PORT).evaluateAttributeExpressions().asInteger().intValue();
        String protocol = getProtocol(processContext);
        boolean booleanValue = processContext.getProperty(CONNECTION_PER_FLOWFILE).getValue() != null ? processContext.getProperty(CONNECTION_PER_FLOWFILE).asBoolean().booleanValue() : false;
        NettyEventSenderFactory<T> nettyEventSenderFactory = getNettyEventSenderFactory(value, intValue, protocol);
        nettyEventSenderFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
        nettyEventSenderFactory.setWorkerThreads(processContext.getMaxConcurrentTasks());
        nettyEventSenderFactory.setMaxConnections(processContext.getMaxConcurrentTasks());
        nettyEventSenderFactory.setSocketSendBufferSize(Integer.valueOf(processContext.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue()));
        nettyEventSenderFactory.setSingleEventPerConnection(booleanValue);
        nettyEventSenderFactory.setShutdownQuietPeriod(Duration.ZERO);
        int intValue2 = processContext.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        int intValue3 = processContext.getProperty(IDLE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
        nettyEventSenderFactory.setTimeout(Duration.ofMillis(intValue2));
        nettyEventSenderFactory.setIdleTimeout(Duration.ofSeconds(intValue3));
        PropertyValue property = processContext.getProperty(SSL_CONTEXT_SERVICE);
        if (property.isSet()) {
            nettyEventSenderFactory.setSslContext(property.asControllerService(SSLContextService.class).createContext());
        }
        return nettyEventSenderFactory.getEventSender();
    }

    protected String getOutgoingMessageDelimiter(ProcessContext processContext, FlowFile flowFile) {
        String value = processContext.getProperty(OUTGOING_MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
        if (value != null) {
            value = value.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
        }
        return value;
    }

    protected String getProtocol(ProcessContext processContext) {
        return processContext.getProperty(PROTOCOL).getValue();
    }

    protected abstract NettyEventSenderFactory<T> getNettyEventSenderFactory(String str, int i, String str2);
}
