package kr.jm.metric.input.publisher;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import java.util.stream.Stream;
import kr.jm.metric.config.input.InputConfigInterface;
import kr.jm.metric.data.Transfer;
import kr.jm.metric.input.InputInterface;
import kr.jm.utils.exception.JMExceptionManager;
import kr.jm.utils.flow.publisher.BulkSubmissionPublisher;
import kr.jm.utils.flow.publisher.JMPublisherInterface;
import kr.jm.utils.flow.publisher.JMSubmissionPublisherInterface;
import kr.jm.utils.helper.JMLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kr/jm/metric/input/publisher/InputPublisher.class */
public class InputPublisher implements JMSubmissionPublisherInterface<List<Transfer<String>>>, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(InputPublisher.class);
    private BulkSubmissionPublisher<Transfer<String>> submissionPublisher;
    protected String inputId;
    protected InputInterface input;

    public InputPublisher(InputConfigInterface inputConfigInterface) {
        this(new StringTransferWaitingBulkSubmissionPublisher(new StringTransferWaitingSubmissionPublisher(inputConfigInterface.getWaitingMillis().longValue(), inputConfigInterface.getQueueSizeLimit().intValue()), inputConfigInterface.getBulkSize().intValue(), inputConfigInterface.getFlushIntervalSeconds().intValue()), inputConfigInterface.getInputId(), inputConfigInterface.buildInput());
    }

    public InputPublisher(BulkSubmissionPublisher<Transfer<String>> bulkSubmissionPublisher, String str, InputInterface inputInterface) {
        this.submissionPublisher = bulkSubmissionPublisher;
        this.inputId = str;
        this.input = inputInterface;
        JMLog.info(log, "InputPublisher", new Object[]{str, inputInterface});
    }

    public InputPublisher start() {
        JMLog.info(log, "start", new Object[]{this.inputId});
        InputInterface inputInterface = this.input;
        BulkSubmissionPublisher<Transfer<String>> bulkSubmissionPublisher = this.submissionPublisher;
        Objects.requireNonNull(bulkSubmissionPublisher);
        inputInterface.start((v1) -> {
            r1.submitSingle(v1);
        });
        return this;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        JMLog.info(log, "close", new Object[]{this.inputId});
        try {
            this.input.close();
            this.submissionPublisher.close();
        } catch (Exception e) {
            JMExceptionManager.logException(log, e, "close", new Object[0]);
        }
    }

    public InputPublisher subscribeWith(Flow.Subscriber<List<Transfer<String>>>... subscriberArr) {
        this.submissionPublisher.subscribeWith(subscriberArr);
        return this;
    }

    public InputPublisher consumeWith(Consumer<List<Transfer<String>>>... consumerArr) {
        this.submissionPublisher.consumeWith(consumerArr);
        return this;
    }

    public void testInput(String str) {
        testInput(Stream.of(str));
    }

    public void testInput(List<String> list) {
        testInput(list.stream());
    }

    public void testInput(Stream<String> stream) {
        Stream<R> map = stream.map(str -> {
            return new Transfer(this.inputId, str);
        });
        BulkSubmissionPublisher<Transfer<String>> bulkSubmissionPublisher = this.submissionPublisher;
        Objects.requireNonNull(bulkSubmissionPublisher);
        map.forEach((v1) -> {
            r1.submitSingle(v1);
        });
        this.submissionPublisher.flush();
    }

    public int submit(List<Transfer<String>> list) {
        return this.submissionPublisher.submit(list);
    }

    public void subscribe(Flow.Subscriber<? super List<Transfer<String>>> subscriber) {
        this.submissionPublisher.subscribe(subscriber);
    }

    public String getInputId() {
        return this.inputId;
    }

    /* renamed from: consumeWith, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ JMPublisherInterface m21consumeWith(Consumer[] consumerArr) {
        return consumeWith((Consumer<List<Transfer<String>>>[]) consumerArr);
    }

    /* renamed from: subscribeWith, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ JMPublisherInterface m22subscribeWith(Flow.Subscriber[] subscriberArr) {
        return subscribeWith((Flow.Subscriber<List<Transfer<String>>>[]) subscriberArr);
    }
}
