package me.ahoo.eventbus.jdbc;

import com.google.common.base.Throwables;
import java.util.List;
import java.util.Optional;
import me.ahoo.eventbus.core.publisher.PublishEvent;
import me.ahoo.eventbus.core.repository.ConcurrentVersionConflictException;
import me.ahoo.eventbus.core.repository.RepeatedSubscribeException;
import me.ahoo.eventbus.core.repository.SubscribeEventRepository;
import me.ahoo.eventbus.core.repository.SubscribeIdentity;
import me.ahoo.eventbus.core.repository.SubscribeStatus;
import me.ahoo.eventbus.core.repository.Version;
import me.ahoo.eventbus.core.repository.entity.CompensateLeader;
import me.ahoo.eventbus.core.repository.entity.SubscribeEventCompensateEntity;
import me.ahoo.eventbus.core.repository.entity.SubscribeEventEntity;
import me.ahoo.eventbus.core.serialize.Serializer;
import me.ahoo.eventbus.core.subscriber.Subscriber;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.support.GeneratedKeyHolder;

/* loaded from: input_file:me/ahoo/eventbus/jdbc/JdbcSubscribeEventRepository.class */
public class JdbcSubscribeEventRepository implements SubscribeEventRepository {
    private final Serializer serializer;
    private final NamedParameterJdbcTemplate jdbcTemplate;
    private static final String SQL_GET_SUBSCRIBE_EVENT = "select id,status,version from subscribe_event where event_id=:event_id and event_name=:event_name and subscribe_name=:subscribe_name limit 1;";
    private static final String SQL_SUBSCRIBE_INITIALIZED = "insert subscribe_event(subscribe_name, status,subscribe_time, event_id, event_name, event_data, event_create_time, version,create_time)values (:subscribe_name, :status,:subscribe_time, :event_id, :event_name, :event_data, :event_create_time, :version,:create_time);";
    private static final String SQL_MARK_SUCCEEDED = "update subscribe_event set status=1,version=version+1 where id=:id and version=:version;";
    private static final String SQL_MARK_FAILED = "update subscribe_event set status=2,version=version+1 where id=:id and version=:version;";
    private static final String SQL_SUBSCRIBE_FAILED = "insert subscribe_event_failed (subscribe_event_id, failed_msg,create_time) values (:subscribe_event_id, :failed_msg,:create_time)";
    private static final String SQL_QUERY_FAILED = "select id, subscribe_name, status, subscribe_time, event_id, event_name, event_data, event_create_time, version, create_time from subscribe_event where status<>1 and create_time<:before and version<:max_version order by version asc limit :limit;";
    private static String SQL_COMPENSATE = "insert subscribe_event_compensate (subscribe_event_id, start_time, taken, failed_msg) values (:subscribe_event_id, :start_time, :taken, :failed_msg) ";

    public JdbcSubscribeEventRepository(Serializer serializer, NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
        this.serializer = serializer;
        this.jdbcTemplate = namedParameterJdbcTemplate;
    }

    private Optional<SubscribeIdentity> getSubscribeIdentity(Subscriber subscriber, Long l, String str) {
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("event_id", l);
        mapSqlParameterSource.addValue("event_name", str);
        mapSqlParameterSource.addValue("subscribe_name", subscriber.getName());
        try {
            return Optional.of((SubscribeIdentity) this.jdbcTemplate.queryForObject(SQL_GET_SUBSCRIBE_EVENT, mapSqlParameterSource, (resultSet, i) -> {
                long j = resultSet.getLong("id");
                int i = resultSet.getInt("status");
                int i2 = resultSet.getInt("version");
                SubscribeIdentity subscribeIdentity = new SubscribeIdentity();
                subscribeIdentity.setId(Long.valueOf(j));
                subscribeIdentity.setSubscriberName(subscriber.getName());
                subscribeIdentity.setStatus(SubscribeStatus.valeOf(i));
                subscribeIdentity.setVersion(Integer.valueOf(i2));
                return subscribeIdentity;
            }));
        } catch (EmptyResultDataAccessException e) {
            return Optional.empty();
        }
    }

    private SubscribeIdentity initializeSubscribeIdentity(Subscriber subscriber, PublishEvent publishEvent, String str) {
        SubscribeIdentity subscribeIdentity = new SubscribeIdentity();
        subscribeIdentity.setStatus(SubscribeStatus.INITIALIZED);
        subscribeIdentity.setVersion(Version.INITIAL_VALUE);
        subscribeIdentity.setSubscriberName(subscriber.getName());
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("subscribe_name", subscriber.getName());
        mapSqlParameterSource.addValue("status", Integer.valueOf(subscribeIdentity.getStatus().getValue()));
        mapSqlParameterSource.addValue("subscribe_time", Long.valueOf(System.currentTimeMillis()));
        mapSqlParameterSource.addValue("event_id", publishEvent.getId());
        mapSqlParameterSource.addValue("event_name", str);
        mapSqlParameterSource.addValue("event_data", this.serializer.serialize(publishEvent.getEventData()));
        mapSqlParameterSource.addValue("event_create_time", publishEvent.getCreateTime());
        mapSqlParameterSource.addValue("version", subscribeIdentity.getVersion());
        mapSqlParameterSource.addValue("create_time", Long.valueOf(System.currentTimeMillis()));
        GeneratedKeyHolder generatedKeyHolder = new GeneratedKeyHolder();
        this.jdbcTemplate.update(SQL_SUBSCRIBE_INITIALIZED, mapSqlParameterSource, generatedKeyHolder);
        subscribeIdentity.setId(Long.valueOf(generatedKeyHolder.getKey().longValue()));
        return subscribeIdentity;
    }

