package org.yamcs.replication;

import com.google.common.util.concurrent.Service;
import com.google.protobuf.TextFormat;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.yamcs.AbstractYamcsService;
import org.yamcs.ConfigurationException;
import org.yamcs.InitException;
import org.yamcs.Spec;
import org.yamcs.YConfiguration;
import org.yamcs.YamcsException;
import org.yamcs.YamcsServer;
import org.yamcs.YamcsServerInstance;
import org.yamcs.replication.protobuf.ColumnInfo;
import org.yamcs.replication.protobuf.Request;
import org.yamcs.replication.protobuf.Response;
import org.yamcs.replication.protobuf.StreamInfo;
import org.yamcs.replication.protobuf.TimeMessage;
import org.yamcs.time.SimulationTimeService;
import org.yamcs.time.TimeService;
import org.yamcs.utils.DecodingException;
import org.yamcs.yarch.ColumnDefinition;
import org.yamcs.yarch.ColumnSerializer;
import org.yamcs.yarch.ColumnSerializerFactory;
import org.yamcs.yarch.DataType;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.TupleDefinition;
import org.yamcs.yarch.YarchDatabase;

/* loaded from: input_file:org/yamcs/replication/ReplicationSlave.class */
public class ReplicationSlave extends AbstractYamcsService {
    private TcpRole tcpRole;
    int port;
    String host;
    ReplicationClient tcpClient;
    long reconnectionInterval;
    String masterInstance;
    long lastTxId;
    SlaveChannelHandler slaveChannelHandler;
    RandomAccessFile lastTxFile;
    Path txtfilePath;
    int localInstanceId;
    int maxTupleSize;
    long timeoutMillis;
    Map<String, String> streamNames = new HashMap();
    SslContext sslCtx = null;
    SimulationTimeService simTimeService = null;

    /* loaded from: input_file:org/yamcs/replication/ReplicationSlave$SlaveChannelHandler.class */
    public class SlaveChannelHandler extends ChannelInboundHandlerAdapter {
        ReplicationSlave replSlave;
        private ChannelHandlerContext channelHandlerContext;
        Map<Integer, ByteBufToStream> streamWriters = new HashMap();
        long lastMsgReceivedTime = System.currentTimeMillis();
        private ScheduledFuture<?> timeoutFuture;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/yamcs/replication/ReplicationSlave$SlaveChannelHandler$ByteBufToStream.class */
        public class ByteBufToStream {
            TupleDefinition completeTuple = new TupleDefinition();
            ColumnSerializer<?>[] serializers;
            Stream stream;

            public ByteBufToStream(Stream stream, StreamInfo streamInfo) {
                this.stream = stream;
                this.serializers = new ColumnSerializer[streamInfo.getColumnsCount()];
                for (int i = 0; i < this.serializers.length; i++) {
                    ColumnInfo columns = streamInfo.getColumns(i);
                    if (columns.getId() != i) {
                        ReplicationSlave.this.log.warn("Corrupted metadata? c[{}].getId = {} (should be {})", Integer.valueOf(i), Integer.valueOf(columns.getId()), Integer.valueOf(i));
                        return;
                    }
                    ColumnDefinition columnDefinition = new ColumnDefinition(columns.getName(), DataType.byName(columns.getType()));
                    this.completeTuple.addColumn(columnDefinition);
                    this.serializers[i] = ColumnSerializerFactory.getColumnSerializerForReplication(columnDefinition);
                }
            }

