package com.taobao.common.store.journal;

import com.taobao.common.store.journal.JournalStore;
import com.taobao.common.store.journal.impl.OpItemHashMap;
import com.taobao.common.store.util.BytesKey;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/taobao/common/store/journal/DataFileAppender.class */
public class DataFileAppender {
    private Thread thread;
    protected int maxWriteBatchSize;
    private WriteBatch nextWriteBatch;
    private final JournalStore journal;
    private volatile boolean shutdown = false;
    private boolean running = false;
    private final Lock enqueueLock = new ReentrantLock();
    private final Condition notEmpty = this.enqueueLock.newCondition();
    private final Condition empty = this.enqueueLock.newCondition();
    protected final Map<BytesKey, JournalStore.InflyWriteData> inflyWrites = new ConcurrentHashMap(OpItemHashMap.DEFAULT_CAPACITY);
    final Condition notSync = this.enqueueLock.newCondition();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/taobao/common/store/journal/DataFileAppender$WriteBatch.class */
    public class WriteBatch {
        final CountDownLatch latch = new CountDownLatch(1);
        final List<WriteCommand> cmdList = new ArrayList();
        int removeOPCount;
        final DataFile dataFile;
        final LogFile logFile;
        int dataSize;
        long offset;
        volatile IOException exception;
        final int number;

        public WriteBatch(WriteCommand writeCommand, DataFile dataFile, LogFile logFile) {
            this.offset = -1L;
            this.dataFile = dataFile;
            this.number = writeCommand.opItem.number;
            this.logFile = logFile;
            switch (writeCommand.opItem.op) {
                case OpItem.OP_ADD /* 1 */:
                    this.offset = writeCommand.opItem.offset;
                    this.dataSize += writeCommand.data.length;
                    this.dataFile.increment();
                    break;
                case 2:
                    this.removeOPCount++;
                    break;
                default:
                    throw new RuntimeException("Unknow op type " + writeCommand.opItem);
            }
            this.cmdList.add(writeCommand);
        }

        public boolean canAppend(WriteCommand writeCommand) throws IOException {
            switch (writeCommand.opItem.op) {
                case OpItem.OP_ADD /* 1 */:
                    return this.dataFile.getLength() + ((long) writeCommand.data.length) < 67108864 && this.dataSize + writeCommand.data.length < DataFileAppender.this.maxWriteBatchSize;
                case 2:
                    return writeCommand.opItem.number == this.number;
                default:
                    throw new RuntimeException("Unknow op type " + writeCommand.opItem);
            }
        }

