package datafu.hourglass.mapreduce;

import datafu.hourglass.fs.PathUtils;
import datafu.hourglass.model.Accumulator;
import datafu.hourglass.schemas.PartitionPreservingSchemas;
import java.io.IOException;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;

/* loaded from: input_file:datafu/hourglass/mapreduce/PartitioningReducer.class */
public class PartitioningReducer extends ObjectReducer implements Serializable {
    private transient AvroMultipleOutputs _multipleOutputs;
    private transient Map<Long, String> _timeToNamedOutput;
    private PartitionPreservingSchemas _schemas;
    private Accumulator<GenericRecord, GenericRecord> accumulator;

    @Override // datafu.hourglass.mapreduce.ObjectReducer
    public void reduce(Object obj, Iterable<Object> iterable, ReduceContext<Object, Object, Object, Object> reduceContext) throws IOException, InterruptedException {
        GenericRecord genericRecord;
        Accumulator<GenericRecord, GenericRecord> accumulator = getAccumulator();
        if (accumulator == null) {
            throw new RuntimeException("No accumulator set for reducer!");
        }
        accumulator.cleanup();
        GenericRecord genericRecord2 = (GenericRecord) ((AvroKey) obj).datum();
        Long l = (Long) genericRecord2.get("time");
        GenericRecord genericRecord3 = (GenericRecord) genericRecord2.get("value");
        long j = 0;
        Iterator<Object> it = iterable.iterator();
        while (it.hasNext()) {
            accumulator.accumulate((GenericRecord) ((AvroValue) it.next()).datum());
            j++;
        }
        if (j <= 0 || (genericRecord = accumulator.getFinal()) == null) {
            return;
        }
        GenericData.Record record = new GenericData.Record(getSchemas().getReduceOutputSchema());
        record.put("key", genericRecord3);
        record.put("value", genericRecord);
        String namedOutput = getNamedOutput(l);
        if (this._multipleOutputs == null) {
            throw new RuntimeException("No multiple outputs set");
        }
        this._multipleOutputs.write(namedOutput, new AvroKey(record), (AvroValue) null);
    }

    @Override // datafu.hourglass.mapreduce.ObjectProcessor
    public void setContext(TaskInputOutputContext<Object, Object, Object, Object> taskInputOutputContext) {
        super.setContext(taskInputOutputContext);
        this._multipleOutputs = new AvroMultipleOutputs(taskInputOutputContext);
    }

    public void setAccumulator(Accumulator<GenericRecord, GenericRecord> accumulator) {
        this.accumulator = accumulator;
    }

    public Accumulator<GenericRecord, GenericRecord> getAccumulator() {
        return this.accumulator;
    }

    public void setSchemas(PartitionPreservingSchemas partitionPreservingSchemas) {
        this._schemas = partitionPreservingSchemas;
    }

    public PartitionPreservingSchemas getSchemas() {
        return this._schemas;
    }

    @Override // datafu.hourglass.mapreduce.ObjectProcessor
    public void close() throws IOException, InterruptedException {
        super.close();
        if (this._multipleOutputs != null) {
            this._multipleOutputs.close();
            this._multipleOutputs = null;
        }
    }

    private String getNamedOutput(Long l) {
        if (this._timeToNamedOutput == null) {
            this._timeToNamedOutput = new HashMap();
        }
        String str = this._timeToNamedOutput.get(l);
        if (str == null) {
            str = PathUtils.datedPathFormat.format(new Date(l.longValue()));
            this._timeToNamedOutput.put(l, str);
        }
        return str;
    }
}
