package org.dihedron.activities.engine;

import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.dihedron.activities.exceptions.ActivityException;
import org.dihedron.activities.exceptions.TimedOutException;
import org.dihedron.activities.types.ActivityData;
import org.dihedron.activities.types.Scalar;
import org.dihedron.activities.types.Vector;
import org.dihedron.commons.TypedVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/dihedron/activities/engine/ParallelEngine.class */
public abstract class ParallelEngine implements ActivityEngine {
    private static final Logger logger = LoggerFactory.getLogger(ParallelEngine.class);
    private BlockingQueue<Integer> queue = new LinkedBlockingQueue();
    private WaitMode mode = WaitMode.WAIT_FOR_ALL;
    private long timeout = Long.MAX_VALUE;
    private TimeUnit unit = TimeUnit.DAYS;

    public ParallelEngine setWaitMode(WaitMode waitMode, long j, TimeUnit timeUnit) {
        this.mode = waitMode;
        this.timeout = j;
        this.unit = timeUnit;
        return this;
    }

    @Override // org.dihedron.activities.engine.ActivityEngine
    public Vector execute(TypedVector<ActivityInfo> typedVector) throws ActivityException {
        if (typedVector == null || typedVector.size() <= 0) {
            logger.warn("no activities to execute");
            return null;
        }
        logger.trace("executing {} activities...", Integer.valueOf(typedVector.size()));
        int i = 0;
        TypedVector<Future<ActivityData>> typedVector2 = new TypedVector<>();
        Iterator<ActivityInfo> it = typedVector.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            typedVector2.add(submit(new ActivityCallable(i2, this.queue, it.next())));
        }
        return wait(typedVector2);
    }

    public Vector wait(TypedVector<Future<ActivityData>> typedVector) throws ActivityException {
        Vector vector = new Vector();
        vector.setSize(typedVector.size());
        try {
            int size = typedVector.size();
            logger.trace("waiting for {} activities to complete...", Integer.valueOf(size));
            long j = this.timeout;
            do {
                int i = size;
                size--;
                if (i <= 0 || j <= 0) {
                    logger.debug("all activities complete");
                    return vector;
                }
                long currentTimeMillis = System.currentTimeMillis();
                Integer poll = this.queue.poll(j, this.unit);
                j -= this.unit.convert(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
                if (poll == null || j < 0) {
                    logger.trace("no activity completed within timeout, exiting");
                    throw new TimedOutException("Timeout expired before all activities completed");
                }
                logger.debug("activity '{}' complete (count: {}, queue: {})", new Object[]{poll, Integer.valueOf(size), Integer.valueOf(this.queue.size())});
                ActivityData activityData = typedVector.get(poll.intValue()).get();
                if (activityData instanceof Scalar) {
                    vector.set(poll.intValue(), ((Scalar) activityData).get());
                } else {
                    vector.set(poll.intValue(), activityData);
                }
            } while (this.mode != WaitMode.WAIT_FOR_ANY);
            return vector;
        } catch (InterruptedException e) {
            logger.error("operation interrupted", e);
            throw new ActivityException("Operation interrupted");
        } catch (ExecutionException e2) {
            logger.error("error executing asynchronous activity", e2);
            throw new ActivityException("Error executing asynchronous activity", e2);
        }
    }

    protected abstract Future<ActivityData> submit(ActivityCallable activityCallable);

    @Override // org.dihedron.activities.engine.ActivityEngine
    public /* bridge */ /* synthetic */ ActivityData execute(TypedVector typedVector) throws ActivityException {
        return execute((TypedVector<ActivityInfo>) typedVector);
    }
}
