package com.spotify.scio.transforms;

import com.spotify.scio.transforms.FutureHandlers;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/scio/transforms/BaseAsyncDoFn.class */
public abstract class BaseAsyncDoFn<InputT, OutputT, ResourceT, FutureT> extends DoFnWithResource<InputT, OutputT, ResourceT> implements FutureHandlers.Base<FutureT, OutputT> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncDoFn.class);
    private final ConcurrentMap<UUID, FutureT> futures = new ConcurrentHashMap();
    private final ConcurrentLinkedQueue<BaseAsyncDoFn<InputT, OutputT, ResourceT, FutureT>.Result> results = new ConcurrentLinkedQueue<>();
    private final ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spotify/scio/transforms/BaseAsyncDoFn$Result.class */
    public class Result {
        private OutputT output;
        private UUID futureUuid;
        private Instant timestamp;
        private BoundedWindow window;

        Result(OutputT outputt, UUID uuid, Instant instant, BoundedWindow boundedWindow) {
            this.output = outputt;
            this.timestamp = instant;
            this.futureUuid = uuid;
            this.window = boundedWindow;
        }
    }

    public abstract FutureT processElement(InputT inputt);

    @DoFn.StartBundle
    public void startBundle(DoFn<InputT, OutputT>.StartBundleContext startBundleContext) {
        this.futures.clear();
        this.results.clear();
        this.errors.clear();
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext) {
        if (!this.futures.isEmpty()) {
            try {
                waitForFutures(this.futures.values());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.error("Failed to process futures", e);
                throw new RuntimeException("Failed to process futures", e);
            } catch (ExecutionException e2) {
                LOG.error("Failed to process futures", e2);
                throw new RuntimeException("Failed to process futures", e2);
            }
        }
        flush(finishBundleContext);
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element InputT inputt, @DoFn.Timestamp Instant instant, DoFn.OutputReceiver<OutputT> outputReceiver, BoundedWindow boundedWindow) {
        flush(outputReceiver);
        this.futures.computeIfAbsent(UUID.randomUUID(), uuid -> {
            return addCallback(processElement(inputt), obj -> {
                this.results.add(new Result(obj, uuid, instant, boundedWindow));
                return null;
            }, th -> {
                this.errors.add(th);
                return null;
            });
        });
    }

    private void flush(DoFn.OutputReceiver<OutputT> outputReceiver) {
        if (!this.errors.isEmpty()) {
            RuntimeException runtimeException = new RuntimeException("Failed to process futures");
            Throwable poll = this.errors.poll();
            while (true) {
                Throwable th = poll;
                if (th == null) {
                    break;
                }
                runtimeException.addSuppressed(th);
                poll = this.errors.poll();
            }
            throw runtimeException;
        }
        BaseAsyncDoFn<InputT, OutputT, ResourceT, FutureT>.Result poll2 = this.results.poll();
        while (true) {
            BaseAsyncDoFn<InputT, OutputT, ResourceT, FutureT>.Result result = poll2;
            if (result == null) {
                return;
            }
            outputReceiver.output(((Result) result).output);
            this.futures.remove(((Result) result).futureUuid);
            poll2 = this.results.poll();
        }
    }

    private void flush(DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext) {
        if (!this.errors.isEmpty()) {
            RuntimeException runtimeException = new RuntimeException("Failed to process futures");
            Throwable poll = this.errors.poll();
            while (true) {
                Throwable th = poll;
                if (th == null) {
                    break;
                }
                runtimeException.addSuppressed(th);
                poll = this.errors.poll();
            }
            throw runtimeException;
        }
        BaseAsyncDoFn<InputT, OutputT, ResourceT, FutureT>.Result poll2 = this.results.poll();
        while (true) {
            BaseAsyncDoFn<InputT, OutputT, ResourceT, FutureT>.Result result = poll2;
            if (result == null) {
                return;
            }
            finishBundleContext.output(((Result) result).output, ((Result) result).timestamp, ((Result) result).window);
            this.futures.remove(((Result) result).futureUuid);
            poll2 = this.results.poll();
        }
    }
}
