package datafu.hourglass.mapreduce;

import datafu.hourglass.fs.PathUtils;
import datafu.hourglass.model.KeyValueCollector;
import datafu.hourglass.model.Mapper;
import datafu.hourglass.schemas.PartitionCollapsingSchemas;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.avro.UnresolvedUnionException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;

/* loaded from: input_file:datafu/hourglass/mapreduce/CollapsingMapper.class */
public class CollapsingMapper extends ObjectMapper implements Serializable {
    private static Logger _log = Logger.getLogger(CollapsingMapper.class);
    private transient IdentityMapCollector _mapCollector;
    private transient TimeMapCollector _timeMapCollector;
    private boolean _reusePreviousOutput;
    private PartitionCollapsingSchemas _schemas;
    private Mapper<GenericRecord, GenericRecord, GenericRecord> _mapper;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datafu/hourglass/mapreduce/CollapsingMapper$IdentityMapCollector.class */
    public class IdentityMapCollector extends MapCollector {
        public IdentityMapCollector(PartitionCollapsingSchemas partitionCollapsingSchemas) {
            super();
        }

        @Override // datafu.hourglass.model.KeyValueCollector
        public void collect(GenericRecord genericRecord, GenericRecord genericRecord2) throws IOException, InterruptedException {
            if (genericRecord == null) {
                throw new RuntimeException("key is null");
            }
            if (genericRecord2 == null) {
                throw new RuntimeException("value is null");
            }
            getContext().write(new AvroKey(genericRecord), new AvroValue(genericRecord2));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datafu/hourglass/mapreduce/CollapsingMapper$MapCollector.class */
    public abstract class MapCollector implements KeyValueCollector<GenericRecord, GenericRecord> {
        private MapContext<Object, Object, Object, Object> context;

        private MapCollector() {
        }

        public void setContext(MapContext<Object, Object, Object, Object> mapContext) {
            this.context = mapContext;
        }

        public MapContext<Object, Object, Object, Object> getContext() {
            return this.context;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datafu/hourglass/mapreduce/CollapsingMapper$TimeMapCollector.class */
    public class TimeMapCollector extends MapCollector {
        private GenericRecord wrappedValue;
        private InputSplit lastSplit;
        private long lastTime;

        public TimeMapCollector(PartitionCollapsingSchemas partitionCollapsingSchemas) {
            super();
            this.wrappedValue = new GenericData.Record(partitionCollapsingSchemas.getDatedIntermediateValueSchema());
        }

        @Override // datafu.hourglass.model.KeyValueCollector
        public void collect(GenericRecord genericRecord, GenericRecord genericRecord2) throws IOException, InterruptedException {
            long time;
            if (genericRecord == null) {
                throw new RuntimeException("key is null");
            }
            if (genericRecord2 == null) {
                throw new RuntimeException("value is null");
            }
            if (this.lastSplit == getContext().getInputSplit()) {
                time = this.lastTime;
            } else {
                this.lastSplit = getContext().getInputSplit();
                try {
                    Method method = getContext().getInputSplit().getClass().getMethod("getInputSplit", new Class[0]);
                    method.setAccessible(true);
                    time = PathUtils.getDateForNestedDatedPath(((FileSplit) method.invoke(getContext().getInputSplit(), new Object[0])).getPath().getParent()).getTime();
                    this.lastTime = time;
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                } catch (IllegalArgumentException e2) {
                    throw new RuntimeException(e2);
                } catch (NoSuchMethodException e3) {
                    throw new RuntimeException(e3);
                } catch (SecurityException e4) {
                    throw new RuntimeException(e4);
                } catch (InvocationTargetException e5) {
                    throw new RuntimeException(e5);
                }
            }
            this.wrappedValue.put("time", Long.valueOf(time));
            this.wrappedValue.put("value", genericRecord2);
            getContext().write(new AvroKey(genericRecord), new AvroValue(this.wrappedValue));
        }
    }

    @Override // datafu.hourglass.mapreduce.ObjectMapper
    public void map(Object obj, MapContext<Object, Object, Object, Object> mapContext) throws IOException, InterruptedException {
        GenericRecord genericRecord = (GenericRecord) ((AvroKey) obj).datum();
        try {
            getMapCollector().setContext(mapContext);
            getMapper().map(genericRecord, getMapCollector());
        } catch (InterruptedException e) {
            throw new IOException(e);
        } catch (UnresolvedUnionException e2) {
            _log.error("UnresolvedUnionException on schema: " + ((GenericRecord) e2.getUnresolvedDatum()).getSchema());
            throw e2;
        }
    }

    public boolean getReuseOutput() {
        return this._reusePreviousOutput;
    }

    public void setReuseOutput(boolean z) {
        this._reusePreviousOutput = z;
    }

    @Override // datafu.hourglass.mapreduce.ObjectProcessor
    public void setContext(TaskInputOutputContext<Object, Object, Object, Object> taskInputOutputContext) {
        super.setContext(taskInputOutputContext);
        if (this._mapper instanceof Configurable) {
            this._mapper.setConf(taskInputOutputContext.getConfiguration());
        }
    }

    public Mapper<GenericRecord, GenericRecord, GenericRecord> getMapper() {
        return this._mapper;
    }

    public void setMapper(Mapper<GenericRecord, GenericRecord, GenericRecord> mapper) {
        this._mapper = mapper;
    }

    public void setSchemas(PartitionCollapsingSchemas partitionCollapsingSchemas) {
        this._schemas = partitionCollapsingSchemas;
    }

    public PartitionCollapsingSchemas getSchemas() {
        return this._schemas;
    }

    private MapCollector getMapCollector() {
        return getReuseOutput() ? getTimeMapCollector() : getIdentityMapCollector();
    }

    private TimeMapCollector getTimeMapCollector() {
        if (this._timeMapCollector == null) {
            this._timeMapCollector = new TimeMapCollector(getSchemas());
        }
        return this._timeMapCollector;
    }

    private IdentityMapCollector getIdentityMapCollector() {
        if (this._mapCollector == null) {
            this._mapCollector = new IdentityMapCollector(getSchemas());
        }
        return this._mapCollector;
    }
}
