package org.eclipse.ditto.services.models.acks;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.model.base.entity.id.NamespacedEntityIdWithType;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.model.things.WithThingId;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.acks.things.ThingAcknowledgementFactory;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.base.WithOptionalEntity;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayCommandTimeoutException;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.eclipse.ditto.signals.commands.things.ThingCommandResponse;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;

/* loaded from: input_file:org/eclipse/ditto/services/models/acks/AcknowledgementAggregatorActor.class */
public final class AcknowledgementAggregatorActor extends AbstractActor {
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final String correlationId;
    private final DittoHeaders requestCommandHeaders;
    private final AcknowledgementAggregator ackregator;
    private final Consumer<Object> responseSignalConsumer;
    private final Duration timeout;

    private AcknowledgementAggregatorActor(ThingId thingId, DittoHeaders dittoHeaders, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Consumer<Object> consumer) {
        this.responseSignalConsumer = consumer;
        this.requestCommandHeaders = dittoHeaders;
        this.correlationId = (String) this.requestCommandHeaders.getCorrelationId().orElseGet(() -> {
            return getSelf().path().name();
        });
        Optional timeout = this.requestCommandHeaders.getTimeout();
        Objects.requireNonNull(acknowledgementConfig);
        this.timeout = (Duration) timeout.orElseGet(acknowledgementConfig::getForwarderFallbackTimeout);
        getContext().setReceiveTimeout(this.timeout);
        Set acknowledgementRequests = this.requestCommandHeaders.getAcknowledgementRequests();
        this.ackregator = AcknowledgementAggregator.getInstance((NamespacedEntityIdWithType) thingId, (CharSequence) this.correlationId, this.timeout, headerTranslator);
        this.ackregator.addAcknowledgementRequests(acknowledgementRequests);
        this.log.withCorrelationId(this.correlationId).info("Starting to wait for all requested acknowledgements <{}> for a maximum duration of <{}>.", acknowledgementRequests, this.timeout);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(Signal<?> signal, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Consumer<Object> consumer) {
        return props(signal.getEntityId(), signal.getDittoHeaders(), acknowledgementConfig, headerTranslator, consumer);
    }

    private static Props props(ThingId thingId, DittoHeaders dittoHeaders, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, Consumer<Object> consumer) {
        return Props.create(AcknowledgementAggregatorActor.class, new Object[]{thingId, dittoHeaders, acknowledgementConfig, headerTranslator, consumer});
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(ThingCommandResponse.class, this::handleThingCommandResponse).match(MessageCommandResponse.class, this::handleMessageCommandResponse).match(Acknowledgement.class, this::handleAcknowledgement).match(Acknowledgements.class, this::handleAcknowledgements).match(DittoRuntimeException.class, this::handleDittoRuntimeException).match(ReceiveTimeout.class, this::handleReceiveTimeout).matchAny(obj -> {
            this.log.warning("Received unexpected message: <{}>", obj);
        }).build();
    }

    private void handleThingCommandResponse(ThingCommandResponse<?> thingCommandResponse) {
        Stream stream = thingCommandResponse.getDittoHeaders().getChannel().stream();
        String name = TopicPath.Channel.LIVE.getName();
        Objects.requireNonNull(name);
        addCommandResponse(thingCommandResponse, thingCommandResponse, stream.anyMatch((v1) -> {
            return r1.equals(v1);
        }));
    }

    private void handleMessageCommandResponse(MessageCommandResponse<?, ?> messageCommandResponse) {
        addCommandResponse(messageCommandResponse, messageCommandResponse, true);
    }

    private void addCommandResponse(CommandResponse<?> commandResponse, WithThingId withThingId, boolean z) {
        this.log.withCorrelationId(this.correlationId).debug("Received command response <{}>.", commandResponse);
        this.ackregator.addReceivedAcknowledgment(ThingAcknowledgementFactory.newAcknowledgement(z ? DittoAcknowledgementLabel.LIVE_RESPONSE : DittoAcknowledgementLabel.TWIN_PERSISTED, withThingId.getThingEntityId(), commandResponse.getStatusCode(), commandResponse.getDittoHeaders(), getPayload(commandResponse).orElse(null)));
        potentiallyCompleteAcknowledgements(commandResponse);
    }

    private static Optional<JsonValue> getPayload(CommandResponse<?> commandResponse) {
        return commandResponse instanceof WithOptionalEntity ? ((WithOptionalEntity) commandResponse).getEntity(commandResponse.getImplementedSchemaVersion()) : commandResponse instanceof MessageCommandResponse ? commandResponse.toJson().getValue(MessageCommandResponse.JsonFields.JSON_MESSAGE).map(jsonObject -> {
            return jsonObject;
        }) : Optional.empty();
    }

    private void handleReceiveTimeout(ReceiveTimeout receiveTimeout) {
        this.log.withCorrelationId(this.correlationId).info("Timed out waiting for all requested acknowledgements, completing Acknowledgements with timeouts...");
        completeAcknowledgements(null, this.requestCommandHeaders);
    }

    private void handleAcknowledgement(Acknowledgement acknowledgement) {
        this.log.withCorrelationId(this.correlationId).debug("Received acknowledgement <{}>.", acknowledgement);
        this.ackregator.addReceivedAcknowledgment(acknowledgement);
        potentiallyCompleteAcknowledgements(null);
    }

    private void handleAcknowledgements(Acknowledgements acknowledgements) {
        this.log.withCorrelationId(this.correlationId).debug("Received acknowledgements <{}>.", acknowledgements);
        Stream stream = acknowledgements.stream();
        AcknowledgementAggregator acknowledgementAggregator = this.ackregator;
        Objects.requireNonNull(acknowledgementAggregator);
        stream.forEach(acknowledgementAggregator::addReceivedAcknowledgment);
        potentiallyCompleteAcknowledgements(null);
    }

    private void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        this.log.withCorrelationId(this.correlationId).info("Stopped waiting for acknowledgements because of ditto runtime exception <{}>.", dittoRuntimeException);
        handleSignal(dittoRuntimeException);
        getContext().stop(getSelf());
    }

