package cz.o2.proxima.tools.io;

import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.OnlineAttributeWriter;
import cz.o2.proxima.storage.StreamElement;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.Writer;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/tools/io/DirectAttributeSink.class */
public class DirectAttributeSink implements DataSink<StreamElement> {
    private static final Logger log = LoggerFactory.getLogger(DirectAttributeSink.class);
    private final Repository repo;
    private final AtomicInteger unclosedWriters;
    private final UnaryFunction<StreamElement, StreamElement> transformFn;

    public static DataSink<StreamElement> of(Repository repository) {
        return new DirectAttributeSink(repository);
    }

    public static DataSink<StreamElement> of(Repository repository, UnaryFunction<StreamElement, StreamElement> unaryFunction) {
        return new DirectAttributeSink(repository, unaryFunction);
    }

    private DirectAttributeSink(Repository repository) {
        this(repository, UnaryFunction.identity());
    }

    private DirectAttributeSink(Repository repository, UnaryFunction<StreamElement, StreamElement> unaryFunction) {
        this.unclosedWriters = new AtomicInteger();
        this.repo = repository;
        this.transformFn = unaryFunction;
    }

    public Writer<StreamElement> openWriter(int i) {
        this.unclosedWriters.incrementAndGet();
        return new Writer<StreamElement>() { // from class: cz.o2.proxima.tools.io.DirectAttributeSink.1
            public void write(StreamElement streamElement) throws IOException {
                OnlineAttributeWriter onlineAttributeWriter = (OnlineAttributeWriter) DirectAttributeSink.this.repo.getWriter(streamElement.getAttributeDescriptor()).orElseThrow(() -> {
                    return new IllegalStateException("Missing writer for " + streamElement.getAttributeDescriptor());
                });
                StreamElement streamElement2 = (StreamElement) DirectAttributeSink.this.transformFn.apply(streamElement);
                AtomicInteger atomicInteger = new AtomicInteger();
                AtomicReference atomicReference = new AtomicReference();
                atomicReference.set(() -> {
                    onlineAttributeWriter.write(streamElement2, (z, th) -> {
                        if (z) {
                            return;
                        }
                        if (atomicInteger.incrementAndGet() >= 3) {
                            throw new IllegalStateException(th);
                        }
                        DirectAttributeSink.log.warn("Failed to write {}, retries {}, retrying.", new Object[]{streamElement2, Integer.valueOf(atomicInteger.get()), th});
                        ((Runnable) atomicReference.get()).run();
                    });
                });
                ((Runnable) atomicReference.get()).run();
            }

            public void commit() throws IOException {
            }

            public void close() throws IOException {
                if (DirectAttributeSink.this.unclosedWriters.decrementAndGet() == 0) {
                    DirectAttributeSink.this.repo.close();
                }
            }
        };
    }

    public void commit() throws IOException {
    }

    public void rollback() throws IOException {
    }
}
