package org.apache.rocketmq.store.dledger;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import io.openmessaging.storage.dledger.AppendFuture;
import io.openmessaging.storage.dledger.BatchAppendFuture;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFile;
import io.openmessaging.storage.dledger.store.file.MmapFileList;
import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.StoreStatsService;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;

/* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.9.4.jar:org/apache/rocketmq/store/dledger/DLedgerCommitLog.class */
public class DLedgerCommitLog extends CommitLog {
    private final DLedgerServer dLedgerServer;
    private final DLedgerConfig dLedgerConfig;
    private final DLedgerMmapFileStore dLedgerFileStore;
    private final MmapFileList dLedgerFileList;
    private final int id;
    private final MessageSerializer messageSerializer;
    private volatile long beginTimeInDledgerLock;
    private long dividedCommitlogOffset;
    private boolean isInrecoveringOldCommitlog;
    private final StringBuilder msgIdBuilder;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.9.4.jar:org/apache/rocketmq/store/dledger/DLedgerCommitLog$DLedgerSelectMappedBufferResult.class */
    public static class DLedgerSelectMappedBufferResult extends SelectMappedBufferResult {
        private SelectMmapBufferResult sbr;

        public DLedgerSelectMappedBufferResult(SelectMmapBufferResult selectMmapBufferResult) {
            super(selectMmapBufferResult.getStartOffset(), selectMmapBufferResult.getByteBuffer(), selectMmapBufferResult.getSize(), null);
            this.sbr = selectMmapBufferResult;
        }