            public void processData(long j, ByteBuffer byteBuffer) {
                TupleDefinition tupleDefinition = new TupleDefinition();
                ArrayList arrayList = new ArrayList();
                while (true) {
                    try {
                        int i = byteBuffer.getInt();
                        if (i == -1) {
                            this.stream.emitTuple(new Tuple(tupleDefinition, arrayList));
                            ReplicationSlave.this.updateLastTxFile();
                            return;
                        }
                        int i2 = i & 65535;
                        if (i2 >= this.completeTuple.size()) {
                            ReplicationSlave.this.log.warn("TX{}: when deserializing data for stream {}: reference to unknown column index {}", Long.valueOf(j), this.stream.getName(), Integer.valueOf(i2));
                            return;
                        }
                        int i3 = i >>> 24;
                        ColumnDefinition column = this.completeTuple.getColumn(i2);
                        ColumnSerializer<?> columnSerializer = this.serializers[i2];
                        if (column.getType().getTypeId() != i3) {
                            ReplicationSlave.this.log.warn("TX{}: when deserializing data for stream {}: type id for index {} (column {}) is {}; expected {}", Long.valueOf(j), this.stream.getName(), Integer.valueOf(i2), column.getName(), Integer.valueOf(i3), Byte.valueOf(column.getType().getTypeId()));
                            return;
                        } else {
                            Object deserialize = columnSerializer.deserialize(byteBuffer, column);
                            tupleDefinition.addColumn(column);
                            arrayList.add(deserialize);
                        }
                    } catch (Exception e) {
                        ReplicationSlave.this.log.warn("Cannot deserialize data for stream {}", this.stream.getName(), e);
                        return;
                    }
                }
            }
        }

        public SlaveChannelHandler(ReplicationSlave replicationSlave) {
            this.replSlave = replicationSlave;
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
            ByteBuf byteBuf = (ByteBuf) obj;
            try {
                doChannelRead(channelHandlerContext, byteBuf);
                byteBuf.release();
            } catch (Throwable th) {
                byteBuf.release();
                throw th;
            }
        }

        private void doChannelRead(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
            ByteBuffer nioBuffer = byteBuf.nioBuffer();
            if (ReplicationSlave.this.state() != Service.State.RUNNING) {
                return;
            }
            try {
                Message decode = Message.decode(nioBuffer);
                this.lastMsgReceivedTime = System.currentTimeMillis();
                if (decode.type == 5) {
                    TransactionMessage transactionMessage = (TransactionMessage) decode;
                    if (transactionMessage.txId <= ReplicationSlave.this.lastTxId) {
                        ReplicationSlave.this.log.warn("Received data from the past txId={}, lastTxId={}", Long.valueOf(transactionMessage.txId), Long.valueOf(ReplicationSlave.this.lastTxId));
                    } else {
                        checkMissing(transactionMessage);
                    }
                    int i = transactionMessage.buf.getInt();
                    if (transactionMessage.instanceId == ReplicationSlave.this.localInstanceId) {
                        ReplicationSlave.this.log.trace("Skipping data originating from myself (serverId: {})", Integer.valueOf(transactionMessage.instanceId));
                        return;
                    }
                    ByteBufToStream byteBufToStream = this.streamWriters.get(Integer.valueOf(i));
                    if (byteBufToStream == null) {
                        ReplicationSlave.this.log.trace("Skipping data for unknown stream {}", Integer.valueOf(i));
                        return;
                    }
                    if (ReplicationSlave.this.log.isTraceEnabled()) {
                        ReplicationSlave.this.log.trace("TX{} received data for stream {}, length {}", Long.valueOf(transactionMessage.txId), byteBufToStream.stream.getName(), Integer.valueOf(transactionMessage.buf.remaining()));
                    }
                    byteBufToStream.processData(transactionMessage.txId, transactionMessage.buf);
                    return;
                }
                if (decode.type != 4) {
                    if (decode.type != 3) {
                        if (decode.type == 6) {
                            ReplicationSlave.this.processTimeMessage(decode.protoMsg);
                            return;
                        } else {
                            ReplicationSlave.this.failService("Unexpected message type " + decode.type + " received from the master");
                            return;
                        }
                    }
                    Response response = decode.protoMsg;
                    if (response.getResult() != 0) {
                        ReplicationSlave.this.failService("Received negative response: " + response.getErrorMsg());
                        return;
                    } else {
                        ReplicationSlave.this.log.info("Received response {}", response);
                        return;
                    }
                }
                TransactionMessage transactionMessage2 = (TransactionMessage) decode;
                if (transactionMessage2.txId > ReplicationSlave.this.lastTxId) {
                    checkMissing(transactionMessage2);
                }
                StreamInfo streamInfo = decode.protoMsg;
                if (!streamInfo.hasName() || !streamInfo.hasId()) {
                    ReplicationSlave replicationSlave = ReplicationSlave.this;
                    replicationSlave.failService("TX" + transactionMessage2.txId + ": received invalid stream info: " + replicationSlave);
                    return;
                }
                ReplicationSlave.this.log.debug("TX{}: received stream info {}", Long.valueOf(transactionMessage2.txId), TextFormat.shortDebugString(streamInfo));
                String name = streamInfo.getName();
                if (!ReplicationSlave.this.streamNames.containsKey(name)) {
                    ReplicationSlave.this.log.debug("TX{}: Ignoring stream {} because it is not in the list configured", Long.valueOf(transactionMessage2.txId), name);
                    return;
                }
                String str = ReplicationSlave.this.streamNames.get(name);
                Stream stream = YarchDatabase.getInstance(ReplicationSlave.this.yamcsInstance).getStream(str);
                if (stream == null) {
                    ReplicationSlave.this.log.warn("TX{}: Received data for stream {} which does not exist", Long.valueOf(transactionMessage2.txId), str);
                } else {
                    this.streamWriters.put(Integer.valueOf(streamInfo.getId()), new ByteBufToStream(stream, streamInfo));
                }
            } catch (DecodingException e) {
                ReplicationSlave.this.log.warn("TX{} Failed to decode message {}; closing connection", Long.valueOf(ReplicationSlave.this.lastTxId), ByteBufUtil.hexDump(byteBuf), e);
                channelHandlerContext.close();
            }
        }

