Java 类org.apache.hadoop.fs.Seekable 实例源码

项目:hadoop-oss    文件:CryptoInputStream.java   
/** Seek to a position. */
@Override
public void seek(long pos) throws IOException {
  Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
  checkStream();
  try {
    /*
     * If data of target pos in the underlying stream has already been read 
     * and decrypted in outBuffer, we just need to re-position outBuffer.
     */
    if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
      int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
      if (forward > 0) {
        outBuffer.position(outBuffer.position() + forward);
      }
    } else {
      ((Seekable) in).seek(pos);
      resetStreamOffset(pos);
    }
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "seek.");
  }
}
项目:hadoop-oss    文件:CryptoStreamsTestBase.java   
/** Test get position. */
@Test(timeout=120000)
public void testGetPos() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);

  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());

  int n2 = readAll(in, result, n1, dataLen - n1);
  Assert.assertEquals(n1 + n2, ((Seekable) in).getPos());
  in.close();
}
项目:hadoop    文件:CryptoInputStream.java   
/** Seek to a position. */
@Override
public void seek(long pos) throws IOException {
  Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
  checkStream();
  try {
    /*
     * If data of target pos in the underlying stream has already been read 
     * and decrypted in outBuffer, we just need to re-position outBuffer.
     */
    if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
      int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
      if (forward > 0) {
        outBuffer.position(outBuffer.position() + forward);
      }
    } else {
      ((Seekable) in).seek(pos);
      resetStreamOffset(pos);
    }
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "seek.");
  }
}
项目:hadoop    文件:CryptoStreamsTestBase.java   
/** Test get position. */
@Test(timeout=120000)
public void testGetPos() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);

  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());

  int n2 = readAll(in, result, n1, dataLen - n1);
  Assert.assertEquals(n1 + n2, ((Seekable) in).getPos());
  in.close();
}
项目:dremio-oss    文件:TestPDFSProtocol.java   
@Test
public void testOnMessageSuccessful() throws IOException {
  InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class));
  doReturn(42).when(mis).read(any(byte[].class), anyInt(), anyInt());

  FSDataInputStream fdis = new FSDataInputStream(mis);
  Response response = getResponse(7L, 4096, fdis);

  InOrder inOrder = Mockito.inOrder(mis);

  inOrder.verify((Seekable) mis).seek(7);
  inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt());

  assertEquals(42, ((DFS.GetFileDataResponse) response.pBody).getRead());
  assertEquals(42, response.dBodies[0].readableBytes());
}
项目:dremio-oss    文件:TestPDFSProtocol.java   
@Test
public void testOnMessageEOF() throws IOException {
  InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class));
  doReturn(-1).when(mis).read(any(byte[].class), anyInt(), anyInt());

  FSDataInputStream fdis = new FSDataInputStream(mis);
  Response response = getResponse(7L, 4096, fdis);

  InOrder inOrder = Mockito.inOrder(mis);

  inOrder.verify((Seekable) mis).seek(7);
  inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt());

  assertEquals(-1, ((DFS.GetFileDataResponse) response.pBody).getRead());
  assertEquals(0, response.dBodies.length);
}
项目:aliyun-oss-hadoop-fs    文件:CryptoInputStream.java   
/** Seek to a position. */
@Override
public void seek(long pos) throws IOException {
  Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
  checkStream();
  try {
    /*
     * If data of target pos in the underlying stream has already been read 
     * and decrypted in outBuffer, we just need to re-position outBuffer.
     */
    if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
      int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
      if (forward > 0) {
        outBuffer.position(outBuffer.position() + forward);
      }
    } else {
      ((Seekable) in).seek(pos);
      resetStreamOffset(pos);
    }
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "seek.");
  }
}
项目:aliyun-oss-hadoop-fs    文件:CryptoStreamsTestBase.java   
/** Test get position. */
@Test(timeout=120000)
public void testGetPos() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);

  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());

  int n2 = readAll(in, result, n1, dataLen - n1);
  Assert.assertEquals(n1 + n2, ((Seekable) in).getPos());
  in.close();
}
项目:big-c    文件:CryptoInputStream.java   
/** Seek to a position. */
@Override
public void seek(long pos) throws IOException {
  Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
  checkStream();
  try {
    /*
     * If data of target pos in the underlying stream has already been read 
     * and decrypted in outBuffer, we just need to re-position outBuffer.
     */
    if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
      int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
      if (forward > 0) {
        outBuffer.position(outBuffer.position() + forward);
      }
    } else {
      ((Seekable) in).seek(pos);
      resetStreamOffset(pos);
    }
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "seek.");
  }
}
项目:big-c    文件:CryptoStreamsTestBase.java   
/** Test get position. */
@Test(timeout=120000)
public void testGetPos() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);

  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());

  int n2 = readAll(in, result, n1, dataLen - n1);
  Assert.assertEquals(n1 + n2, ((Seekable) in).getPos());
  in.close();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:CryptoInputStream.java   
