package eu.stratosphere.sopremo.pact;

import com.google.common.reflect.TypeToken;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.common.functions.GenericCoGrouper;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.SopremoEnvironment;
import eu.stratosphere.sopremo.serialization.SopremoRecord;
import eu.stratosphere.sopremo.type.ArrayNode;
import eu.stratosphere.sopremo.type.IJsonNode;
import eu.stratosphere.sopremo.type.IStreamNode;
import eu.stratosphere.sopremo.type.StreamNode;
import eu.stratosphere.sopremo.type.typed.TypedObjectNode;
import eu.stratosphere.util.Collector;
import java.util.Iterator;

/* loaded from: input_file:eu/stratosphere/sopremo/pact/GenericSopremoCoGroup.class */
public abstract class GenericSopremoCoGroup<LeftElem extends IJsonNode, RightElem extends IJsonNode, Out extends IJsonNode> extends AbstractFunction implements GenericCoGrouper<SopremoRecord, SopremoRecord, SopremoRecord>, SopremoFunction {
    private EvaluationContext context;
    private JsonCollector<Out> collector;
    private RecordToJsonIterator<LeftElem> cachedIterator1;
    private RecordToJsonIterator<RightElem> cachedIterator2;
    private final StreamNode<LeftElem> leftArray = new StreamNode<>();
    private final StreamNode<RightElem> rightArray = new StreamNode<>();

    public void coGroup(Iterator<SopremoRecord> it, Iterator<SopremoRecord> it2, Collector<SopremoRecord> collector) {
        this.collector.configure(collector);
        this.cachedIterator1.setIterator(it);
        this.cachedIterator2.setIterator(it2);
        try {
            if (SopremoUtil.LOG.isTraceEnabled()) {
                ArrayNode arrayNode = new ArrayNode(this.leftArray);
                ArrayNode arrayNode2 = new ArrayNode(this.rightArray);
                SopremoUtil.LOG.trace(String.format("%s %s/%s", getContext().getOperatorDescription(), arrayNode, arrayNode2));
                coGroup(new StreamNode(arrayNode.iterator()), new StreamNode(arrayNode2.iterator()), this.collector);
            } else {
                coGroup(this.leftArray, this.rightArray, this.collector);
            }
        } catch (RuntimeException e) {
            SopremoUtil.LOG.error(String.format("Error occurred @ %s with %s/%s: %s", getContext().getOperatorDescription(), this.leftArray, this.rightArray, e));
            throw e;
        }
    }

    public void combineFirst(Iterator<SopremoRecord> it, Collector<SopremoRecord> collector) {
        throw new UnsupportedOperationException();
    }

    public void combineSecond(Iterator<SopremoRecord> it, Collector<SopremoRecord> collector) {
        throw new UnsupportedOperationException();
    }

    @Override // eu.stratosphere.sopremo.pact.SopremoFunction
    public final EvaluationContext getContext() {
        return this.context;
    }

    public void open(Configuration configuration) throws Exception {
        SopremoEnvironment.getInstance().setConfiguration(configuration);
        this.context = SopremoEnvironment.getInstance().getEvaluationContext();
        this.collector = new JsonCollector<>(this.context);
        SopremoUtil.configureWithTransferredState(this, GenericSopremoCoGroup.class, configuration);
        TypedObjectNode[] typedNodes = SopremoUtil.getTypedNodes(TypeToken.of(getClass()).getSupertype(GenericSopremoCoGroup.class));
        this.cachedIterator1 = typedNodes[0] == null ? new UntypedRecordToJsonIterator<>() : new TypedRecordToJsonIterator<>(typedNodes[0]);
        this.cachedIterator2 = typedNodes[1] == null ? new UntypedRecordToJsonIterator<>() : new TypedRecordToJsonIterator<>(typedNodes[1]);
        this.leftArray.setNodeIterator(this.cachedIterator1);
        this.rightArray.setNodeIterator(this.cachedIterator2);
    }

    protected abstract void coGroup(IStreamNode<LeftElem> iStreamNode, IStreamNode<RightElem> iStreamNode2, JsonCollector<Out> jsonCollector);
}
