package cz.o2.proxima.server;

import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.server.metrics.Metrics;
import cz.o2.proxima.storage.StorageFilter;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.transform.ElementWiseTransformation;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/server/TransformationObserver.class */
public class TransformationObserver implements LogObserver {
    private static final Logger log = LoggerFactory.getLogger(TransformationObserver.class);
    private final RepositoryFactory repoFactory;
    private final ElementWiseTransformation transformation;
    private final StorageFilter filter;
    private final String name;
    private transient DirectDataOperator direct;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TransformationObserver(DirectDataOperator directDataOperator, String str, ElementWiseTransformation elementWiseTransformation, StorageFilter storageFilter) {
        this.repoFactory = directDataOperator.getRepository().asFactory();
        this.name = str;
        this.transformation = elementWiseTransformation;
        this.filter = storageFilter;
    }

    public boolean onError(Throwable th) {
        Utils.die(String.format("Failed to transform using %s. Bailing out.", this.transformation));
        return false;
    }

    public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
        Metrics.reportConsumerWatermark(this.name, onNextContext.getWatermark());
        if (this.filter.apply(streamElement)) {
            doTransform(onNextContext, streamElement);
            return true;
        }
        log.debug("Transformation {}: skipping transformation of {} by filter", this.name, streamElement);
        onNextContext.confirm();
        return true;
    }

    public void onIdle(LogObserver.OnIdleContext onIdleContext) {
        Metrics.reportConsumerWatermark(this.name, onIdleContext.getWatermark());
    }

    private void doTransform(LogObserver.OffsetCommitter offsetCommitter, StreamElement streamElement) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        try {
            if (atomicInteger.addAndGet(this.transformation.apply(streamElement, streamElement2 -> {
                try {
                    log.debug("Transformation {}: writing transformed element {}", this.name, streamElement2);
                    IngestServer.ingestRequest(direct(), streamElement2, streamElement2.getUuid(), status -> {
                        if (status.getStatus() != 200) {
                            atomicInteger.set(-1);
                            offsetCommitter.fail(new RuntimeException(String.format("Received invalid status %d:%s", Integer.valueOf(status.getStatus()), status.getStatusMessage())));
                        } else if (atomicInteger.decrementAndGet() == 0) {
                            offsetCommitter.confirm();
                        }
                    });
                } catch (Exception e) {
                    atomicInteger.set(-1);
                    offsetCommitter.fail(e);
                }
            })) == 0) {
                offsetCommitter.confirm();
            }
        } catch (Exception e) {
            atomicInteger.set(-1);
            offsetCommitter.fail(e);
        }
    }

    private DirectDataOperator direct() {
        if (this.direct == null) {
            this.direct = this.repoFactory.apply().getOrCreateOperator(DirectDataOperator.class);
        }
        return this.direct;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 533376893:
                if (implMethodName.equals("lambda$doTransform$7fb3ccd4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/transform/ElementWiseTransformation$Collector") && serializedLambda.getFunctionalInterfaceMethodName().equals("collect") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/server/TransformationObserver") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lcz/o2/proxima/direct/commitlog/LogObserver$OffsetCommitter;Lcz/o2/proxima/storage/StreamElement;)V")) {
                    TransformationObserver transformationObserver = (TransformationObserver) serializedLambda.getCapturedArg(0);
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(1);
                    LogObserver.OffsetCommitter offsetCommitter = (LogObserver.OffsetCommitter) serializedLambda.getCapturedArg(2);
                    return streamElement2 -> {
                        try {
                            log.debug("Transformation {}: writing transformed element {}", this.name, streamElement2);
                            IngestServer.ingestRequest(direct(), streamElement2, streamElement2.getUuid(), status -> {
                                if (status.getStatus() != 200) {
                                    atomicInteger.set(-1);
                                    offsetCommitter.fail(new RuntimeException(String.format("Received invalid status %d:%s", Integer.valueOf(status.getStatus()), status.getStatusMessage())));
                                } else if (atomicInteger.decrementAndGet() == 0) {
                                    offsetCommitter.confirm();
                                }
                            });
                        } catch (Exception e) {
                            atomicInteger.set(-1);
                            offsetCommitter.fail(e);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
