package org.joyqueue.store;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.store.file.Checkpoint;
import org.joyqueue.store.file.DiskFullException;
import org.joyqueue.store.file.PositioningStore;
import org.joyqueue.store.file.RollBackException;
import org.joyqueue.store.file.StoreMessageSerializer;
import org.joyqueue.store.file.WriteException;
import org.joyqueue.store.index.IndexItem;
import org.joyqueue.store.index.IndexSerializer;
import org.joyqueue.store.message.BatchMessageParser;
import org.joyqueue.store.message.MessageParser;
import org.joyqueue.store.replication.ReplicableStore;
import org.joyqueue.store.utils.PreloadBufferPool;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.concurrent.LoopThread;
import org.joyqueue.toolkit.format.Format;
import org.joyqueue.toolkit.lang.LifeCycle;
import org.joyqueue.toolkit.metric.Metric;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/store/PartitionGroupStoreManager.class */
public class PartitionGroupStoreManager implements ReplicableStore, LifeCycle, Closeable {
    private static final Logger logger = LoggerFactory.getLogger(PartitionGroupStoreManager.class);
    private static final long EVENT_TIMEOUT_MILLS = 60000;
    private final PositioningStore<ByteBuffer> store;
    private final File base;
    private final String topic;
    private final int partitionGroup;
    private final Config config;
    private final PreloadBufferPool bufferPool;
    private final LoopThread writeLoopThread;
    private final LoopThread flushLoopThread;
    private final LoopThread metricThread;
    private final BlockingQueue<WriteCommand> writeCommandCache;
    private long replicationPosition;
    private long indexPosition;
    private int term;
    private Metric.MetricInstance consumeMetric;
    private static final long CHECK_DISK_SPACE_COOL_DOWN = 1000;
    private static final long FLUSH_CHECKPOINT_INTERVAL_MS = 60000;
    static final String CHECKPOINT_FILE = "checkpoint.json";
    private final Map<QosLevel, CallbackPositioningBelt> callbackMap = new HashMap(3);
    private final Map<Short, Partition> partitionMap = new ConcurrentHashMap();
    private final QosStore[] qosStores = {new QosStore(this, QosLevel.ONE_WAY), new QosStore(this, QosLevel.RECEIVE), new QosStore(this, QosLevel.PERSISTENCE), new QosStore(this, QosLevel.REPLICATION), new QosStore(this, QosLevel.ALL)};
    private Metric produceMetrics = null;
    private Metric consumeMetrics = null;
    private Metric.MetricInstance produceMetric = null;
    private final Lock writeLock = new ReentrantLock();
    private AtomicLong lastCheckDiskSpaceTimestamp = new AtomicLong(0);
    private volatile boolean isDiskFull = false;
    private long lastFlushCheckpointTimestamp = 0;
    private int lastEntryTerm = -1;
    private AtomicBoolean started = new AtomicBoolean(false);
    private AtomicBoolean enabled = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/joyqueue/store/PartitionGroupStoreManager$Callback.class */
    public static class Callback {
        long position;
        EventListener<WriteResult> listener;
        long[] indices;
        long timestamp = SystemClock.now();
        QosLevel qosLevel;

