package com.spotify.scio.transforms;

import com.google.common.cache.AbstractCache;
import com.google.common.cache.Cache;
import com.spotify.scio.transforms.FutureHandlers;
import java.io.Serializable;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.CheckForNull;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/scio/transforms/BaseAsyncLookupDoFn.class */
public abstract class BaseAsyncLookupDoFn<A, B, C, F, T> extends DoFnWithResource<A, KV<A, T>, Pair<C, Cache<A, B>>> implements FutureHandlers.Base<F, B> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncLookupDoFn.class);
    private final boolean deduplicate;
    private final CacheSupplier<A, B> cacheSupplier;
    private final int maxPendingRequests;
    private final Semaphore semaphore;
    private final ConcurrentMap<UUID, F> futures;
    private final ConcurrentMap<A, F> inFlightRequests;
    private final ConcurrentLinkedQueue<Pair<UUID, BaseAsyncLookupDoFn<A, B, C, F, T>.Result>> results;
    private long inputCount;
    private long outputCount;

    @FunctionalInterface
    /* loaded from: input_file:com/spotify/scio/transforms/BaseAsyncLookupDoFn$CacheSupplier.class */
    public interface CacheSupplier<K, V> extends Supplier<Cache<K, V>>, Serializable {
    }

    /* loaded from: input_file:com/spotify/scio/transforms/BaseAsyncLookupDoFn$NoOpCacheSupplier.class */
    public static class NoOpCacheSupplier<K, V> implements CacheSupplier<K, V> {
        @Override // java.util.function.Supplier
        public Cache<K, V> get() {
            return new AbstractCache<K, V>() { // from class: com.spotify.scio.transforms.BaseAsyncLookupDoFn.NoOpCacheSupplier.1
                public void put(K k, V v) {
                }

                @CheckForNull
                public V getIfPresent(Object obj) {
                    return null;
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spotify/scio/transforms/BaseAsyncLookupDoFn$Result.class */
    public class Result {
        private A input;
        private T output;
        private Instant timestamp;
        private BoundedWindow window;

        Result(A a, T t, Instant instant, BoundedWindow boundedWindow) {
            this.input = a;
            this.output = t;
            this.timestamp = instant;
            this.window = boundedWindow;
        }
    }

    /* loaded from: input_file:com/spotify/scio/transforms/BaseAsyncLookupDoFn$Try.class */
    public static class Try<A> implements Serializable {
        private final boolean isSuccess;
        private final A value;
        private final Throwable exception;

        public Try(A a) {
            this.isSuccess = true;
            this.value = a;
            this.exception = null;
        }

        public Try(Throwable th) {
            Preconditions.checkNotNull(th, "exception must not be null");
            this.isSuccess = false;
            this.value = null;
            this.exception = th;
        }

        public A get() {
            return this.value;
        }

        public Throwable getException() {
            return this.exception;
        }

        public boolean isSuccess() {
            return this.isSuccess;
        }

        public boolean isFailure() {
            return !this.isSuccess;
        }

        public int hashCode() {
            if (!this.isSuccess) {
                return this.exception.hashCode();
            }
            if (this.value == null) {
                return 0;
            }
            return this.value.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Try)) {
                return false;
            }
            Try r0 = (Try) obj;
            return this.isSuccess == r0.isSuccess && Objects.equals(get(), r0.get()) && Objects.equals(getException(), r0.getException());
        }
    }

    protected abstract C newClient();

    public abstract F asyncLookup(C c, A a);

    public abstract T success(B b);

    public abstract T failure(Throwable th);

    public BaseAsyncLookupDoFn() {
        this(1000);
    }

    public BaseAsyncLookupDoFn(int i) {
        this(i, true, new NoOpCacheSupplier());
    }

    public BaseAsyncLookupDoFn(int i, CacheSupplier<A, B> cacheSupplier) {
        this(i, true, cacheSupplier);
    }

    public BaseAsyncLookupDoFn(int i, boolean z, CacheSupplier<A, B> cacheSupplier) {
        this.futures = new ConcurrentHashMap();
        this.inFlightRequests = new ConcurrentHashMap();
        this.results = new ConcurrentLinkedQueue<>();
        this.maxPendingRequests = i;
        this.deduplicate = z;
        this.cacheSupplier = cacheSupplier;
        this.semaphore = new Semaphore(i);
    }

    @Override // com.spotify.scio.transforms.DoFnWithResource
    public Pair<C, Cache<A, B>> createResource() {
        return Pair.of(newClient(), this.cacheSupplier.get());
    }

    @Override // com.spotify.scio.transforms.DoFnWithResource
    public void closeResource(Pair<C, Cache<A, B>> pair) throws Exception {
        Object left = pair.getLeft();
        if (left instanceof AutoCloseable) {
            ((AutoCloseable) left).close();
        }
    }

    public C getResourceClient() {
        return (C) getResource().getLeft();
    }

    public Cache<A, B> getResourceCache() {
        return (Cache) getResource().getRight();
    }

    @DoFn.StartBundle
    public void startBundle(DoFn<A, KV<A, T>>.StartBundleContext startBundleContext) {
        this.futures.clear();
        this.results.clear();
        this.inFlightRequests.clear();
        this.inputCount = 0L;
        this.outputCount = 0L;
        this.semaphore.drainPermits();
        this.semaphore.release(this.maxPendingRequests);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @DoFn.ProcessElement
    public void processElement(@DoFn.Element A a, @DoFn.Timestamp Instant instant, DoFn.OutputReceiver<KV<A, T>> outputReceiver, BoundedWindow boundedWindow) {
        this.inputCount++;
        flush(result -> {
            outputReceiver.output(KV.of(result.input, result.output));
        });
        Object resourceClient = getResourceClient();
        Cache resourceCache = getResourceCache();
        try {
            UUID randomUUID = UUID.randomUUID();
            Object ifPresent = resourceCache.getIfPresent(a);
            F f = this.inFlightRequests.get(a);
            if (ifPresent != null) {
                outputReceiver.output(KV.of(a, success(ifPresent)));
                this.outputCount++;
            } else if (f != null) {
                this.futures.put(randomUUID, handleOutput(f, a, randomUUID, instant, boundedWindow));
            } else {
                this.semaphore.acquire();
                Object asyncLookup = asyncLookup(resourceClient, a);
                handleCache(asyncLookup, a, resourceCache);
                this.futures.put(randomUUID, handleOutput(handleSemaphore(asyncLookup), a, randomUUID, instant, boundedWindow));
            }
        } catch (InterruptedException e) {
            LOG.error("Failed to acquire semaphore", e);
            throw new RuntimeException("Failed to acquire semaphore", e);
        } catch (Exception e2) {
            LOG.error("Failed to process element", e2);
            throw e2;
        }
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<A, KV<A, T>>.FinishBundleContext finishBundleContext) {
        if (!this.futures.isEmpty()) {
            try {
                waitForFutures(this.futures.values());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.error("Failed to process futures", e);
                throw new RuntimeException("Failed to process futures", e);
            } catch (ExecutionException e2) {
                LOG.error("Failed to process futures", e2);
                throw new RuntimeException("Failed to process futures", e2);
            }
        }
        flush(result -> {
            finishBundleContext.output(KV.of(result.input, result.output), result.timestamp, result.window);
        });
        Preconditions.checkState(this.inputCount == this.outputCount, "Expected inputCount == outputCount, but %s != %s", this.inputCount, this.outputCount);
    }

    private F handleOutput(F f, A a, UUID uuid, Instant instant, BoundedWindow boundedWindow) {
        return addCallback(f, obj -> {
            this.results.add(Pair.of(uuid, new Result(a, success(obj), instant, boundedWindow)));
            return null;
        }, th -> {
            this.results.add(Pair.of(uuid, new Result(a, failure(th), instant, boundedWindow)));
            return null;
        });
    }

    private F handleSemaphore(F f) {
        return addCallback(f, obj -> {
            this.semaphore.release();
            return null;
        }, th -> {
            this.semaphore.release();
            return null;
        });
    }

    private F handleCache(F f, A a, Cache<A, B> cache) {
        boolean z = this.deduplicate && this.inFlightRequests.putIfAbsent(a, f) == null;
        return addCallback(f, obj -> {
            cache.put(a, obj);
            if (!z) {
                return null;
            }
            this.inFlightRequests.remove(a);
            return null;
        }, th -> {
            if (!z) {
                return null;
            }
            this.inFlightRequests.remove(a);
            return null;
        });
    }

    private void flush(Consumer<BaseAsyncLookupDoFn<A, B, C, F, T>.Result> consumer) {
        Pair<UUID, BaseAsyncLookupDoFn<A, B, C, F, T>.Result> poll = this.results.poll();
        while (true) {
            Pair<UUID, BaseAsyncLookupDoFn<A, B, C, F, T>.Result> pair = poll;
            if (pair == null) {
                return;
            }
            UUID uuid = (UUID) pair.getKey();
            consumer.accept((Result) pair.getValue());
            this.outputCount++;
            this.futures.remove(uuid);
            poll = this.results.poll();
        }
    }
}
