package co.com.sofka.infraestructure.asyn;

import co.com.sofka.business.asyn.ListenerEvent;
import co.com.sofka.domain.generic.DomainEvent;
import co.com.sofka.domain.generic.Identity;
import co.com.sofka.infraestructure.bus.ErrorEvent;
import co.com.sofka.infraestructure.bus.EventBus;
import co.com.sofka.infraestructure.repository.EventStoreRepository;
import co.com.sofka.infraestructure.store.StoredEvent;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.Flow;

/* loaded from: input_file:co/com/sofka/infraestructure/asyn/SubscriberEvent.class */
public class SubscriberEvent<T extends Identity> implements Flow.Subscriber<DomainEvent> {
    private final EventStoreRepository<T> repository;
    private final EventBus eventBus;
    private final ListenerEvent listenerEvent;
    private Flow.Subscription subscription;

    public SubscriberEvent(EventStoreRepository<T> eventStoreRepository, EventBus eventBus, ListenerEvent listenerEvent) {
        this.repository = eventStoreRepository;
        this.eventBus = eventBus;
        this.listenerEvent = listenerEvent;
    }

    public SubscriberEvent(EventStoreRepository<T> eventStoreRepository, EventBus eventBus) {
        this(eventStoreRepository, eventBus, null);
    }

    public SubscriberEvent(EventStoreRepository<T> eventStoreRepository) {
        this(eventStoreRepository, null, null);
    }

    public SubscriberEvent() {
        this(null, null, null);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1L);
        Optional.ofNullable(this.listenerEvent).ifPresent(listenerEvent -> {
            listenerEvent.onSubscribe(subscription);
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onNext(DomainEvent domainEvent) {
        Optional.ofNullable(this.eventBus).ifPresent(eventBus -> {
            eventBus.publish(domainEvent);
        });
        Optional.ofNullable(this.repository).ifPresent(eventStoreRepository -> {
            eventStoreRepository.saveEvent(domainEvent.aggregateRootId(), StoredEvent.wrapEvent(domainEvent));
        });
        Optional.ofNullable(this.listenerEvent).ifPresent(listenerEvent -> {
            listenerEvent.setSubscriber(this);
            listenerEvent.onNext(domainEvent);
        });
        this.subscription.request(1L);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        String str = (String) Optional.ofNullable(th.getCause()).map(th2 -> {
            return Arrays.toString(th2.getStackTrace()).substring(0, 250);
        }).orElse("");
        Optional.ofNullable(this.eventBus).ifPresent(eventBus -> {
            eventBus.publishError(new ErrorEvent(504, str, th.getMessage()));
        });
        Optional.ofNullable(this.listenerEvent).ifPresent(listenerEvent -> {
            listenerEvent.onError(th);
        });
        this.subscription.cancel();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        Optional.ofNullable(this.listenerEvent).ifPresent((v0) -> {
            v0.onComplete();
        });
    }
}
