package group.insyde.statefun.tsukuyomi.dispatcher.job;

import group.insyde.statefun.tsukuyomi.dispatcher.socket.DispatcherSocketHolder;
import java.io.BufferedReader;
import java.io.Serializable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:group/insyde/statefun/tsukuyomi/dispatcher/job/DispatcherSocketSource.class */
class DispatcherSocketSource extends RichSourceFunction<Envelope> implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(DispatcherSocketSource.class);
    private volatile boolean cancelled;
    private transient BufferedReader reader;

    public void open(Configuration configuration) throws Exception {
        this.reader = DispatcherSocketHolder.getSocket().getReader().get();
        log.info("Client connected, start reading...");
    }

    public void run(SourceFunction.SourceContext<Envelope> sourceContext) throws Exception {
        while (!this.cancelled) {
            String readLine = this.reader.readLine();
            if (readLine != null) {
                log.info("Line received: {}", readLine);
                sourceContext.collect(Envelope.fromJson(readLine));
            }
        }
    }

    public void cancel() {
        this.cancelled = true;
    }
}
