package de.xab.porter.core;

import de.xab.porter.api.dataconnection.SrcConnection;
import de.xab.porter.api.task.Context;
import de.xab.porter.api.task.Properties;
import de.xab.porter.common.spi.ExtensionLoader;
import de.xab.porter.common.util.Loggers;
import de.xab.porter.transfer.channel.Channel;
import de.xab.porter.transfer.exception.ConnectionException;
import de.xab.porter.transfer.reader.Reader;
import de.xab.porter.transfer.reporter.Reporter;
import de.xab.porter.transfer.writer.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/* loaded from: input_file:de/xab/porter/core/Task.class */
public class Task {
    private final Logger logger = Loggers.getLogger(getClass());
    private final Context context;
    private Map<? extends Reader<?>, String> readers;
    private List<Map.Entry<? extends Writer<?>, Channel>> writers;

    public Task(Context context) {
        this.context = context;
    }

    public void init() {
        List of;
        SrcConnection srcConnection = this.context.getSrcConnection();
        Reader reader = (Reader) ExtensionLoader.getExtensionLoader(Reader.class).loadExtension(srcConnection.getConnectorType(), srcConnection.getType());
        try {
            if (srcConnection.getProperties().isSplit()) {
                try {
                    reader.connect(srcConnection);
                    of = reader.split();
                    reader.close();
                } catch (ConnectionException e) {
                    of = List.of(srcConnection.getSql());
                    this.logger.warning("reader connection failed " + e.getMessage());
                    reader.close();
                }
                this.readers = (Map) of.stream().map(str -> {
                    return Map.entry((Reader) ExtensionLoader.getExtensionLoader(Reader.class).loadExtension(srcConnection.getConnectorType(), srcConnection.getType()), str);
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
            } else {
                this.readers = Map.of(reader, srcConnection.getSql());
            }
            this.readers.forEach((reader2, str2) -> {
                reader2.setChannels(new ArrayList());
            });
            registerChannel();
        } catch (Throwable th) {
            reader.close();
            throw th;
        }
    }

    public void registerChannel() {
        Properties properties = this.context.getProperties();
        List sinkConnections = this.context.getSinkConnections();
        Reporter reporter = (Reporter) ExtensionLoader.getExtensionLoader(Reporter.class).loadExtension((String) null, properties.getReporter());
        this.writers = (List) sinkConnections.stream().map(sinkConnection -> {
            Writer writer = (Writer) ExtensionLoader.getExtensionLoader(Writer.class).loadExtension(sinkConnection.getConnectorType(), sinkConnection.getType());
            try {
                writer.connect(sinkConnection);
            } catch (ConnectionException e) {
                this.logger.severe("writer connection failed" + e.getMessage());
                writer.close();
            }
            Channel channel = (Channel) ExtensionLoader.getExtensionLoader(Channel.class).loadExtension((String) null, this.context.getProperties().getChannel());
            channel.setOnReadListener(result -> {
                writer.write(result);
                reporter.report(result);
            });
            this.readers.forEach((reader, str) -> {
                reader.getChannels().add(channel);
            });
            return Map.entry(writer, channel);
        }).collect(Collectors.toList());
    }

    public void start() {
        CountDownLatch countDownLatch = new CountDownLatch(this.readers.size());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.readers.size());
        ArrayList arrayList = new ArrayList(this.readers.size());
        this.readers.forEach((reader, str) -> {
            arrayList.add(newFixedThreadPool.submit(() -> {
                long j = 0;
                try {
                    try {
                        reader.connect(this.context.getSrcConnection());
                        j = reader.read(str);
                        reader.close();
                        countDownLatch.countDown();
                    } catch (ConnectionException e) {
                        this.logger.severe("reader connection failed " + e.getMessage());
                        reader.close();
                        countDownLatch.countDown();
                    }
                    return Long.valueOf(j);
                } catch (Throwable th) {
                    reader.close();
                    countDownLatch.countDown();
                    throw th;
                }
            }));
        });
        try {
            try {
                countDownLatch.await();
                this.logger.info("task execution over, total read " + arrayList.stream().mapToLong(future -> {
                    try {
                        return ((Long) future.get()).longValue();
                    } catch (InterruptedException e) {
                        this.logger.severe("future is interrupted, " + e.getMessage());
                        return 0L;
                    } catch (ExecutionException e2) {
                        this.logger.severe("future execution failed, " + e2.getMessage());
                        return 0L;
                    }
                }).sum() + " rows.");
                this.writers.forEach(entry -> {
                    ((Writer) entry.getKey()).close();
                });
                newFixedThreadPool.shutdown();
            } catch (InterruptedException e) {
                this.logger.warning("reader interrupted");
                this.writers.forEach(entry2 -> {
                    ((Writer) entry2.getKey()).close();
                });
                newFixedThreadPool.shutdown();
            }
        } catch (Throwable th) {
            this.writers.forEach(entry22 -> {
                ((Writer) entry22.getKey()).close();
            });
            newFixedThreadPool.shutdown();
            throw th;
        }
    }
}