/** Seek to a position. */
@Override
public void seek(long pos) throws IOException {
  Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
  checkStream();
  try {
    /*
     * If data of target pos in the underlying stream has already been read 
     * and decrypted in outBuffer, we just need to re-position outBuffer.
     */
    if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
      int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
      if (forward > 0) {
        outBuffer.position(outBuffer.position() + forward);
      }
    } else {
      ((Seekable) in).seek(pos);
      resetStreamOffset(pos);
    }
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "seek.");
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:CryptoStreamsTestBase.java   
/** Test get position. */
@Test(timeout=120000)
public void testGetPos() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);

  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());

  int n2 = readAll(in, result, n1, dataLen - n1);
  Assert.assertEquals(n1 + n2, ((Seekable) in).getPos());
  in.close();
}
项目:hops    文件:CryptoInputStream.java   
/** Seek to a position. */
@Override
public void seek(long pos) throws IOException {
  if (pos < 0) {
    throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
  }
  checkStream();
  try {
    /*
     * If data of target pos in the underlying stream has already been read 
     * and decrypted in outBuffer, we just need to re-position outBuffer.
     */
    if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
      int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
      if (forward > 0) {
        outBuffer.position(outBuffer.position() + forward);
      }
    } else {
      ((Seekable) in).seek(pos);
      resetStreamOffset(pos);
    }
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "seek.");
  }
}
项目:hops    文件:CryptoStreamsTestBase.java   
/** Test get position. */
@Test(timeout=120000)
public void testGetPos() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);

  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());

  int n2 = readAll(in, result, n1, dataLen - n1);
  Assert.assertEquals(n1 + n2, ((Seekable) in).getPos());
  in.close();
}
项目:Hive-XML-SerDe    文件:XmlInputFormat.java   
@Override
public boolean next(LongWritable key, Text value) throws IOException {
    if (this.pos < this.end) {
        if (readUntilMatch(this.startTag, false)) {
            this.recordStartPos = this.pos - this.startTag.length;
            try {
                this.buffer.write(this.startTag);
                if (readUntilMatch(this.endTag, true)) {
                    key.set(this.recordStartPos);
                    value.set(this.buffer.getData(), 0, this.buffer.getLength());
                    return true;
                }
            } finally {
                if (this.fsin instanceof Seekable) {
                    if (this.pos != ((Seekable) this.fsin).getPos()) {
                        throw new RuntimeException("bytes consumed error!");
                    }
                }
                this.buffer.reset();
            }
        }
    }
    return false;
}
项目:hadoop-oss    文件:CryptoStreamUtils.java   
/**
 * If input stream is {@link org.apache.hadoop.fs.Seekable}, return it's
 * current position, otherwise return 0;
 */
