/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ChunkChecksum;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;

class BlockSender
implements Closeable {
    static final Log LOG = DataNode.LOG;
    static final Log ClientTraceLog = DataNode.ClientTraceLog;
    private static final boolean is32Bit = System.getProperty("sun.arch.data.model").equals("32");
    private static final int MIN_BUFFER_WITH_TRANSFERTO = 65536;
    private static final int TRANSFERTO_BUFFER_SIZE = Math.max(HdfsConstants.IO_FILE_BUFFER_SIZE, 65536);
    private final ExtendedBlock block;
    private InputStream blockIn;
    private long blockInPosition = -1L;
    private DataInputStream checksumIn;
    private final DataChecksum checksum;
    private long initialOffset;
    private long offset;
    private final long endOffset;
    private final int chunkSize;
    private final int checksumSize;
    private final boolean corruptChecksumOk;
    private long seqno;
    private final boolean transferToAllowed;
    private boolean sentEntireByteRange;
    private final boolean verifyChecksum;
    private final String clientTraceFmt;
    private volatile ChunkChecksum lastChunkChecksum = null;
    private DataNode datanode;
    private FileDescriptor blockInFd;
    private FsVolumeReference volumeRef;
    private final Replica replica;
    private final long readaheadLength;
    private ReadaheadPool.ReadaheadRequest curReadahead;
    private final boolean alwaysReadahead;
    private final boolean dropCacheBehindLargeReads;
    private final boolean dropCacheBehindAllReads;
    private long lastCacheDropOffset;
    @VisibleForTesting
    static long CACHE_DROP_INTERVAL_BYTES = 0x100000L;
    private static final long LONG_READ_THRESHOLD_BYTES = 262144L;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    BlockSender(ExtendedBlock block, long startOffset, long length, boolean corruptChecksumOk, boolean verifyChecksum, boolean sendChecksum, DataNode datanode, String clientTraceFmt, CachingStrategy cachingStrategy) throws IOException {
        try {
            long checksumSkip;
            long end;
            int size;
            long replicaVisibleLength;
            this.block = block;
            this.corruptChecksumOk = corruptChecksumOk;
            this.verifyChecksum = verifyChecksum;
            this.clientTraceFmt = clientTraceFmt;
            if (cachingStrategy.getDropBehind() == null) {
                this.dropCacheBehindAllReads = false;
                this.dropCacheBehindLargeReads = datanode.getDnConf().dropCacheBehindReads;
            } else {
                this.dropCacheBehindAllReads = this.dropCacheBehindLargeReads = cachingStrategy.getDropBehind().booleanValue();
            }
            if (cachingStrategy.getReadahead() == null) {
                this.alwaysReadahead = false;
                this.readaheadLength = datanode.getDnConf().readaheadLength;
            } else {
                this.alwaysReadahead = true;
                this.readaheadLength = cachingStrategy.getReadahead();
            }
            this.datanode = datanode;
            if (verifyChecksum) {
                Preconditions.checkArgument((boolean)sendChecksum, (Object)"If verifying checksum, currently must also send it.");
            }
            ChunkChecksum chunkChecksum = null;
            FsDatasetSpi<? extends FsVolumeSpi> fsDatasetSpi = datanode.data;
            synchronized (fsDatasetSpi) {
                this.replica = BlockSender.getReplica(block, datanode);
                replicaVisibleLength = this.replica.getVisibleLength();
                if (this.replica instanceof FinalizedReplica) {
                    FinalizedReplica frep = (FinalizedReplica)this.replica;
                    chunkChecksum = frep.getLastChecksumAndDataLen();
                }
            }
            if (this.replica instanceof ReplicaBeingWritten) {
                ReplicaBeingWritten rbw = (ReplicaBeingWritten)this.replica;
                BlockSender.waitForMinLength(rbw, startOffset + length);
                chunkChecksum = rbw.getLastChecksumAndDataLen();
            }
            if (this.replica.getGenerationStamp() < block.getGenerationStamp()) {
                throw new IOException("Replica gen stamp < block genstamp, block=" + block + ", replica=" + this.replica);
            }
            if (this.replica.getGenerationStamp() > block.getGenerationStamp()) {
                if (DataNode.LOG.isDebugEnabled()) {
                    DataNode.LOG.debug((Object)("Bumping up the client provided block's genstamp to latest " + this.replica.getGenerationStamp() + " for block " + block));
                }
                block.setGenerationStamp(this.replica.getGenerationStamp());
            }
            if (replicaVisibleLength < 0L) {
                throw new IOException("Replica is not readable, block=" + block + ", replica=" + this.replica);
            }
            if (DataNode.LOG.isDebugEnabled()) {
                DataNode.LOG.debug((Object)("block=" + block + ", replica=" + this.replica));
            }
            this.transferToAllowed = datanode.getDnConf().transferToAllowed && (!is32Bit || length <= Integer.MAX_VALUE);
            this.volumeRef = datanode.data.getVolume(block).obtainReference();
            DataChecksum csum = null;
            if (verifyChecksum || sendChecksum) {
                LengthInputStream metaIn = null;
                boolean keepMetaInOpen = false;
                try {
                    metaIn = datanode.data.getMetaDataInputStream(block);
                    if (!corruptChecksumOk || metaIn != null) {
                        if (metaIn == null) {
                            throw new FileNotFoundException("Meta-data not found for " + block);
                        }
                        if (!this.replica.isOnTransientStorage() && metaIn.getLength() >= (long)BlockMetadataHeader.getHeaderSize()) {
                            this.checksumIn = new DataInputStream(new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
                            csum = BlockMetadataHeader.readDataChecksum(this.checksumIn, block);
                            keepMetaInOpen = true;
                        }
                    } else {
                        LOG.warn((Object)("Could not find metadata file for " + block));
                    }
                }
                finally {
                    if (!keepMetaInOpen) {
                        IOUtils.closeStream((Closeable)metaIn);
                    }
                }
            }
            if (csum == null) {
                csum = DataChecksum.newDataChecksum((DataChecksum.Type)DataChecksum.Type.NULL, (int)512);
            }
            if ((size = csum.getBytesPerChecksum()) > 0xA00000 && (long)size > replicaVisibleLength) {
                csum = DataChecksum.newDataChecksum((DataChecksum.Type)csum.getChecksumType(), (int)Math.max((int)replicaVisibleLength, 0xA00000));
                size = csum.getBytesPerChecksum();
            }
            this.chunkSize = size;
            this.checksum = csum;
            this.checksumSize = this.checksum.getChecksumSize();
            length = length < 0L ? replicaVisibleLength : length;
            long l = end = chunkChecksum != null ? chunkChecksum.getDataLength() : this.replica.getBytesOnDisk();
            if (startOffset < 0L || startOffset > end || length + startOffset > end) {
                String msg = " Offset " + startOffset + " and length " + length + " don't match block " + block + " ( blockLen " + end + " )";
                LOG.warn((Object)(datanode.getDNRegistrationForBP(block.getBlockPoolId()) + ":sendBlock() : " + msg));
                throw new IOException(msg);
            }
            this.offset = startOffset - startOffset % (long)this.chunkSize;
            if (length >= 0L) {
                long tmpLen = startOffset + length;
                if (tmpLen % (long)this.chunkSize != 0L) {
                    tmpLen += (long)this.chunkSize - tmpLen % (long)this.chunkSize;
                }
                if (tmpLen < end) {
                    end = tmpLen;
                } else if (chunkChecksum != null) {
                    this.lastChunkChecksum = chunkChecksum;
                }
            }
            this.endOffset = end;
            if (this.offset > 0L && this.checksumIn != null && (checksumSkip = this.offset / (long)this.chunkSize * (long)this.checksumSize) > 0L) {
                IOUtils.skipFully((InputStream)this.checksumIn, (long)checksumSkip);
            }
            this.seqno = 0L;
            if (DataNode.LOG.isDebugEnabled()) {
                DataNode.LOG.debug((Object)("replica=" + this.replica));
            }
            this.blockIn = datanode.data.getBlockInputStream(block, this.offset);
            this.blockInFd = this.blockIn instanceof FileInputStream ? ((FileInputStream)this.blockIn).getFD() : null;
        }
        catch (IOException ioe) {
            IOUtils.closeStream((Closeable)this);
            IOUtils.closeStream((Closeable)this.blockIn);
            throw ioe;
        }
    }

    @Override
    public void close() throws IOException {
        if (this.blockInFd != null && (this.dropCacheBehindAllReads || this.dropCacheBehindLargeReads && this.isLongRead())) {
            try {
                NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(this.block.getBlockName(), this.blockInFd, this.lastCacheDropOffset, this.offset - this.lastCacheDropOffset, 4);
            }
            catch (Exception e) {
                LOG.warn((Object)"Unable to drop cache on file close", (Throwable)e);
            }
        }
        if (this.curReadahead != null) {
            this.curReadahead.cancel();
        }
        IOException ioe = null;
        if (this.checksumIn != null) {
            try {
                this.checksumIn.close();
            }
            catch (IOException e) {
                ioe = e;
            }
            this.checksumIn = null;
        }
        if (this.blockIn != null) {
            try {
                this.blockIn.close();
            }
            catch (IOException e) {
                ioe = e;
            }
            this.blockIn = null;
            this.blockInFd = null;
        }
        if (this.volumeRef != null) {
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{this.volumeRef});
            this.volumeRef = null;
        }
        if (ioe != null) {
            throw ioe;
        }
    }

    private static Replica getReplica(ExtendedBlock block, DataNode datanode) throws ReplicaNotFoundException {
        Replica replica = datanode.data.getReplica(block.getBlockPoolId(), block.getBlockId());
        if (replica == null) {
            throw new ReplicaNotFoundException(block);
        }
        return replica;
    }

    private static void waitForMinLength(ReplicaBeingWritten rbw, long len) throws IOException {
        for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; ++i) {
            try {
                Thread.sleep(100L);
                continue;
            }
            catch (InterruptedException ie) {
                throw new IOException(ie);
            }
        }
        long bytesOnDisk = rbw.getBytesOnDisk();
        if (bytesOnDisk < len) {
            throw new IOException(String.format("Need %d bytes, but only %d bytes available", len, bytesOnDisk));
        }
    }

    private static IOException ioeToSocketException(IOException ioe) {
        if (ioe.getClass().equals(IOException.class)) {
            SocketException se = new SocketException("Original Exception : " + ioe);
            se.initCause(ioe);
            se.setStackTrace(ioe.getStackTrace());
            return se;
        }
        return ioe;
    }

    private int numberOfChunks(long datalen) {
        return (int)((datalen + (long)this.chunkSize - 1L) / (long)this.chunkSize);
    }

    private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, boolean transferTo, DataTransferThrottler throttler) throws IOException {
        int dataLen = (int)Math.min(this.endOffset - this.offset, (long)this.chunkSize * (long)maxChunks);
        int numChunks = this.numberOfChunks(dataLen);
        int checksumDataLen = numChunks * this.checksumSize;
        int packetLen = dataLen + checksumDataLen + 4;
        boolean lastDataPacket = this.offset + (long)dataLen == this.endOffset && dataLen > 0;
        int headerLen = this.writePacketHeader(pkt, dataLen, packetLen);
        int headerOff = pkt.position() - headerLen;
        int checksumOff = pkt.position();
        byte[] buf = pkt.array();
        if (this.checksumSize > 0 && this.checksumIn != null) {
            this.readChecksum(buf, checksumOff, checksumDataLen);
            if (lastDataPacket && this.lastChunkChecksum != null) {
                int start = checksumOff + checksumDataLen - this.checksumSize;
                byte[] updatedChecksum = this.lastChunkChecksum.getChecksum();
                if (updatedChecksum != null) {
                    System.arraycopy(updatedChecksum, 0, buf, start, this.checksumSize);
                }
            }
        }
        int dataOff = checksumOff + checksumDataLen;
        if (!transferTo) {
            IOUtils.readFully((InputStream)this.blockIn, (byte[])buf, (int)dataOff, (int)dataLen);
            if (this.verifyChecksum) {
                this.verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
            }
        }
        try {
            if (transferTo) {
                SocketOutputStream sockOut = (SocketOutputStream)out;
                sockOut.write(buf, headerOff, dataOff - headerOff);
                FileChannel fileCh = ((FileInputStream)this.blockIn).getChannel();
                LongWritable waitTime = new LongWritable();
                LongWritable transferTime = new LongWritable();
                sockOut.transferToFully(fileCh, this.blockInPosition, dataLen, waitTime, transferTime);
                this.datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
                this.datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
                this.blockInPosition += (long)dataLen;
            } else {
                out.write(buf, headerOff, dataOff + dataLen - headerOff);
            }
        }
        catch (IOException e) {
            String ioem;
            if (!(e instanceof SocketTimeoutException || (ioem = e.getMessage()).startsWith("Broken pipe") || ioem.startsWith("Connection reset"))) {
                LOG.error((Object)"BlockSender.sendChunks() exception: ", (Throwable)e);
                this.datanode.getBlockScanner().markSuspectBlock(this.volumeRef.getVolume().getStorageID(), this.block);
            }
            throw BlockSender.ioeToSocketException(e);
        }
        if (throttler != null) {
            throttler.throttle(packetLen);
        }
        return dataLen;
    }

    private void readChecksum(byte[] buf, int checksumOffset, int checksumLen) throws IOException {
        if (this.checksumSize <= 0 && this.checksumIn == null) {
            return;
        }
        try {
            this.checksumIn.readFully(buf, checksumOffset, checksumLen);
        }
        catch (IOException e) {
            LOG.warn((Object)(" Could not read or failed to veirfy checksum for data at offset " + this.offset + " for block " + this.block), (Throwable)e);
            IOUtils.closeStream((Closeable)this.checksumIn);
            this.checksumIn = null;
            if (this.corruptChecksumOk) {
                if (checksumOffset < checksumLen) {
                    Arrays.fill(buf, checksumOffset, checksumLen, (byte)0);
                }
            }
            throw e;
        }
    }

    public void verifyChecksum(byte[] buf, int dataOffset, int datalen, int numChunks, int checksumOffset) throws ChecksumException {
        int dOff = dataOffset;
        int cOff = checksumOffset;
        int dLeft = datalen;
        for (int i = 0; i < numChunks; ++i) {
            this.checksum.reset();
            int dLen = Math.min(dLeft, this.chunkSize);
            this.checksum.update(buf, dOff, dLen);
            if (!this.checksum.compare(buf, cOff)) {
                long failedPos = this.offset + (long)datalen - (long)dLeft;
                StringBuilder replicaInfoString = new StringBuilder();
                if (this.replica != null) {
                    replicaInfoString.append(" for replica: " + this.replica.toString());
                }
                throw new ChecksumException("Checksum failed at " + failedPos + replicaInfoString, failedPos);
            }
            dLeft -= dLen;
            dOff += dLen;
            cOff += this.checksumSize;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    long sendBlock(DataOutputStream out, OutputStream baseStream, DataTransferThrottler throttler) throws IOException {
        try (TraceScope scope = Trace.startSpan((String)("sendBlock_" + this.block.getBlockId()), (Sampler)Sampler.NEVER);){
            long l = this.doSendBlock(out, baseStream, throttler);
            return l;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long doSendBlock(DataOutputStream out, OutputStream baseStream, DataTransferThrottler throttler) throws IOException {
        long startTime;
        long totalRead;
        block11: {
            if (out == null) {
                throw new IOException("out stream is null");
            }
            this.initialOffset = this.offset;
            totalRead = 0L;
            OutputStream streamForSendChunks = out;
            this.lastCacheDropOffset = this.initialOffset;
            if (this.isLongRead() && this.blockInFd != null) {
                NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(this.block.getBlockName(), this.blockInFd, 0L, 0L, 2);
            }
            this.manageOsCache();
            startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0L;
            try {
                int maxChunksPerPacket;
                boolean transferTo;
                int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
                boolean bl = transferTo = this.transferToAllowed && !this.verifyChecksum && baseStream instanceof SocketOutputStream && this.blockIn instanceof FileInputStream;
                if (transferTo) {
                    FileChannel fileChannel = ((FileInputStream)this.blockIn).getChannel();
                    this.blockInPosition = fileChannel.position();
                    streamForSendChunks = baseStream;
                    maxChunksPerPacket = this.numberOfChunks(TRANSFERTO_BUFFER_SIZE);
                    pktBufSize += this.checksumSize * maxChunksPerPacket;
                } else {
                    maxChunksPerPacket = Math.max(1, this.numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
                    pktBufSize += (this.chunkSize + this.checksumSize) * maxChunksPerPacket;
                }
                ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
                while (this.endOffset > this.offset && !Thread.currentThread().isInterrupted()) {
                    this.manageOsCache();
                    long len = this.sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler);
                    this.offset += len;
                    totalRead += len + (long)(this.numberOfChunks(len) * this.checksumSize);
                    ++this.seqno;
                }
                if (Thread.currentThread().isInterrupted()) break block11;
                try {
                    this.sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler);
                    out.flush();
                }
                catch (IOException e) {
                    throw BlockSender.ioeToSocketException(e);
                }
                this.sentEntireByteRange = true;
            }
            catch (Throwable throwable) {
                if (this.clientTraceFmt != null && ClientTraceLog.isDebugEnabled()) {
                    long endTime = System.nanoTime();
                    ClientTraceLog.debug((Object)String.format(this.clientTraceFmt, totalRead, this.initialOffset, endTime - startTime));
                }
                this.close();
                throw throwable;
            }
        }
        if (this.clientTraceFmt != null && ClientTraceLog.isDebugEnabled()) {
            long endTime = System.nanoTime();
            ClientTraceLog.debug((Object)String.format(this.clientTraceFmt, totalRead, this.initialOffset, endTime - startTime));
        }
        this.close();
        return totalRead;
    }

    private void manageOsCache() throws IOException {
        long nextCacheDropOffset;
        if (this.blockInFd == null) {
            return;
        }
        if (this.readaheadLength > 0L && this.datanode.readaheadPool != null && (this.alwaysReadahead || this.isLongRead())) {
            this.curReadahead = this.datanode.readaheadPool.readaheadStream(this.clientTraceFmt, this.blockInFd, this.offset, this.readaheadLength, Long.MAX_VALUE, this.curReadahead);
        }
        if ((this.dropCacheBehindAllReads || this.dropCacheBehindLargeReads && this.isLongRead()) && this.offset >= (nextCacheDropOffset = this.lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES)) {
            long dropLength = this.offset - this.lastCacheDropOffset;
            NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(this.block.getBlockName(), this.blockInFd, this.lastCacheDropOffset, dropLength, 4);
            this.lastCacheDropOffset = this.offset;
        }
    }

    private boolean isLongRead() {
        return this.endOffset - this.initialOffset > 262144L;
    }

    private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
        pkt.clear();
        PacketHeader header = new PacketHeader(packetLen, this.offset, this.seqno, dataLen == 0, dataLen, false);
        int size = header.getSerializedSize();
        pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size);
        header.putInBuffer(pkt);
        return size;
    }

    boolean didSendEntireByteRange() {
        return this.sentEntireByteRange;
    }

    DataChecksum getChecksum() {
        return this.checksum;
    }

    long getOffset() {
        return this.offset;
    }
}

