package kr.jm.metric.input.publisher;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import kr.jm.metric.config.input.ChunkType;
import kr.jm.metric.config.output.ElasticsearchOutputConfig;
import kr.jm.metric.data.Transfer;
import kr.jm.metric.input.InputInterface;
import kr.jm.utils.JMOptional;
import kr.jm.utils.JMString;
import kr.jm.utils.exception.JMException;
import kr.jm.utils.flow.publisher.BulkSubmissionPublisher;
import kr.jm.utils.flow.publisher.JMPublisherInterface;
import kr.jm.utils.helper.JMJson;
import kr.jm.utils.helper.JMLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kr/jm/metric/input/publisher/InputPublisher.class */
public class InputPublisher implements TransferSubmissionPublisherInterface<String>, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(InputPublisher.class);
    private final BulkSubmissionPublisher<Transfer<String>> transferBulkSubmissionPublisher;
    protected String inputId;
    protected final InputInterface input;
    private final Consumer<Transfer<String>> chunkConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: kr.jm.metric.input.publisher.InputPublisher$1, reason: invalid class name */
    /* loaded from: input_file:kr/jm/metric/input/publisher/InputPublisher$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$kr$jm$metric$config$input$ChunkType = new int[ChunkType.values().length];

        static {
            try {
                $SwitchMap$kr$jm$metric$config$input$ChunkType[ChunkType.LINES.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$kr$jm$metric$config$input$ChunkType[ChunkType.JSON_LIST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public InputPublisher(BulkSubmissionPublisher<Transfer<String>> bulkSubmissionPublisher, InputInterface inputInterface) {
        this(bulkSubmissionPublisher, inputInterface, ChunkType.NONE);
    }

    public InputPublisher(BulkSubmissionPublisher<Transfer<String>> bulkSubmissionPublisher, InputInterface inputInterface, ChunkType chunkType) {
        this.transferBulkSubmissionPublisher = bulkSubmissionPublisher;
        this.inputId = inputInterface.getInputId();
        this.input = inputInterface;
        this.chunkConsumer = buildChunkConsumer((ChunkType) Optional.ofNullable(chunkType).orElse(ChunkType.NONE));
        JMLog.info(log, "InputPublisher", new Object[]{this.inputId, inputInterface, chunkType});
    }

    private Consumer<Transfer<String>> buildChunkConsumer(ChunkType chunkType) {
        switch (AnonymousClass1.$SwitchMap$kr$jm$metric$config$input$ChunkType[chunkType.ordinal()]) {
            case ElasticsearchOutputConfig.DEFAULT_FLUSH_INTERVAL_SECONDS /* 1 */:
                return buildTransferConsumer(str -> {
                    return Arrays.stream(str.split(JMString.LINE_SEPARATOR));
                });
            case 2:
                return buildTransferConsumer(str2 -> {
                    Stream stream = JMJson.getInstance().toList(str2).stream();
                    JMJson jMJson = JMJson.getInstance();
                    Objects.requireNonNull(jMJson);
                    return stream.map(jMJson::toJsonString);
                });
            default:
                BulkSubmissionPublisher<Transfer<String>> bulkSubmissionPublisher = this.transferBulkSubmissionPublisher;
                Objects.requireNonNull(bulkSubmissionPublisher);
                return (v1) -> {
                    r0.submitSingle(v1);
                };
        }
    }

    private Consumer<Transfer<String>> buildTransferConsumer(Function<String, Stream<String>> function) {
        return transfer -> {
            BulkSubmissionPublisher<Transfer<String>> bulkSubmissionPublisher = this.transferBulkSubmissionPublisher;
            Stream flatMap = JMOptional.getOptional((String) transfer.getData()).stream().flatMap(function);
            Objects.requireNonNull(transfer);
            bulkSubmissionPublisher.submit((Transfer[]) flatMap.map((v1) -> {
                return r2.newWith(v1);
            }).toArray(i -> {
                return new Transfer[i];
            }));
        };
    }

    public InputPublisher start() {
        JMLog.info(log, "start", new Object[]{this.inputId});
        this.input.start(this.chunkConsumer);
        return this;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        JMLog.info(log, "close", new Object[]{this.inputId});
        try {
            this.input.close();
            this.transferBulkSubmissionPublisher.close();
        } catch (Exception e) {
            JMException.handleException(log, e, "close", new Object[0]);
        }
    }

    @SafeVarargs
    public final InputPublisher subscribeWith(Flow.Subscriber<List<Transfer<String>>>... subscriberArr) {
        this.transferBulkSubmissionPublisher.subscribeWith(subscriberArr);
        return this;
    }

    @SafeVarargs
    public final InputPublisher consumeWith(Consumer<List<Transfer<String>>>... consumerArr) {
        this.transferBulkSubmissionPublisher.consumeWith(consumerArr);
        return this;
    }

    public void testInput(String str) {
        submit(this.inputId, List.of(str));
    }

    public void testInput(List<String> list) {
        submit(this.inputId, list);
        this.transferBulkSubmissionPublisher.flush();
    }

    public void subscribe(Flow.Subscriber<? super List<Transfer<String>>> subscriber) {
        this.transferBulkSubmissionPublisher.subscribe(subscriber);
    }

    @Override // kr.jm.metric.input.publisher.TransferSubmissionPublisherInterface
    public int submit(List<Transfer<String>> list) {
        list.forEach(this.chunkConsumer);
        return list.size();
    }

    public String getInputId() {
        return this.inputId;
    }

    @SafeVarargs
    /* renamed from: consumeWith, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ JMPublisherInterface m18consumeWith(Consumer[] consumerArr) {
        return consumeWith((Consumer<List<Transfer<String>>>[]) consumerArr);
    }

    @SafeVarargs
    /* renamed from: subscribeWith, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ JMPublisherInterface m19subscribeWith(Flow.Subscriber[] subscriberArr) {
        return subscribeWith((Flow.Subscriber<List<Transfer<String>>>[]) subscriberArr);
    }
}
