package stream.monitor;

import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.ProcessContext;

/* loaded from: input_file:stream/monitor/TimeRate.class */
public class TimeRate extends StreamMonitor implements TimeRateService {
    static Logger logger = LoggerFactory.getLogger(TimeRate.class);
    protected Long start;
    protected Long startIndex;
    protected Long nowIndex;
    protected long n;
    protected float mean;
    protected Float rate;
    protected Float time;
    protected Integer every = null;
    protected String index;

    public TimeRate() {
        try {
            reset();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String getIndex() {
        return this.index;
    }

    public void setIndex(String str) {
        this.index = str;
    }

    public Integer getEvery() {
        return this.every;
    }

    public void setEvery(Integer num) {
        this.every = num;
    }

    @Override // stream.monitor.StreamMonitor
    public void init(ProcessContext processContext) throws Exception {
        if (this.dweet.booleanValue()) {
            this.keys = new String[]{"@timeRate"};
        }
        super.init(processContext);
    }

    public Data process(Data data) {
        if (this.start == null) {
            this.start = Long.valueOf(System.currentTimeMillis());
            this.startIndex = getIndex(data);
        }
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        long longValue = valueOf.longValue() - this.start.longValue();
        if (longValue > this.every.intValue()) {
            this.nowIndex = getIndex(data);
            if (this.nowIndex != null) {
                long longValue2 = this.nowIndex.longValue() - this.startIndex.longValue();
                this.rate = Float.valueOf((1.0f * ((float) longValue2)) / ((float) longValue));
                this.time = Float.valueOf((1.0f * ((float) longValue2)) / 1000.0f);
                data.put("@timeRate", this.rate);
                data.put("@processedTime", this.time);
                if (this.log.booleanValue()) {
                    logger.info("Time rate {}. {} time (s) processed. @index={}.Time-rate is: {}/second", new Object[]{getId(), this.time, this.nowIndex, this.rate});
                }
                if (this.dweet.booleanValue()) {
                    this.n++;
                    this.mean += (this.rate.floatValue() - this.mean) / ((float) this.n);
                    data.put("@timeRate", Float.valueOf(this.mean));
                    this.dweetWriter.process(data);
                }
                this.start = valueOf;
                this.startIndex = this.nowIndex;
            }
        }
        return data;
    }

    private Long getIndex(Data data) {
        Serializable serializable = (Serializable) data.get(this.index);
        if (serializable == null || !(serializable instanceof Long)) {
            return null;
        }
        return (Long) serializable;
    }

    public void finish() throws Exception {
        super.finish();
        logger.info("TimeRate finished");
    }

    @Override // stream.monitor.TimeRateService
    public Double getTimeRate() {
        return new Double(this.rate.floatValue());
    }

    public void reset() throws Exception {
        this.n = 0L;
        this.start = null;
        this.startIndex = null;
        this.nowIndex = null;
        this.rate = new Float(0.0f);
        this.time = new Float(0.0f);
        this.mean = 0.0f;
    }
}