public static long getInputStreamOffset(InputStream in) throws IOException {
  if (in instanceof Seekable) {
    return ((Seekable) in).getPos();
  }
  return 0;
}
项目:hadoop-oss    文件:CryptoInputStream.java   
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
  Preconditions.checkArgument(targetPos >= 0, 
      "Cannot seek to negative offset.");
  checkStream();
  try {
    boolean result = ((Seekable) in).seekToNewSource(targetPos);
    resetStreamOffset(targetPos);
    return result;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "seekToNewSource.");
  }
}
项目:hadoop-oss    文件:CryptoInputStream.java   
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
    EnumSet<ReadOption> opts) throws IOException,
    UnsupportedOperationException {
  checkStream();
  try {
    if (outBuffer.remaining() > 0) {
      // Have some decrypted data unread, need to reset.
      ((Seekable) in).seek(getPos());
      resetStreamOffset(getPos());
    }
    final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
        read(bufferPool, maxLength, opts);
    if (buffer != null) {
      final int n = buffer.remaining();
      if (n > 0) {
        streamOffset += buffer.remaining(); // Read n bytes
        final int pos = buffer.position();
        decrypt(buffer, n, pos);
      }
    }
    return buffer;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " + 
        "enhanced byte buffer access.");
  }
}
项目:hadoop-oss    文件:CompressionInputStream.java   
/**
 * This method returns the current position in the stream.
 *
 * @return Current position in stream as a long
 */
@Override
public long getPos() throws IOException {
  if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
    //This way of getting the current position will not work for file
    //size which can be fit in an int and hence can not be returned by
    //available method.
    return (this.maxAvailableData - this.in.available());
  }
  else{
    return ((Seekable)this.in).getPos();
  }

}
项目:hadoop-oss    文件:CryptoStreamsTestBase.java   
private void seekCheck(InputStream in, int pos) throws Exception {
  byte[] result = new byte[dataLen];
  ((Seekable) in).seek(pos);
  int n = readAll(in, result, 0, dataLen);

  Assert.assertEquals(dataLen, n + pos);
  byte[] readData = new byte[n];
  System.arraycopy(result, 0, readData, 0, n);
  byte[] expectedData = new byte[n];
  System.arraycopy(data, pos, expectedData, 0, n);
  Assert.assertArrayEquals(readData, expectedData);
}
项目:hadoop-oss    文件:CryptoStreamsTestBase.java   
/** Test skip. */
@Test(timeout=120000)
public void testSkip() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);

  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());

  long skipped = in.skip(dataLen / 3);
  int n2 = readAll(in, result, 0, dataLen);

  Assert.assertEquals(dataLen, n1 + skipped + n2);
  byte[] readData = new byte[n2];
  System.arraycopy(result, 0, readData, 0, n2);
  byte[] expectedData = new byte[n2];
  System.arraycopy(data, dataLen - n2, expectedData, 0, n2);
  Assert.assertArrayEquals(readData, expectedData);

  try {
    skipped = in.skip(-3);
    Assert.fail("Skip Negative length should fail.");
  } catch (IllegalArgumentException e) {
    GenericTestUtils.assertExceptionContains("Negative skip length", e);
  }

  // Skip after EOF
  skipped = in.skip(3);
  Assert.assertEquals(skipped, 0);

  in.close();
}
项目:hadoop-oss    文件:CryptoStreamsTestBase.java   
private void seekToNewSourceCheck(InputStream in, int targetPos) 
    throws Exception {
  byte[] result = new byte[dataLen];
  ((Seekable) in).seekToNewSource(targetPos);
  int n = readAll(in, result, 0, dataLen);

  Assert.assertEquals(dataLen, n + targetPos);
  byte[] readData = new byte[n];
  System.arraycopy(result, 0, readData, 0, n);
  byte[] expectedData = new byte[n];
  System.arraycopy(data, targetPos, expectedData, 0, n);
  Assert.assertArrayEquals(readData, expectedData);
}
项目:QDrill    文件:TextInput.java   
/**
 * Creates a new instance with the mandatory characters for handling newlines transparently.
 * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
 * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
 */
