package edu.cmu.graphchi.walks;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
import edu.cmu.graphchi.ChiLogger;
import edu.cmu.graphchi.ChiVertex;
import edu.cmu.graphchi.GraphChiContext;
import edu.cmu.graphchi.engine.VertexInterval;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

/* loaded from: input_file:edu/cmu/graphchi/walks/DrunkardDriver.class */
public abstract class DrunkardDriver<VertexDataType, EdgeDataType> implements GrabbedBucketConsumer {
    private WalkSnapshot curWalkSnapshot;
    protected final DrunkardJob job;
    protected static Logger logger = ChiLogger.getLogger("drunkard-driver");
    WalkUpdateFunction<VertexDataType, EdgeDataType> callback;
    protected LinkedBlockingQueue<BucketsToSend> bucketQueue = new LinkedBlockingQueue<>();
    protected AtomicBoolean finished = new AtomicBoolean(false);
    protected AtomicLong pendingWalksToSubmit = new AtomicLong(0);
    private final Timer purgeTimer = Metrics.defaultRegistry().newTimer(DrunkardMobEngine.class, "purge-localwalks", TimeUnit.SECONDS, TimeUnit.MINUTES);
    private ArrayList<LocalWalkBuffer> localBuffers = new ArrayList<>();
    private Thread dumperThread = new Thread(createDumperThread());

    /* JADX INFO: Access modifiers changed from: package-private */
    public DrunkardDriver(DrunkardJob drunkardJob, WalkUpdateFunction<VertexDataType, EdgeDataType> walkUpdateFunction) {
        this.job = drunkardJob;
        this.callback = walkUpdateFunction;
        this.dumperThread.start();
    }

    protected abstract DumperThread createDumperThread();

    public DrunkardJob getJob() {
        return this.job;
    }

    protected abstract DrunkardContext createDrunkardContext(int i, GraphChiContext graphChiContext, LocalWalkBuffer localWalkBuffer);

    public void update(ChiVertex<VertexDataType, EdgeDataType> chiVertex, GraphChiContext graphChiContext, LocalWalkBuffer localWalkBuffer) {
        while (this.pendingWalksToSubmit.get() > this.job.getWalkManager().getTotalWalks() / 40) {
            try {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
            } catch (RemoteException e2) {
                throw new RuntimeException((Throwable) e2);
            }
        }
        boolean z = graphChiContext.getIteration() == 0;
        WalkArray walksAtVertex = this.curWalkSnapshot.getWalksAtVertex(chiVertex.getId(), true);
        this.curWalkSnapshot.clear(chiVertex.getId());
        if (z && this.job.getWalkManager().isSource(chiVertex.getId())) {
            this.job.getCompanion().setAvoidList(this.job.getWalkManager().getVertexSourceIdx(chiVertex.getId()), this.callback.getNotTrackedVertices(chiVertex));
        }
        if (walksAtVertex == null || walksAtVertex.size() == 0) {
            return;
        }
        this.callback.processWalksAtVertex(walksAtVertex, chiVertex, createDrunkardContext(chiVertex.getId(), graphChiContext, localWalkBuffer), localWalkBuffer.random);
    }

    public void initWalks() throws RemoteException {
        this.job.getWalkManager().initializeWalks();
        this.job.getCompanion().setSources(this.job.getWalkManager().getSources());
    }

    public void beginIteration(GraphChiContext graphChiContext) {
        if (graphChiContext.getIteration() == 0) {
            graphChiContext.getScheduler().removeAllTasks();
            this.job.getWalkManager().populateSchedulerWithSources(graphChiContext.getScheduler());
        }
    }

    public void endIteration(GraphChiContext graphChiContext) {
    }

    public void spinUntilFinish() {
        this.finished.set(true);
        while (this.bucketQueue.size() > 0) {
            try {
                System.out.println("Waiting ..." + this.bucketQueue.size());
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            this.dumperThread.join();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addLocalBuffer(LocalWalkBuffer localWalkBuffer) {
        this.localBuffers.add(localWalkBuffer);
    }

    public void beginSubInterval(GraphChiContext graphChiContext, VertexInterval vertexInterval) {
        long currentTimeMillis = System.currentTimeMillis();
        this.curWalkSnapshot = this.job.getWalkManager().grabSnapshot(vertexInterval.getFirstVertex(), vertexInterval.getLastVertex());
        logger.info("Grab snapshot took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
        while (this.localBuffers.size() > 0) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
            logger.fine("Waiting for purge to finish...");
        }
    }

    public void endSubInterval(GraphChiContext graphChiContext, VertexInterval vertexInterval) {
        this.curWalkSnapshot.restoreUngrabbed();
        this.curWalkSnapshot = null;
        synchronized (this.localBuffers) {
            TimerContext time = this.purgeTimer.time();
            Iterator<LocalWalkBuffer> it = this.localBuffers.iterator();
            while (it.hasNext()) {
                it.next().purge(this.job.getWalkManager());
            }
            this.localBuffers.clear();
            time.stop();
        }
    }

    public void beginInterval(GraphChiContext graphChiContext, VertexInterval vertexInterval) {
        long totalWalks = this.job.getWalkManager().getTotalWalks();
        long numOfActiveWalks = this.job.getWalkManager().getNumOfActiveWalks();
        System.out.println("=====================================");
        System.out.println("Active walks: " + numOfActiveWalks + ", initialized=" + totalWalks);
        System.out.println("=====================================");
        this.job.getWalkManager().populateSchedulerForInterval(graphChiContext.getScheduler(), vertexInterval);
        this.job.getWalkManager().setBucketConsumer(this);
    }

    public void endInterval(GraphChiContext graphChiContext, VertexInterval vertexInterval) {
    }

    @Override // edu.cmu.graphchi.walks.GrabbedBucketConsumer
    public void consume(int i, WalkArray walkArray, int i2) {
        try {
            this.pendingWalksToSubmit.addAndGet(i2);
            this.bucketQueue.put(new BucketsToSend(i, walkArray, i2));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
