/*
 * Decompiled with CFR 0.152.
 */
package org.commoncrawl.hadoop.mergeutils;

import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.commoncrawl.hadoop.mergeutils.RawDataSpillWriter;
import org.commoncrawl.hadoop.mergeutils.SequenceFileIndexWriter;
import org.commoncrawl.util.shared.CCStringUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class SequenceFileSpillWriter<KeyType extends WritableComparable, ValueType extends Writable>
implements RawDataSpillWriter<KeyType, ValueType> {
    private static final Log LOG = LogFactory.getLog(SequenceFileSpillWriter.class);
    private static final int BUFFER_QUEUE_CAPACITY = 100;
    private SequenceFileIndexWriter<KeyType, ValueType> _indexWriter = null;
    private SequenceFile.Writer writer = null;
    private long _recordCount = 0L;
    private FSDataOutputStream _outputStream;
    private ByteBuffer _activeBuffer = null;
    private Thread _writerThread = null;
    private IOException _writerException = null;
    private int _spillBufferSize = -1;
    public static final int DEFAULT_SPILL_BUFFER_SIZE = 1000000;
    public static final String SPILL_WRITER_BUFFER_SIZE_PARAM = "commoncrawl.spillwriter.buffer.size";
    public static final String QUEUE_CAPACITY_PARAM = "commoncrawl.spillwriter.queue.capacity";
    private LinkedBlockingQueue<QueuedBufferItem> _bufferQueue;

    public SequenceFileSpillWriter(FileSystem fileSystem, Configuration conf, Path outputFilePath, Class<KeyType> keyClass, Class<ValueType> valueClass, SequenceFileIndexWriter<KeyType, ValueType> optionalIndexWriter, CompressionCodec codec, short replicationFactor) throws IOException {
        this._bufferQueue = new LinkedBlockingQueue(conf.getInt(QUEUE_CAPACITY_PARAM, 100));
        this._spillBufferSize = conf.getInt(SPILL_WRITER_BUFFER_SIZE_PARAM, 1000000);
        this._outputStream = fileSystem.create(outputFilePath, true, 0xA00000, replicationFactor, fileSystem.getDefaultBlockSize());
        this._activeBuffer = ByteBuffer.allocate(this._spillBufferSize);
        this._indexWriter = optionalIndexWriter;
        this.writer = codec != null ? SequenceFile.createWriter((Configuration)conf, (FSDataOutputStream)this._outputStream, keyClass, valueClass, (SequenceFile.CompressionType)SequenceFile.CompressionType.BLOCK, (CompressionCodec)codec) : SequenceFile.createWriter((Configuration)conf, (FSDataOutputStream)this._outputStream, keyClass, valueClass, (SequenceFile.CompressionType)SequenceFile.CompressionType.NONE, null);
        this._writerThread = new Thread(new Runnable(){

            public void run() {
                block4: while (true) {
                    QueuedBufferItem queuedBufferItem = null;
                    try {
                        queuedBufferItem = (QueuedBufferItem)SequenceFileSpillWriter.this._bufferQueue.take();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (queuedBufferItem._buffer == null) {
                        return;
                    }
                    ByteBuffer theBuffer = queuedBufferItem._buffer;
                    byte[] bufferAsBytes = theBuffer.array();
                    int itemsWritten = 0;
                    long timeStart = System.currentTimeMillis();
                    while (true) {
                        if (theBuffer.remaining() == 0) continue block4;
                        int keyLen = theBuffer.getInt();
                        int keyPos = theBuffer.position();
                        theBuffer.position(keyPos + keyLen);
                        int valueLen = theBuffer.getInt();
                        int valuePosition = theBuffer.position();
                        theBuffer.position(valuePosition + valueLen);
                        try {
                            SequenceFileSpillWriter.this.spillRawRecord2(bufferAsBytes, keyPos, keyLen, bufferAsBytes, valuePosition, valueLen);
                        }
                        catch (IOException e) {
                            LOG.error((Object)("Writer Thread Failed with Error:" + CCStringUtils.stringifyException((Throwable)e)));
                            SequenceFileSpillWriter.this._writerException = e;
                            return;
                        }
                        ++itemsWritten;
                    }
                    break;
                }
            }
        });
        this._writerThread.start();
    }

    @Override
    public void close() throws IOException {
        this.flushActiveBuffer();
        if (this._writerThread != null) {
            try {
                this._bufferQueue.put(new QueuedBufferItem(null));
                this._writerThread.join();
                this._writerThread = null;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        if (this.writer != null) {
            this.writer.close();
            this.writer = null;
        }
        if (this._indexWriter != null) {
            this._indexWriter.close();
            this._indexWriter = null;
        }
        if (this._outputStream != null) {
            this._outputStream.close();
            this._outputStream = null;
        }
    }

    private static OutputStream newOutputStream(final ByteBuffer buf) {
        return new OutputStream(){

            public void write(int b) throws IOException {
                buf.put((byte)(b & 0xFF));
            }

            public void write(byte[] src, int off, int len) throws IOException {
                buf.put(src, off, len);
            }
        };
    }

    @Override
    public void spillRecord(KeyType key, ValueType value) throws IOException {
        int startPosition = -1;
        boolean done = false;
        while (!done) {
            if (this._activeBuffer == null) {
                this.allocateActiveBuffer();
            }
            startPosition = this._activeBuffer.position();
            DataOutputStream outputStream = new DataOutputStream(SequenceFileSpillWriter.newOutputStream(this._activeBuffer));
            int keySizePos = 0;
            int valueSizePos = 0;
            int keySize = 0;
            int endPosition = 0;
            int valueSize = 0;
            boolean overflow = false;
            try {
                keySizePos = this._activeBuffer.position();
                this._activeBuffer.position(keySizePos + 4);
                key.write((DataOutput)outputStream);
                valueSizePos = this._activeBuffer.position();
                keySize = valueSizePos - keySizePos - 4;
                this._activeBuffer.position(keySizePos);
                this._activeBuffer.putInt(keySize);
                this._activeBuffer.position(valueSizePos + 4);
                value.write((DataOutput)outputStream);
                endPosition = this._activeBuffer.position();
                valueSize = endPosition - valueSizePos - 4;
                this._activeBuffer.position(valueSizePos);
                this._activeBuffer.putInt(valueSize);
                this._activeBuffer.position(endPosition);
                done = true;
            }
            catch (IllegalArgumentException e) {
                overflow = true;
            }
            catch (BufferOverflowException e) {
                overflow = true;
            }
            if (!overflow) continue;
            if (startPosition == 0) {
                throw new IOException("Key + Value Size too Big for SpillBuffer !");
            }
            this._activeBuffer.position(startPosition);
            this.flushActiveBuffer();
        }
    }

    @Override
    public void spillRawRecord(byte[] keyData, int keyOffset, int keyLength, byte[] valueData, int valueOffset, int valueLength) throws IOException {
        int startPosition = -1;
        boolean done = false;
        while (!done) {
            if (this._activeBuffer == null) {
                this.allocateActiveBuffer();
            }
            startPosition = this._activeBuffer.position();
            int keySizePos = 0;
            int valueSizePos = 0;
            int keySize = 0;
            int endPosition = 0;
            int valueSize = 0;
            boolean overflow = false;
            try {
                keySizePos = this._activeBuffer.position();
                this._activeBuffer.position(keySizePos + 4);
                this._activeBuffer.put(keyData, keyOffset, keyLength);
                valueSizePos = this._activeBuffer.position();
                keySize = valueSizePos - keySizePos - 4;
                this._activeBuffer.position(keySizePos);
                this._activeBuffer.putInt(keySize);
                this._activeBuffer.position(valueSizePos + 4);
                this._activeBuffer.put(valueData, valueOffset, valueLength);
                endPosition = this._activeBuffer.position();
                valueSize = endPosition - valueSizePos - 4;
                this._activeBuffer.position(valueSizePos);
                this._activeBuffer.putInt(valueSize);
                this._activeBuffer.position(endPosition);
                done = true;
            }
            catch (IllegalArgumentException e) {
                overflow = true;
            }
            catch (BufferOverflowException e) {
                overflow = true;
            }
            if (!overflow) continue;
            if (startPosition == 0) {
                throw new IOException("Key + Value Size too Big for SpillBuffer !");
            }
            this._activeBuffer.position(startPosition);
            this.flushActiveBuffer();
        }
    }

    private void allocateActiveBuffer() throws IOException {
        this._activeBuffer = ByteBuffer.allocate(this._spillBufferSize);
    }

    private void flushActiveBuffer() throws IOException {
        try {
            if (this._activeBuffer != null && this._activeBuffer.position() != 0) {
                this._activeBuffer.flip();
                this._bufferQueue.put(new QueuedBufferItem(this._activeBuffer));
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this._activeBuffer = null;
    }

    public void spillRawRecord2(byte[] keyData, int keyOffset, int keyLength, final byte[] valueData, final int valueOffset, final int valueLength) throws IOException {
        if (this._indexWriter != null) {
            this._indexWriter.indexItem(keyData, keyOffset, keyLength, valueData, valueOffset, valueLength, this.writer.getLength());
        }
        ++this._recordCount;
        this.writer.appendRaw(keyData, keyOffset, keyLength, new SequenceFile.ValueBytes(){

            public int getSize() {
                return valueLength;
            }

            public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
                throw new IOException("UnSupported Method");
            }

            public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
                outStream.write(valueData, valueOffset, valueLength);
            }
        });
    }

    private static class QueuedBufferItem {
        public ByteBuffer _buffer;

        QueuedBufferItem(ByteBuffer buffer) {
            this._buffer = buffer;
        }
    }
}