        public void append(WriteCommand writeCommand) throws IOException {
            switch (writeCommand.opItem.op) {
                case OpItem.OP_ADD /* 1 */:
                    if (this.offset == -1) {
                        this.offset = this.dataFile.position();
                        writeCommand.opItem.offset = this.dataFile.position();
                        writeCommand.opItem.number = this.dataFile.getNumber();
                        this.dataFile.forward(writeCommand.data.length);
                        this.dataSize += writeCommand.data.length;
                    } else {
                        writeCommand.opItem.offset = this.offset + this.dataSize;
                        writeCommand.opItem.number = this.dataFile.getNumber();
                        this.dataFile.forward(writeCommand.data.length);
                        this.dataSize += writeCommand.data.length;
                    }
                    this.dataFile.increment();
                    break;
                case 2:
                    this.removeOPCount++;
                    break;
                default:
                    throw new RuntimeException("Unknow op type " + writeCommand.opItem);
            }
            this.cmdList.add(writeCommand);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/taobao/common/store/journal/DataFileAppender$WriteCommand.class */
    public class WriteCommand {
        final BytesKey bytesKey;
        final OpItem opItem;
        final byte[] data;
        final boolean sync;

        public WriteCommand(BytesKey bytesKey, OpItem opItem, byte[] bArr, boolean z) {
            this.bytesKey = bytesKey;
            this.opItem = opItem;
            this.data = bArr;
            this.sync = z;
        }

        public String toString() {
            return this.opItem.toString();
        }
    }

    public DataFileAppender(JournalStore journalStore) {
        this.maxWriteBatchSize = journalStore.maxWriteBatchSize;
        this.journal = journalStore;
    }

    public OpItem remove(OpItem opItem, BytesKey bytesKey, boolean z) throws IOException {
        if (this.shutdown) {
            throw new RuntimeException("DataFileAppender已经关闭");
        }
        return enqueueTryWait(opItem, z, new WriteCommand(bytesKey, opItem, null, z));
    }

    public OpItem store(OpItem opItem, BytesKey bytesKey, byte[] bArr, boolean z) throws IOException {
        if (this.shutdown) {
            throw new RuntimeException("DataFileAppender已经关闭");
        }
        opItem.key = bytesKey.getData();
        opItem.length = bArr.length;
        return enqueueTryWait(opItem, z, new WriteCommand(bytesKey, opItem, bArr, z));
    }

    private OpItem enqueueTryWait(OpItem opItem, boolean z, WriteCommand writeCommand) throws IOException {
        WriteBatch enqueue = enqueue(writeCommand, z);
        if (z) {
            try {
                enqueue.latch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            IOException iOException = enqueue.exception;
            if (iOException != null) {
                throw iOException;
            }
        }
        return opItem;
    }

    public void close() {
        this.enqueueLock.lock();
        try {
            if (!this.shutdown) {
                this.shutdown = true;
                this.running = false;
                this.empty.signalAll();
            }
            while (this.thread.isAlive()) {
                try {
                    this.thread.join();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } finally {
            this.enqueueLock.unlock();
        }
    }

    public void processQueue() {
        while (true) {
            WriteBatch writeBatch = null;
            this.enqueueLock.lock();
            while (this.nextWriteBatch == null) {
                try {
                    if (this.shutdown) {
                        return;
                    } else {
                        try {
                            this.empty.await();
                        } catch (InterruptedException e) {
                        }
                    }
                } finally {
                    this.enqueueLock.unlock();
                }
            }
            writeBatch = this.nextWriteBatch;
            this.nextWriteBatch = null;
            this.notEmpty.signalAll();
            this.enqueueLock.unlock();
            if (writeBatch != null) {
                DataFile dataFile = writeBatch.dataFile;
                LogFile logFile = writeBatch.logFile;
                try {
                    writeDataAndLog(writeBatch, dataFile, logFile, writeBatch.cmdList);
                    processRemove(writeBatch, dataFile, logFile);
                    writeBatch.latch.countDown();
                } catch (Throwable th) {
                    writeBatch.latch.countDown();
                    throw th;
                }
            }
        }
    }

    private void processRemove(WriteBatch writeBatch, DataFile dataFile, LogFile logFile) {
        if (dataFile == null || logFile == null) {
            return;
        }
        dataFile.decrement(writeBatch.removeOPCount);
        this.enqueueLock.lock();
        try {
            try {
                if (dataFile.getLength() >= 67108864 && dataFile.isUnUsed()) {
                    if (this.journal.dataFile == dataFile) {
                        this.journal.newDataFile();
                    }
                    this.journal.dataFiles.remove(Integer.valueOf(dataFile.getNumber()));
                    this.journal.logFiles.remove(Integer.valueOf(dataFile.getNumber()));
                    dataFile.delete();
                    logFile.delete();
                }
                this.enqueueLock.unlock();
            } catch (Exception e) {
                if (e instanceof IOException) {
                    writeBatch.exception = (IOException) e;
                } else {
                    writeBatch.exception = new IOException(e);
                }
                this.enqueueLock.unlock();
            }
        } catch (Throwable th) {
            this.enqueueLock.unlock();
            throw th;
        }
    }

    public byte[] getDataFromInFlyWrites(BytesKey bytesKey) {
        JournalStore.InflyWriteData inflyWriteData = this.inflyWrites.get(bytesKey);
        if (inflyWriteData == null || inflyWriteData.count <= 0) {
            return null;
        }
        return inflyWriteData.data;
    }

    private void writeDataAndLog(WriteBatch writeBatch, DataFile dataFile, LogFile logFile, List<WriteCommand> list) {
        JournalStore.InflyWriteData inflyWriteData;
        ByteBuffer allocate = writeBatch.dataSize > 0 ? ByteBuffer.allocate(writeBatch.dataSize) : null;
        ByteBuffer allocate2 = ByteBuffer.allocate(list.size() * 33);
        for (WriteCommand writeCommand : list) {
            allocate2.put(writeCommand.opItem.toByte());
            if (writeCommand.opItem.op == 1) {
                allocate.put(writeCommand.data);
            }
        }
        if (allocate != null) {
            allocate.flip();
        }
        allocate2.flip();
        if (allocate != null) {
            try {
                dataFile.write(allocate);
            } catch (IOException e) {
                writeBatch.exception = e;
            }
        }
        logFile.write(allocate2);
        this.enqueueLock.lock();
        try {
            for (WriteCommand writeCommand2 : list) {
                if (!writeCommand2.sync && writeCommand2.opItem.op == 1 && (inflyWriteData = this.inflyWrites.get(writeCommand2.bytesKey)) != null) {
                    inflyWriteData.count--;
                    if (inflyWriteData.count <= 0) {
                        this.inflyWrites.remove(writeCommand2.bytesKey);
                    }
                }
            }
        } finally {
            this.enqueueLock.unlock();
        }
    }

    public void sync() {
        this.enqueueLock.lock();
        while (this.nextWriteBatch != null) {
            try {
                try {
                    this.notEmpty.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            } finally {
                this.enqueueLock.unlock();
            }
        }
        Iterator<DataFile> it = this.journal.dataFiles.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().sync(this.notSync);
            } catch (Exception e2) {
            }
        }
    }

    private WriteBatch enqueue(WriteCommand writeCommand, boolean z) throws IOException {
        WriteBatch newWriteBatch;
        this.enqueueLock.lock();
        try {
            startAppendThreadIfNessary();
            if (this.nextWriteBatch == null) {
                newWriteBatch = newWriteBatch(writeCommand);
                this.empty.signalAll();
            } else if (this.nextWriteBatch.canAppend(writeCommand)) {
                this.nextWriteBatch.append(writeCommand);
                newWriteBatch = this.nextWriteBatch;
            } else {
                while (this.nextWriteBatch != null) {
                    try {
                        this.notEmpty.await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                newWriteBatch = newWriteBatch(writeCommand);
                this.empty.signalAll();
            }
            if (!z) {
                JournalStore.InflyWriteData inflyWriteData = this.inflyWrites.get(writeCommand.bytesKey);
                switch (writeCommand.opItem.op) {
                    case OpItem.OP_ADD /* 1 */:
                        if (inflyWriteData != null) {
                            inflyWriteData.data = writeCommand.data;
                            inflyWriteData.count++;
                            break;
                        } else {
                            this.inflyWrites.put(writeCommand.bytesKey, new JournalStore.InflyWriteData(writeCommand.data));
                            break;
                        }
                    case 2:
                        if (inflyWriteData != null) {
                            this.inflyWrites.remove(writeCommand.bytesKey);
                            break;
                        }
                        break;
                }
            }
            return newWriteBatch;
        } finally {
            this.enqueueLock.unlock();
        }
    }

    private WriteBatch newWriteBatch(WriteCommand writeCommand) throws IOException {
        WriteBatch writeBatch;
        if (writeCommand.opItem.op != 1) {
            DataFile dataFile = this.journal.dataFiles.get(Integer.valueOf(writeCommand.opItem.number));
            LogFile logFile = this.journal.logFiles.get(Integer.valueOf(writeCommand.opItem.number));
            if (dataFile == null || logFile == null) {
                throw new IOException("日志文件已经被删除，编号为" + writeCommand.opItem.number);
            }
            this.nextWriteBatch = new WriteBatch(writeCommand, dataFile, logFile);
            writeBatch = this.nextWriteBatch;
        } else {
            if (this.journal.indices.containsKey(writeCommand.bytesKey)) {
                throw new IOException("发现重复的key");
            }
            DataFile dataFile2 = getDataFile();
            writeCommand.opItem.offset = dataFile2.position();
            writeCommand.opItem.number = dataFile2.getNumber();
            dataFile2.forward(writeCommand.data.length);
            this.nextWriteBatch = new WriteBatch(writeCommand, dataFile2, this.journal.logFile);
            writeBatch = this.nextWriteBatch;
        }
        return writeBatch;
    }

    private void startAppendThreadIfNessary() {
        if (this.running) {
            return;
        }
        this.running = true;
        this.thread = new Thread() { // from class: com.taobao.common.store.journal.DataFileAppender.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                DataFileAppender.this.processQueue();
            }
        };
        this.thread.setPriority(10);
        this.thread.setDaemon(true);
        this.thread.setName("Store4j file appender");
        this.thread.start();
    }

    private DataFile getDataFile() throws IOException {
        DataFile dataFile = this.journal.dataFile;
        if (dataFile.getLength() >= 67108864) {
            dataFile = this.journal.newDataFile();
        }
        return dataFile;
    }
}