        Callback(QosLevel qosLevel, EventListener<WriteResult> eventListener, long[] jArr) {
            this.listener = eventListener;
            this.indices = jArr;
            this.qosLevel = qosLevel;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/store/PartitionGroupStoreManager$CallbackPositioningBelt.class */
    public class CallbackPositioningBelt {
        private final ConcurrentLinkedQueue<Callback> queue = new ConcurrentLinkedQueue<>();
        private AtomicLong callbackPosition = new AtomicLong(0);

        CallbackPositioningBelt() {
        }

        Callback getFirst() {
            Callback peek = this.queue.peek();
            if (peek == null) {
                throw new NoSuchElementException();
            }
            return peek;
        }

        Callback removeFirst() {
            Callback poll = this.queue.poll();
            if (poll == null) {
                throw new NoSuchElementException();
            }
            return poll;
        }

        boolean remove(Callback callback) {
            return this.queue.remove(callback);
        }

        void addLast(Callback callback) {
            this.queue.add(callback);
        }

        void callbackBefore(long j) {
            try {
                if (j > this.callbackPosition.get()) {
                    this.callbackPosition.set(j);
                    while (getFirst().position <= j) {
                        Callback removeFirst = removeFirst();
                        removeFirst.listener.onEvent(new WriteResult(JoyQueueCode.SUCCESS, removeFirst.indices));
                    }
                }
                long now = SystemClock.now() - 60000;
                while (getFirst().timestamp < now) {
                    removeFirst().listener.onEvent(new WriteResult(JoyQueueCode.SE_WRITE_TIMEOUT, (long[]) null));
                }
            } catch (NoSuchElementException e) {
            }
        }

        void put(Callback callback) {
            addLast(callback);
            if (callback.position > this.callbackPosition.get() || !remove(callback)) {
                return;
            }
            callback.listener.onEvent(new WriteResult(JoyQueueCode.SUCCESS, callback.indices));
        }
    }

    /* loaded from: input_file:org/joyqueue/store/PartitionGroupStoreManager$Config.class */
    public static class Config {
        public static final int DEFAULT_MAX_MESSAGE_LENGTH = 4194304;
        public static final int DEFAULT_WRITE_REQUEST_CACHE_SIZE = 128;
        public static final long DEFAULT_FLUSH_INTERVAL_MS = 50;
        public static final long DEFAULT_WRITE_TIMEOUT_MS = 3000;
        public static final long DEFAULT_MAX_DIRTY_SIZE = 10485760;
        public static final long DEFAULT_PRINT_METRIC_INTERVAL_MS = 0;
        private final long maxDirtySize;
        private final long writeTimeoutMs;
        private final int maxMessageLength;
        private final int writeRequestCacheSize;
        private final long flushIntervalMs;
        private final long printMetricIntervalMs;
        private PositioningStore.Config storeConfig;
        private PositioningStore.Config indexStoreConfig;

        public Config() {
            this(4194304, 128, 50L, DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, 0L, new PositioningStore.Config(134217728), new PositioningStore.Config(134217728));
        }

        public Config(int i, int i2, long j, long j2, long j3, long j4, PositioningStore.Config config, PositioningStore.Config config2) {
            this.maxMessageLength = i;
            this.writeRequestCacheSize = i2;
            this.flushIntervalMs = j;
            this.writeTimeoutMs = j2;
            this.maxDirtySize = j3;
            this.printMetricIntervalMs = j4;
            this.storeConfig = config;
            this.indexStoreConfig = config2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/joyqueue/store/PartitionGroupStoreManager$Partition.class */
    public static class Partition {
        private final PositioningStore<IndexItem> store;

        private Partition(PositioningStore<IndexItem> positioningStore) {
            this.store = positioningStore;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void rollbackTo(long j) throws IOException {
            long j2;
            long right = this.store.right();
            while (true) {
                j2 = right - 12;
                if (j2 < this.store.left()) {
                    break;
                }
                IndexItem read = this.store.read(j2);
                if (null == read) {
                    throw new RollBackException(String.format("Failed to rollback store %s to position %d, batchRead index failed!", this.store.base().getAbsolutePath(), Long.valueOf(j)));
                }
                if (read.getOffset() + read.getLength() <= j) {
                    break;
                } else {
                    right = j2;
                }
            }
            this.store.setRight(j2 <= this.store.left() ? 0L : j2 + 12);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/joyqueue/store/PartitionGroupStoreManager$WriteCommand.class */
    public static class WriteCommand {
        private final QosLevel qosLevel;
        private final EventListener<WriteResult> eventListener;
        private final ByteBuffer[] messages;

        private WriteCommand(QosLevel qosLevel, EventListener<WriteResult> eventListener, ByteBuffer[] byteBufferArr) {
            this.qosLevel = qosLevel;
            this.eventListener = eventListener;
            this.messages = byteBufferArr;
        }
    }

    public PartitionGroupStoreManager(String str, int i, File file, Config config, PreloadBufferPool preloadBufferPool) {
        this.base = file;
        this.topic = str;
        this.partitionGroup = i;
        this.config = config;
        this.writeCommandCache = new LinkedBlockingQueue(config.writeRequestCacheSize);
        this.bufferPool = preloadBufferPool;
        this.callbackMap.put(QosLevel.PERSISTENCE, new CallbackPositioningBelt());
        this.callbackMap.put(QosLevel.REPLICATION, new CallbackPositioningBelt());
        this.callbackMap.put(QosLevel.ALL, new CallbackPositioningBelt());
        this.store = new PositioningStore<>(file, config.storeConfig, preloadBufferPool, new StoreMessageSerializer(config.maxMessageLength));
        if (!file.isDirectory()) {
            throw new StoreInitializeException(String.format("Partition group directory: %s not available!", file.getAbsolutePath()));
        }
        this.replicationPosition = this.store.flushPosition();
        this.term = getMaxTerm(this.store);
        this.writeLoopThread = LoopThread.builder().name(String.format("WriteThread-%s-%d", str, Integer.valueOf(i))).doWork(this::write).sleepTime(0L, 0L).onException(th -> {
            logger.warn("Write Exception: ", th);
        }).build();
        this.flushLoopThread = LoopThread.builder().name(String.format("FlushThread-%s-%d", str, Integer.valueOf(i))).doWork(this::flush).sleepTime(config.flushIntervalMs, config.flushIntervalMs).onException(th2 -> {
            logger.warn("Flush Exception: ", th2);
        }).build();
        this.metricThread = initMetrics(config);
    }

    private LoopThread initMetrics(Config config) {
        if (config.printMetricIntervalMs <= 0) {
            return null;
        }
        this.produceMetrics = new Metric("WriteMetric-" + this.topic + "-" + this.partitionGroup, 1, new String[]{"WriteLatency", "FlushLatency"}, new String[]{"WriteCount", "FlushCount"}, new String[]{"WriteTraffic", "FlushTraffic"});
        this.produceMetric = (Metric.MetricInstance) this.produceMetrics.getMetricInstances().get(0);
        this.consumeMetrics = new Metric("ReadMetric-" + this.topic + "-" + this.partitionGroup, 1, new String[]{"ReadLatency"}, new String[]{"ReadCount"}, new String[]{"ReadTraffic"});
        this.consumeMetric = (Metric.MetricInstance) this.consumeMetrics.getMetricInstances().get(0);
        return LoopThread.builder().sleepTime(config.printMetricIntervalMs, config.printMetricIntervalMs).name("Metric-Thread").onException(th -> {
            logger.warn("Exception:", th);
        }).doWork(() -> {
            this.consumeMetrics.reportAndReset();
            this.produceMetrics.reportAndReset();
            logger.info("{}-{} WriteCommandCache size: {}, dirty size: {}/{}.", new Object[]{this.topic, Integer.valueOf(this.partitionGroup), Integer.valueOf(this.writeCommandCache.size()), Long.valueOf(this.store.right() - this.store.flushPosition()), Long.valueOf(config.maxDirtySize)});
        }).build();
    }

    public void recover() {
        try {
            logger.info("Recovering message store {}...", this.base.getAbsolutePath());
            this.store.recover();
            resetLastEntryTerm();
            logger.info("Recovering index store {}...", this.base.getAbsolutePath());
            this.indexPosition = recoverPartitions();
            long j = this.indexPosition;
            logger.info("Recovering the checkpoint {}...", this.base.getAbsolutePath());
            this.indexPosition = recoverCheckpoint();
            long j2 = this.indexPosition;
            logger.info("Building indices {}...", this.base.getAbsolutePath());
            try {
                recoverIndices();
            } catch (Throwable th) {
                if (j == this.indexPosition) {
                    throw th;
                }
                this.indexPosition = j;
                logger.warn("Exception while recover indices using indexPosition {} from Checkpoint.json. Fall back safe index position {} and retry recover indices...", new Object[]{Format.formatWithComma(j2), Format.formatWithComma(j), th});
                this.indexPosition = j;
                recoverIndices();
            }
            logger.info("Store recovered: {}...", this.base.getAbsolutePath());
        } catch (IOException e) {
            throw new StoreInitializeException(e);
        }
    }

    private void resetLastEntryTerm() {
        if (this.store.right() > 0) {
            this.lastEntryTerm = getEntryTerm(this.store.toLogStart(this.store.right()));
        } else {
            this.lastEntryTerm = -1;
        }
    }

    private void recoverIndices() throws IOException {
        Iterator<Partition> it = this.partitionMap.values().iterator();
        while (it.hasNext()) {
            it.next().rollbackTo(this.indexPosition);
        }
        while (this.indexPosition < this.store.right()) {
            ByteBuffer read = this.store.read(this.indexPosition);
            if (null == read) {
                throw new ReadException(String.format("Read log failed! store: %s, position: %d.", this.store.base().getAbsolutePath(), Long.valueOf(this.indexPosition)));
            }
            IndexItem parseMessage = IndexItem.parseMessage(read, this.indexPosition);
            Partition partition = this.partitionMap.get(Short.valueOf(parseMessage.getPartition()));
            if (null == partition) {
                this.indexPosition += parseMessage.getLength();
            } else {
                PositioningStore positioningStore = partition.store;
                if (positioningStore.right() == 0) {
                    positioningStore.setRight(parseMessage.getIndex() * 12);
                }
                long right = positioningStore.right() / 12;
                if (parseMessage.getIndex() == right) {
                    if (BatchMessageParser.isBatch(read)) {
                        short batchSize = BatchMessageParser.getBatchSize(read);
                        parseMessage.setBatchMessage(true);
                        parseMessage.setBatchMessageSize(batchSize);
                    }
                    writeIndex(parseMessage, partition.store);
                } else if (parseMessage.getIndex() < right) {
                    if (((IndexItem) positioningStore.read(parseMessage.getIndex() * 12)).getOffset() != this.indexPosition) {
                        throw new WriteException(String.format("Index mismatch, store: %s, partition: %d, next index of the partition: %s，index in log: %s, log position: %s, log: \n%s", this.base, Short.valueOf(parseMessage.getPartition()), Format.formatWithComma(right), Format.formatWithComma(parseMessage.getIndex()), Format.formatWithComma(this.indexPosition), MessageParser.getString(read)));
                    }
                } else if (parseMessage.getIndex() > right) {
                    throw new WriteException(String.format("Index must be continuous, store: %s, partition: %d, next index of the partition: %s，index in log: %s, log position: %s, log: \n%s", this.base, Short.valueOf(parseMessage.getPartition()), Format.formatWithComma(right), Format.formatWithComma(parseMessage.getIndex()), Format.formatWithComma(this.indexPosition), MessageParser.getString(read)));
                }
                if (positioningStore.right() - positioningStore.flushPosition() >= Config.DEFAULT_MAX_DIRTY_SIZE) {
                    positioningStore.flush();
                    logger.info("Recovering index, topic: {}, group: {}, Write position: {}, index position: {}", new Object[]{this.topic, Integer.valueOf(this.partitionGroup), Long.valueOf(this.store.right()), Long.valueOf(this.indexPosition)});
                }
            }
        }
        Iterator<Partition> it2 = this.partitionMap.values().iterator();
        while (it2.hasNext()) {
            PositioningStore positioningStore2 = it2.next().store;
            if (positioningStore2.right() > positioningStore2.flushPosition()) {
                positioningStore2.flush();
            }
        }
    }

    private void rollbackPartitions(long j) throws IOException {
        Iterator<Partition> it = this.partitionMap.values().iterator();
        while (it.hasNext()) {
            it.next().rollbackTo(j);
        }
    }

    private int getMaxTerm(PositioningStore<ByteBuffer> positioningStore) {
        int i = 0;
        if (positioningStore.right() > positioningStore.left()) {
            i = getEntryTerm(positioningStore.toLogStart(positioningStore.right()));
        }
        return i;
    }

    private long recoverCheckpoint() {
        try {
            File file = new File(this.base, CHECKPOINT_FILE);
            if (file.isFile()) {
                byte[] bArr = new byte[(int) file.length()];
                FileInputStream fileInputStream = new FileInputStream(file);
                Throwable th = null;
                try {
                    try {
                        if (bArr.length != fileInputStream.read(bArr)) {
                            throw new IOException("File length not match!");
                        }
                        if (fileInputStream != null) {
                            if (0 != 0) {
                                try {
                                    fileInputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                fileInputStream.close();
                            }
                        }
                        Checkpoint checkpoint = (Checkpoint) JSON.parseObject(new String(bArr, StandardCharsets.UTF_8), Checkpoint.class);
                        if (checkpoint.getIndexPosition() > this.indexPosition && checkpoint.getPartitions().entrySet().stream().allMatch(entry -> {
                            short shortValue = ((Short) entry.getKey()).shortValue();
                            long longValue = ((Long) entry.getValue()).longValue();
                            Partition partition = this.partitionMap.get(Short.valueOf(shortValue));
                            return null != partition && partition.store.right() >= longValue * 12;
                        })) {
                            logger.info("Using indexPosition: {} from the checkpoint file.", Long.valueOf(checkpoint.getIndexPosition()));
                            return checkpoint.getIndexPosition();
                        }
                    } finally {
                    }
                } finally {
                }
            } else {
                logger.info("Checkpoint file is NOT found, continue recover...");
            }
        } catch (Throwable th3) {
            logger.warn("Recover checkpoint exception, continue recover...", th3);
        }
        return this.indexPosition;
    }

    /* JADX WARN: Type inference failed for: r4v1, types: [org.joyqueue.store.utils.PreloadBufferPool] */
    private long recoverPartitions() throws IOException {
        IndexItem read;
        File file = new File(this.base, "index");
        long right = this.store.right();
        if (!file.isDirectory()) {
            throw new StoreInitializeException(String.format("Index directory: %s not found! ", file.getAbsolutePath()));
        }
        Short[] loadPartitionIndices = loadPartitionIndices(file);
        if (loadPartitionIndices == null) {
            return this.store.left();
        }
        for (Short sh : loadPartitionIndices) {
            short shortValue = sh.shortValue();
            File file2 = new File(file, String.valueOf((int) shortValue));
            PositioningStore.Config config = this.config.indexStoreConfig;
            ?? r4 = this.bufferPool;
            PositioningStore<IndexItem> positioningStore = new PositioningStore<>(file2, config, r4, new IndexSerializer());
            positioningStore.recover();
            positioningStore.setRight(positioningStore.right() - (positioningStore.right() % 12));
            long right2 = positioningStore.right();
            IndexItem indexItem = null;
            do {
                long j = right2 - 12;
                right2 = r4;
                if (j < positioningStore.left() + 12) {
                    break;
                }
                read = null == indexItem ? positioningStore.read(right2) : indexItem;
                indexItem = positioningStore.read(right2 - 12);
            } while (!verifyCurrentIndex(read, indexItem));
            positioningStore.setRight(right2 + 12);
            this.partitionMap.put(Short.valueOf(shortValue), new Partition(positioningStore));
            if (positioningStore.right() - positioningStore.left() > 0) {
                IndexItem read2 = positioningStore.read(positioningStore.right() - 12);
                if (read2 == null) {
                    throw new ReadException(String.format("Failed to recover index store %s to position %s, batchRead index failed!", positioningStore.base().getAbsolutePath(), Format.formatWithComma(positioningStore.right() - 12)));
                }
                verifyBatchMessage(read2, positioningStore, this.store);
                long offset = read2.getOffset();
                logger.info("Topic: {}, group: {}, partition: {}, maxIndexedMessageOffset: {}.", new Object[]{this.topic, Integer.valueOf(this.partitionGroup), Short.valueOf(shortValue), Format.formatWithComma(offset)});
                if (right > offset) {
                    logger.info("Topic: {}, group: {}, set indexPosition from {} to {}.", new Object[]{this.topic, Integer.valueOf(this.partitionGroup), Format.formatWithComma(right), Format.formatWithComma(offset)});
                    right = offset;
                }
            } else {
                right = this.store.left();
            }
        }
        return right;
    }

    private boolean verifyCurrentIndex(IndexItem indexItem, IndexItem indexItem2) {
        return indexItem.getLength() > 0 && indexItem.getOffset() > indexItem2.getOffset();
    }

    private Short[] loadPartitionIndices(File file) {
        Short[] shArr = null;
        File[] listFiles = file.listFiles(file2 -> {
            return file2.isDirectory() && file2.getName().matches("^\\d+$");
        });
        if (null != listFiles) {
            shArr = (Short[]) Arrays.stream(listFiles).map((v0) -> {
                return v0.getName();
            }).map(str -> {
                try {
                    return Short.valueOf(Short.parseShort(str));
                } catch (NumberFormatException e) {
                    return (short) -1;
                }
            }).filter(sh -> {
                return sh.shortValue() >= 0;
            }).toArray(i -> {
                return new Short[i];
            });
        }
        return shArr;
    }

    private void verifyBatchMessage(IndexItem indexItem, PositioningStore<IndexItem> positioningStore, PositioningStore<ByteBuffer> positioningStore2) throws IOException {
        if (indexItem.getOffset() < positioningStore2.right()) {
            ByteBuffer read = positioningStore2.read(indexItem.getOffset());
            if (BatchMessageParser.isBatch(read)) {
                short batchSize = BatchMessageParser.getBatchSize(read);
                long j = MessageParser.getLong(read, MessageParser.INDEX);
                if (positioningStore.right() < (batchSize + j) * 12) {
                    logger.info("Incomplete batch message indices found, roll back index store to {}, index: {}, message position: {}, store: {}.", new Object[]{Format.formatWithComma(j * 12), Format.formatWithComma(indexItem.getIndex()), Format.formatWithComma(indexItem.getOffset()), positioningStore.base().getAbsolutePath()});
                    positioningStore.setRight(j * 12);
                }
            }
        }
    }

    public String getTopic() {
        return this.topic;
    }

    public int getPartitionGroup() {
        return this.partitionGroup;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Short[] listPartitions() {
        return (Short[]) this.partitionMap.keySet().toArray(new Short[0]);
    }

    private void removePartition(short s) {
        if (null != this.partitionMap.remove(Short.valueOf(s))) {
            File file = new File(this.base, "index" + File.separator + ((int) s));
            if (file.renameTo(new File(file.getParent(), file.getName() + ".d." + SystemClock.now()))) {
                return;
            }
            logger.warn("Rename directory {} failed!", file.getAbsolutePath());
        }
    }

    private void addPartition(short s) throws IOException {
        if (this.partitionMap.get(Short.valueOf(s)) == null) {
            removePartition(s);
            File file = new File(this.base, "index" + File.separator + ((int) s));
            if (!file.mkdirs()) {
                throw new IOException(String.format("Create directory: %s failed!", file.getAbsolutePath()));
            }
            PositioningStore positioningStore = new PositioningStore(file, this.config.indexStoreConfig, this.bufferPool, new IndexSerializer());
            positioningStore.recover();
            this.partitionMap.put(Short.valueOf(s), new Partition(positioningStore));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public ReadResult read(short s, long j, int i, long j2) throws IOException {
        ByteBuffer read;
        int i2;
        long nanoTime = System.nanoTime();
        ReadResult readResult = new ReadResult();
        checkPartition(s);
        List batchRead = this.partitionMap.get(Short.valueOf(s)).store.batchRead(j * 12, i);
        long j3 = 0;
        readResult.setEop(batchRead.size() < i);
        ArrayList arrayList = new ArrayList(i);
        IndexItem indexItem = null;
        for (int i3 = 0; i3 < batchRead.size(); i3++) {
            IndexItem indexItem2 = (IndexItem) batchRead.get(i3);
            if ((null == indexItem || indexItem2.getOffset() != indexItem.getOffset()) && indexItem2.getOffset() < commitPosition()) {
                try {
                    try {
                        PositioningStore<ByteBuffer> positioningStore = this.store;
                        long offset = indexItem2.getOffset();
                        int length = indexItem2.getLength();
                        read = positioningStore.read(offset, length);
                        int i4 = length;
                        if (MessageParser.getInt(read, MessageParser.LENGTH) != indexItem2.getLength()) {
                            Logger logger2 = logger;
                            Object[] objArr = {Format.formatWithComma(j + i3), Format.formatWithComma(indexItem2.getOffset()), Integer.valueOf(indexItem2.getLength()), Integer.valueOf(MessageParser.getInt(read, MessageParser.LENGTH)), Short.valueOf(s), this.base.getAbsolutePath()};
                            logger2.warn("索引中消息长度不正确！index: {} , offset: {}, message length (from index/from message): {}/{}, partition: {}, store: {}.", objArr);
                            read = this.store.read(indexItem2.getOffset());
                            i4 = objArr;
                        }
                        i2 = i4;
                    } catch (Throwable th) {
                        Logger logger3 = logger;
                        Object[] objArr2 = {Format.formatWithComma(j + i3), Format.formatWithComma(indexItem2.getOffset()), Integer.valueOf(indexItem2.getLength()), Short.valueOf(s), this.base.getAbsolutePath(), th};
                        logger3.warn("Exception on read, try to read without length! index: {} , offset: {}, message length: {}, partition: {}, store: {}.", objArr2);
                        read = this.store.read(indexItem2.getOffset());
                        i2 = objArr2;
                    }
                    if (null == read) {
                        throw new ReadException(String.format("Read log failed! store: %s, position: %d.", this.store.base().getAbsolutePath(), Long.valueOf(indexItem2.getOffset())));
                    }
                    if (j2 > 0) {
                        long remaining = j3 + read.remaining();
                        j3 = i2;
                        if (remaining >= j2) {
                            break;
                        }
                    }
                    arrayList.add(read);
                    indexItem = indexItem2;
                } catch (Throwable th2) {
                    logger.warn("Exception on read! index: {} , offset: {}, message length: {}, partition: {}, store: {}.", new Object[]{Format.formatWithComma(j + i3), Format.formatWithComma(indexItem2.getOffset()), Integer.valueOf(indexItem2.getLength()), Short.valueOf(s), this.base.getAbsolutePath(), th2});
                    throw th2;
                }
            }
        }
        readResult.setMessages((ByteBuffer[]) arrayList.toArray(new ByteBuffer[0]));
        readResult.setCode(JoyQueueCode.SUCCESS);
        if (null != this.consumeMetric) {
            this.consumeMetric.addCounter("ReadCount", arrayList.size());
            this.consumeMetric.addLatency("ReadLatency", System.nanoTime() - nanoTime);
            this.consumeMetric.addTraffic("ReadTraffic", arrayList.stream().mapToInt((v0) -> {
                return v0.remaining();
            }).sum());
        }
        return readResult;
    }

    private void checkPartition(short s) {
        if (!this.partitionMap.containsKey(Short.valueOf(s))) {
            throw new ReadException(String.format("No such partition: %d in topic: %s, partition group: %d.", Short.valueOf(s), this.topic, Integer.valueOf(this.partitionGroup)));
        }
    }

    private long[] write(ByteBuffer... byteBufferArr) throws IOException {
        long right = this.store.right();
        Map<Short, Long> createPartitionSnapshot = createPartitionSnapshot();
        long j = right;
        long[] jArr = new long[byteBufferArr.length];
        try {
            int length = byteBufferArr.length;
            for (int i = 0; i < length; i++) {
                ByteBuffer slice = byteBufferArr[i].slice();
                if (slice.remaining() > this.config.maxMessageLength) {
                    throw new WriteException(String.format("Message too large! Message length: %d, limit: %d", Integer.valueOf(slice.remaining()), Integer.valueOf(this.config.maxMessageLength)));
                }
                IndexItem parseMessage = IndexItem.parseMessage(slice, j);
                Partition partition = this.partitionMap.get(Short.valueOf(parseMessage.getPartition()));
                jArr[i] = partition.store.right() / 12;
                MessageParser.setLong(slice, MessageParser.INDEX, jArr[i]);
                parseMessage.setIndex(jArr[i]);
                j = this.store.append((PositioningStore<ByteBuffer>) slice);
                updateLastEntryTerm(slice);
                if (BatchMessageParser.isBatch(slice)) {
                    short batchSize = BatchMessageParser.getBatchSize(slice);
                    parseMessage.setBatchMessage(true);
                    parseMessage.setBatchMessageSize(batchSize);
                }
                writeIndex(parseMessage, partition.store);
                this.flushLoopThread.wakeup();
            }
            return jArr;
        } catch (Throwable th) {
            onWriteException(right, createPartitionSnapshot, th);
            throw th;
        }
    }

    private Map<Short, Long> createPartitionSnapshot() {
        return (Map) this.partitionMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(((Partition) entry.getValue()).store.right());
        }));
    }

    private void writeIndex(IndexItem indexItem, PositioningStore<IndexItem> positioningStore) throws IOException {
        if (indexItem.isBatchMessage()) {
            appendBatchMessageIndices(positioningStore, indexItem);
        } else {
            positioningStore.append((PositioningStore<IndexItem>) indexItem);
        }
        this.indexPosition += indexItem.getLength();
    }

    private void appendBatchMessageIndices(PositioningStore<IndexItem> positioningStore, IndexItem indexItem) throws IOException {
        ByteBuffer allocate = ByteBuffer.allocate(indexItem.getBatchMessageSize() * 12);
        for (int i = 0; i < indexItem.getBatchMessageSize(); i++) {
            indexItem.serializeTo(allocate);
        }
        allocate.flip();
        positioningStore.appendByteBuffer(allocate);
    }

    private void write() throws IOException, InterruptedException {
        WriteCommand writeCommand = null;
        try {
            if (!this.writeLock.tryLock()) {
                throw new IllegalStateException("Acquire write lock failed!");
            }
            try {
                try {
                    writeCommand = this.writeCommandCache.take();
                    if (null != this.produceMetric) {
                        this.produceMetric.addTraffic("WriteTraffic", Arrays.stream(writeCommand.messages).mapToInt((v0) -> {
                            return v0.remaining();
                        }).sum());
                    }
                    long nanoTime = System.nanoTime();
                    verifyState(true);
                    if (waitForFlush()) {
                        writeCommand.eventListener.onEvent(new WriteResult(JoyQueueCode.SE_WRITE_TIMEOUT, (long[]) null));
                    } else {
                        handleCallback(writeCommand, this.store.right(), write(writeCommand.messages));
                    }
                    long nanoTime2 = System.nanoTime();
                    if (null != this.produceMetric) {
                        this.produceMetric.addLatency("WriteLatency", nanoTime2 - nanoTime);
                        this.produceMetric.addCounter("WriteCount", 1L);
                    }
                    this.writeLock.unlock();
                } catch (IllegalStateException e) {
                    if (null != writeCommand && writeCommand.eventListener != null) {
                        writeCommand.eventListener.onEvent(new WriteResult(JoyQueueCode.CY_STATUS_ERROR, (long[]) null));
                    }
                    logger.warn("Write failed, cause: store disabled! Store: {}.", this.base.getAbsolutePath());
                    this.writeLock.unlock();
                } catch (Throwable th) {
                    if (null != writeCommand && writeCommand.eventListener != null) {
                        writeCommand.eventListener.onEvent(new WriteResult(JoyQueueCode.SE_WRITE_FAILED, (long[]) null));
                    }
                    logger.warn("Write failed, cause: exception! Store: {}.", this.base.getAbsolutePath(), th);
                    this.writeLock.unlock();
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                this.writeLock.unlock();
            } catch (DiskFullException e3) {
                if (null != writeCommand && writeCommand.eventListener != null) {
                    writeCommand.eventListener.onEvent(new WriteResult(JoyQueueCode.SE_DISK_FULL, (long[]) null));
                }
                logger.warn("Write failed, cause: disk full! Store: {}.", this.base.getAbsolutePath());
                this.writeLock.unlock();
            }
        } catch (Throwable th2) {
            this.writeLock.unlock();
            throw th2;
        }
    }

    private void verifyState(boolean z) {
        if (this.enabled.get() != z) {
            throw new IllegalStateException();
        }
    }

    private boolean waitForFlush() {
        long now = SystemClock.now();
        while (this.store.right() - this.store.flushPosition() >= this.config.maxDirtySize && SystemClock.now() - now <= this.config.writeTimeoutMs) {
            Thread.yield();
        }
        return SystemClock.now() - now > this.config.writeTimeoutMs;
    }

    private void handleCallback(WriteCommand writeCommand, long j, long[] jArr) {
        Callback callback = new Callback(writeCommand.qosLevel, writeCommand.eventListener, jArr);
        callback.position = j;
        CallbackPositioningBelt callbackPositioningBelt = this.callbackMap.get(writeCommand.qosLevel);
        if (null != callbackPositioningBelt) {
            callbackPositioningBelt.put(callback);
        }
    }

    private void flush() {
        boolean flush;
        do {
            try {
                long nanoTime = System.nanoTime();
                long flushPosition = this.store.flushPosition();
                flush = this.store.flush() | flushIndices();
                if (null != this.produceMetric && flush) {
                    long nanoTime2 = System.nanoTime();
                    this.produceMetric.addTraffic("FlushTraffic", this.store.flushPosition() - flushPosition);
                    this.produceMetric.addLatency("FlushLatency", nanoTime2 - nanoTime);
                    this.produceMetric.addCounter("FlushCount", 1L);
                }
                if (flush) {
                    this.callbackMap.get(QosLevel.PERSISTENCE).callbackBefore(flushPosition());
                }
                flushCheckpointPeriodically();
            } catch (IOException e) {
                logger.warn("Exception:", e);
                return;
            }
        } while (flush);
    }

    private boolean flushIndices() {
        boolean z;
        boolean z2 = false;
        do {
            try {
                z = false;
                Iterator<Partition> it = this.partitionMap.values().iterator();
                while (it.hasNext()) {
                    z = it.next().store.flush() || z;
                }
                z2 = z2 || z;
            } catch (Exception e) {
                logger.warn("Exception: ", e);
            }
        } while (z);
        return z2;
    }

    private boolean isDiskFull() {
        long j = this.lastCheckDiskSpaceTimestamp.get();
        if (SystemClock.now() - j > CHECK_DISK_SPACE_COOL_DOWN && this.lastCheckDiskSpaceTimestamp.compareAndSet(j, SystemClock.now())) {
            this.isDiskFull = this.store.isDiskFull();
        }
        return this.isDiskFull;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void asyncWrite(QosLevel qosLevel, EventListener<WriteResult> eventListener, WriteRequest... writeRequestArr) {
        if (isDiskFull()) {
            if (eventListener != null) {
                eventListener.onEvent(new WriteResult(JoyQueueCode.SE_DISK_FULL, (long[]) null));
                return;
            }
            return;
        }
        if (!this.enabled.get()) {
            throw new WriteException(String.format("Store disabled! topic: %s, partitionGroup: %d.", this.topic, Integer.valueOf(this.partitionGroup)));
        }
        ByteBuffer[] byteBufferArr = new ByteBuffer[writeRequestArr.length];
        int length = writeRequestArr.length;
        for (int i = 0; i < length; i++) {
            WriteRequest writeRequest = writeRequestArr[i];
            ByteBuffer buffer = writeRequest.getBuffer();
            int i2 = MessageParser.getInt(buffer, MessageParser.LENGTH);
            if (i2 != buffer.remaining()) {
                throw new WriteException(String.format("Message length check error! Expect: %d, actual: %d", Integer.valueOf(i2), Integer.valueOf(buffer.remaining())));
            }
            if (!this.partitionMap.containsKey(Short.valueOf(writeRequest.getPartition()))) {
                throw new WriteException(String.format("No partition %d in partition group %d of topic %s!", Short.valueOf(writeRequest.getPartition()), Integer.valueOf(this.partitionGroup), this.topic));
            }
            MessageParser.setShort(buffer, MessageParser.PARTITION, writeRequest.getPartition());
            MessageParser.setInt(buffer, MessageParser.TERM, this.term);
            MessageParser.setInt(buffer, MessageParser.STORAGE_TIMESTAMP, (int) (SystemClock.now() - MessageParser.getLong(buffer, MessageParser.CLIENT_TIMESTAMP)));
            byteBufferArr[i] = writeRequest.getBuffer();
        }
        try {
            this.writeCommandCache.put(new WriteCommand(qosLevel, eventListener, byteBufferArr));
        } catch (InterruptedException e) {
            logger.warn("Exception: ", e);
            if (eventListener != null) {
                eventListener.onEvent(new WriteResult(JoyQueueCode.SE_WRITE_FAILED, (long[]) null));
            }
        }
        if (qosLevel != QosLevel.RECEIVE || null == eventListener) {
            return;
        }
        eventListener.onEvent(new WriteResult(JoyQueueCode.SUCCESS, (long[]) null));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long indexPosition() {
        return this.indexPosition;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PositioningStore<IndexItem> indexStore(short s) {
        if (this.partitionMap.containsKey(Short.valueOf(s))) {
            return this.partitionMap.get(Short.valueOf(s)).store;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PositioningStore<ByteBuffer> messageStore() {
        return this.store;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<PositioningStore<IndexItem>> meetPositioningStores() {
        return (Set) this.partitionMap.values().stream().map(partition -> {
            return partition.store;
        }).collect(Collectors.toSet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long clean(long j, Map<Short, Long> map, boolean z) throws IOException {
        long j2 = 0;
        long j3 = -1;
        for (Map.Entry<Short, Long> entry : map.entrySet()) {
            Short key = entry.getKey();
            long longValue = entry.getValue().longValue();
            PositioningStore<IndexItem> indexStore = indexStore(key.shortValue());
            if (indexStore != null) {
                long j4 = (longValue == Long.MAX_VALUE || !z) ? Long.MAX_VALUE : longValue * 12;
                if (j <= 0) {
                    if (indexStore.fileCount() > 1 && indexStore.meetMinStoreFile(j4) > 1) {
                        j2 += indexStore.physicalDeleteLeftFile();
                        if (logger.isDebugEnabled()) {
                            logger.info("Delete PositioningStore physical index file by size, partition: <{}>, offset position: <{}>", key, Long.valueOf(j4));
                        }
                    }
                } else if (indexStore.fileCount() > 1 && indexStore.meetMinStoreFile(j4) > 1 && hasEarly(indexStore, j)) {
                    j2 += indexStore.physicalDeleteLeftFile();
                    if (logger.isDebugEnabled()) {
                        logger.info("Delete PositioningStore physical index file by time, partition: <{}>, offset position: <{}>", key, Long.valueOf(j4));
                    }
                }
                try {
                    long offset = indexStore.read(indexStore.left()).getOffset();
                    if (j3 < 0 || j3 > offset) {
                        j3 = offset;
                    }
                } catch (PositionOverflowException e) {
                }
            }
        }
        if (j3 >= 0) {
            j2 += this.store.physicalDeleteTo(j3);
            if (logger.isDebugEnabled()) {
                logger.info("Delete PositioningStore physical message file, offset position: <{}>", Long.valueOf(j3));
            }
        }
        return j2;
    }

    private boolean hasEarly(PositioningStore<IndexItem> positioningStore, long j) throws IOException {
        ByteBuffer read = this.store.read(positioningStore.read(positioningStore.left()).getOffset());
        return MessageParser.getLong(read, MessageParser.CLIENT_TIMESTAMP) + ((long) MessageParser.getInt(read, MessageParser.STORAGE_TIMESTAMP)) < j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void rePartition(Short[] shArr) throws IOException {
        for (Short sh : shArr) {
            short shortValue = sh.shortValue();
            if (!this.partitionMap.containsKey(Short.valueOf(shortValue))) {
                addPartition(shortValue);
            }
        }
        List asList = Arrays.asList(shArr);
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Short, Partition> entry : this.partitionMap.entrySet()) {
            if (!asList.contains(entry.getKey())) {
                arrayList.add(entry.getKey());
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            removePartition(((Short) it.next()).shortValue());
        }
    }

    public synchronized void start() {
        if (this.config.printMetricIntervalMs > 0) {
            this.metricThread.start();
        }
        startFlushThread();
        this.started.set(true);
    }

    private void startFlushThread() {
        this.flushLoopThread.start();
    }

    private void startWriteThread() {
        this.writeLoopThread.start();
    }

    public synchronized void stop() {
        try {
            if (this.started.compareAndSet(true, false)) {
                System.out.println("Waiting for flush finished...");
                while (!isAllStoreClean()) {
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException e) {
                        logger.error(e.getMessage(), e);
                    }
                }
                System.out.println("Stopping flush thread...");
                stopFlushThread();
                flushCheckpoint();
                System.out.println("Stopping callback thread...");
                if (this.config.printMetricIntervalMs > 0) {
                    this.metricThread.stop();
                }
                System.out.println("Store stopped. " + this.base.getAbsolutePath());
            }
        } catch (Throwable th) {
            logger.error(th.getMessage(), th);
        }
    }

    private boolean isAllStoreClean() {
        return Stream.concat(Stream.of(this.store), this.partitionMap.values().stream().map(partition -> {
            return partition.store;
        })).allMatch((v0) -> {
            return v0.isClean();
        });
    }

    private void stopFlushThread() {
        this.flushLoopThread.stop();
    }

    private void stopWriteThread() {
        this.writeLoopThread.stop();
    }

    public boolean isStarted() {
        return this.started.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getLeftIndex(short s) {
        long j = -1;
        Partition partition = this.partitionMap.get(Short.valueOf(s));
        if (null != partition) {
            j = partition.store.left() / 12;
        }
        return j;
    }

    public long getRightIndex(short s) {
        long j = -1;
        Partition partition = this.partitionMap.get(Short.valueOf(s));
        if (null != partition) {
            j = partition.store.right() / 12;
        }
        return j;
    }

    public boolean serviceStatus() {
        return this.enabled.get();
    }

    public void enable() {
        if (this.enabled.get()) {
            return;
        }
        this.enabled.set(true);
        startWriteThread();
    }

    public void disable() {
        if (this.enabled.get()) {
            this.writeCommandCache.clear();
            stopWriteThread();
            this.enabled.set(false);
        }
    }

    public void setRightPosition(long j) throws IOException {
        stopFlushThread();
        try {
            rollback(j);
        } finally {
            startFlushThread();
        }
    }

    public void clear(long j) throws IOException {
        stopFlushThread();
        try {
            Iterator<Partition> it = this.partitionMap.values().iterator();
            while (it.hasNext()) {
                it.next().store.setRight(0L);
            }
            this.store.clear(j);
            startFlushThread();
        } catch (Throwable th) {
            startFlushThread();
            throw th;
        }
    }

    private void rollback(long j) throws IOException {
        if (this.indexPosition > j) {
            this.indexPosition = j;
            flushCheckpoint();
        }
        if (j <= leftPosition() || j > rightPosition()) {
            Iterator<Partition> it = this.partitionMap.values().iterator();
            while (it.hasNext()) {
                it.next().store.setRight(0L);
            }
        } else {
            rollbackPartitions(j);
        }
        this.store.setRight(j);
        resetLastEntryTerm();
    }

    private void flushCheckpointPeriodically() throws IOException {
        if (SystemClock.now() > this.lastFlushCheckpointTimestamp + 60000) {
            flushCheckpoint();
            this.lastFlushCheckpointTimestamp = SystemClock.now();
        }
    }

    private void flushCheckpoint() throws IOException {
        byte[] bytes = JSON.toJSONString(new Checkpoint(this.indexPosition, (Map) this.partitionMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(((Partition) entry.getValue()).store.right() / 12);
        }))), new SerializerFeature[]{SerializerFeature.PrettyFormat, SerializerFeature.DisableCircularReferenceDetect}).getBytes(StandardCharsets.UTF_8);
        FileOutputStream fileOutputStream = new FileOutputStream(new File(this.base, CHECKPOINT_FILE));
        Throwable th = null;
        try {
            try {
                fileOutputStream.write(bytes);
                if (fileOutputStream != null) {
                    if (0 == 0) {
                        fileOutputStream.close();
                        return;
                    }
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fileOutputStream != null) {
                if (th != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th4;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long flushPosition() {
        return this.store.flushPosition();
    }

    public long commitPosition() {
        return this.replicationPosition;
    }

    public int term() {
        return this.term;
    }

    public void term(int i) {
        this.term = i;
    }

    public ByteBuffer readEntryBuffer(long j, int i) throws IOException {
        long nanoTime = System.nanoTime();
        ByteBuffer readByteBuffer = this.store.readByteBuffer(j, i);
        if (null != this.consumeMetric) {
            this.consumeMetric.addCounter("ReadCount", 1L);
            this.consumeMetric.addLatency("ReadLatency", System.nanoTime() - nanoTime);
            this.consumeMetric.addTraffic("ReadTraffic", readByteBuffer.remaining());
        }
        return readByteBuffer;
    }

    public long appendEntryBuffer(ByteBuffer byteBuffer) throws IOException, TimeoutException {
        if (!this.writeLock.tryLock()) {
            throw new IllegalStateException("Acquire write lock failed!");
        }
        try {
            verifyState(false);
            long nanoTime = System.nanoTime();
            if (waitForFlush()) {
                throw new TimeoutException("Wait for flush timeout! The broker is too much busy to write data to disks.");
            }
            long right = this.store.right();
            Map<Short, Long> createPartitionSnapshot = createPartitionSnapshot();
            int i = 0;
            int remaining = byteBuffer.remaining();
            try {
                long appendByteBuffer = this.store.appendByteBuffer(byteBuffer.asReadOnlyBuffer());
                while (byteBuffer.hasRemaining()) {
                    IndexItem parseMessage = IndexItem.parseMessage(byteBuffer, right + byteBuffer.position());
                    Partition partition = this.partitionMap.get(Short.valueOf(parseMessage.getPartition()));
                    if (partition.store.right() == 0) {
                        partition.store.setRight(parseMessage.getIndex() * 12);
                    } else if (parseMessage.getIndex() * 12 != partition.store.right()) {
                        throw new WriteException(String.format("Index must be continuous, store: %s, partition: %d, next index of the partition: %s，index in log: %s, log position: %s, log: \n%s", this.base, Short.valueOf(parseMessage.getPartition()), Format.formatWithComma(partition.store.right() / 12), Format.formatWithComma(parseMessage.getIndex()), Format.formatWithComma(right + byteBuffer.position()), MessageParser.getString(byteBuffer)));
                    }
                    if (BatchMessageParser.isBatch(byteBuffer)) {
                        short batchSize = BatchMessageParser.getBatchSize(byteBuffer);
                        parseMessage.setBatchMessage(true);
                        parseMessage.setBatchMessageSize(batchSize);
                    }
                    writeIndex(parseMessage, partition.store);
                    updateLastEntryTerm(byteBuffer);
                    byteBuffer.position(byteBuffer.position() + parseMessage.getLength());
                    i++;
                }
                if (null != this.produceMetric) {
                    long nanoTime2 = System.nanoTime();
                    this.produceMetric.addTraffic("WriteTraffic", remaining);
                    this.produceMetric.addLatency("WriteLatency", nanoTime2 - nanoTime);
                    this.produceMetric.addCounter("WriteCount", i);
                }
                return appendByteBuffer;
            } catch (Throwable th) {
                onWriteException(right, createPartitionSnapshot, th);
                throw th;
            }
        } finally {
            this.writeLock.unlock();
        }
    }

    private void updateLastEntryTerm(ByteBuffer byteBuffer) {
        int i = MessageParser.getInt(byteBuffer, MessageParser.TERM);
        if (i < 0) {
            throw new WriteException(String.format("Invalid term %d at position %d!", Integer.valueOf(i), Integer.valueOf(byteBuffer.position())));
        }
        this.lastEntryTerm = i;
    }

    private void onWriteException(long j, Map<Short, Long> map, Throwable th) {
        try {
            rollback(j, map);
        } catch (Throwable th2) {
            logger.warn("Rollback failed, rollback to position: {}, topic={}, partitionGroup={}.", new Object[]{Long.valueOf(j), this.topic, Integer.valueOf(this.partitionGroup), th2});
        }
        if (!(th instanceof DiskFullException)) {
            logger.warn("Write failed, rollback to position: {}, topic={}, partitionGroup={}.", new Object[]{Long.valueOf(j), this.topic, Integer.valueOf(this.partitionGroup), th});
        } else {
            try {
                Thread.sleep(CHECK_DISK_SPACE_COOL_DOWN);
            } catch (InterruptedException e) {
            }
        }
    }

    private void rollback(long j, Map<Short, Long> map) throws IOException {
        stopFlushThread();
        try {
            map.forEach((sh, l) -> {
                try {
                    this.partitionMap.get(sh).store.setRight(l.longValue());
                } catch (Throwable th) {
                    logger.warn("Rollback partition failed! topic: {}, group: {}, partition: {}, rollback position: {}, current position: {}, store: {}.", new Object[]{this.topic, Integer.valueOf(this.partitionGroup), sh, l, Long.valueOf(this.partitionMap.get(sh).store.right()), this.base.getAbsoluteFile(), th});
                }
            });
            this.indexPosition = j;
            try {
                flushCheckpoint();
            } catch (Throwable th) {
            }
            this.store.setRight(j);
            startFlushThread();
        } catch (Throwable th2) {
            startFlushThread();
            throw th2;
        }
    }

    public long position(long j, int i) {
        return this.store.position(j, i);
    }

    public int lastEntryTerm() {
        return this.lastEntryTerm;
    }

    public void commit(long j) {
        if (j > this.replicationPosition) {
            this.replicationPosition = j;
            this.callbackMap.get(QosLevel.REPLICATION).callbackBefore(commitPosition());
        }
        this.callbackMap.get(QosLevel.ALL).callbackBefore(Math.min(flushPosition(), commitPosition()));
    }

    public int getEntryTerm(long j) {
        int i = 0;
        if (this.store.right() > this.store.left()) {
            try {
                ByteBuffer read = this.store.read(j);
                if (read == null) {
                    throw new ReadException(String.format("Read log failed! store: %s, position: %d.", this.store.base().getAbsolutePath(), Long.valueOf(j)));
                }
                int i2 = MessageParser.getInt(read, MessageParser.TERM);
                if (i2 >= 0) {
                    i = i2;
                }
            } catch (Exception e) {
                throw new ReadException(String.format("Read log failed! store: %s, position: %d.", this.store.base().getAbsolutePath(), Long.valueOf(j)), e);
            }
        }
        return i;
    }

    public long leftPosition() {
        return this.store.left();
    }

    public long rightPosition() {
        return this.store.right();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (null != this.store) {
            this.store.close();
        }
        Iterator<Partition> it = this.partitionMap.values().iterator();
        while (it.hasNext()) {
            it.next().store.close();
        }
    }

    public long getIndex(short s, long j) {
        try {
            if (!this.partitionMap.containsKey(Short.valueOf(s))) {
                return -1L;
            }
            PositioningStore<IndexItem> positioningStore = this.partitionMap.get(Short.valueOf(s)).store;
            long binarySearchByTimestamp = binarySearchByTimestamp(j, this.store, positioningStore, positioningStore.left() / 12, (positioningStore.right() / 12) - 1);
            while (binarySearchByTimestamp - 1 >= positioningStore.left() && j <= getStorageTimestamp(this.store, positioningStore, binarySearchByTimestamp - 1)) {
                binarySearchByTimestamp--;
            }
            return binarySearchByTimestamp;
        } catch (IOException | PositionOverflowException | PositionUnderflowException e) {
            logger.warn("Exception: ", e);
            return -1L;
        }
    }

    private long getStorageTimestamp(PositioningStore<ByteBuffer> positioningStore, PositioningStore<IndexItem> positioningStore2, long j) throws IOException {
        IndexItem read = positioningStore2.read(j * 12);
        return MessageParser.getLong(positioningStore.read(read.getOffset(), read.getLength()), MessageParser.CLIENT_TIMESTAMP) + MessageParser.getInt(r0, MessageParser.STORAGE_TIMESTAMP);
    }

    private long binarySearchByTimestamp(long j, PositioningStore<ByteBuffer> positioningStore, PositioningStore<IndexItem> positioningStore2, long j2, long j3) throws IOException {
        if (j3 <= j2) {
            return -1L;
        }
        if (j <= getStorageTimestamp(positioningStore, positioningStore2, j2)) {
            return j2;
        }
        if (j > getStorageTimestamp(positioningStore, positioningStore2, j3)) {
            return -1L;
        }
        if (j2 + 1 == j3) {
            return j3;
        }
        long j4 = j2 + ((j3 - j2) / 2);
        return j < getStorageTimestamp(positioningStore, positioningStore2, j4) ? binarySearchByTimestamp(j, positioningStore, positioningStore2, j2, j4) : binarySearchByTimestamp(j, positioningStore, positioningStore2, j4, j3);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QosStore getQosStore(QosLevel qosLevel) {
        return this.qosStores[qosLevel.value()];
    }
}
