package streams.spark.example;

import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.io.Sink;

/* loaded from: input_file:streams/spark/example/PerformanceSink.class */
public class PerformanceSink implements Sink {
    private static final transient Logger log = LoggerFactory.getLogger(PerformanceSink.class);
    private String id;
    private String url;
    private transient FileSystem hdfs;
    private transient BufferedWriter bw;
    private transient long startTime;
    private transient long numItems;

    public PerformanceSink() {
    }

    public PerformanceSink(String str) throws Exception {
        setUrl(str);
    }

    public void init() throws Exception {
        if (this.url == null) {
            log.warn("No URL was set for sink {}. Will not write performance to any file.");
        }
        this.startTime = System.currentTimeMillis();
        this.numItems = 0L;
    }

    public boolean write(Data data) throws Exception {
        if (this.hdfs != null) {
            if (this.bw == null) {
                this.bw = new BufferedWriter(new OutputStreamWriter(this.hdfs.create(new Path(this.url), true)));
            }
            try {
                this.bw.write(Long.toString(System.currentTimeMillis() - this.startTime));
                this.bw.newLine();
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
        this.numItems++;
        return true;
    }

    public boolean write(Collection<Data> collection) throws Exception {
        Iterator<Data> it = collection.iterator();
        while (it.hasNext()) {
            if (!write(it.next())) {
                return false;
            }
        }
        return true;
    }

    public void close() throws Exception {
        if (this.bw != null) {
            this.bw.flush();
            this.bw.close();
        }
        log.info("Received {} items with an approximate rate of {} items per second", Long.valueOf(this.numItems), String.format("%.3f", Double.valueOf((1000.0d * this.numItems) / (System.currentTimeMillis() - this.startTime))));
    }

    public String getId() {
        return this.id;
    }

    public void setId(String str) {
        this.id = str;
    }

    public String getUrl() {
        return this.url;
    }

    public void setUrl(String str) throws Exception {
        Matcher matcher = Pattern.compile("^[a-z]+://[^:]+:[0-9]+").matcher(str);
        if (!matcher.find()) {
            throw new IllegalArgumentException("Invalid URL format, filesystem could not be determined: " + str);
        }
        FileSystem fileSystem = FileSystem.get(new URI(matcher.group()), new Configuration());
        this.url = str;
        this.hdfs = fileSystem;
    }
}