public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
  byte[] lineSeparator = settings.getNewLineDelimiter();
  byte normalizedLineSeparator = settings.getNormalizedNewLine();
  Preconditions.checkArgument(lineSeparator != null && (lineSeparator.length == 1 || lineSeparator.length == 2), "Invalid line separator. Expected 1 to 2 characters");
  Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
  boolean isCompressed = input instanceof CompressionInputStream ;
  Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");

  // splits aren't allowed with compressed data.  The split length will be the compressed size which means we'll normally end prematurely.
  if(isCompressed && endPos > 0){
    endPos = Long.MAX_VALUE;
  }

  this.input = input;
  this.seekable = (Seekable) input;
  this.settings = settings;

  if(input instanceof FSDataInputStream){
    this.inputFS = (FSDataInputStream) input;
    this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
  }else{
    this.inputFS = null;
    this.bufferReadable = false;
  }

  this.startPos = startPos;
  this.endPos = endPos;

  this.lineSeparator1 = lineSeparator[0];
  this.lineSeparator2 = lineSeparator.length == 2 ? lineSeparator[1] : NULL_BYTE;
  this.normalizedLineSeparator = normalizedLineSeparator;

  this.buffer = readBuffer;
  this.bStart = buffer.memoryAddress();
  this.bStartMinus1 = bStart -1;
  this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
}
项目:hadoop    文件:CryptoStreamUtils.java   
/**
 * If input stream is {@link org.apache.hadoop.fs.Seekable}, return it's
 * current position, otherwise return 0;
 */
public static long getInputStreamOffset(InputStream in) throws IOException {
  if (in instanceof Seekable) {
    return ((Seekable) in).getPos();
  }
  return 0;
}
项目:hadoop    文件:CryptoInputStream.java   
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
  Preconditions.checkArgument(targetPos >= 0, 
      "Cannot seek to negative offset.");
  checkStream();
  try {
    boolean result = ((Seekable) in).seekToNewSource(targetPos);
    resetStreamOffset(targetPos);
    return result;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "seekToNewSource.");
  }
}
项目:hadoop    文件:CryptoInputStream.java   
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
    EnumSet<ReadOption> opts) throws IOException,
    UnsupportedOperationException {
  checkStream();
  try {
    if (outBuffer.remaining() > 0) {
      // Have some decrypted data unread, need to reset.
      ((Seekable) in).seek(getPos());
      resetStreamOffset(getPos());
    }
    final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
        read(bufferPool, maxLength, opts);
    if (buffer != null) {
      final int n = buffer.remaining();
      if (n > 0) {
        streamOffset += buffer.remaining(); // Read n bytes
        final int pos = buffer.position();
        decrypt(buffer, n, pos);
      }
    }
    return buffer;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " + 
        "enhanced byte buffer access.");
  }
}
项目:hadoop    文件:CompressionInputStream.java   
/**
 * This method returns the current position in the stream.
 *
 * @return Current position in stream as a long
 */
