package com.hotels.bdp.circustrain.aws.sns.event;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.sns.AmazonSNSAsyncClient;
import com.amazonaws.services.sns.model.PublishRequest;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.annotations.VisibleForTesting;
import com.hotels.bdp.circustrain.api.CompletionCode;
import com.hotels.bdp.circustrain.api.event.CopierListener;
import com.hotels.bdp.circustrain.api.event.EventPartition;
import com.hotels.bdp.circustrain.api.event.EventPartitions;
import com.hotels.bdp.circustrain.api.event.EventReplicaCatalog;
import com.hotels.bdp.circustrain.api.event.EventSourceCatalog;
import com.hotels.bdp.circustrain.api.event.EventTable;
import com.hotels.bdp.circustrain.api.event.EventTableReplication;
import com.hotels.bdp.circustrain.api.event.LocomotiveListener;
import com.hotels.bdp.circustrain.api.event.ReplicaCatalogListener;
import com.hotels.bdp.circustrain.api.event.SourceCatalogListener;
import com.hotels.bdp.circustrain.api.event.TableReplicationListener;
import com.hotels.bdp.circustrain.api.metrics.Metrics;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/hotels/bdp/circustrain/aws/sns/event/SnsListener.class */
public class SnsListener implements LocomotiveListener, SourceCatalogListener, ReplicaCatalogListener, TableReplicationListener, CopierListener {
    private static final Logger LOG = LoggerFactory.getLogger(SnsListener.class);
    private static final int SNS_MESSAGE_SIZE_LIMIT = 262144;
    private final AmazonSNSAsyncClient sns;
    private final ListenerConfig config;
    private final ObjectWriter startWriter;
    private final Clock clock;
    private Metrics metrics;
    private List<EventPartition> partitionsToCreate;
    private List<EventPartition> partitionsToAlter;
    private String startTime;
    private EventSourceCatalog sourceCatalog;
    private EventReplicaCatalog replicaCatalog;
    private LinkedHashMap<String, String> partitionKeyTypes;

    @Autowired
    public SnsListener(AmazonSNSAsyncClient amazonSNSAsyncClient, ListenerConfig listenerConfig) {
        this(amazonSNSAsyncClient, listenerConfig, Clock.DEFAULT);
    }

    SnsListener(AmazonSNSAsyncClient amazonSNSAsyncClient, ListenerConfig listenerConfig, Clock clock) {
        this.clock = clock;
        LOG.info("Starting listener, topics: start={}, success={}, fail={}", new Object[]{listenerConfig.getStartTopic(), listenerConfig.getSuccessTopic(), listenerConfig.getFailTopic()});
        this.sns = amazonSNSAsyncClient;
        this.config = listenerConfig;
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        this.startWriter = objectMapper.writer();
    }

    public void circusTrainStartUp(String[] strArr, EventSourceCatalog eventSourceCatalog, EventReplicaCatalog eventReplicaCatalog) {
        this.sourceCatalog = eventSourceCatalog;
        this.replicaCatalog = eventReplicaCatalog;
    }

    public void copierEnd(Metrics metrics) {
        this.metrics = metrics;
    }

    public void tableReplicationStart(EventTableReplication eventTableReplication, String str) {
        this.startTime = this.clock.getTime();
        publish(this.config.getStartTopic(), new SnsMessage(SnsMessageType.START, this.config.getHeaders(), this.startTime, null, str, this.sourceCatalog.getName(), this.replicaCatalog.getName(), this.replicaCatalog.getHiveMetastoreUris(), eventTableReplication.getSourceTable().getQualifiedName(), eventTableReplication.getQualifiedReplicaName(), eventTableReplication.getReplicaTable().getTableLocation(), this.partitionKeyTypes, null, null, null));
    }

    public void tableReplicationSuccess(EventTableReplication eventTableReplication, String str) {
        publish(this.config.getSuccessTopic(), new SnsMessage(SnsMessageType.SUCCESS, this.config.getHeaders(), this.startTime, this.clock.getTime(), str, this.sourceCatalog.getName(), this.replicaCatalog.getName(), this.replicaCatalog.getHiveMetastoreUris(), eventTableReplication.getSourceTable().getQualifiedName(), eventTableReplication.getQualifiedReplicaName(), eventTableReplication.getReplicaTable().getTableLocation(), this.partitionKeyTypes, getModifiedPartitions(this.partitionsToAlter, this.partitionsToCreate), getBytesReplicated(), null));
    }

