package de.microtema.stream.listener.publisher;

import de.microtema.stream.listener.model.StreamListenerEndpoint;
import de.microtema.stream.listener.service.StreamListenerExecutionService;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.log.LogAccessor;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.stereotype.Component;

@ConditionalOnProperty(prefix = "stream-listener", name = {"enabled"}, havingValue = "true", matchIfMissing = true)
@Component
/* loaded from: input_file:de/microtema/stream/listener/publisher/StreamEventPublisher.class */
public class StreamEventPublisher {
    private static final long DELAY = TimeUnit.SECONDS.toMillis(1);
    private final ScheduledExecutorService scheduledExecutorService;
    private final StreamListenerExecutionService streamListenerExecutionService;
    private ScheduledTaskRegistrar scheduledTaskRegistrar;
    private final LogAccessor log = new LogAccessor(LogFactory.getLog(getClass()));
    private final Set<String> endpointIds = Collections.synchronizedSet(new HashSet());
    private final Set<StreamListenerEndpoint> endpoints = Collections.synchronizedSet(new HashSet());

    public StreamEventPublisher(ScheduledExecutorService scheduledExecutorService, StreamListenerExecutionService streamListenerExecutionService) {
        this.scheduledExecutorService = scheduledExecutorService;
        this.streamListenerExecutionService = streamListenerExecutionService;
    }

    public void registerStreamListenerEndpoint(StreamListenerEndpoint streamListenerEndpoint) {
        this.log.info(() -> {
            return String.format("Endpoint [%s] for topic [%s] successfully registered", streamListenerEndpoint.getId(), streamListenerEndpoint.getTopic());
        });
        this.endpoints.add(streamListenerEndpoint);
    }

    public void startup(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        this.scheduledTaskRegistrar = scheduledTaskRegistrar;
        for (StreamListenerEndpoint streamListenerEndpoint : this.endpoints) {
            if (streamListenerEndpoint.isAutoStartup()) {
                publishEvent(streamListenerEndpoint);
            } else {
                scheduledTaskRegistrar.addCronTask(() -> {
                    publishEvent(streamListenerEndpoint);
                }, streamListenerEndpoint.getCron());
            }
        }
    }

    public void destroy() {
        this.scheduledTaskRegistrar.destroy();
    }

    private void publishEvent(StreamListenerEndpoint streamListenerEndpoint) {
        String id = streamListenerEndpoint.getId();
        boolean isAutoStartup = streamListenerEndpoint.isAutoStartup();
        if (!this.endpointIds.add(id)) {
            this.log.warn(() -> {
                return "RACE CONDITION DETECTED on [" + streamListenerEndpoint.getId() + "]";
            });
            return;
        }
        try {
            boolean executeEndpointMethod = this.streamListenerExecutionService.executeEndpointMethod(streamListenerEndpoint);
            this.endpointIds.remove(id);
            if (executeEndpointMethod) {
                if (!isAutoStartup) {
                    this.log.trace(() -> {
                        return "Reschedule next async task on [" + streamListenerEndpoint.getId() + "]";
                    });
                    this.scheduledExecutorService.schedule(() -> {
                        publishEvent(streamListenerEndpoint);
                    }, DELAY, TimeUnit.MILLISECONDS);
                    return;
                }
                this.log.trace(() -> {
                    return "Call next task on [" + streamListenerEndpoint.getId() + "]";
                });
                try {
                    Thread.sleep(DELAY);
                    publishEvent(streamListenerEndpoint);
                } catch (InterruptedException e) {
                    this.log.warn(() -> {
                        return "Interrupted Exception during the thread sleep! Message: " + e.getMessage();
                    });
                    throw new IllegalStateException("Interrupted Exception during the thread sleep!", e);
                }
            }
        } catch (Throwable th) {
            this.endpointIds.remove(id);
            throw th;
        }
    }
}
