package org.yamcs.parameterarchive;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.yamcs.ConfigurationException;
import org.yamcs.Processor;
import org.yamcs.YConfiguration;
import org.yamcs.parameter.ParameterValue;
import org.yamcs.utils.LoggingUtils;
import org.yamcs.utils.TimeEncoding;

/* loaded from: input_file:org/yamcs/parameterarchive/RealtimeArchiveFiller.class */
public class RealtimeArchiveFiller extends ArchiveFillerTask {
    ScheduledThreadPoolExecutor executor;
    int flushInterval;
    final Logger log;
    String processorName;
    final String yamcsInstance;
    Processor realtimeProcessor;
    int subscriptionId;

    public RealtimeArchiveFiller(ParameterArchiveV2 parameterArchiveV2, Map<String, Object> map) {
        super(parameterArchiveV2);
        this.executor = new ScheduledThreadPoolExecutor(1);
        this.flushInterval = 300;
        this.processorName = "realtime";
        this.yamcsInstance = parameterArchiveV2.getYamcsInstance();
        this.log = LoggingUtils.getLogger(getClass(), this.yamcsInstance);
        if (map != null) {
            parseConfig(map);
        }
    }

    private void parseConfig(Map<String, Object> map) {
        this.flushInterval = YConfiguration.getInt(map, "flushInterval", this.flushInterval);
        this.processorName = YConfiguration.getString(map, "processorName", this.processorName);
    }

    @Override // org.yamcs.parameterarchive.ArchiveFillerTask, org.yamcs.parameter.ParameterConsumer
    public void updateItems(int i, List<ParameterValue> list) {
        this.executor.execute(() -> {
            try {
                super.updateItems(i, list);
            } catch (Exception e) {
                this.log.error("Error when adding data to realtime segments", e);
            }
        });
    }

    @Override // org.yamcs.parameterarchive.ArchiveFillerTask
    public void flush() {
        try {
            for (Map.Entry<Long, Map<Integer, PGSegment>> entry : this.pgSegments.entrySet()) {
                Map<Integer, PGSegment> value = entry.getValue();
                long longValue = entry.getKey().longValue();
                this.log.debug("Writing to archive the segment: [{} - {})", TimeEncoding.toString(longValue), TimeEncoding.toString(SortedTimeSegment.getNextSegmentStart(longValue)));
                consolidateAndWriteToArchive(longValue, value.values());
            }
        } catch (Exception e) {
            this.log.error("Error when flusing data to parameter archive ", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.realtimeProcessor = Processor.getInstance(this.yamcsInstance, this.processorName);
        if (this.realtimeProcessor == null) {
            throw new ConfigurationException("No processor named '" + this.processorName + "' in instance " + this.yamcsInstance);
        }
        this.subscriptionId = this.realtimeProcessor.getParameterRequestManager().subscribeAll(this);
        this.executor.scheduleAtFixedRate(this::flush, this.flushInterval, this.flushInterval, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.realtimeProcessor.getParameterRequestManager().unsubscribeAll(this.subscriptionId);
        this.executor.shutdown();
        flush();
    }

    @Override // org.yamcs.parameterarchive.ArchiveFillerTask
    public /* bridge */ /* synthetic */ long getNumProcessedParameters() {
        return super.getNumProcessedParameters();
    }
}
