package kr.jm.utils.flow.processor;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.function.Function;
import kr.jm.utils.exception.JMException;
import kr.jm.utils.flow.publisher.JMSubmissionPublisher;
import kr.jm.utils.flow.subscriber.JMSubscriber;
import kr.jm.utils.flow.subscriber.JMSubscriberBuilder;
import kr.jm.utils.helper.JMLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kr/jm/utils/flow/processor/JMProcessor.class */
public class JMProcessor<T, R> implements JMProcessorInterface<T, R>, AutoCloseable {
    private Function<T, R> transformFunction;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private JMSubmissionPublisher<R> outputPublisher = new JMSubmissionPublisher<>();
    private JMSubscriber<T> inputSubscriber = JMSubscriberBuilder.build(this::process);

    public JMProcessor(Function<T, R> function) {
        this.transformFunction = function;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void process(T t) {
        try {
            Optional ofNullable = Optional.ofNullable(this.transformFunction.apply(t));
            JMSubmissionPublisher<R> jMSubmissionPublisher = this.outputPublisher;
            Objects.requireNonNull(jMSubmissionPublisher);
            ofNullable.ifPresent(jMSubmissionPublisher::submit);
        } catch (Exception e) {
            JMException.handleException(this.log, e, "process", t);
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        JMLog.info(this.log, "onSubscribe", subscription);
        this.inputSubscriber.onSubscribe(subscription);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        JMLog.debug(this.log, "onNext", t);
        this.inputSubscriber.onNext(t);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        this.inputSubscriber.onError(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        JMLog.info(this.log, "onComplete", new Object[0]);
        this.inputSubscriber.onComplete();
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super R> subscriber) {
        JMLog.info(this.log, "subscribe", subscriber);
        this.outputPublisher.subscribe(subscriber);
    }

    public void close() {
        JMLog.info(this.log, "close", new Object[0]);
        this.outputPublisher.close();
    }
}