        @Override // org.apache.rocketmq.store.SelectMappedBufferResult
        public synchronized void release() {
            super.release();
            if (this.sbr != null) {
                this.sbr.release();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.9.4.jar:org/apache/rocketmq/store/dledger/DLedgerCommitLog$EncodeResult.class */
    public class EncodeResult {
        private String queueOffsetKey;
        private ByteBuffer data;
        private List<byte[]> batchData;
        private AppendMessageStatus status;
        private int totalMsgLen;

        public EncodeResult(AppendMessageStatus appendMessageStatus, ByteBuffer byteBuffer, String str) {
            this.data = byteBuffer;
            this.status = appendMessageStatus;
            this.queueOffsetKey = str;
        }

        /* JADX WARN: Type inference failed for: r0v10, types: [long, java.nio.ByteBuffer] */
        public void setQueueOffsetKey(long j, boolean z) {
            if (!z) {
                this.data.putLong(20, j);
                return;
            }
            Iterator<byte[]> it = this.batchData.iterator();
            while (it.hasNext()) {
                ?? wrap = ByteBuffer.wrap(it.next());
                j++;
                wrap.putLong(20, wrap);
            }
        }

        public byte[] getData() {
            return this.data.array();
        }

        public EncodeResult(AppendMessageStatus appendMessageStatus, String str, List<byte[]> list, int i) {
            this.batchData = list;
            this.status = appendMessageStatus;
            this.queueOffsetKey = str;
            this.totalMsgLen = i;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.9.4.jar:org/apache/rocketmq/store/dledger/DLedgerCommitLog$MessageSerializer.class */
    class MessageSerializer {
        private final int maxMessageBodySize;

        MessageSerializer(int i) {
            this.maxMessageBodySize = i;
        }

        public EncodeResult serialize(MessageExtBrokerInner messageExtBrokerInner) {
            int sysFlag = messageExtBrokerInner.getSysFlag();
            int i = (sysFlag & 16) == 0 ? 8 : 20;
            int i2 = (sysFlag & 32) == 0 ? 8 : 20;
            ByteBuffer allocate = ByteBuffer.allocate(i);
            ByteBuffer allocate2 = ByteBuffer.allocate(i2);
            String str = messageExtBrokerInner.getTopic() + "-" + messageExtBrokerInner.getQueueId();
            byte[] bytes = messageExtBrokerInner.getPropertiesString() == null ? null : messageExtBrokerInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
            int length = bytes == null ? 0 : bytes.length;
            if (length > 32767) {
                DLedgerCommitLog.log.warn("putMessage message properties length too long. length={}", Integer.valueOf(bytes.length));
                return new EncodeResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED, null, str);
            }
            byte[] bytes2 = messageExtBrokerInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            int length2 = bytes2.length;
            int length3 = messageExtBrokerInner.getBody() == null ? 0 : messageExtBrokerInner.getBody().length;
            int calMsgLength = DLedgerCommitLog.calMsgLength(messageExtBrokerInner.getSysFlag(), length3, length2, length);
            ByteBuffer allocate3 = ByteBuffer.allocate(calMsgLength);
            if (length3 > this.maxMessageBodySize) {
                DLedgerCommitLog.log.warn("message body size exceeded, msg total size: " + calMsgLength + ", msg body size: " + length3 + ", maxMessageBodySize: " + this.maxMessageBodySize);
                return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, str);
            }
            resetByteBuffer(allocate3, calMsgLength);
            allocate3.putInt(calMsgLength);
            allocate3.putInt(-626843481);
            allocate3.putInt(messageExtBrokerInner.getBodyCRC());
            allocate3.putInt(messageExtBrokerInner.getQueueId());
            allocate3.putInt(messageExtBrokerInner.getFlag());
            allocate3.putLong(0L);
            allocate3.putLong(0L);
            allocate3.putInt(messageExtBrokerInner.getSysFlag());
            allocate3.putLong(messageExtBrokerInner.getBornTimestamp());
            resetByteBuffer(allocate, i);
            allocate3.put(messageExtBrokerInner.getBornHostBytes(allocate));
            allocate3.putLong(messageExtBrokerInner.getStoreTimestamp());
            resetByteBuffer(allocate2, i2);
            allocate3.put(messageExtBrokerInner.getStoreHostBytes(allocate2));
            allocate3.putInt(messageExtBrokerInner.getReconsumeTimes());
            allocate3.putLong(messageExtBrokerInner.getPreparedTransactionOffset());
            allocate3.putInt(length3);
            if (length3 > 0) {
                allocate3.put(messageExtBrokerInner.getBody());
            }
            allocate3.put((byte) length2);
            allocate3.put(bytes2);
            allocate3.putShort((short) length);
            if (length > 0) {
                allocate3.put(bytes);
            }
            return new EncodeResult(AppendMessageStatus.PUT_OK, allocate3, str);
        }

        public EncodeResult serialize(MessageExtBatch messageExtBatch) {
            String str = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId();
            int i = 0;
            ByteBuffer wrap = messageExtBatch.wrap();
            int limit = wrap.limit();
            if (limit > this.maxMessageBodySize) {
                DLedgerCommitLog.log.warn("message body size exceeded, msg body size: " + limit + ", maxMessageBodySize: " + this.maxMessageBodySize);
                throw new RuntimeException("message size exceeded");
            }
            LinkedList linkedList = new LinkedList();
            int sysFlag = messageExtBatch.getSysFlag();
            int i2 = (sysFlag & 16) == 0 ? 8 : 20;
            int i3 = (sysFlag & 32) == 0 ? 8 : 20;
            ByteBuffer allocate = ByteBuffer.allocate(i2);
            ByteBuffer allocate2 = ByteBuffer.allocate(i3);
            while (wrap.hasRemaining()) {
                wrap.getInt();
                wrap.getInt();
                wrap.getInt();
                int i4 = wrap.getInt();
                int i5 = wrap.getInt();
                int position = wrap.position();
                int crc32 = UtilAll.crc32(wrap.array(), position, i5);
                wrap.position(position + i5);
                short s = wrap.getShort();
                int position2 = wrap.position();
                wrap.position(position2 + s);
                byte[] bytes = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
                int length = bytes.length;
                int calMsgLength = DLedgerCommitLog.calMsgLength(messageExtBatch.getSysFlag(), i5, length, s);
                ByteBuffer allocate3 = ByteBuffer.allocate(calMsgLength);
                i += calMsgLength;
                resetByteBuffer(allocate3, calMsgLength);
                allocate3.putInt(calMsgLength);
                allocate3.putInt(-626843481);
                allocate3.putInt(crc32);
                allocate3.putInt(messageExtBatch.getQueueId());
                allocate3.putInt(i4);
                allocate3.putLong(0L);
                allocate3.putLong(0L);
                allocate3.putInt(messageExtBatch.getSysFlag());
                allocate3.putLong(messageExtBatch.getBornTimestamp());
                resetByteBuffer(allocate, i2);
                allocate3.put(messageExtBatch.getBornHostBytes(allocate));
                allocate3.putLong(messageExtBatch.getStoreTimestamp());
                resetByteBuffer(allocate2, i3);
                allocate3.put(messageExtBatch.getStoreHostBytes(allocate2));
                allocate3.putInt(messageExtBatch.getReconsumeTimes());
                allocate3.putLong(0L);
                allocate3.putInt(i5);
                if (i5 > 0) {
                    allocate3.put(wrap.array(), position, i5);
                }
                allocate3.put((byte) length);
                allocate3.put(bytes);
                allocate3.putShort(s);
                if (s > 0) {
                    allocate3.put(wrap.array(), position2, s);
                }
                byte[] bArr = new byte[calMsgLength];
                allocate3.clear();
                allocate3.get(bArr);
                linkedList.add(bArr);
            }
            return new EncodeResult(AppendMessageStatus.PUT_OK, str, linkedList, i);
        }

        private void resetByteBuffer(ByteBuffer byteBuffer, int i) {
            byteBuffer.flip();
            byteBuffer.limit(i);
        }
    }

    public DLedgerCommitLog(DefaultMessageStore defaultMessageStore) {
        super(defaultMessageStore);
        this.beginTimeInDledgerLock = 0L;
        this.dividedCommitlogOffset = -1L;
        this.isInrecoveringOldCommitlog = false;
        this.msgIdBuilder = new StringBuilder();
        this.dLedgerConfig = new DLedgerConfig();
        this.dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
        this.dLedgerConfig.setStoreType(DLedgerConfig.FILE);
        this.dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
        this.dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
        this.dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
        this.dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
        this.dLedgerConfig.setDataStorePath(defaultMessageStore.getMessageStoreConfig().getStorePathDLedgerCommitLog());
        this.dLedgerConfig.setReadOnlyDataStoreDirs(defaultMessageStore.getMessageStoreConfig().getReadOnlyCommitLogStorePaths());
        this.dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
        this.dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
        this.dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);
        this.dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId());
        this.dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush());
        this.dLedgerConfig.setDiskSpaceRatioToCheckExpired(defaultMessageStore.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0f);
        this.id = Integer.parseInt(this.dLedgerConfig.getSelfId().substring(1)) + 1;
        this.dLedgerServer = new DLedgerServer(this.dLedgerConfig);
        this.dLedgerFileStore = (DLedgerMmapFileStore) this.dLedgerServer.getdLedgerStore();
        this.dLedgerFileStore.addAppendHook((dLedgerEntry, byteBuffer, i) -> {
            if (!$assertionsDisabled && i != 48) {
                throw new AssertionError();
            }
            byteBuffer.position(byteBuffer.position() + i + 28);
            byteBuffer.putLong(dLedgerEntry.getPos() + i);
        });
        this.dLedgerFileList = this.dLedgerFileStore.getDataFileList();
        this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public boolean load() {
        return super.load();
    }

