package eu.stratosphere.sopremo.pact;

import com.google.common.reflect.TypeToken;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.common.functions.GenericReducer;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.SopremoEnvironment;
import eu.stratosphere.sopremo.serialization.SopremoRecord;
import eu.stratosphere.sopremo.type.ArrayNode;
import eu.stratosphere.sopremo.type.IJsonNode;
import eu.stratosphere.sopremo.type.IStreamNode;
import eu.stratosphere.sopremo.type.StreamNode;
import eu.stratosphere.sopremo.type.typed.TypedObjectNode;
import eu.stratosphere.util.Collector;
import java.util.Iterator;

/* loaded from: input_file:eu/stratosphere/sopremo/pact/GenericSopremoReduce.class */
public abstract class GenericSopremoReduce<Elem extends IJsonNode, Out extends IJsonNode> extends AbstractFunction implements GenericReducer<SopremoRecord, SopremoRecord>, SopremoFunction {
    private EvaluationContext context;
    private JsonCollector<Out> collector;
    private RecordToJsonIterator<? extends Elem> iterator;
    private final StreamNode<Elem> array = new StreamNode<>();

    public void combine(Iterator<SopremoRecord> it, Collector<SopremoRecord> collector) throws Exception {
        this.collector.configure(collector);
        this.iterator.setIterator(it);
        try {
            if (SopremoUtil.LOG.isTraceEnabled()) {
                ArrayNode arrayNode = new ArrayNode(this.array);
                SopremoUtil.LOG.trace(String.format("%s %s", getContext().getOperatorDescription(), arrayNode));
                combine(arrayNode, this.collector);
            } else {
                combine(this.array, this.collector);
            }
        } catch (RuntimeException e) {
            SopremoUtil.LOG.error(String.format("Error occurred @ %s with %s: %s", getContext().getOperatorDescription(), this.array, e));
            throw e;
        }
    }

    @Override // eu.stratosphere.sopremo.pact.SopremoFunction
    public final EvaluationContext getContext() {
        return this.context;
    }

    public void open(Configuration configuration) {
        SopremoEnvironment.getInstance().setConfiguration(configuration);
        this.context = SopremoEnvironment.getInstance().getEvaluationContext();
        TypedObjectNode typedObjectNode = SopremoUtil.getTypedNodes(TypeToken.of(getClass()).getSupertype(GenericSopremoReduce.class))[0];
        this.iterator = typedObjectNode == null ? new UntypedRecordToJsonIterator<>() : new TypedRecordToJsonIterator<>(typedObjectNode);
        this.collector = new JsonCollector<>(this.context);
        SopremoUtil.configureWithTransferredState(this, GenericSopremoReduce.class, configuration);
        this.array.setNodeIterator(this.iterator);
    }

    public void reduce(Iterator<SopremoRecord> it, Collector<SopremoRecord> collector) {
        this.collector.configure(collector);
        this.iterator.setIterator(it);
        try {
            if (SopremoUtil.LOG.isTraceEnabled()) {
                ArrayNode arrayNode = new ArrayNode(this.array);
                SopremoUtil.LOG.trace(String.format("%s %s", getContext().getOperatorDescription(), arrayNode));
                reduce(arrayNode, this.collector);
            } else {
                reduce(this.array, this.collector);
            }
        } catch (RuntimeException e) {
            SopremoUtil.LOG.error(String.format("Error occurred @ %s with %s: %s", getContext().getOperatorDescription(), this.array, e));
            throw e;
        }
    }

    protected void combine(IStreamNode<Elem> iStreamNode, JsonCollector<Out> jsonCollector) {
        reduce(iStreamNode, jsonCollector);
    }

    protected abstract void reduce(IStreamNode<Elem> iStreamNode, JsonCollector<Out> jsonCollector);
}
