package edu.cmu.graphchi.hadoop;

import edu.cmu.graphchi.ChiLogger;
import edu.cmu.graphchi.preprocessing.EdgeProcessor;
import edu.cmu.graphchi.preprocessing.FastSharder;
import java.io.IOException;
import java.util.logging.Logger;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.tools.pigstats.PigStatusReporter;

/* loaded from: input_file:edu/cmu/graphchi/hadoop/PigGraphChiBase.class */
public abstract class PigGraphChiBase extends LoadFunc implements LoadMetadata {
    private static final Logger logger = ChiLogger.getLogger("pig-graphchi-base");
    private String location;
    private Job job;
    private boolean activeNode = false;
    private boolean ready = false;
    private String status = "initializing";

    protected abstract String getSchemaString();

    public ResourceSchema getSchema(String str, Job job) throws IOException {
        return null;
    }

    public ResourceStatistics getStatistics(String str, Job job) throws IOException {
        return null;
    }

    public String[] getPartitionKeys(String str, Job job) throws IOException {
        return null;
    }

    public void setPartitionFilter(Expression expression) throws IOException {
    }

    public InputFormat getInputFormat() throws IOException {
        return new PigTextInputFormat();
    }

    protected abstract int getNumShards();

    /* JADX INFO: Access modifiers changed from: protected */
    public String getGraphName() {
        return "pigudfgraph";
    }

    public void setLocation(String str, Job job) throws IOException {
        logger.info("Set HDFS location for GraphChi Pig: " + str);
        PigTextInputFormat.setInputPaths(job, str);
        this.location = str;
        this.job = job;
    }

    public void setStatusString(String str) {
        this.status = str;
    }

    protected abstract void runGraphChi() throws Exception;

    protected abstract FastSharder createSharder(String str, int i) throws IOException;

    public void prepareToRead(RecordReader recordReader, final PigSplit pigSplit) throws IOException {
        try {
            int i = 0;
            for (String str : pigSplit.getLocations()) {
                int i2 = i;
                i++;
                System.out.println(i2 + "Split : " + str);
            }
            System.out.println("Num paths: " + pigSplit.getNumPaths());
            System.out.println("" + pigSplit.getConf());
            System.out.println("split index " + pigSplit.getSplitIndex());
            new Thread(new Runnable() { // from class: edu.cmu.graphchi.hadoop.PigGraphChiBase.1
                @Override // java.lang.Runnable
                public void run() {
                    while (!PigGraphChiBase.this.ready) {
                        PigStatusReporter.getInstance().progress();
                        PigStatusReporter.getInstance().setStatus("GraphChi running (0): " + PigGraphChiBase.this.getStatusString());
                        try {
                            Thread.sleep(5000L);
                        } catch (InterruptedException e) {
                        }
                    }
                }
            }).start();
            if (pigSplit.getSplitIndex() > 0) {
                PigStatusReporter.getInstance().setStatus("Redundant GraphChi-mapper - will die");
                throw new RuntimeException("Split index > 0 -- this mapper will die (expected, not an error).");
            }
            this.activeNode = true;
            new Thread(new Runnable() { // from class: edu.cmu.graphchi.hadoop.PigGraphChiBase.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        PigGraphChiBase.this.setStatusString("Preprocessing: reading data from HDFS: " + PigGraphChiBase.this.location);
                        final FastSharder createSharder = PigGraphChiBase.this.createSharder(PigGraphChiBase.this.getGraphName(), PigGraphChiBase.this.getNumShards());
                        new HDFSGraphLoader(PigGraphChiBase.this.location, new EdgeProcessor<Float>() { // from class: edu.cmu.graphchi.hadoop.PigGraphChiBase.2.1
                            long counter = 0;

                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // edu.cmu.graphchi.preprocessing.EdgeProcessor
                            public Float receiveEdge(int i3, int i4, String str2) {
                                try {
                                    createSharder.addEdge(i3, i4, str2);
                                    this.counter++;
                                    if (this.counter % 100000 == 0) {
                                        PigGraphChiBase.this.setStatusString("Preprocessing, read " + this.counter + " edges");
                                    }
                                    return null;
                                } catch (IOException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        }).load(pigSplit.getConf());
                        PigGraphChiBase.this.setStatusString("Sharding...");
                        createSharder.process();
                        PigGraphChiBase.logger.info("Starting to run GraphChi");
                        PigGraphChiBase.this.setStatusString("Start GraphChi engine");
                        PigGraphChiBase.this.runGraphChi();
                        PigGraphChiBase.logger.info("Ready.");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    PigGraphChiBase.this.ready = true;
                }
            }).start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    protected String getStatusString() {
        return this.status;
    }

    protected abstract Tuple getNextResult(TupleFactory tupleFactory) throws ExecException;

    public Tuple getNext() throws IOException {
        if (!this.activeNode) {
            return null;
        }
        while (!this.ready) {
            logger.info("GraphChi-Java running: waiting for graphchi-engine to finish: " + getStatusString());
            PigStatusReporter.getInstance().setStatus(getStatusString());
            PigStatusReporter.getInstance().progress();
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
            }
        }
        return getNextResult(TupleFactory.getInstance());
    }
}
