package io.fluxcapacitor.testserver.endpoints;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.VoidResult;
import io.fluxcapacitor.common.api.tracking.ClaimSegment;
import io.fluxcapacitor.common.api.tracking.ClaimSegmentResult;
import io.fluxcapacitor.common.api.tracking.DisconnectTracker;
import io.fluxcapacitor.common.api.tracking.GetPosition;
import io.fluxcapacitor.common.api.tracking.GetPositionResult;
import io.fluxcapacitor.common.api.tracking.Read;
import io.fluxcapacitor.common.api.tracking.ReadFromIndex;
import io.fluxcapacitor.common.api.tracking.ReadFromIndexResult;
import io.fluxcapacitor.common.api.tracking.ReadResult;
import io.fluxcapacitor.common.api.tracking.ResetPosition;
import io.fluxcapacitor.common.api.tracking.StorePosition;
import io.fluxcapacitor.javaclient.tracking.client.InMemoryMessageStore;
import io.fluxcapacitor.testserver.Handle;
import io.fluxcapacitor.testserver.WebsocketEndpoint;
import jakarta.websocket.CloseReason;
import jakarta.websocket.Session;
import java.beans.ConstructorProperties;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/testserver/endpoints/ConsumerEndpoint.class */
public class ConsumerEndpoint extends WebsocketEndpoint {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerEndpoint.class);
    private final InMemoryMessageStore store;
    private final MessageType messageType;

    @Handle
    public void handle(Read read, Session session) {
        this.store.read(new WebSocketTrackerRead(read, getClientId(session), session.getId(), this.messageType)).whenComplete((messageBatch, th) -> {
            if (th != null) {
                log.error("Failed to complete read", th);
            } else {
                sendResult(session, new ReadResult(read.getRequestId(), messageBatch));
            }
        });
    }

    @Handle
    public void handle(ClaimSegment claimSegment, Session session) {
        this.store.claimSegment(new WebSocketTrackerRead(claimSegment, getClientId(session), session.getId(), this.messageType)).whenComplete((iArr, th) -> {
            if (th != null) {
                log.error("Failed to complete claim segment", th);
            } else {
                sendResult(session, new ClaimSegmentResult(claimSegment.getRequestId(), this.store.getPosition(claimSegment.getConsumer()), iArr));
            }
        });
    }

    @Handle
    public VoidResult handle(StorePosition storePosition) {
        this.store.storePosition(storePosition.getConsumer(), storePosition.getSegment(), storePosition.getLastIndex());
        return new VoidResult(storePosition.getRequestId());
    }

    @Handle
    public VoidResult handle(ResetPosition resetPosition) {
        this.store.resetPosition(resetPosition.getConsumer(), resetPosition.getLastIndex());
        return new VoidResult(resetPosition.getRequestId());
    }

    @Handle
    public void handle(DisconnectTracker disconnectTracker) {
        this.store.disconnectTracker(disconnectTracker.getConsumer(), disconnectTracker.getTrackerId(), disconnectTracker.isSendFinalEmptyBatch());
    }

    @Handle
    public ReadFromIndexResult handle(ReadFromIndex readFromIndex) {
        return new ReadFromIndexResult(readFromIndex.getRequestId(), this.store.readFromIndex(readFromIndex.getMinIndex() - 1, readFromIndex.getMaxSize()));
    }

    @Handle
    public GetPositionResult handle(GetPosition getPosition) {
        return new GetPositionResult(getPosition.getRequestId(), this.store.getPosition(getPosition.getConsumer()));
    }

    @Override // io.fluxcapacitor.testserver.WebsocketEndpoint, jakarta.websocket.Endpoint
    public void onClose(Session session, CloseReason closeReason) {
        super.onClose(session, closeReason);
        this.store.disconnectTrackersMatching(webSocketTrackerRead -> {
            return Objects.equals(webSocketTrackerRead.getSessionId(), session.getId());
        });
    }

    @ConstructorProperties({"store", "messageType"})
    public ConsumerEndpoint(InMemoryMessageStore inMemoryMessageStore, MessageType messageType) {
        this.store = inMemoryMessageStore;
        this.messageType = messageType;
    }
}
