package de.microtema.stream.listener.service;

import de.microtema.stream.listener.converter.EventDataToResponseStatusConverter;
import de.microtema.stream.listener.listener.RecordFilterStrategy;
import de.microtema.stream.listener.listener.StreamEventListenerErrorHandler;
import de.microtema.stream.listener.model.EventIdAware;
import de.microtema.stream.listener.model.StreamListenerEndpoint;
import de.microtema.stream.listener.provider.service.StreamListenerDataProvider;
import de.microtema.stream.listener.support.ResponseState;
import de.microtema.stream.listener.support.ResponseStatus;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.log.LogAccessor;
import org.springframework.stereotype.Service;
import org.springframework.util.ReflectionUtils;

@ConditionalOnProperty(prefix = "stream-listener", name = {"enabled"}, havingValue = "true", matchIfMissing = true)
@Service
/* loaded from: input_file:de/microtema/stream/listener/service/StreamListenerExecutionService.class */
public class StreamListenerExecutionService<T extends EventIdAware> {
    private final LogAccessor log = new LogAccessor(LogFactory.getLog(getClass()));
    private final EventDataToResponseStatusConverter<T> responseStatusConverter;

    public StreamListenerExecutionService(EventDataToResponseStatusConverter<T> eventDataToResponseStatusConverter) {
        this.responseStatusConverter = eventDataToResponseStatusConverter;
    }

    public boolean executeEndpointMethod(StreamListenerEndpoint<T> streamListenerEndpoint) {
        List<T> receive = streamListenerEndpoint.getDataProvider().receive(streamListenerEndpoint);
        if (CollectionUtils.isEmpty(receive)) {
            this.log.trace(() -> {
                return "Skip invocation of [" + streamListenerEndpoint.getId() + "] due to empty records";
            });
            return false;
        }
        this.log.trace(() -> {
            return "Received ({}) record(s) for [" + streamListenerEndpoint.getId() + "]";
        });
        executeImpl(streamListenerEndpoint, receive);
        return true;
    }

    private void executeImpl(StreamListenerEndpoint<T> streamListenerEndpoint, List<T> list) {
        StreamListenerDataProvider<T> dataProvider = streamListenerEndpoint.getDataProvider();
        streamListenerEndpoint.getId();
        boolean isBatch = streamListenerEndpoint.isBatch();
        long currentTimeMillis = System.currentTimeMillis();
        List<ResponseStatus> executeEndpoint = isBatch ? executeEndpoint(streamListenerEndpoint, list) : (List) list.stream().map(eventIdAware -> {
            return executeEndpoint((StreamListenerEndpoint<StreamListenerEndpoint>) streamListenerEndpoint, (StreamListenerEndpoint) eventIdAware);
        }).collect(Collectors.toList());
        getDuration(currentTimeMillis);
        dataProvider.commit(streamListenerEndpoint, executeEndpoint);
    }

    private String getDuration(long j) {
        return DurationFormatUtils.formatDurationHMS(System.currentTimeMillis() - j);
    }

    private Object[] concatMethodParameters(Object obj, Object... objArr) {
        Object[] objArr2 = new Object[1 + objArr.length];
        objArr2[0] = obj;
        System.arraycopy(objArr, 0, objArr2, 1, objArr.length);
        return objArr2;
    }

    private ResponseStatus executeEndpoint(StreamListenerEndpoint<T> streamListenerEndpoint, T t) {
        Object[] methodParameters = streamListenerEndpoint.getMethodParameters();
        Object bean = streamListenerEndpoint.getBean();
        Method method = streamListenerEndpoint.getMethod();
        ResponseStatus filterRecordIfNecessary = filterRecordIfNecessary(t, streamListenerEndpoint.getRecordFilterStrategy());
        if (Objects.nonNull(filterRecordIfNecessary)) {
            return filterRecordIfNecessary;
        }
        try {
            executeEndpointMethod(method, bean, concatMethodParameters(t, methodParameters));
            return (ResponseStatus) this.responseStatusConverter.convert(t);
        } catch (Exception e) {
            handleError((StreamListenerExecutionService<T>) t, e, (StreamListenerEndpoint<StreamListenerExecutionService<T>>) streamListenerEndpoint);
            return (ResponseStatus) this.responseStatusConverter.convert(t, String.format("Unable to execute endpoint [%s] record [%s]. Message: %s", streamListenerEndpoint.getId(), t.getEventId(), e.getMessage()));
        }
    }

    private List<ResponseStatus> executeEndpoint(StreamListenerEndpoint<T> streamListenerEndpoint, List<T> list) {
        Object[] methodParameters = streamListenerEndpoint.getMethodParameters();
        Object bean = streamListenerEndpoint.getBean();
        Method method = streamListenerEndpoint.getMethod();
        List<ResponseStatus> filterRecordsIfNecessary = filterRecordsIfNecessary(list, streamListenerEndpoint.getRecordFilterStrategy());
        try {
            executeEndpointMethod(method, bean, concatMethodParameters(list, methodParameters));
            filterRecordsIfNecessary.addAll(this.responseStatusConverter.convertList(list));
        } catch (Exception e) {
            handleError(list, e, streamListenerEndpoint);
            filterRecordsIfNecessary.addAll(this.responseStatusConverter.convertList(list, String.format("Unable to execute endpoint [%s] within (%s) record(s). Message: %s", streamListenerEndpoint.getId(), Integer.valueOf(list.size()), e.getMessage())));
        }
        return filterRecordsIfNecessary;
    }

    private List<ResponseStatus> filterRecordsIfNecessary(List<T> list, RecordFilterStrategy<T> recordFilterStrategy) {
        if (Objects.isNull(recordFilterStrategy)) {
            return new ArrayList();
        }
        List convertList = this.responseStatusConverter.convertList(recordFilterStrategy.filterBatch(list));
        convertList.forEach(responseStatus -> {
            responseStatus.setState(ResponseState.SKIPPED);
        });
        return new ArrayList(convertList);
    }

    private ResponseStatus filterRecordIfNecessary(T t, RecordFilterStrategy<T> recordFilterStrategy) {
        if (Objects.isNull(recordFilterStrategy) || recordFilterStrategy.filter(t)) {
            return null;
        }
        ResponseStatus responseStatus = (ResponseStatus) this.responseStatusConverter.convert(t);
        responseStatus.setState(ResponseState.SKIPPED);
        return responseStatus;
    }

    private void handleError(T t, Exception exc, StreamListenerEndpoint<T> streamListenerEndpoint) {
        StreamEventListenerErrorHandler<T> errorHandler = streamListenerEndpoint.getErrorHandler();
        if (Objects.isNull(errorHandler)) {
            return;
        }
        errorHandler.handleError((StreamEventListenerErrorHandler<T>) t, exc, (StreamListenerEndpoint<StreamEventListenerErrorHandler<T>>) streamListenerEndpoint);
    }

    private void handleError(List<T> list, Exception exc, StreamListenerEndpoint<T> streamListenerEndpoint) {
        StreamEventListenerErrorHandler<T> errorHandler = streamListenerEndpoint.getErrorHandler();
        if (Objects.isNull(errorHandler)) {
            return;
        }
        errorHandler.handleError(list, exc, streamListenerEndpoint);
    }

    private void executeEndpointMethod(Method method, Object obj, Object... objArr) {
        try {
            ReflectionUtils.makeAccessible(method);
            method.invoke(obj, objArr);
        } catch (IllegalAccessException e) {
            throw new UndeclaredThrowableException(e);
        } catch (InvocationTargetException e2) {
            ReflectionUtils.rethrowRuntimeException(e2.getTargetException());
        }
    }
}
