package me.ahoo.eventbus.core.consistency.impl;

import com.google.common.base.Stopwatch;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import me.ahoo.eventbus.core.compensation.CompensationPublishEventWrapper;
import me.ahoo.eventbus.core.consistency.ConsistencyPublisher;
import me.ahoo.eventbus.core.publisher.EventDescriptor;
import me.ahoo.eventbus.core.publisher.EventDescriptorParser;
import me.ahoo.eventbus.core.publisher.PublishEventWrapper;
import me.ahoo.eventbus.core.publisher.Publisher;
import me.ahoo.eventbus.core.repository.PublishEventRepository;
import me.ahoo.eventbus.core.repository.PublishIdentity;
import me.ahoo.eventbus.core.utils.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;

/* loaded from: input_file:me/ahoo/eventbus/core/consistency/impl/ConsistencyPublisherImpl.class */
public class ConsistencyPublisherImpl implements ConsistencyPublisher, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ConsistencyPublisherImpl.class);
    private final Publisher publisher;
    private final EventDescriptorParser eventDescriptorParser;
    private final PublishEventRepository publishEventRepository;
    private final PlatformTransactionManager transactionManager;
    private final int EXECUTOR_CORE_POOL_SIZE = 1;
    private final int EXECUTOR_MAX_POOL_SIZE = Runtime.getRuntime().availableProcessors();
    private final int EXECUTOR_BLOCKING_QUEUE_SIZE = 10000;
    private final ExecutorService executorService = new ThreadPoolExecutor(1, this.EXECUTOR_MAX_POOL_SIZE, 10, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10000), Threads.defaultFactory("ConsistencyPublisher"));

    public ConsistencyPublisherImpl(Publisher publisher, EventDescriptorParser eventDescriptorParser, PublishEventRepository publishEventRepository, PlatformTransactionManager platformTransactionManager) {
        this.publisher = publisher;
        this.eventDescriptorParser = eventDescriptorParser;
        this.publishEventRepository = publishEventRepository;
        this.transactionManager = platformTransactionManager;
    }

    @Override // me.ahoo.eventbus.core.consistency.ConsistencyPublisher
    public Object publish(Supplier<Object> supplier) {
        Stopwatch createStarted = Stopwatch.createStarted();
        PublishIdentity publishIdentity = null;
        Object obj = null;
        TransactionStatus transaction = this.transactionManager.getTransaction((TransactionDefinition) null);
        try {
            Object obj2 = supplier.get();
            if (Objects.nonNull(obj2)) {
                EventDescriptor parse = this.eventDescriptorParser.parse(obj2);
                obj = parse.getEventData(obj2);
                if (Objects.nonNull(obj)) {
                    publishIdentity = this.publishEventRepository.initialize(parse.getEventName(), obj);
                }
            }
            this.transactionManager.commit(transaction);
            log.info("Publish Inner succeeded,taken:[{}].", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
            if (Objects.isNull(publishIdentity)) {
                log.info("publish -> Ignore publish event when publishEvent is null.");
                return obj2;
            }
            publish(publishIdentity, obj);
            return obj2;
        } catch (Throwable th) {
            this.transactionManager.rollback(transaction);
            log.warn("Publish Inner failed,taken:[{}].error:{}", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)), th.getMessage());
            throw th;
        }
    }

    @Override // me.ahoo.eventbus.core.consistency.ConsistencyPublisher
    public Future<?> publish(PublishIdentity publishIdentity, Object obj) {
        return this.executorService.submit(() -> {
            doPublish(publishIdentity, obj);
        });
    }

    private void doPublish(PublishIdentity publishIdentity, Object obj) {
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            this.publisher.publish(obj instanceof CompensationPublishEventWrapper ? (CompensationPublishEventWrapper) obj : new PublishEventWrapper(publishIdentity.getId(), publishIdentity.getEventName(), obj, LocalDateTime.now()));
            log.debug("publish event to bus succeeded! taken:[{}].", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
            try {
                this.publishEventRepository.markSucceeded(publishIdentity);
                log.debug("mark publish event to succeeded! taken:[{}].", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
                log.info("Publish succeeded,taken:[{}]", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
            } catch (Throwable th) {
                log.error(String.format("mark publish event to succeeded error. -> id:[%d] error,taken:[%d]!", publishIdentity.getId(), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS))), th);
                throw th;
            }
        } catch (Throwable th2) {
            log.error(String.format("publish event to bus error -> id:[%d] error,taken:[%d]!", publishIdentity.getId(), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS))), th2);
            try {
                this.publishEventRepository.markFailed(publishIdentity, th2);
            } catch (Throwable th3) {
                log.error(String.format("mark publish event status to failed error. -> id:[%d] error,taken:[%d]!", publishIdentity.getId(), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS))), th3);
            }
            throw th2;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.executorService.shutdown();
    }
}
