package cz.o2.proxima.tools.io;

import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
import cz.o2.proxima.client.IngestClient;
import cz.o2.proxima.proto.service.Rpc;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.Writer;
import cz.seznam.euphoria.core.client.util.Triple;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/tools/io/AttributeSink.class */
public class AttributeSink implements DataSink<Triple<String, byte[], Long>> {
    private static final Logger log = LoggerFactory.getLogger(AttributeSink.class);
    private final String host;
    private final int port;
    private final EntityDescriptor entityDesc;
    private final AttributeDescriptor<?> desc;

    @Nullable
    private transient IngestClient client = null;

    public AttributeSink(String str, int i, EntityDescriptor entityDescriptor, AttributeDescriptor<?> attributeDescriptor) {
        this.host = str;
        this.port = i;
        this.entityDesc = entityDescriptor;
        this.desc = attributeDescriptor;
    }

    public Writer<Triple<String, byte[], Long>> openWriter(int i) {
        return new Writer<Triple<String, byte[], Long>>() { // from class: cz.o2.proxima.tools.io.AttributeSink.1
            public void write(Triple<String, byte[], Long> triple) throws IOException {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                Rpc.Ingest build = Rpc.Ingest.newBuilder().setEntity(AttributeSink.this.entityDesc.getName()).setAttribute(AttributeSink.this.desc.getName()).setKey((String) triple.getFirst()).setUuid(UUID.randomUUID().toString()).setValue(ByteString.copyFrom((byte[]) triple.getSecond())).setStamp(((Long) triple.getThird()).longValue()).build();
                AttributeSink.this.client().send(build, status -> {
                    if (status.getStatus() != 200) {
                        AttributeSink.log.warn("Failed to send ingest {}: {} {}", new Object[]{TextFormat.shortDebugString(build), Integer.valueOf(status.getStatus()), status.getStatusMessage()});
                    }
                    countDownLatch.countDown();
                });
            }

            public void commit() throws IOException {
            }

            public void close() throws IOException {
            }
        };
    }

    public void commit() throws IOException {
        this.client.close();
    }

    public void rollback() throws IOException {
        this.client.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public IngestClient client() {
        if (this.client == null) {
            this.client = IngestClient.create(this.host, this.port);
        }
        return this.client;
    }
}