    public void tableReplicationFailure(EventTableReplication eventTableReplication, String str, Throwable th) {
        if (this.startTime == null) {
            this.startTime = this.clock.getTime();
        }
        publish(this.config.getFailTopic(), new SnsMessage(SnsMessageType.FAILURE, this.config.getHeaders(), this.startTime, this.clock.getTime(), str, this.sourceCatalog.getName(), this.replicaCatalog.getName(), this.replicaCatalog.getHiveMetastoreUris(), eventTableReplication.getSourceTable().getQualifiedName(), eventTableReplication.getQualifiedReplicaName(), eventTableReplication.getReplicaTable().getTableLocation(), this.partitionKeyTypes, getModifiedPartitions(this.partitionsToAlter, this.partitionsToCreate), getBytesReplicated(), th.getMessage()));
    }

    private Long getBytesReplicated() {
        if (this.metrics != null) {
            return Long.valueOf(this.metrics.getBytesReplicated());
        }
        return 0L;
    }

    public void partitionsToCreate(EventPartitions eventPartitions) {
        this.partitionsToCreate = eventPartitions.getEventPartitions();
        setPartitionKeyTypes(eventPartitions.getPartitionKeyTypes());
    }

    public void partitionsToAlter(EventPartitions eventPartitions) {
        this.partitionsToAlter = eventPartitions.getEventPartitions();
        setPartitionKeyTypes(eventPartitions.getPartitionKeyTypes());
    }

    private void setPartitionKeyTypes(LinkedHashMap<String, String> linkedHashMap) {
        if (linkedHashMap != null) {
            this.partitionKeyTypes = linkedHashMap;
        }
    }

    @VisibleForTesting
    static List<List<String>> getModifiedPartitions(List<EventPartition> list, List<EventPartition> list2) {
        if (list == null && list2 == null) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (List list3 : Arrays.asList(list, list2)) {
            if (list3 != null) {
                Iterator it = list3.iterator();
                while (it.hasNext()) {
                    arrayList.add(((EventPartition) it.next()).getValues());
                }
            }
        }
        return arrayList;
    }

    private void publish(String str, SnsMessage snsMessage) {
        if (str != null) {
            try {
                String writeValueAsString = this.startWriter.writeValueAsString(snsMessage);
                int length = writeValueAsString.getBytes(StandardCharsets.UTF_8).length;
                if (length > SNS_MESSAGE_SIZE_LIMIT) {
                    LOG.warn("Message length of {} exceeds SNS limit ({} bytes).", Integer.valueOf(length), Integer.valueOf(SNS_MESSAGE_SIZE_LIMIT));
                }
                LOG.debug("Attempting to send message to topic '{}': {}", str, writeValueAsString);
                PublishRequest publishRequest = new PublishRequest(str, writeValueAsString);
                if (this.config.getSubject() != null) {
                    publishRequest.setSubject(this.config.getSubject());
                }
                try {
                    this.sns.publish(publishRequest);
                } catch (AmazonClientException e) {
                    LOG.error("Could not publish message to SNS topic '{}'.", str, e);
                }
            } catch (JsonProcessingException e2) {
                LOG.error("Could not serialize message '{}'.", snsMessage, e2);
            }
        }
    }

    @PreDestroy
    public void flush() throws InterruptedException {
        LOG.debug("Terminating...");
        this.sns.getExecutorService().shutdown();
        this.sns.getExecutorService().awaitTermination(60L, TimeUnit.SECONDS);
        this.sns.shutdown();
        LOG.debug("Terminated.");
    }

    public void copierStart(String str) {
    }

    public void resolvedReplicaLocation(URI uri) {
    }

    public void existingReplicaPartitions(EventPartitions eventPartitions) {
    }

    public void deprecatedReplicaLocations(List<URI> list) {
    }

    public void resolvedMetaStoreSourceTable(EventTable eventTable) {
    }

    public void resolvedSourcePartitions(EventPartitions eventPartitions) {
    }

    public void resolvedSourceLocation(URI uri) {
    }

    public void circusTrainShutDown(CompletionCode completionCode, Map<String, Long> map) {
    }
}
