package com.datasalt.pangool.utils.test;

import com.datasalt.pangool.io.ITuple;
import com.datasalt.pangool.io.Schema;
import com.datasalt.pangool.io.Tuple;
import com.datasalt.pangool.io.TupleFile;
import com.datasalt.pangool.utils.HadoopUtils;
import com.datasalt.pangool.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Assert;
import org.junit.Before;

/* loaded from: input_file:com/datasalt/pangool/utils/test/AbstractHadoopTestLibrary.class */
public abstract class AbstractHadoopTestLibrary extends AbstractBaseTest {
    protected FileSystem fS;
    protected Map<String, List<Pair<Object, Object>>> outputs = new HashMap();
    protected Map<String, Object> inputs = new HashMap();

    /* loaded from: input_file:com/datasalt/pangool/utils/test/AbstractHadoopTestLibrary$PrintVisitor.class */
    public static class PrintVisitor extends TupleVisitor {
        @Override // com.datasalt.pangool.utils.test.AbstractHadoopTestLibrary.TupleVisitor
        public void onTuple(ITuple iTuple) {
            System.out.println(iTuple);
        }
    }

    /* loaded from: input_file:com/datasalt/pangool/utils/test/AbstractHadoopTestLibrary$TupleVisitor.class */
    public static abstract class TupleVisitor {
        public abstract void onTuple(ITuple iTuple);
    }

    @Before
    public void initHadoop() throws IOException {
        this.fS = FileSystem.get(getConf());
    }

    private SequenceFile.Writer openWriter(String str, Class cls, Class cls2) throws IOException {
        return new SequenceFile.Writer(this.fS, getConf(), new Path(str), cls, cls2);
    }

    private TupleFile.Writer openTupleWriter(String str, Schema schema) throws IOException {
        return new TupleFile.Writer(this.fS, getConf(), new Path(str), schema);
    }

    public Writable writable(Object obj) {
        if (obj instanceof Integer) {
            return new IntWritable(((Integer) obj).intValue());
        }
        if (obj instanceof Double) {
            return new DoubleWritable(((Double) obj).doubleValue());
        }
        if (obj instanceof Long) {
            return new LongWritable(((Long) obj).longValue());
        }
        if (obj instanceof String) {
            return new Text((String) obj);
        }
        if (obj instanceof Float) {
            return new FloatWritable(((Float) obj).floatValue());
        }
        if (obj instanceof Boolean) {
            return new BooleanWritable(((Boolean) obj).booleanValue());
        }
        return null;
    }

    public void assertRun(Job job) throws IOException, InterruptedException, ClassNotFoundException {
        HadoopUtils.deleteIfExists(FileSystem.get(job.getConfiguration()), FileOutputFormat.getOutputPath(job));
        Iterator<Map.Entry<String, Object>> it = this.inputs.entrySet().iterator();
        while (it.hasNext()) {
            Object value = it.next().getValue();
            if (value instanceof SequenceFile.Writer) {
                ((SequenceFile.Writer) value).close();
            } else if (value instanceof TupleFile.Writer) {
                ((TupleFile.Writer) value).close();
            }
        }
        job.waitForCompletion(true);
        Assert.assertTrue(job.isSuccessful());
    }

    public void cleanUp() throws IOException {
        Iterator<Map.Entry<String, Object>> it = this.inputs.entrySet().iterator();
        while (it.hasNext()) {
            trash(it.next().getKey());
        }
        Iterator<Map.Entry<String, List<Pair<Object, Object>>>> it2 = this.outputs.entrySet().iterator();
        while (it2.hasNext()) {
            Path path = new Path(it2.next().getKey());
            if (path.toString().contains("-0000")) {
                path = path.getParent();
            }
            trash(path.toString());
        }
    }

    protected void trash(String... strArr) throws IOException {
        for (String str : strArr) {
            HadoopUtils.deleteIfExists(this.fS, new Path(str));
        }
    }

    protected String firstReducerOutput(String str) {
        return str + "/part-r-00000";
    }

    protected String firstMapOutput(String str) {
        return str + "/part-m-00000";
    }

    protected String firstReducerMultiOutput(String str, String str2) {
        return str + "/" + str2 + "-r-00000";
    }

    protected String firstMapperMultiOutput(String str, String str2) {
        return str + "/" + str2 + "-m-00000";
    }

    public AbstractHadoopTestLibrary withInput(String str, Object obj, Object obj2) throws IOException {
        SequenceFile.Writer writer = (SequenceFile.Writer) this.inputs.get(str);
        if (writer == null) {
            writer = openWriter(str, obj.getClass(), obj2.getClass());
            this.inputs.put(str, writer);
        }
        writer.append(obj, obj2);
        return this;
    }

