package org.occurrent.subscription.inmemory;

import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.occurrent.filter.Filter;
import org.occurrent.retry.RetryStrategy;
import org.occurrent.subscription.OccurrentSubscriptionFilter;
import org.occurrent.subscription.StartAt;
import org.occurrent.subscription.SubscriptionFilter;
import org.occurrent.subscription.api.blocking.Subscription;
import org.occurrent.subscription.api.blocking.SubscriptionModel;
import org.occurrent.subscription.internal.ExecutorShutdown;

/* loaded from: input_file:org/occurrent/subscription/inmemory/InMemorySubscriptionModel.class */
public class InMemorySubscriptionModel implements SubscriptionModel, Consumer<Stream<CloudEvent>> {
    private final ConcurrentMap<String, InMemorySubscription> subscriptions;
    private final ConcurrentMap<String, Boolean> pausedSubscriptions;
    private final ExecutorService cloudEventDispatcher;
    private final RetryStrategy retryStrategy;
    private final Supplier<BlockingQueue<CloudEvent>> queueSupplier;
    private volatile boolean shutdown;
    private volatile boolean running;

    public InMemorySubscriptionModel() {
        this(RetryStrategy.fixed(200L));
    }

    public InMemorySubscriptionModel(RetryStrategy retryStrategy) {
        this(Executors.newCachedThreadPool(), retryStrategy);
    }

    public InMemorySubscriptionModel(ExecutorService executorService, RetryStrategy retryStrategy) {
        this(executorService, retryStrategy, LinkedBlockingQueue::new);
    }

    public InMemorySubscriptionModel(ExecutorService executorService, RetryStrategy retryStrategy, Supplier<BlockingQueue<CloudEvent>> supplier) {
        this.shutdown = false;
        this.running = true;
        if (executorService == null) {
            throw new IllegalArgumentException("cloudEventDispatcher cannot be null");
        }
        if (retryStrategy == null) {
            throw new IllegalArgumentException(RetryStrategy.class.getSimpleName() + " cannot be null");
        }
        if (supplier == null) {
            throw new IllegalArgumentException(BlockingQueue.class.getSimpleName() + " cannot be null");
        }
        this.queueSupplier = supplier;
        this.cloudEventDispatcher = executorService;
        this.retryStrategy = retryStrategy;
        this.subscriptions = new ConcurrentHashMap();
        this.pausedSubscriptions = new ConcurrentHashMap();
    }

    public synchronized Subscription subscribe(String str, SubscriptionFilter subscriptionFilter, StartAt startAt, Consumer<CloudEvent> consumer) {
        if (this.shutdown) {
            throw new IllegalStateException("Cannot subscribe when shutdown");
        }
        if (str == null) {
            throw new IllegalArgumentException("subscriptionId cannot be null");
        }
        if (consumer == null) {
            throw new IllegalArgumentException("action cannot be null");
        }
        if (this.subscriptions.containsKey(str) || this.pausedSubscriptions.containsKey(str)) {
            throw new IllegalArgumentException("Subscription " + str + " is already defined.");
        }
        if (startAt == null) {
            throw new IllegalArgumentException(StartAt.class.getSimpleName() + " cannot be null");
        }
        StartAt startAt2 = startAt.get();
        if (!startAt2.isNow() && !startAt2.isDefault()) {
            throw new IllegalArgumentException(InMemorySubscriptionModel.class.getSimpleName() + " only supports starting from 'now' and 'default' (StartAt.now() or StartAt.subscriptionModelDefault())");
        }
        InMemorySubscription inMemorySubscription = new InMemorySubscription(str, this.queueSupplier.get(), consumer, getFilter(subscriptionFilter), this.retryStrategy);
        this.subscriptions.put(str, inMemorySubscription);
        if (!this.running) {
            this.pausedSubscriptions.put(str, true);
        }
        this.cloudEventDispatcher.execute(inMemorySubscription);
        return inMemorySubscription;
    }

    public void cancelSubscription(String str) {
        this.subscriptions.remove(str);
        this.pausedSubscriptions.remove(str);
    }

    @Override // java.util.function.Consumer
    public void accept(Stream<CloudEvent> stream) {
        if (this.running) {
            List list = (List) stream.collect(Collectors.toList());
            this.subscriptions.values().forEach(inMemorySubscription -> {
                if (isRunning(inMemorySubscription.id())) {
                    Stream stream2 = list.stream();
                    inMemorySubscription.getClass();
                    Stream filter = stream2.filter(inMemorySubscription::matches);
                    inMemorySubscription.getClass();
                    filter.forEach(inMemorySubscription::eventAvailable);
                }
            });
        }
    }

    @PreDestroy
    public void shutdown() {
        synchronized (this.subscriptions) {
            this.shutdown = true;
            this.subscriptions.values().forEach((v0) -> {
                v0.shutdown();
            });
            this.subscriptions.clear();
        }
        this.pausedSubscriptions.clear();
        ExecutorShutdown.shutdownSafely(this.cloudEventDispatcher, 5L, TimeUnit.SECONDS);
    }

    private static Filter getFilter(SubscriptionFilter subscriptionFilter) {
        Filter filter;
        if (subscriptionFilter == null) {
            filter = Filter.all();
        } else {
            if (!(subscriptionFilter instanceof OccurrentSubscriptionFilter)) {
                throw new IllegalArgumentException(InMemorySubscriptionModel.class.getSimpleName() + " only support filters of type " + OccurrentSubscriptionFilter.class.getName());
            }
            filter = ((OccurrentSubscriptionFilter) subscriptionFilter).filter;
        }
        return filter;
    }

    public void stop() {
        this.running = false;
        this.subscriptions.values().forEach(inMemorySubscription -> {
            this.pausedSubscriptions.put(inMemorySubscription.id(), true);
        });
    }

    public void start() {
        this.running = true;
        this.pausedSubscriptions.clear();
    }

    public boolean isRunning() {
        return this.running;
    }

    public boolean isRunning(String str) {
        return this.running && this.subscriptions.containsKey(str) && !this.pausedSubscriptions.containsKey(str);
    }

    public boolean isPaused(String str) {
        return this.pausedSubscriptions.containsKey(str);
    }

    public Subscription resumeSubscription(String str) {
        if (!isPaused(str)) {
            throw new IllegalArgumentException("Subscription " + str + " is not paused");
        }
        this.running = true;
        this.pausedSubscriptions.remove(str);
        return this.subscriptions.get(str);
    }

    public void pauseSubscription(String str) {
        if (!isRunning(str)) {
            throw new IllegalArgumentException("Subscription " + str + " is not running");
        }
        this.pausedSubscriptions.put(str, true);
    }
}
