package me.ahoo.eventbus.jdbc;

import com.google.common.base.Throwables;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.List;
import me.ahoo.eventbus.core.repository.ConcurrentVersionConflictException;
import me.ahoo.eventbus.core.repository.PublishEventRepository;
import me.ahoo.eventbus.core.repository.PublishIdentity;
import me.ahoo.eventbus.core.repository.PublishStatus;
import me.ahoo.eventbus.core.repository.Version;
import me.ahoo.eventbus.core.repository.entity.PublishEventCompensationEntity;
import me.ahoo.eventbus.core.repository.entity.PublishEventEntity;
import me.ahoo.eventbus.core.utils.Dates;
import me.ahoo.eventbus.core.utils.Jsons;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.support.GeneratedKeyHolder;

/* loaded from: input_file:me/ahoo/eventbus/jdbc/JdbcPublishEventRepository.class */
public class JdbcPublishEventRepository implements PublishEventRepository {
    private final NamedParameterJdbcTemplate jdbcTemplate;
    private static final String SQL_INITIALIZED = "insert publish_event (event_name, event_data, status,version,create_time) values (:event_name, :event_data, :status,:version,:create_time);";
    private static final String SQL_MARK_SUCCEEDED = "update publish_event set status=1,version=version+1,published_time=:published_time where id=:id and version=:version;";
    private static final String SQL_MARK_FAILED = "update publish_event set status=2,version=version+1 where id=:id and version=:version;";
    private static final String SQL_PUBLISH_FAILED = "insert publish_event_failed(publish_event_id, failed_msg,create_time) values (:publish_event_id, :failed_msg,:create_time)";
    private static final String SQL_QUERY_FAILED = "select id, event_name, event_data, status, published_time, version, create_time from publish_event where status<>1 and create_time<:before and version<:max_version order by version asc limit :limit;";
    private static String SQL_COMPENSATE = "insert publish_event_compensation (publish_event_id, start_time, taken, failed_msg) values (:publish_event_id, :start_time, :taken, :failed_msg);";

    public JdbcPublishEventRepository(NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
        this.jdbcTemplate = namedParameterJdbcTemplate;
    }

    public PublishIdentity initialize(String str, Object obj) {
        PublishIdentity build = PublishIdentity.builder().status(PublishStatus.INITIALIZED).version(Version.INITIAL_VALUE).eventName(str).createTime(LocalDateTime.now()).build();
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("event_name", str);
        mapSqlParameterSource.addValue("event_data", Jsons.serializeAsString(obj));
        mapSqlParameterSource.addValue("status", Integer.valueOf(build.getStatus().getValue()));
        mapSqlParameterSource.addValue("version", build.getVersion());
        mapSqlParameterSource.addValue("create_time", build.getCreateTime());
        GeneratedKeyHolder generatedKeyHolder = new GeneratedKeyHolder();
        this.jdbcTemplate.update(SQL_INITIALIZED, mapSqlParameterSource, generatedKeyHolder);
        build.setId(Long.valueOf(generatedKeyHolder.getKey().longValue()));
        return build;
    }

    private void checkAffected(PublishIdentity publishIdentity, int i) {
        if (i <= 0) {
            throw new ConcurrentVersionConflictException(String.format("Publish [%s] mark [%d]@[%d] to status [%s] error.", publishIdentity.getEventName(), publishIdentity.getId(), publishIdentity.getVersion(), publishIdentity.getStatus().name()), publishIdentity);
        }
    }

    public int markSucceeded(PublishIdentity publishIdentity) {
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("id", publishIdentity.getId());
        mapSqlParameterSource.addValue("version", publishIdentity.getVersion());
        mapSqlParameterSource.addValue("published_time", LocalDateTime.now());
        int update = this.jdbcTemplate.update(SQL_MARK_SUCCEEDED, mapSqlParameterSource);
        checkAffected(publishIdentity, update);
        return update;
    }

    public int markFailed(PublishIdentity publishIdentity, Throwable th) {
        insertPublishEventFailed(publishIdentity, th);
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("id", publishIdentity.getId());
        mapSqlParameterSource.addValue("version", publishIdentity.getVersion());
        int update = this.jdbcTemplate.update(SQL_MARK_FAILED, mapSqlParameterSource);
        checkAffected(publishIdentity, update);
        return update;
    }

    private void insertPublishEventFailed(PublishIdentity publishIdentity, Throwable th) {
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("publish_event_id", publishIdentity.getId());
        mapSqlParameterSource.addValue("failed_msg", Throwables.getStackTraceAsString(th));
        mapSqlParameterSource.addValue("create_time", LocalDateTime.now());
        this.jdbcTemplate.update(SQL_PUBLISH_FAILED, mapSqlParameterSource);
    }

    public List<PublishEventEntity> queryFailed(int i, int i2, int i3) {
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("max_version", Integer.valueOf(i3));
        mapSqlParameterSource.addValue("before", LocalDateTime.now().minus(i2, (TemporalUnit) ChronoUnit.MINUTES));
        mapSqlParameterSource.addValue("limit", Integer.valueOf(i));
        return this.jdbcTemplate.query(SQL_QUERY_FAILED, mapSqlParameterSource, (resultSet, i4) -> {
            long j = resultSet.getLong("id");
            String string = resultSet.getString("event_name");
            String string2 = resultSet.getString("event_data");
            int i4 = resultSet.getInt("status");
            Timestamp timestamp = resultSet.getTimestamp("published_time");
            int i5 = resultSet.getInt("version");
            Timestamp timestamp2 = resultSet.getTimestamp("create_time");
            PublishEventEntity publishEventEntity = new PublishEventEntity();
            publishEventEntity.setId(Long.valueOf(j));
            publishEventEntity.setEventName(string);
            publishEventEntity.setEventData(string2);
            publishEventEntity.setStatus(PublishStatus.valueOf(i4));
            publishEventEntity.setVersion(Integer.valueOf(i5));
            publishEventEntity.setPublishedTime(Dates.of(timestamp));
            publishEventEntity.setCreateTime(Dates.of(timestamp2));
            return publishEventEntity;
        });
    }

    public int compensate(PublishEventCompensationEntity publishEventCompensationEntity) {
        MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource("publish_event_id", publishEventCompensationEntity.getPublishEventId());
        mapSqlParameterSource.addValue("start_time", publishEventCompensationEntity.getStartTime());
        mapSqlParameterSource.addValue("taken", publishEventCompensationEntity.getTaken());
        mapSqlParameterSource.addValue("failed_msg", publishEventCompensationEntity.getFailedMsg());
        return this.jdbcTemplate.update(SQL_COMPENSATE, mapSqlParameterSource);
    }
}