    public AbstractHadoopTestLibrary withInput(String str, Object obj) throws IOException {
        return withInput(str, obj, NullWritable.get());
    }

    public AbstractHadoopTestLibrary withTupleInput(String str, ITuple iTuple) throws IOException {
        TupleFile.Writer writer = (TupleFile.Writer) this.inputs.get(str);
        if (writer == null) {
            writer = openTupleWriter(str, iTuple.getSchema());
            this.inputs.put(str, writer);
        }
        writer.append(iTuple);
        return this;
    }

    public void withOutput(String str, Object obj) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
        withOutput(str, obj, NullWritable.get());
    }

    public static void readTuples(Path path, Configuration configuration, TupleVisitor tupleVisitor) throws IOException, InterruptedException {
        TupleFile.Reader reader = new TupleFile.Reader(FileSystem.get(path.toUri(), configuration), configuration, path);
        Tuple tuple = new Tuple(reader.getSchema());
        while (reader.next(tuple)) {
            tupleVisitor.onTuple(tuple);
        }
        reader.close();
    }

    public void withTupleOutput(String str, final ITuple iTuple) throws IOException, InterruptedException {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        readTuples(new Path(str), new Configuration(), new TupleVisitor() { // from class: com.datasalt.pangool.utils.test.AbstractHadoopTestLibrary.1
            @Override // com.datasalt.pangool.utils.test.AbstractHadoopTestLibrary.TupleVisitor
            public void onTuple(ITuple iTuple2) {
                atomicInteger.incrementAndGet();
                if (iTuple2.equals(iTuple)) {
                    atomicBoolean.set(true);
                }
            }
        });
        if (atomicBoolean.get()) {
            return;
        }
        if (atomicInteger.get() == 0) {
            throw new AssertionError("Empty output " + str);
        }
        System.err.println("Not found in output. Tuple: " + iTuple);
        readTuples(new Path(str), new Configuration(), new TupleVisitor() { // from class: com.datasalt.pangool.utils.test.AbstractHadoopTestLibrary.2
            @Override // com.datasalt.pangool.utils.test.AbstractHadoopTestLibrary.TupleVisitor
            public void onTuple(ITuple iTuple2) {
                System.err.println("Output entry -> Tuple: " + iTuple2);
            }
        });
        throw new AssertionError("Not found in output -> Tuple: " + iTuple);
    }

    public void withOutput(String str, Object obj, Object obj2) throws IOException {
        List<Pair<Object, Object>> ensureOutput = ensureOutput(str);
        for (Pair<Object, Object> pair : ensureOutput) {
            if (pair.getFirst().equals(obj) && pair.getSecond().equals(obj2)) {
                return;
            }
        }
        if (ensureOutput.size() == 0) {
            throw new AssertionError("Empty output " + str);
        }
        System.err.println("Not found in output. KEY: " + obj + ", VALUE: " + obj2);
        for (Pair<Object, Object> pair2 : ensureOutput) {
            System.err.println("Output entry -> KEY: " + pair2.getFirst() + ", VALUE: " + pair2.getSecond());
        }
        throw new AssertionError("Not found in output -> KEY: " + obj + ", VALUE: " + obj2);
    }

    public List<Pair<Object, Object>> ensureOutput(String str) throws IOException {
        List<Pair<Object, Object>> list = this.outputs.get(str);
        if (list == null) {
            list = new ArrayList();
            SequenceFile.Reader reader = new SequenceFile.Reader(this.fS, new Path(str), getConf());
            Object newInstance = ReflectionUtils.newInstance(reader.getKeyClass(), getConf());
            Object newInstance2 = ReflectionUtils.newInstance(reader.getValueClass(), getConf());
            while (true) {
                Object obj = newInstance2;
                if (reader.next(newInstance) == null) {
                    break;
                }
                list.add(new Pair<>(newInstance, reader.getCurrentValue(obj)));
                newInstance = ReflectionUtils.newInstance(reader.getKeyClass(), getConf());
                newInstance2 = ReflectionUtils.newInstance(reader.getValueClass(), getConf());
            }
            reader.close();
            this.outputs.put(str, list);
        }
        return list;
    }

    public void dumpOutput(String str) throws IOException {
        for (Pair<Object, Object> pair : ensureOutput(str)) {
            System.out.println("KEY: " + pair.getFirst() + ", VALUE: " + pair.getSecond());
        }
    }
}