    private void potentiallyCompleteAcknowledgements(@Nullable CommandResponse<?> commandResponse) {
        if (this.ackregator.receivedAllRequestedAcknowledgements()) {
            completeAcknowledgements(commandResponse, this.requestCommandHeaders);
        }
    }

    private void completeAcknowledgements(@Nullable CommandResponse<?> commandResponse, DittoHeaders dittoHeaders) {
        Acknowledgements aggregatedAcknowledgements = this.ackregator.getAggregatedAcknowledgements(dittoHeaders);
        boolean containsOnlyTwinPersistedOrLiveResponse = containsOnlyTwinPersistedOrLiveResponse(aggregatedAcknowledgements);
        if (null != commandResponse && containsOnlyTwinPersistedOrLiveResponse) {
            handleSignal(commandResponse);
        } else if (!containsOnlyTwinPersistedOrLiveResponse || this.ackregator.receivedAllRequestedAcknowledgements()) {
            this.log.withCorrelationId(dittoHeaders).debug("Completing with collected acknowledgements: {}", aggregatedAcknowledgements);
            handleSignal(aggregatedAcknowledgements);
        } else {
            handleSignal(asThingErrorResponse(aggregatedAcknowledgements));
        }
        getContext().stop(getSelf());
    }

    private void handleSignal(Object obj) {
        this.responseSignalConsumer.accept(obj);
    }

    private ThingErrorResponse asThingErrorResponse(Acknowledgements acknowledgements) {
        return ThingErrorResponse.of(ThingId.of(acknowledgements.getEntityId()), GatewayCommandTimeoutException.newBuilder(this.timeout).dittoHeaders(acknowledgements.getDittoHeaders()).build());
    }

    private static boolean containsOnlyTwinPersistedOrLiveResponse(Acknowledgements acknowledgements) {
        return acknowledgements.getSize() == 1 && acknowledgements.stream().anyMatch(acknowledgement -> {
            AcknowledgementLabel label = acknowledgement.getLabel();
            return DittoAcknowledgementLabel.TWIN_PERSISTED.equals(label) || DittoAcknowledgementLabel.LIVE_RESPONSE.equals(label);
        });
    }
}
