package com.spotify.scio.transforms;

import com.google.common.cache.Cache;
import com.spotify.scio.transforms.BaseAsyncLookupDoFn;
import com.spotify.scio.transforms.FutureHandlers;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/scio/transforms/BaseAsyncBatchLookupDoFn.class */
public abstract class BaseAsyncBatchLookupDoFn<Input, BatchRequest, BatchResponse, Output, ClientType, FutureType, TryWrapper> extends DoFnWithResource<Input, KV<Input, TryWrapper>, Pair<ClientType, Cache<String, Output>>> implements FutureHandlers.Base<FutureType, BatchResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncBatchLookupDoFn.class);
    private final int batchSize;
    private final SerializableFunction<List<Input>, BatchRequest> batchRequestFn;
    private final SerializableFunction<BatchResponse, List<Pair<String, Output>>> batchResponseFn;
    private final SerializableFunction<Input, String> idExtractorFn;
    private final int maxPendingRequests;
    private final BaseAsyncLookupDoFn.CacheSupplier<String, Output> cacheSupplier;
    private final Semaphore semaphore;
    private final ConcurrentMap<UUID, FutureType> futures;
    private final ConcurrentMap<String, List<Triple<Input, Instant, BoundedWindow>>> inputs;
    private final Queue<Input> batch;
    private final ConcurrentLinkedQueue<Pair<UUID, List<BaseAsyncBatchLookupDoFn<Input, BatchRequest, BatchResponse, Output, ClientType, FutureType, TryWrapper>.Result>>> results;
    private long inputCount;
    private long outputCount;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spotify/scio/transforms/BaseAsyncBatchLookupDoFn$Result.class */
    public class Result {
        private Input input;
        private TryWrapper output;
        private Instant timestamp;
        private BoundedWindow window;

        Result(Input input, TryWrapper trywrapper, Instant instant, BoundedWindow boundedWindow) {
            this.input = input;
            this.output = trywrapper;
            this.timestamp = instant;
            this.window = boundedWindow;
        }
    }

    public BaseAsyncBatchLookupDoFn(int i, SerializableFunction<List<Input>, BatchRequest> serializableFunction, SerializableFunction<BatchResponse, List<Pair<String, Output>>> serializableFunction2, SerializableFunction<Input, String> serializableFunction3, int i2) {
        this(i, serializableFunction, serializableFunction2, serializableFunction3, i2, new BaseAsyncLookupDoFn.NoOpCacheSupplier());
    }

    public BaseAsyncBatchLookupDoFn(int i, SerializableFunction<List<Input>, BatchRequest> serializableFunction, SerializableFunction<BatchResponse, List<Pair<String, Output>>> serializableFunction2, SerializableFunction<Input, String> serializableFunction3, int i2, BaseAsyncLookupDoFn.CacheSupplier<String, Output> cacheSupplier) {
        this.futures = new ConcurrentHashMap();
        this.inputs = new ConcurrentHashMap();
        this.batch = new ArrayDeque();
        this.results = new ConcurrentLinkedQueue<>();
        this.batchSize = i;
        this.batchRequestFn = serializableFunction;
        this.batchResponseFn = serializableFunction2;
        this.idExtractorFn = serializableFunction3;
        this.maxPendingRequests = i2;
        this.semaphore = new Semaphore(i2);
        this.cacheSupplier = cacheSupplier;
    }

    protected abstract ClientType newClient();

    public abstract FutureType asyncLookup(ClientType clienttype, BatchRequest batchrequest);

    public abstract TryWrapper success(Output output);

    public abstract TryWrapper failure(Throwable th);

    @Override // com.spotify.scio.transforms.DoFnWithResource
    public Pair<ClientType, Cache<String, Output>> createResource() {
        return Pair.of(newClient(), this.cacheSupplier.get());
    }

    @Override // com.spotify.scio.transforms.DoFnWithResource
    public void closeResource(Pair<ClientType, Cache<String, Output>> pair) throws Exception {
        Object left = pair.getLeft();
        if (left instanceof AutoCloseable) {
            ((AutoCloseable) left).close();
        }
    }

    public ClientType getResourceClient() {
        return (ClientType) getResource().getLeft();
    }

    public Cache<String, Output> getResourceCache() {
        return (Cache) getResource().getRight();
    }

    @DoFn.StartBundle
    public void startBundle(DoFn<Input, KV<Input, TryWrapper>>.StartBundleContext startBundleContext) {
        this.futures.clear();
        this.results.clear();
        this.inputs.clear();
        this.batch.clear();
        this.inputCount = 0L;
        this.outputCount = 0L;
        this.semaphore.drainPermits();
        this.semaphore.release(this.maxPendingRequests);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @DoFn.ProcessElement
    public void processElement(@DoFn.Element Input input, @DoFn.Timestamp Instant instant, DoFn.OutputReceiver<KV<Input, TryWrapper>> outputReceiver, BoundedWindow boundedWindow) {
        this.inputCount++;
        flush(result -> {
            outputReceiver.output(KV.of(result.input, result.output));
        });
        Cache resourceCache = getResourceCache();
        try {
            String str = (String) this.idExtractorFn.apply(input);
            Objects.requireNonNull(str, "idExtractorFn returned null");
            Object ifPresent = resourceCache.getIfPresent(str);
            if (ifPresent != null) {
                outputReceiver.output(KV.of(input, success(ifPresent)));
                this.outputCount++;
            } else {
                this.inputs.compute(str, (str2, list) -> {
                    if (list == null) {
                        list = new LinkedList();
                        this.batch.add(input);
                    }
                    list.add(Triple.of(input, instant, boundedWindow));
                    return list;
                });
            }
            if (this.batch.size() >= this.batchSize) {
                createRequest();
            }
        } catch (InterruptedException e) {
            LOG.error("Failed to acquire semaphore", e);
            throw new RuntimeException("Failed to acquire semaphore", e);
        } catch (Exception e2) {
            LOG.error("Failed to process element", e2);
            throw e2;
        }
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<Input, KV<Input, TryWrapper>>.FinishBundleContext finishBundleContext) {
        try {
            if (!this.batch.isEmpty()) {
                createRequest();
            }
            if (!this.futures.isEmpty()) {
                waitForFutures(this.futures.values());
            }
            flush(result -> {
                finishBundleContext.output(KV.of(result.input, result.output), result.timestamp, result.window);
            });
            Preconditions.checkState(this.inputCount == this.outputCount, "Expected requestCount == responseCount, but %s != %s", this.inputCount, this.outputCount);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Failed to process futures", e);
            throw new RuntimeException("Failed to process futures", e);
        } catch (ExecutionException e2) {
            LOG.error("Failed to process futures", e2);
            throw new RuntimeException("Failed to process futures", e2);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void createRequest() throws InterruptedException {
        Object resourceClient = getResourceClient();
        Cache resourceCache = getResourceCache();
        UUID randomUUID = UUID.randomUUID();
        ArrayList arrayList = new ArrayList(this.batch);
        Object apply = this.batchRequestFn.apply(arrayList);
        this.semaphore.acquire();
        Object asyncLookup = asyncLookup(resourceClient, apply);
        handleCache(asyncLookup, resourceCache);
        this.futures.put(randomUUID, handleOutput(handleSemaphore(asyncLookup), arrayList, randomUUID));
        this.batch.clear();
    }

    private FutureType handleOutput(FutureType futuretype, List<Input> list, UUID uuid) {
        return addCallback(futuretype, obj -> {
            ((List) this.batchResponseFn.apply(obj)).forEach(pair -> {
                String str = (String) pair.getLeft();
                Object right = pair.getRight();
                List<Triple<Input, Instant, BoundedWindow>> remove = this.inputs.remove(str);
                if (remove == null) {
                    LOG.error("The ID '{}' received in the gRPC batch response does not match any IDs extracted via the idExtractorFn for the requested  batch sent to the gRPC endpoint. Please ensure that the IDs returned from the gRPC endpoints match the IDs extracted using the providedidExtractorFn for the same input.", str);
                } else {
                    this.results.add(Pair.of(uuid, (List) remove.stream().map(triple -> {
                        return new Result(triple.getLeft(), success(right), (Instant) triple.getMiddle(), (BoundedWindow) triple.getRight());
                    }).collect(Collectors.toList())));
                }
            });
            return null;
        }, th -> {
            list.forEach(obj2 -> {
                this.results.add(Pair.of(uuid, (List) this.inputs.remove((String) this.idExtractorFn.apply(obj2)).stream().map(triple -> {
                    return new Result(triple.getLeft(), failure(th), (Instant) triple.getMiddle(), (BoundedWindow) triple.getRight());
                }).collect(Collectors.toList())));
            });
            return null;
        });
    }

    private FutureType handleSemaphore(FutureType futuretype) {
        return addCallback(futuretype, obj -> {
            this.semaphore.release();
            return null;
        }, th -> {
            this.semaphore.release();
            return null;
        });
    }

    private FutureType handleCache(FutureType futuretype, Cache<String, Output> cache) {
        return addCallback(futuretype, obj -> {
            ((List) this.batchResponseFn.apply(obj)).forEach(pair -> {
                cache.put((String) pair.getLeft(), pair.getRight());
            });
            return null;
        }, th -> {
            return null;
        });
    }

    private void flush(Consumer<BaseAsyncBatchLookupDoFn<Input, BatchRequest, BatchResponse, Output, ClientType, FutureType, TryWrapper>.Result> consumer) {
        Pair<UUID, List<BaseAsyncBatchLookupDoFn<Input, BatchRequest, BatchResponse, Output, ClientType, FutureType, TryWrapper>.Result>> poll = this.results.poll();
        while (true) {
            Pair<UUID, List<BaseAsyncBatchLookupDoFn<Input, BatchRequest, BatchResponse, Output, ClientType, FutureType, TryWrapper>.Result>> pair = poll;
            if (pair == null) {
                return;
            }
            UUID uuid = (UUID) pair.getKey();
            ((List) pair.getValue()).forEach(consumer);
            this.outputCount += r0.size();
            this.futures.remove(uuid);
            poll = this.results.poll();
        }
    }
}