    private void refreshConfig() {
        this.dLedgerConfig.setEnableDiskForceClean(this.defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
        this.dLedgerConfig.setDeleteWhen(this.defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
        this.dLedgerConfig.setFileReservedHours(this.defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);
    }

    private void disableDeleteDledger() {
        this.dLedgerConfig.setEnableDiskForceClean(false);
        this.dLedgerConfig.setFileReservedHours(87600);
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public void start() {
        this.dLedgerServer.startup();
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public void shutdown() {
        this.dLedgerServer.shutdown();
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public long flush() {
        this.dLedgerFileStore.flush();
        return this.dLedgerFileList.getFlushedWhere();
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public long getMaxOffset() {
        if (this.dLedgerFileStore.getCommittedPos() > 0) {
            return this.dLedgerFileStore.getCommittedPos();
        }
        if (this.dLedgerFileList.getMinOffset() > 0) {
            return this.dLedgerFileList.getMinOffset();
        }
        return 0L;
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public long getMinOffset() {
        return !this.mappedFileQueue.getMappedFiles().isEmpty() ? this.mappedFileQueue.getMinOffset() : this.dLedgerFileList.getMinOffset();
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public long getConfirmOffset() {
        return getMaxOffset();
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public void setConfirmOffset(long j) {
        log.warn("Should not set confirm offset {} for dleger commitlog", Long.valueOf(j));
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public long remainHowManyDataToCommit() {
        return this.dLedgerFileList.remainHowManyDataToCommit();
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public long remainHowManyDataToFlush() {
        return this.dLedgerFileList.remainHowManyDataToFlush();
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public int deleteExpiredFile(long j, int i, long j2, boolean z) {
        if (this.mappedFileQueue.getMappedFiles().isEmpty()) {
            refreshConfig();
            return Integer.MAX_VALUE;
        }
        disableDeleteDledger();
        int deleteExpiredFile = super.deleteExpiredFile(j, i, j2, z);
        if (deleteExpiredFile > 0 || this.mappedFileQueue.getMappedFiles().size() != 1) {
            return deleteExpiredFile;
        }
        MappedFile lastMappedFile = this.mappedFileQueue.getLastMappedFile();
        log.info("Try to delete the last old commitlog file {}", lastMappedFile.getFileName());
        if (System.currentTimeMillis() < lastMappedFile.getLastModifiedTimestamp() + j && !z) {
            return 1;
        }
        while (!lastMappedFile.destroy(AbstractComponentTracker.LINGERING_TIMEOUT)) {
            DLedgerUtils.sleep(1000L);
        }
        this.mappedFileQueue.getMappedFiles().remove(lastMappedFile);
        return 1;
    }

    public SelectMappedBufferResult convertSbr(SelectMmapBufferResult selectMmapBufferResult) {
        if (selectMmapBufferResult == null) {
            return null;
        }
        return new DLedgerSelectMappedBufferResult(selectMmapBufferResult);
    }

    public SelectMmapBufferResult truncate(SelectMmapBufferResult selectMmapBufferResult) {
        long committedPos = this.dLedgerFileStore.getCommittedPos();
        if (selectMmapBufferResult == null || selectMmapBufferResult.getStartOffset() == committedPos) {
            return null;
        }
        if (selectMmapBufferResult.getStartOffset() + selectMmapBufferResult.getSize() <= committedPos) {
            return selectMmapBufferResult;
        }
        selectMmapBufferResult.setSize((int) (committedPos - selectMmapBufferResult.getStartOffset()));
        return selectMmapBufferResult;
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public SelectMappedBufferResult getData(long j) {
        if (j < this.dividedCommitlogOffset) {
            return super.getData(j);
        }
        return getData(j, j == 0);
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public SelectMappedBufferResult getData(long j, boolean z) {
        if (j < this.dividedCommitlogOffset) {
            return super.getData(j, z);
        }
        if (j >= this.dLedgerFileStore.getCommittedPos()) {
            return null;
        }
        int mappedFileSizeForEntryData = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
        MmapFile findMappedFileByOffset = this.dLedgerFileList.findMappedFileByOffset(j, z);
        if (findMappedFileByOffset != null) {
            return convertSbr(truncate(findMappedFileByOffset.selectMappedBuffer((int) (j % mappedFileSizeForEntryData))));
        }
        return null;
    }

    private void recover(long j) {
        this.dLedgerFileStore.load();
        if (this.dLedgerFileList.getMappedFiles().size() > 0) {
            this.dLedgerFileStore.recover();
            this.dividedCommitlogOffset = this.dLedgerFileList.getFirstMappedFile().getFileFromOffset();
            if (this.mappedFileQueue.getLastMappedFile() != null) {
                disableDeleteDledger();
            }
            long maxWrotePosition = this.dLedgerFileList.getMaxWrotePosition();
            if (j >= maxWrotePosition) {
                log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", Long.valueOf(j), Long.valueOf(maxWrotePosition));
                this.defaultMessageStore.truncateDirtyLogicFiles(maxWrotePosition);
                return;
            }
            return;
        }
        this.isInrecoveringOldCommitlog = true;
        super.recoverNormally(j);
        this.isInrecoveringOldCommitlog = false;
        MappedFile lastMappedFile = this.mappedFileQueue.getLastMappedFile();
        if (lastMappedFile == null) {
            return;
        }
        ByteBuffer sliceByteBuffer = lastMappedFile.sliceByteBuffer();
        sliceByteBuffer.position(lastMappedFile.getWrotePosition());
        boolean z = true;
        sliceByteBuffer.getInt();
        int i = sliceByteBuffer.getInt();
        if (i == -875286124) {
            z = false;
        } else {
            log.info("Recover old commitlog found a illegal magic code={}", Integer.valueOf(i));
        }
        this.dLedgerConfig.setEnableDiskForceClean(false);
        this.dividedCommitlogOffset = lastMappedFile.getFileFromOffset() + lastMappedFile.getFileSize();
        log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", Boolean.valueOf(z), Long.valueOf(lastMappedFile.getFileFromOffset() + lastMappedFile.getWrotePosition()), lastMappedFile.getFileName(), Long.valueOf(this.dividedCommitlogOffset));
        if (z) {
            sliceByteBuffer.position(lastMappedFile.getWrotePosition());
            sliceByteBuffer.putInt(lastMappedFile.getFileSize() - lastMappedFile.getWrotePosition());
            sliceByteBuffer.putInt(-875286124);
            lastMappedFile.flush(0);
        }
        lastMappedFile.setWrotePosition(lastMappedFile.getFileSize());
        lastMappedFile.setCommittedPosition(lastMappedFile.getFileSize());
        lastMappedFile.setFlushedPosition(lastMappedFile.getFileSize());
        this.dLedgerFileList.getLastMappedFile(this.dividedCommitlogOffset);
        log.info("Will set the initial commitlog offset={} for dledger", Long.valueOf(this.dividedCommitlogOffset));
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public void recoverNormally(long j) {
        recover(j);
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public void recoverAbnormally(long j) {
        recover(j);
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, boolean z) {
        return checkMessageAndReturnSize(byteBuffer, z, true);
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, boolean z, boolean z2) {
        if (this.isInrecoveringOldCommitlog) {
            return super.checkMessageAndReturnSize(byteBuffer, z, z2);
        }
        try {
            int position = byteBuffer.position();
            int i = byteBuffer.getInt();
            int i2 = byteBuffer.getInt();
            if (i2 == -875286124 || i2 == -626843481) {
                byteBuffer.position(position);
                return super.checkMessageAndReturnSize(byteBuffer, z, z2);
            }
            if (i == -1) {
                return new DispatchRequest(0, true);
            }
            byteBuffer.position(position + 48);
            DispatchRequest checkMessageAndReturnSize = super.checkMessageAndReturnSize(byteBuffer, z, z2);
            if (checkMessageAndReturnSize.isSuccess()) {
                checkMessageAndReturnSize.setBufferSize(checkMessageAndReturnSize.getMsgSize() + 48);
            } else if (checkMessageAndReturnSize.getMsgSize() > 0) {
                checkMessageAndReturnSize.setBufferSize(checkMessageAndReturnSize.getMsgSize() + 48);
            }
            return checkMessageAndReturnSize;
        } catch (Throwable th) {
            return new DispatchRequest(-1, false);
        }
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public boolean resetOffset(long j) {
        return false;
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public long getBeginTimeInLock() {
        return this.beginTimeInDledgerLock;
    }

    private void setMessageInfo(MessageExtBrokerInner messageExtBrokerInner, int i) {
        messageExtBrokerInner.setStoreTimestamp(System.currentTimeMillis());
        messageExtBrokerInner.setBodyCRC(UtilAll.crc32(messageExtBrokerInner.getBody()));
        if ((i == 0 || i == 8) && messageExtBrokerInner.getDelayTimeLevel() > 0) {
            if (messageExtBrokerInner.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                messageExtBrokerInner.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            int delayLevel2QueueId = ScheduleMessageService.delayLevel2QueueId(messageExtBrokerInner.getDelayTimeLevel());
            MessageAccessor.putProperty(messageExtBrokerInner, MessageConst.PROPERTY_REAL_TOPIC, messageExtBrokerInner.getTopic());
            MessageAccessor.putProperty(messageExtBrokerInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(messageExtBrokerInner.getQueueId()));
            messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
            messageExtBrokerInner.setTopic(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);
            messageExtBrokerInner.setQueueId(delayLevel2QueueId);
        }
        if (((InetSocketAddress) messageExtBrokerInner.getBornHost()).getAddress() instanceof Inet6Address) {
            messageExtBrokerInner.setBornHostV6Flag();
        }
        if (((InetSocketAddress) messageExtBrokerInner.getStoreHost()).getAddress() instanceof Inet6Address) {
            messageExtBrokerInner.setStoreHostAddressV6Flag();
        }
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner messageExtBrokerInner) {
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        int transactionValue = MessageSysFlag.getTransactionValue(messageExtBrokerInner.getSysFlag());
        setMessageInfo(messageExtBrokerInner, transactionValue);
        String topic = messageExtBrokerInner.getTopic();
        EncodeResult encodeResult = null;
        boolean isMultiDispatchMsg = this.multiDispatch.isMultiDispatchMsg(messageExtBrokerInner);
        if (!isMultiDispatchMsg) {
            encodeResult = this.messageSerializer.serialize(messageExtBrokerInner);
            if (encodeResult.status != AppendMessageStatus.PUT_OK) {
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
            }
        }
        this.putMessageLock.lock();
        try {
            try {
                this.beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
                if (isMultiDispatchMsg) {
                    if (!this.multiDispatch.wrapMultiDispatch(messageExtBrokerInner)) {
                        CompletableFuture<PutMessageResult> completedFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
                        this.beginTimeInDledgerLock = 0L;
                        this.putMessageLock.unlock();
                        return completedFuture;
                    }
                    encodeResult = this.messageSerializer.serialize(messageExtBrokerInner);
                    if (encodeResult.status != AppendMessageStatus.PUT_OK) {
                        CompletableFuture<PutMessageResult> completedFuture2 = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
                        this.beginTimeInDledgerLock = 0L;
                        this.putMessageLock.unlock();
                        return completedFuture2;
                    }
                }
                long queueOffsetByKey = getQueueOffsetByKey(encodeResult.queueOffsetKey, transactionValue);
                encodeResult.setQueueOffsetKey(queueOffsetByKey, false);
                AppendEntryRequest appendEntryRequest = new AppendEntryRequest();
                appendEntryRequest.setGroup(this.dLedgerConfig.getGroup());
                appendEntryRequest.setRemoteId(this.dLedgerServer.getMemberState().getSelfId());
                appendEntryRequest.setBody(encodeResult.getData());
                AppendFuture appendFuture = (AppendFuture) this.dLedgerServer.handleAppend(appendEntryRequest);
                if (appendFuture.getPos() == -1) {
                    CompletableFuture<PutMessageResult> completedFuture3 = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
                    this.beginTimeInDledgerLock = 0L;
                    this.putMessageLock.unlock();
                    return completedFuture3;
                }
                long pos = appendFuture.getPos() + 48;
                String createMessageId = MessageDecoder.createMessageId(ByteBuffer.allocate((messageExtBrokerInner.getSysFlag() & 32) == 0 ? 16 : 28), messageExtBrokerInner.getStoreHostBytes(), pos);
                long now = this.defaultMessageStore.getSystemClock().now() - this.beginTimeInDledgerLock;
                AppendMessageResult appendMessageResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, pos, encodeResult.getData().length, createMessageId, System.currentTimeMillis(), queueOffsetByKey, now);
                switch (transactionValue) {
                    case 0:
                    case 8:
                        this.topicQueueTable.put(encodeResult.queueOffsetKey, Long.valueOf(queueOffsetByKey + 1));
                        this.multiDispatch.updateMultiQueueOffset(messageExtBrokerInner);
                        break;
                    case 4:
                    case 12:
                        break;
                }
                this.beginTimeInDledgerLock = 0L;
                this.putMessageLock.unlock();
                if (now > 500) {
                    log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", Long.valueOf(now), Integer.valueOf(messageExtBrokerInner.getBody().length), appendMessageResult);
                }
                return appendFuture.thenApply(appendEntryResponse -> {
                    PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
                    switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
                        case SUCCESS:
                            putMessageStatus = PutMessageStatus.PUT_OK;
                            break;
                        case INCONSISTENT_LEADER:
                        case NOT_LEADER:
                        case LEADER_NOT_READY:
                        case DISK_FULL:
                            putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
                            break;
                        case WAIT_QUORUM_ACK_TIMEOUT:
                            putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
                            break;
                        case LEADER_PENDING_FULL:
                            putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
                            break;
                    }
                    PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendMessageResult);
                    if (putMessageStatus == PutMessageStatus.PUT_OK) {
                        storeStatsService.getSinglePutMessageTopicTimesTotal(topic).add(1L);
                        storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBrokerInner.getTopic()).add(appendMessageResult.getWroteBytes());
                    }
                    return putMessageResult;
                });
            } catch (Exception e) {
                log.error("Put message error", (Throwable) e);
                CompletableFuture<PutMessageResult> completedFuture4 = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
                this.beginTimeInDledgerLock = 0L;
                this.putMessageLock.unlock();
                return completedFuture4;
            }
        } catch (Throwable th) {
            this.beginTimeInDledgerLock = 0L;
            this.putMessageLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
        int transactionValue = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
        if (transactionValue == 0 && messageExtBatch.getDelayTimeLevel() <= 0) {
            messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
            StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
            if (((InetSocketAddress) messageExtBatch.getBornHost()).getAddress() instanceof Inet6Address) {
                messageExtBatch.setBornHostV6Flag();
            }
            if (((InetSocketAddress) messageExtBatch.getStoreHost()).getAddress() instanceof Inet6Address) {
                messageExtBatch.setStoreHostAddressV6Flag();
            }
            EncodeResult serialize = this.messageSerializer.serialize(messageExtBatch);
            if (serialize.status != AppendMessageStatus.PUT_OK) {
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(serialize.status)));
            }
            this.putMessageLock.lock();
            this.msgIdBuilder.setLength(0);
            int i = 0;
            try {
                try {
                    this.beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
                    long queueOffsetByKey = getQueueOffsetByKey(serialize.queueOffsetKey, transactionValue);
                    serialize.setQueueOffsetKey(queueOffsetByKey, true);
                    BatchAppendEntryRequest batchAppendEntryRequest = new BatchAppendEntryRequest();
                    batchAppendEntryRequest.setGroup(this.dLedgerConfig.getGroup());
                    batchAppendEntryRequest.setRemoteId(this.dLedgerServer.getMemberState().getSelfId());
                    batchAppendEntryRequest.setBatchMsgs(serialize.batchData);
                    AppendFuture appendFuture = (AppendFuture) this.dLedgerServer.handleAppend(batchAppendEntryRequest);
                    if (appendFuture.getPos() == -1) {
                        log.warn("HandleAppend return false due to error code {}", Integer.valueOf(((AppendEntryResponse) appendFuture.get()).getCode()));
                        CompletableFuture<PutMessageResult> completedFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
                        this.beginTimeInDledgerLock = 0L;
                        this.putMessageLock.unlock();
                        return completedFuture;
                    }
                    BatchAppendFuture batchAppendFuture = (BatchAppendFuture) appendFuture;
                    ByteBuffer allocate = ByteBuffer.allocate((messageExtBatch.getSysFlag() & 32) == 0 ? 16 : 28);
                    boolean z = true;
                    long j = 0;
                    for (long j2 : batchAppendFuture.getPositions()) {
                        long j3 = j2 + 48;
                        if (z) {
                            j = j3;
                            z = false;
                        }
                        String createMessageId = MessageDecoder.createMessageId(allocate, messageExtBatch.getStoreHostBytes(), j3);
                        if (this.msgIdBuilder.length() > 0) {
                            this.msgIdBuilder.append(',').append(createMessageId);
                        } else {
                            this.msgIdBuilder.append(createMessageId);
                        }
                        i++;
                    }
                    long now = this.defaultMessageStore.getSystemClock().now() - this.beginTimeInDledgerLock;
                    AppendMessageResult appendMessageResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, j, serialize.totalMsgLen, this.msgIdBuilder.toString(), System.currentTimeMillis(), queueOffsetByKey, now);
                    appendMessageResult.setMsgNum(i);
                    this.topicQueueTable.put(serialize.queueOffsetKey, Long.valueOf(queueOffsetByKey + i));
                    this.beginTimeInDledgerLock = 0L;
                    this.putMessageLock.unlock();
                    if (now > 500) {
                        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", Long.valueOf(now), Integer.valueOf(messageExtBatch.getBody().length), appendMessageResult);
                    }
                    return batchAppendFuture.thenApply(appendEntryResponse -> {
                        PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
                        switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
                            case SUCCESS:
                                putMessageStatus = PutMessageStatus.PUT_OK;
                                break;
                            case INCONSISTENT_LEADER:
                            case NOT_LEADER:
                            case LEADER_NOT_READY:
                            case DISK_FULL:
                                putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
                                break;
                            case WAIT_QUORUM_ACK_TIMEOUT:
                                putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
                                break;
                            case LEADER_PENDING_FULL:
                                putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
                                break;
                        }
                        PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendMessageResult);
                        if (putMessageStatus == PutMessageStatus.PUT_OK) {
                            storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(appendMessageResult.getMsgNum());
                            storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(appendMessageResult.getWroteBytes());
                        }
                        return putMessageResult;
                    });
                } catch (Exception e) {
                    log.error("Put message error", (Throwable) e);
                    CompletableFuture<PutMessageResult> completedFuture2 = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
                    this.beginTimeInDledgerLock = 0L;
                    this.putMessageLock.unlock();
                    return completedFuture2;
                }
            } catch (Throwable th) {
                this.beginTimeInDledgerLock = 0L;
                this.putMessageLock.unlock();
                throw th;
            }
        }
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public SelectMappedBufferResult getMessage(long j, int i) {
        if (j < this.dividedCommitlogOffset) {
            return super.getMessage(j, i);
        }
        int mappedFileSizeForEntryData = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
        MmapFile findMappedFileByOffset = this.dLedgerFileList.findMappedFileByOffset(j, j == 0);
        if (findMappedFileByOffset != null) {
            return convertSbr(findMappedFileByOffset.selectMappedBuffer((int) (j % mappedFileSizeForEntryData), i));
        }
        return null;
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public long rollNextFile(long j) {
        int mappedFileSizeCommitLog = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        return (j + mappedFileSizeCommitLog) - (j % mappedFileSizeCommitLog);
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public HashMap<String, Long> getTopicQueueTable() {
        return this.topicQueueTable;
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public void setTopicQueueTable(HashMap<String, Long> hashMap) {
        this.topicQueueTable = hashMap;
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public void destroy() {
        super.destroy();
        this.dLedgerFileList.destroy();
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public boolean appendData(long j, byte[] bArr, int i, int i2) {
        return false;
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public void checkSelf() {
        this.dLedgerFileList.checkSelf();
    }

    @Override // org.apache.rocketmq.store.CommitLog
    public long lockTimeMills() {
        long j = 0;
        long j2 = this.beginTimeInDledgerLock;
        if (j2 > 0) {
            j = this.defaultMessageStore.now() - j2;
        }
        if (j < 0) {
            j = 0;
        }
        return j;
    }

    private long getQueueOffsetByKey(String str, int i) {
        long longValue = this.topicQueueTable.computeIfAbsent(str, str2 -> {
            return 0L;
        }).longValue();
        switch (i) {
            case 4:
            case 12:
                longValue = 0;
                break;
        }
        return longValue;
    }

    public DLedgerServer getdLedgerServer() {
        return this.dLedgerServer;
    }

    public int getId() {
        return this.id;
    }

    public long getDividedCommitlogOffset() {
        return this.dividedCommitlogOffset;
    }

    static {
        $assertionsDisabled = !DLedgerCommitLog.class.desiredAssertionStatus();
        System.setProperty("dLedger.multiPath.Splitter", MessageStoreConfig.MULTI_PATH_SPLITTER);
    }
}