@Override
public long getPos() throws IOException {
  if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
    //This way of getting the current position will not work for file
    //size which can be fit in an int and hence can not be returned by
    //available method.
    return (this.maxAvailableData - this.in.available());
  }
  else{
    return ((Seekable)this.in).getPos();
  }

}
项目:hadoop    文件:CryptoStreamsTestBase.java   
private void seekCheck(InputStream in, int pos) throws Exception {
  byte[] result = new byte[dataLen];
  ((Seekable) in).seek(pos);
  int n = readAll(in, result, 0, dataLen);

  Assert.assertEquals(dataLen, n + pos);
  byte[] readData = new byte[n];
  System.arraycopy(result, 0, readData, 0, n);
  byte[] expectedData = new byte[n];
  System.arraycopy(data, pos, expectedData, 0, n);
  Assert.assertArrayEquals(readData, expectedData);
}
项目:hadoop    文件:CryptoStreamsTestBase.java   
/** Test skip. */
@Test(timeout=120000)
public void testSkip() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);

  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());

  long skipped = in.skip(dataLen / 3);
  int n2 = readAll(in, result, 0, dataLen);

  Assert.assertEquals(dataLen, n1 + skipped + n2);
  byte[] readData = new byte[n2];
  System.arraycopy(result, 0, readData, 0, n2);
  byte[] expectedData = new byte[n2];
  System.arraycopy(data, dataLen - n2, expectedData, 0, n2);
  Assert.assertArrayEquals(readData, expectedData);

  try {
    skipped = in.skip(-3);
    Assert.fail("Skip Negative length should fail.");
  } catch (IllegalArgumentException e) {
    GenericTestUtils.assertExceptionContains("Negative skip length", e);
  }

  // Skip after EOF
  skipped = in.skip(3);
  Assert.assertEquals(skipped, 0);

  in.close();
}
项目:hadoop    文件:CryptoStreamsTestBase.java   
private void seekToNewSourceCheck(InputStream in, int targetPos) 
    throws Exception {
  byte[] result = new byte[dataLen];
  ((Seekable) in).seekToNewSource(targetPos);
  int n = readAll(in, result, 0, dataLen);

  Assert.assertEquals(dataLen, n + targetPos);
  byte[] readData = new byte[n];
  System.arraycopy(result, 0, readData, 0, n);
  byte[] expectedData = new byte[n];
  System.arraycopy(data, targetPos, expectedData, 0, n);
  Assert.assertArrayEquals(readData, expectedData);
}
项目:dremio-oss    文件:TextInput.java   
/**
 * Creates a new instance with the mandatory characters for handling newlines transparently.
 * lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
 * normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
 */
public TextInput(TextParsingSettings settings, InputStream input, ArrowBuf readBuffer, long startPos, long endPos) {
  this.lineSeparator = settings.getNewLineDelimiter();
  byte normalizedLineSeparator = settings.getNormalizedNewLine();
  Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
  boolean isCompressed = input instanceof CompressionInputStream ;
  Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");

  // splits aren't allowed with compressed data.  The split length will be the compressed size which means we'll normally end prematurely.
  if(isCompressed && endPos > 0){
    endPos = Long.MAX_VALUE;
  }

  this.input = input;
  this.seekable = (Seekable) input;
  this.settings = settings;

  if(input instanceof FSDataInputStream){
    this.inputFS = (FSDataInputStream) input;
    this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
  }else{
    this.inputFS = null;
    this.bufferReadable = false;
  }

  this.startPos = startPos;
  this.endPos = endPos;

  this.normalizedLineSeparator = normalizedLineSeparator;

  this.buffer = readBuffer;
  this.bStart = buffer.memoryAddress();
  this.bStartMinus1 = bStart -1;
  this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
}
项目:aliyun-oss-hadoop-fs    文件:CryptoStreamUtils.java   
/**
 * If input stream is {@link org.apache.hadoop.fs.Seekable}, return it's
 * current position, otherwise return 0;
 */
public static long getInputStreamOffset(InputStream in) throws IOException {
  if (in instanceof Seekable) {
    return ((Seekable) in).getPos();
  }
  return 0;
}
项目:aliyun-oss-hadoop-fs    文件:CryptoInputStream.java   
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
  Preconditions.checkArgument(targetPos >= 0, 
      "Cannot seek to negative offset.");
  checkStream();
  try {
    boolean result = ((Seekable) in).seekToNewSource(targetPos);
    resetStreamOffset(targetPos);
    return result;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "seekToNewSource.");
  }
}
项目:aliyun-oss-hadoop-fs    文件:CryptoInputStream.java   
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
    EnumSet<ReadOption> opts) throws IOException,
    UnsupportedOperationException {
  checkStream();
  try {
    if (outBuffer.remaining() > 0) {
      // Have some decrypted data unread, need to reset.
      ((Seekable) in).seek(getPos());
      resetStreamOffset(getPos());
    }
    final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
        read(bufferPool, maxLength, opts);
    if (buffer != null) {
      final int n = buffer.remaining();
      if (n > 0) {
        streamOffset += buffer.remaining(); // Read n bytes
        final int pos = buffer.position();
        decrypt(buffer, n, pos);
      }
    }
    return buffer;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " + 
        "enhanced byte buffer access.");
  }
}
项目:aliyun-oss-hadoop-fs    文件:CompressionInputStream.java   
/**
 * This method returns the current position in the stream.
 *
 * @return Current position in stream as a long
 */
