package com.spotify.scio.transforms;

import com.spotify.scio.util.RemoteFileUtil;
import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/scio/transforms/FileDownloadDoFn.class */
public class FileDownloadDoFn<OutputT> extends DoFn<URI, OutputT> {
    private static final Logger LOG = LoggerFactory.getLogger(FileDownloadDoFn.class);
    private final List<FileDownloadDoFn<OutputT>.Element> batch;
    private final RemoteFileUtil remoteFileUtil;
    private final SerializableFunction<Path, OutputT> fn;
    private final int batchSize;
    private final boolean keep;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spotify/scio/transforms/FileDownloadDoFn$Element.class */
    public class Element {
        private URI uri;
        private Instant timestamp;
        private BoundedWindow window;

        Element(URI uri, Instant instant, BoundedWindow boundedWindow) {
            this.uri = uri;
            this.timestamp = instant;
            this.window = boundedWindow;
        }
    }

    public FileDownloadDoFn(RemoteFileUtil remoteFileUtil, SerializableFunction<Path, OutputT> serializableFunction) {
        this(remoteFileUtil, serializableFunction, 1, false);
    }

    public FileDownloadDoFn(RemoteFileUtil remoteFileUtil, SerializableFunction<Path, OutputT> serializableFunction, int i, boolean z) {
        this.remoteFileUtil = remoteFileUtil;
        this.fn = serializableFunction;
        this.batch = new ArrayList();
        this.batchSize = i;
        this.keep = z;
    }

    @DoFn.StartBundle
    public void startBundle(DoFn<URI, OutputT>.StartBundleContext startBundleContext) {
        this.batch.clear();
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element URI uri, @DoFn.Timestamp Instant instant, DoFn.OutputReceiver<OutputT> outputReceiver, BoundedWindow boundedWindow) {
        this.batch.add(new Element(uri, instant, boundedWindow));
        if (this.batch.size() >= this.batchSize) {
            processBatch(outputReceiver);
        }
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<URI, OutputT>.FinishBundleContext finishBundleContext) {
        processBatch(finishBundleContext);
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item("Batch Size", Integer.valueOf(this.batchSize))).add(DisplayData.item("Keep Downloaded Files", Boolean.valueOf(this.keep)));
    }

    private void processBatch(DoFn.OutputReceiver<OutputT> outputReceiver) {
        if (this.batch.isEmpty()) {
            return;
        }
        LOG.info("Processing batch of {}", Integer.valueOf(this.batch.size()));
        List<URI> list = (List) this.batch.stream().map(element -> {
            return element.uri;
        }).collect(Collectors.toList());
        Stream<Path> stream = this.remoteFileUtil.download(list).stream();
        SerializableFunction<Path, OutputT> serializableFunction = this.fn;
        Objects.requireNonNull(serializableFunction);
        Stream<R> map = stream.map((v1) -> {
            return r1.apply(v1);
        });
        Objects.requireNonNull(outputReceiver);
        map.forEach(outputReceiver::output);
        if (!this.keep) {
            LOG.info("Deleting batch of {}", Integer.valueOf(this.batch.size()));
            this.remoteFileUtil.delete(list);
        }
        this.batch.clear();
    }

    private void processBatch(DoFn<URI, OutputT>.FinishBundleContext finishBundleContext) {
        if (this.batch.isEmpty()) {
            return;
        }
        LOG.info("Processing batch of {}", Integer.valueOf(this.batch.size()));
        List<URI> list = (List) this.batch.stream().map(element -> {
            return element.uri;
        }).collect(Collectors.toList());
        Stream<Path> stream = this.remoteFileUtil.download(list).stream();
        SerializableFunction<Path, OutputT> serializableFunction = this.fn;
        Objects.requireNonNull(serializableFunction);
        Iterator it = ((List) stream.map((v1) -> {
            return r1.apply(v1);
        }).collect(Collectors.toList())).iterator();
        Iterator<FileDownloadDoFn<OutputT>.Element> it2 = this.batch.iterator();
        while (it.hasNext() && it2.hasNext()) {
            FileDownloadDoFn<OutputT>.Element next = it2.next();
            finishBundleContext.output(it.next(), ((Element) next).timestamp, ((Element) next).window);
        }
        if (!this.keep) {
            LOG.info("Deleting batch of {}", Integer.valueOf(this.batch.size()));
            this.remoteFileUtil.delete(list);
        }
        this.batch.clear();
    }
}