    public SubscribeIdentity initialize(Subscriber subscriber, PublishEvent publishEvent) throws RepeatedSubscribeException {
        String eventName = publishEvent.getEventName();
        Optional<SubscribeIdentity> subscribeIdentity = getSubscribeIdentity(subscriber, publishEvent.getId(), eventName);
        if (!subscribeIdentity.isPresent()) {
            return initializeSubscribeIdentity(subscriber, publishEvent, eventName);
        }
        if (subscribeIdentity.get().getStatus().equals(SubscribeStatus.SUCCEEDED)) {
            throw new RepeatedSubscribeException(subscriber, publishEvent);
        }
        return subscribeIdentity.get();
    }

    private void checkAffected(SubscribeIdentity subscribeIdentity, int i) {
        if (i == 0) {
            throw new ConcurrentVersionConflictException(String.format("Subscribe [%s] mark id:[%d] on version:[%d] to status [%s] error.", subscribeIdentity.getSubscriberName(), subscribeIdentity.getId(), subscribeIdentity.getVersion(), subscribeIdentity.getStatus().name()), subscribeIdentity);
        }
    }

    public int markSucceeded(SubscribeIdentity subscribeIdentity) {
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("id", subscribeIdentity.getId());
        mapSqlParameterSource.addValue("version", subscribeIdentity.getVersion());
        int update = this.jdbcTemplate.update(SQL_MARK_SUCCEEDED, mapSqlParameterSource);
        checkAffected(subscribeIdentity, update);
        return update;
    }

    public int markFailed(SubscribeIdentity subscribeIdentity, Throwable th) {
        insertSubscribeEventFailed(subscribeIdentity, th);
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("id", subscribeIdentity.getId());
        mapSqlParameterSource.addValue("version", subscribeIdentity.getVersion());
        int update = this.jdbcTemplate.update(SQL_MARK_FAILED, mapSqlParameterSource);
        checkAffected(subscribeIdentity, update);
        return update;
    }

    private void insertSubscribeEventFailed(SubscribeIdentity subscribeIdentity, Throwable th) {
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("subscribe_event_id", subscribeIdentity.getId());
        mapSqlParameterSource.addValue("failed_msg", Throwables.getStackTraceAsString(th));
        mapSqlParameterSource.addValue("create_time", Long.valueOf(System.currentTimeMillis()));
        this.jdbcTemplate.update(SQL_SUBSCRIBE_FAILED, mapSqlParameterSource);
    }

    public List<SubscribeEventEntity> queryFailed(int i, long j, int i2) {
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("max_version", Integer.valueOf(i2));
        mapSqlParameterSource.addValue("before", Long.valueOf(System.currentTimeMillis() - j));
        mapSqlParameterSource.addValue("limit", Integer.valueOf(i));
        return this.jdbcTemplate.query(SQL_QUERY_FAILED, mapSqlParameterSource, (resultSet, i3) -> {
            long j2 = resultSet.getLong("id");
            String string = resultSet.getString("subscribe_name");
            int i3 = resultSet.getInt("status");
            long j3 = resultSet.getLong("subscribe_time");
            long j4 = resultSet.getLong("event_id");
            String string2 = resultSet.getString("event_name");
            String string3 = resultSet.getString("event_data");
            long j5 = resultSet.getLong("event_create_time");
            int i4 = resultSet.getInt("version");
            long j6 = resultSet.getLong("create_time");
            SubscribeEventEntity subscribeEventEntity = new SubscribeEventEntity();
            subscribeEventEntity.setId(Long.valueOf(j2));
            subscribeEventEntity.setSubscriberName(string);
            subscribeEventEntity.setStatus(SubscribeStatus.valeOf(i3));
            subscribeEventEntity.setSubscribeTime(Long.valueOf(j3));
            subscribeEventEntity.setEventId(Long.valueOf(j4));
            subscribeEventEntity.setEventName(string2);
            subscribeEventEntity.setEventData(string3);
            subscribeEventEntity.setEventCreateTime(Long.valueOf(j5));
            subscribeEventEntity.setVersion(Integer.valueOf(i4));
            subscribeEventEntity.setCreateTime(Long.valueOf(j6));
            return subscribeEventEntity;
        });
    }

    public int compensate(SubscribeEventCompensateEntity subscribeEventCompensateEntity) {
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("subscribe_event_id", subscribeEventCompensateEntity.getSubscribeEventId());
        mapSqlParameterSource.addValue("start_time", subscribeEventCompensateEntity.getStartTime());
        mapSqlParameterSource.addValue("taken", subscribeEventCompensateEntity.getTaken());
        mapSqlParameterSource.addValue("failed_msg", subscribeEventCompensateEntity.getFailedMsg());
        return this.jdbcTemplate.update(SQL_COMPENSATE, mapSqlParameterSource);
    }

    public CompensateLeader getLeader() {
        return JdbcPublishEventRepository.getLeader(this.jdbcTemplate, "subscribe_leader");
    }

    public boolean fightLeadership(long j, long j2, String str, int i) {
        return JdbcPublishEventRepository.fightLeadership(this.jdbcTemplate, "subscribe_leader", j, j2, str, i);
    }

    public boolean releaseLeadership(String str) {
        return JdbcPublishEventRepository.releaseLeadership(this.jdbcTemplate, "subscribe_leader", str);
    }
}
