package org.jgrapes.io;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Optional;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.Components;
import org.jgrapes.core.Event;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
import org.jgrapes.core.events.Stop;
import org.jgrapes.io.events.IOError;
import org.jgrapes.io.events.Input;
import org.jgrapes.io.util.ManagedBuffer;
import org.jgrapes.io.util.ManagedBufferPool;
import org.jgrapes.util.events.ConfigurationUpdate;

/* loaded from: input_file:org/jgrapes/io/InputStreamMonitor.class */
public class InputStreamMonitor extends Component implements Runnable {
    private Channel dataChannel;
    private InputStream input;
    private boolean registered;
    private Thread runner;
    private ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> buffers;
    private int bufferSize;

    public InputStreamMonitor(Channel channel, InputStream inputStream, Channel channel2) {
        super(channel);
        this.bufferSize = 2048;
        this.input = inputStream;
        this.dataChannel = channel2;
    }

    public InputStreamMonitor(Channel channel, InputStream inputStream) {
        this(channel, inputStream, channel);
    }

    public InputStreamMonitor setBufferSize(int i) {
        this.bufferSize = i;
        return this;
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    @Handler
    public void onConfigurationUpdate(ConfigurationUpdate configurationUpdate) {
        configurationUpdate.values(componentPath()).ifPresent(map -> {
            Optional.ofNullable((String) map.get("bufferSize")).ifPresent(str -> {
                setBufferSize(Integer.parseInt(str));
            });
        });
    }

    @Handler
    public void onStart(Start start) {
        synchronized (this) {
            if (this.runner != null) {
                return;
            }
            this.buffers = new ManagedBufferPool<>((v1, v2) -> {
                return new ManagedBuffer(v1, v2);
            }, () -> {
                return ByteBuffer.allocateDirect(this.bufferSize);
            }, 2);
            this.runner = new Thread(this, Components.simpleObjectName(this));
            this.runner.setDaemon(true);
            this.runner.start();
        }
    }

    @Handler(priority = -10000)
    public void onStop(Stop stop) throws InterruptedException {
        synchronized (this) {
            if (this.runner == null) {
                return;
            }
            this.runner.interrupt();
            synchronized (this) {
                if (this.registered) {
                    unregisterAsGenerator();
                    this.registered = false;
                }
                this.runner = null;
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setName(Components.simpleObjectName(this));
        try {
            try {
                synchronized (this) {
                    registerAsGenerator();
                    this.registered = true;
                }
                ReadableByteChannel newChannel = Channels.newChannel(this.input);
                while (!Thread.currentThread().isInterrupted()) {
                    ManagedBuffer<ByteBuffer> acquire = this.buffers.acquire();
                    boolean z = acquire.fillFromChannel(newChannel) == -1;
                    fire(Input.fromSink(acquire, z), new Channel[]{this.dataChannel});
                    if (z) {
                        break;
                    }
                }
                synchronized (this) {
                    if (this.registered) {
                        unregisterAsGenerator();
                        this.registered = false;
                    }
                }
            } catch (IOException e) {
                fire(new IOError((Event<?>) null, e), new Channel[]{channel()});
                synchronized (this) {
                    if (this.registered) {
                        unregisterAsGenerator();
                        this.registered = false;
                    }
                }
            } catch (InterruptedException e2) {
                synchronized (this) {
                    if (this.registered) {
                        unregisterAsGenerator();
                        this.registered = false;
                    }
                }
            }
        } catch (Throwable th) {
            synchronized (this) {
                if (this.registered) {
                    unregisterAsGenerator();
                    this.registered = false;
                }
                throw th;
            }
        }
    }
}