        private void checkMissing(TransactionMessage transactionMessage) {
            if (transactionMessage.txId != ReplicationSlave.this.lastTxId + 1) {
                ReplicationSlave.this.log.warn("Transactions {} to {} are missing", Long.valueOf(ReplicationSlave.this.lastTxId + 1), Long.valueOf(transactionMessage.txId - 1));
            }
            ReplicationSlave.this.lastTxId = transactionMessage.txId;
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelActive(channelHandlerContext);
            sendRequest();
        }

        public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.handlerAdded(channelHandlerContext);
            if (ReplicationSlave.this.tcpRole == TcpRole.CLIENT) {
                return;
            }
            this.channelHandlerContext = channelHandlerContext;
            sendRequest();
        }

        private void sendRequest() {
            Request.Builder yamcsInstance = Request.newBuilder().setRequestSeq(1).setYamcsInstance(ReplicationSlave.this.masterInstance);
            if (ReplicationSlave.this.lastTxId >= 0) {
                yamcsInstance.setStartTxId(ReplicationSlave.this.lastTxId + 1);
            }
            Request m372build = yamcsInstance.m372build();
            ReplicationSlave.this.log.debug("Connection {} opened, sending request {}", this.channelHandlerContext.channel().remoteAddress(), TextFormat.shortDebugString(m372build));
            this.channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(Message.get(m372build).encode()));
            cancelTimeoutFuture();
            this.timeoutFuture = this.channelHandlerContext.executor().scheduleAtFixedRate(this::checkTimeout, ReplicationSlave.this.timeoutMillis, ReplicationSlave.this.timeoutMillis, TimeUnit.MILLISECONDS);
        }

        void checkTimeout() {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastMsgReceivedTime > ReplicationSlave.this.timeoutMillis) {
                ReplicationSlave.this.log.warn("No message received in the last {} seconds. Closing the connection", Long.valueOf((currentTimeMillis - this.lastMsgReceivedTime) / 1000));
                this.channelHandlerContext.close();
                cancelTimeoutFuture();
            }
        }

        void cancelTimeoutFuture() {
            ScheduledFuture<?> scheduledFuture = this.timeoutFuture;
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
        }

        public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.channelHandlerContext = channelHandlerContext;
        }

        public void shutdown() {
            this.channelHandlerContext.close();
            cancelTimeoutFuture();
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            ReplicationSlave.this.log.warn("Caught exception", th);
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            ReplicationSlave.this.log.debug("Connection {} closed", channelHandlerContext.channel().remoteAddress());
            super.channelInactive(channelHandlerContext);
            cancelTimeoutFuture();
            ReplicationSlave.this.slaveChannelHandler = null;
        }
    }

    @Override // org.yamcs.AbstractYamcsService, org.yamcs.YamcsService
    public void init(String str, String str2, YConfiguration yConfiguration) throws InitException {
        super.init(str, str2, yConfiguration);
        YamcsServerInstance yamcsServer = YamcsServer.getServer().getInstance(str);
        this.localInstanceId = yamcsServer.getInstanceId();
        if (yConfiguration.getBoolean("updateSimTime")) {
            TimeService timeService = yamcsServer.getTimeService();
            if (!(timeService instanceof SimulationTimeService)) {
                throw new ConfigurationException("Cannot use updateSimTime unless the simulated time service is configured");
            }
            this.simTimeService = (SimulationTimeService) timeService;
            this.simTimeService.setTime0(0L);
        }
        for (String str3 : yConfiguration.getList("streams")) {
            String[] split = str3.split("\\s*\\-\\>\\s*");
            if (split.length == 1) {
                this.streamNames.put(split[0], split[0]);
            } else {
                if (split.length != 2) {
                    throw new ConfigurationException("Invalid stream spec '" + str3 + "'");
                }
                this.streamNames.put(split[0], split[1]);
            }
        }
        this.tcpRole = (TcpRole) yConfiguration.getEnum("tcpRole", (Class<Class>) TcpRole.class, (Class) TcpRole.CLIENT);
        if (this.tcpRole == TcpRole.CLIENT) {
            this.host = yConfiguration.getString("masterHost");
            this.port = yConfiguration.getInt("masterPort");
            this.reconnectionInterval = 1000 * yConfiguration.getLong("reconnectionIntervalSec", 30L);
            if (yConfiguration.getBoolean("enableTls", false)) {
                try {
                    this.sslCtx = SslContextBuilder.forClient().build();
                } catch (SSLException e) {
                    throw new InitException("Failed to initialize the TLS: " + e.toString());
                }
            }
        } else {
            getReplicationServer().registerSlave(this);
        }
        this.masterInstance = yConfiguration.getString("masterInstance", str);
        Path resolve = Paths.get(YarchDatabase.getDataDir(), new String[0]).resolve(str).resolve("replication");
        resolve.toFile().mkdirs();
        String string = yConfiguration.getString("lastTxFile", str2 + "-lastid.txt");
        this.maxTupleSize = yConfiguration.getInt("maxTupleSize");
        this.timeoutMillis = (long) (yConfiguration.getDouble("timeoutSec") * 1000.0d);
        this.txtfilePath = resolve.resolve(string);
        try {
            this.lastTxFile = new RandomAccessFile(this.txtfilePath.toFile(), "rw");
            String readLine = this.lastTxFile.readLine();
            if (readLine != null) {
                this.lastTxId = Long.parseLong(readLine);
            } else {
                this.lastTxId = -1L;
            }
        } catch (IOException e2) {
            throw new InitException(e2);
        } catch (NumberFormatException e3) {
            throw new InitException("Cannot parse number from " + this.txtfilePath + ": " + e3);
        }
    }

    @Override // org.yamcs.YamcsService
    public Spec getSpec() {
        Spec spec = new Spec();
        spec.addOption("streams", Spec.OptionType.LIST).withElementType(Spec.OptionType.STRING).withRequired(true);
        spec.addOption("tcpRole", Spec.OptionType.STRING);
        spec.addOption("masterHost", Spec.OptionType.STRING);
        spec.addOption("masterPort", Spec.OptionType.INTEGER);
        spec.addOption("reconnectionIntervalSec", Spec.OptionType.INTEGER);
        spec.addOption("enableTls", Spec.OptionType.BOOLEAN);
        spec.addOption("masterInstance", Spec.OptionType.STRING);
        spec.addOption("lastTxFile", Spec.OptionType.STRING);
        spec.addOption("maxTupleSize", Spec.OptionType.INTEGER).withDefault(131072).withDescription("Maximum size of the serialized tuple");
        spec.addOption("timeoutSec", Spec.OptionType.FLOAT).withDescription("Timeout in seconds. If no message is received in this time, the connection will be closed").withDefault(30);
        spec.addOption("updateSimTime", Spec.OptionType.BOOLEAN).withDefault(false).withDescription("If true, update the simulation time with the time received from the master");
        return spec;
    }

    protected void doStart() {
        if (this.tcpRole == TcpRole.CLIENT) {
            this.tcpClient = new ReplicationClient(this.yamcsInstance, this.host, this.port, this.sslCtx, this.reconnectionInterval, this.maxTupleSize, () -> {
                return new SlaveChannelHandler(this);
            });
            this.tcpClient.start();
        }
        notifyStarted();
    }

    protected void doStop() {
        shutdown();
        notifyStopped();
    }

    private void failService(String str) {
        this.log.warn("Replication failed: {}", str);
        this.log.warn("Shutting down the service");
        shutdown();
        notifyFailed(new Exception(str));
    }

    private void shutdown() {
        this.log.debug("Shutting down the replication slave");
        if (this.tcpClient != null) {
            this.tcpClient.stop();
        }
        if (this.tcpRole == TcpRole.SERVER) {
            try {
                getReplicationServer().unregisterSlave(this);
            } catch (InitException e) {
                throw new RuntimeException(e);
            }
        }
        if (this.slaveChannelHandler != null) {
            this.slaveChannelHandler.shutdown();
            this.slaveChannelHandler = null;
        }
        try {
            this.lastTxFile.close();
        } catch (IOException e2) {
            this.log.error("Failed to close the last TX id file");
            notifyFailed(e2);
        }
    }

    private void updateLastTxFile() {
        try {
            this.lastTxFile.seek(0L);
            this.lastTxFile.writeBytes(Long.toString(this.lastTxId) + "\n");
        } catch (IOException e) {
            this.log.warn("Failed to update the last tx file " + this.txtfilePath, e);
        }
    }

    private ReplicationServer getReplicationServer() throws InitException {
        List globalServices = YamcsServer.getServer().getGlobalServices(ReplicationServer.class);
        if (globalServices.isEmpty()) {
            throw new InitException("ReplicationSlave is defined with the role Server; that requires the ReplicationServer global service (yamcs.yaml) to be defined");
        }
        if (globalServices.size() > 1) {
            this.log.warn("There are {} ReplicationServer services defined. Registering to the first one.", Integer.valueOf(globalServices.size()));
        }
        return (ReplicationServer) globalServices.get(0);
    }

    public List<String> getStreamNames() {
        return (List) this.streamNames.entrySet().stream().map(entry -> {
            return ((String) entry.getKey()).equals(entry.getValue()) ? (String) entry.getKey() : ((String) entry.getKey()) + "->" + ((String) entry.getValue());
        }).collect(Collectors.toList());
    }

    public boolean isTcpClient() {
        return this.tcpRole == TcpRole.CLIENT;
    }

    public ReplicationClient getTcpClient() {
        return this.tcpClient;
    }

    public String getMasterHost() {
        return this.host;
    }

    public int getMasterPort() {
        return this.port;
    }

    public String getMasterInstance() {
        return this.masterInstance;
    }

    public long getTxId() {
        return this.lastTxId;
    }

    public ChannelHandler newChannelHandler() throws YamcsException {
        if (this.slaveChannelHandler != null) {
            throw new YamcsException("There is already a connection open to this slave");
        }
        this.slaveChannelHandler = new SlaveChannelHandler(this);
        return this.slaveChannelHandler;
    }

    private void processTimeMessage(TimeMessage timeMessage) {
        if (this.simTimeService != null) {
            this.simTimeService.setSimElapsedTime(timeMessage.getLocalTime(), timeMessage.getMissionTime());
            if (timeMessage.hasSpeed()) {
                this.simTimeService.setSimSpeed(timeMessage.getSpeed());
            }
        }
    }
}
