package com.spotify.scio.transforms;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/scio/transforms/BatchDoFn.class */
public class BatchDoFn<InputT> extends DoFn<InputT, Iterable<InputT>> {
    private static final Logger LOG = LoggerFactory.getLogger(BatchDoFn.class);
    public static final int DEFAULT_MAX_LIVE_WINDOWS = 10;
    private final long maxWeight;
    private final SerializableFunction<InputT, Long> weigher;
    private final int maxLiveWindows;
    private transient Map<BoundedWindow, Buffer<InputT>> buffers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spotify/scio/transforms/BatchDoFn$Buffer.class */
    public static class Buffer<InputT> {
        private final List<InputT> elements = new ArrayList();
        private long weight = 0;

        Buffer() {
        }

        public List<InputT> getElements() {
            return this.elements;
        }

        public long getWeight() {
            return this.weight;
        }

        public void add(InputT inputt, long j) {
            this.elements.add(inputt);
            this.weight += j;
        }
    }

    public BatchDoFn(long j, SerializableFunction<InputT, Long> serializableFunction) {
        this(j, serializableFunction, 10);
    }

    public BatchDoFn(long j, SerializableFunction<InputT, Long> serializableFunction, int i) {
        this.maxWeight = j;
        this.weigher = serializableFunction;
        this.maxLiveWindows = i;
    }

    @DoFn.Setup
    public void setup() {
        this.buffers = new HashMap();
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element InputT inputt, BoundedWindow boundedWindow, DoFn.OutputReceiver<Iterable<InputT>> outputReceiver) {
        LOG.debug("*** BATCH *** Add element for window {} ", boundedWindow);
        Buffer<InputT> computeIfAbsent = this.buffers.computeIfAbsent(boundedWindow, boundedWindow2 -> {
            return new Buffer();
        });
        computeIfAbsent.add(inputt, ((Long) this.weigher.apply(inputt)).longValue());
        if (computeIfAbsent.getWeight() >= this.maxWeight) {
            LOG.debug("*** END OF BATCH *** for window {}", boundedWindow);
            flushBatch(boundedWindow, outputReceiver);
        } else if (this.buffers.size() > this.maxLiveWindows) {
            BoundedWindow boundedWindow3 = (BoundedWindow) ((Map.Entry) Collections.max(this.buffers.entrySet(), Comparator.comparingLong(entry -> {
                return ((Buffer) entry.getValue()).getWeight();
            }))).getKey();
            LOG.debug("*** END OF BATCH *** for window {}", boundedWindow3);
            flushBatch(boundedWindow3, outputReceiver);
        }
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<InputT, Iterable<InputT>>.FinishBundleContext finishBundleContext) {
        for (Map.Entry<BoundedWindow, Buffer<InputT>> entry : this.buffers.entrySet()) {
            BoundedWindow key = entry.getKey();
            finishBundleContext.output(entry.getValue().getElements(), key.maxTimestamp(), key);
        }
        this.buffers.clear();
    }

    private void flushBatch(BoundedWindow boundedWindow, DoFn.OutputReceiver<Iterable<InputT>> outputReceiver) {
        outputReceiver.outputWithTimestamp(((Buffer) this.buffers.remove(boundedWindow)).elements, boundedWindow.maxTimestamp());
    }
}
