package li.pitschmann.knx.core.communication.communicator;

import java.nio.channels.ByteChannel;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import li.pitschmann.knx.core.annotations.Nullable;
import li.pitschmann.knx.core.body.Body;
import li.pitschmann.knx.core.body.RequestBody;
import li.pitschmann.knx.core.body.ResponseBody;
import li.pitschmann.knx.core.communication.InternalKnxClient;
import li.pitschmann.knx.core.communication.InternalKnxEventPool;
import li.pitschmann.knx.core.communication.event.KnxEvent;
import li.pitschmann.knx.core.communication.event.KnxMultiEvent;
import li.pitschmann.knx.core.communication.queue.AbstractInboxQueue;
import li.pitschmann.knx.core.communication.queue.AbstractOutboxQueue;
import li.pitschmann.knx.core.communication.queue.DefaultInboxQueue;
import li.pitschmann.knx.core.communication.queue.DefaultOutboxQueue;
import li.pitschmann.knx.core.config.CoreConfigs;
import li.pitschmann.knx.core.utils.Closeables;
import li.pitschmann.knx.core.utils.Sleeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:li/pitschmann/knx/core/communication/communicator/AbstractChannelCommunicator.class */
public abstract class AbstractChannelCommunicator extends SubmissionPublisher<Body> implements Runnable {
    private final InternalKnxClient client;
    private final ExecutorService queueExecutor;
    private final ExecutorService communicationExecutor;
    private final SelectableChannel channel;
    private final AbstractInboxQueue<? extends ByteChannel> inboxQueue;
    private final AbstractOutboxQueue<? extends ByteChannel> outboxQueue;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final AtomicBoolean closed = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractChannelCommunicator(InternalKnxClient internalKnxClient) {
        this.client = (InternalKnxClient) Objects.requireNonNull(internalKnxClient);
        this.channel = (SelectableChannel) Objects.requireNonNull(newChannel(this.client));
        this.log.debug("Channel registered: {} (open: {}, registered: {}, blocking: {})", new Object[]{this.channel, Boolean.valueOf(this.channel.isOpen()), Boolean.valueOf(this.channel.isRegistered()), Boolean.valueOf(this.channel.isBlocking())});
        this.inboxQueue = createInboxQueue(this.client, this.channel);
        this.outboxQueue = createOutboxQueue(this.client, this.channel);
        this.log.debug("Inbox and Outbox Queues created: InboxQueue={}, OutboxQueue={}.", this.inboxQueue, this.outboxQueue);
        this.queueExecutor = Executors.newFixedThreadPool(2);
        this.queueExecutor.submit(this.inboxQueue);
        this.queueExecutor.submit(this.outboxQueue);
        this.queueExecutor.shutdown();
        this.log.debug("Queue Executor created: {}", this.queueExecutor);
        Integer num = (Integer) this.client.getConfig(CoreConfigs.Communication.EXECUTOR_POOL_SIZE);
        this.communicationExecutor = Executors.newFixedThreadPool(num.intValue());
        this.log.debug("Communication Executor created with size of {}: {}", num, this.communicationExecutor);
    }

    protected abstract SelectableChannel newChannel(InternalKnxClient internalKnxClient);

    protected AbstractInboxQueue<? extends ByteChannel> createInboxQueue(InternalKnxClient internalKnxClient, SelectableChannel selectableChannel) {
        return new DefaultInboxQueue(internalKnxClient, selectableChannel);
    }

    protected AbstractOutboxQueue<? extends ByteChannel> createOutboxQueue(InternalKnxClient internalKnxClient, SelectableChannel selectableChannel) {
        return new DefaultOutboxQueue(internalKnxClient, selectableChannel);
    }