@Override
public long getPos() throws IOException {
  if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
    //This way of getting the current position will not work for file
    //size which can be fit in an int and hence can not be returned by
    //available method.
    return (this.maxAvailableData - this.in.available());
  }
  else{
    return ((Seekable)this.in).getPos();
  }

}
项目:aliyun-oss-hadoop-fs    文件:CryptoStreamsTestBase.java   
private void seekCheck(InputStream in, int pos) throws Exception {
  byte[] result = new byte[dataLen];
  ((Seekable) in).seek(pos);
  int n = readAll(in, result, 0, dataLen);

  Assert.assertEquals(dataLen, n + pos);
  byte[] readData = new byte[n];
  System.arraycopy(result, 0, readData, 0, n);
  byte[] expectedData = new byte[n];
  System.arraycopy(data, pos, expectedData, 0, n);
  Assert.assertArrayEquals(readData, expectedData);
}
项目:aliyun-oss-hadoop-fs    文件:CryptoStreamsTestBase.java   
/** Test skip. */
@Test(timeout=120000)
public void testSkip() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);

  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());

  long skipped = in.skip(dataLen / 3);
  int n2 = readAll(in, result, 0, dataLen);

  Assert.assertEquals(dataLen, n1 + skipped + n2);
  byte[] readData = new byte[n2];
  System.arraycopy(result, 0, readData, 0, n2);
  byte[] expectedData = new byte[n2];
  System.arraycopy(data, dataLen - n2, expectedData, 0, n2);
  Assert.assertArrayEquals(readData, expectedData);

  try {
    skipped = in.skip(-3);
    Assert.fail("Skip Negative length should fail.");
  } catch (IllegalArgumentException e) {
    GenericTestUtils.assertExceptionContains("Negative skip length", e);
  }

  // Skip after EOF
  skipped = in.skip(3);
  Assert.assertEquals(skipped, 0);

  in.close();
}
项目:aliyun-oss-hadoop-fs    文件:CryptoStreamsTestBase.java   
private void seekToNewSourceCheck(InputStream in, int targetPos) 
    throws Exception {
  byte[] result = new byte[dataLen];
  ((Seekable) in).seekToNewSource(targetPos);
  int n = readAll(in, result, 0, dataLen);

  Assert.assertEquals(dataLen, n + targetPos);
  byte[] readData = new byte[n];
  System.arraycopy(result, 0, readData, 0, n);
  byte[] expectedData = new byte[n];
  System.arraycopy(data, targetPos, expectedData, 0, n);
  Assert.assertArrayEquals(readData, expectedData);
}
项目:big-c    文件:CryptoStreamUtils.java   
/**
 * If input stream is {@link org.apache.hadoop.fs.Seekable}, return it's
 * current position, otherwise return 0;
 */
public static long getInputStreamOffset(InputStream in) throws IOException {
  if (in instanceof Seekable) {
    return ((Seekable) in).getPos();
  }
  return 0;
}
项目:big-c    文件:CryptoInputStream.java   
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
  Preconditions.checkArgument(targetPos >= 0, 
      "Cannot seek to negative offset.");
  checkStream();
  try {
    boolean result = ((Seekable) in).seekToNewSource(targetPos);
    resetStreamOffset(targetPos);
    return result;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "seekToNewSource.");
  }
}