package net.ravendb.client.connection;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Iterator;
import net.ravendb.abstractions.basic.CleanCloseable;
import net.ravendb.abstractions.closure.Action0;
import net.ravendb.abstractions.closure.Predicate;
import net.ravendb.client.changes.IObservable;
import net.ravendb.client.changes.IObserver;
import net.ravendb.client.connection.profiling.ConcurrentSet;
import org.apache.commons.io.IOUtils;

/* loaded from: input_file:net/ravendb/client/connection/ObservableLineStream.class */
public class ObservableLineStream implements IObservable<String>, Closeable {
    private final InputStream stream;
    protected int posInBuffer;
    private final Action0 onDispose;
    private Thread task;
    protected final byte[] buffer = new byte[8192];
    private volatile boolean disposed = false;
    private final ConcurrentSet<IObserver<String>> subscribers = new ConcurrentSet<>();

    public ObservableLineStream(InputStream inputStream, Action0 action0) {
        this.stream = inputStream;
        this.onDispose = action0;
    }

    public void start() {
        this.task = new Thread(new Runnable() { // from class: net.ravendb.client.connection.ObservableLineStream.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        int read = ObservableLineStream.this.read();
                        if (read == -1) {
                            break;
                        }
                        int i = 0;
                        byte b = 0;
                        boolean z = false;
                        for (int i2 = ObservableLineStream.this.posInBuffer; i2 < ObservableLineStream.this.posInBuffer + read; i2++) {
                            if (b == 13 && ObservableLineStream.this.buffer[i2] == 10) {
                                z = true;
                                int i3 = i;
                                i = i2 + 1;
                                if (i3 != i2 - 2 && ObservableLineStream.this.buffer.length - i3 >= 5 && ObservableLineStream.this.buffer[i3] == 100 && ObservableLineStream.this.buffer[i3 + 1] == 97 && ObservableLineStream.this.buffer[i3 + 2] == 116 && ObservableLineStream.this.buffer[i3 + 3] == 97 && ObservableLineStream.this.buffer[i3 + 4] == 58) {
                                    String str = new String(ObservableLineStream.this.buffer, i3 + 5, (i2 - i3) - 6, Charset.forName("UTF-8"));
                                    Iterator it = ObservableLineStream.this.subscribers.iterator();
                                    while (it.hasNext()) {
                                        ((IObserver) it.next()).onNext(str);
                                    }
                                }
                            }
                            b = ObservableLineStream.this.buffer[i2];
                        }
                        ObservableLineStream.this.posInBuffer += read;
                        if (i >= ObservableLineStream.this.posInBuffer) {
                            ObservableLineStream.this.posInBuffer = 0;
                        } else if (z) {
                            System.arraycopy(ObservableLineStream.this.buffer, i, ObservableLineStream.this.buffer, 0, ObservableLineStream.this.posInBuffer - i);
                            ObservableLineStream.this.posInBuffer -= i;
                        }
                    } catch (Exception e) {
                        IOUtils.closeQuietly(ObservableLineStream.this.stream);
                        ObservableLineStream.this.disposeAndSignalConnectionError(e);
                        return;
                    }
                }
                throw new EOFException();
            }
        }, "ObservableLineStream");
        this.task.setDaemon(true);
        this.task.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void disposeAndSignalConnectionError(Exception exc) {
        if (this.disposed) {
            return;
        }
        Iterator<IObserver<String>> it = this.subscribers.iterator();
        while (it.hasNext()) {
            it.next().onError(exc);
        }
    }

    public Thread getTask() {
        return this.task;
    }

    public int read() throws IOException {
        return this.stream.read(this.buffer, this.posInBuffer, this.buffer.length - this.posInBuffer);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.disposed = true;
        Iterator<IObserver<String>> it = this.subscribers.iterator();
        while (it.hasNext()) {
            it.next().onCompleted();
        }
        this.onDispose.apply();
    }

    @Override // net.ravendb.client.changes.IObservable
    public CleanCloseable subscribe(final IObserver<String> iObserver) {
        this.subscribers.add(iObserver);
        return new CleanCloseable() { // from class: net.ravendb.client.connection.ObservableLineStream.2
            @Override // net.ravendb.abstractions.basic.CleanCloseable, java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                ObservableLineStream.this.subscribers.remove(iObserver);
            }
        };
    }

    @Override // net.ravendb.client.changes.IObservable
    public IObservable<String> where(Predicate<String> predicate) {
        throw new UnsupportedOperationException("You can't use ObservableLineStream with where predicate");
    }
}