    public SelectableChannel getChannel() {
        return this.channel;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.log.trace("*** START ***");
        while (!Thread.interrupted() && !isClosed()) {
            try {
                this.log.debug("Waiting for next packet from channel");
                Body next = this.inboxQueue.next();
                if (!isCompatible(next)) {
                    this.log.warn("Body is not expected for this channel and therefore ignored: {}", next);
                } else if (isClosed()) {
                    this.log.warn("Body not sent to subscribers because submission publisher is closed: {}", next);
                } else {
                    this.log.debug("Body from channel to be sent to subscribers: {}", next);
                    submit(next);
                }
            } catch (InterruptedException e) {
                this.log.debug("Channel receiver is cancelled");
                Thread.currentThread().interrupt();
            } catch (Throwable th) {
                if (!isClosed()) {
                    this.log.error("Throwable caught during running communicator", th);
                    throw th;
                }
            }
        }
        this.log.trace("*** END ***");
    }

    public abstract boolean isCompatible(Body body);

    public final void send(Body body) {
        this.outboxQueue.send((Body) Objects.requireNonNull(body));
        this.log.debug("Body added to outbox queue: {}", body);
    }

    public final <T extends ResponseBody> CompletableFuture<T> send(RequestBody requestBody, long j) {
        return send(requestBody, null, j);
    }

    public final <T extends ResponseBody> CompletableFuture<T> send(RequestBody requestBody, @Nullable Predicate<T> predicate, long j) {
        return CompletableFuture.supplyAsync(() -> {
            return sendAndWaitInternal(requestBody, predicate, j);
        }, this.communicationExecutor).exceptionally(th -> {
            this.client.notifyError(th);
            return null;
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v39, types: [java.lang.Object, li.pitschmann.knx.core.body.ResponseBody] */
    /* JADX WARN: Type inference failed for: r0v48, types: [li.pitschmann.knx.core.body.ResponseBody] */
    /* JADX WARN: Type inference failed for: r0v53, types: [li.pitschmann.knx.core.body.ResponseBody] */
    /* JADX WARN: Type inference failed for: r9v0, types: [java.util.function.Predicate, java.util.function.Predicate<T extends li.pitschmann.knx.core.body.ResponseBody>] */
    @Nullable
    private final <T extends ResponseBody> T sendAndWaitInternal(RequestBody requestBody, @Nullable Predicate<T> predicate, long j) {
        InternalKnxEventPool eventPool = this.client.getEventPool();
        KnxEvent knxEvent = eventPool.get((InternalKnxEventPool) requestBody);
        knxEvent.setRequest(requestBody);
        knxEvent.clearResponse();
        this.log.trace("Request Body added to event pool.");
        this.client.getStatusPool().setDirty(requestBody);
        int i = 1;
        T t = null;
        do {
            send(requestBody);
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                KnxEvent knxEvent2 = eventPool.get((InternalKnxEventPool) requestBody);
                if (knxEvent2.hasResponse()) {
                    if (predicate == 0) {
                        t = knxEvent2.getResponse();
                    } else if (knxEvent2 instanceof KnxMultiEvent) {
                        t = ((KnxMultiEvent) knxEvent2).getResponse((Predicate<T>) predicate);
                    } else {
                        ?? response = knxEvent2.getResponse();
                        t = predicate.test(response) ? response : null;
                    }
                    if (t != null) {
                        this.log.debug("Response received for request ({}/{}): {}", new Object[]{Integer.valueOf(i), 3, t});
                        break;
                    }
                }
                if (!Sleeper.milliseconds(10L) || System.currentTimeMillis() - currentTimeMillis >= j) {
                    break;
                }
            }
            if (t == null) {
                this.log.warn("No response received yet for request ({}/{}): {}", new Object[]{Integer.valueOf(i), 3, requestBody});
            }
            if (t != null) {
                break;
            }
            int i2 = i;
            i++;
            if (i2 >= 3) {
                break;
            }
        } while (!Thread.currentThread().isInterrupted());
        return t;
    }

    @Override // java.util.concurrent.SubmissionPublisher, java.lang.AutoCloseable
    public final void close() {
        this.log.trace("Method 'close()' invoked.");
        if (this.closed.getAndSet(true)) {
            return;
        }
        super.close();
        try {
            cleanUp();
        } finally {
            Closeables.closeQuietly((Channel) this.channel);
            Closeables.shutdownQuietly(this.queueExecutor);
            Closeables.shutdownQuietly(this.communicationExecutor);
            this.log.debug("Method 'close()' called.");
        }
    }

    protected void cleanUp() {
    }
}
