/** Read a compressed buffer */ private synchronized void readBuffer(DataInputBuffer buffer, CompressionInputStream filter) throws IOException { // Read data into a temporary buffer DataOutputBuffer dataBuffer = new DataOutputBuffer(); try { int dataBufferLength = WritableUtils.readVInt(in); dataBuffer.write(in, dataBufferLength); // Set up 'buffer' connected to the input-stream buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); } finally { dataBuffer.close(); } // Reset the codec filter.resetState(); }
@Override public synchronized InputStream createDecompressionStream( InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException( "LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?"); } InputStream bis1 = null; if (downStreamBufferSize > 0) { bis1 = new BufferedInputStream(downStream, downStreamBufferSize); } else { bis1 = downStream; } conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024); CompressionInputStream cis = codec.createInputStream(bis1, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; }
/** * * @param source * @param dest * @param codec * @param compressor * may be null * @param decomp * may be null * @param mark * @return * @throws IOException */ public static final CompressionOutputStream copy(File source, File dest, CompressionCodec codec, Compressor compressor, Decompressor decomp, long mark) throws IOException { FileInputStream fileInput = new FileInputStream(source); CompressionInputStream in = (decomp == null) ? codec .createInputStream(fileInput) : codec.createInputStream( fileInput, decomp); FileOutputStream fileOut = new FileOutputStream(dest); CompressionOutputStream out = (compressor == null) ? codec .createOutputStream(fileOut) : codec.createOutputStream( fileOut, compressor); try { copy(in, out, mark); return out; } finally { IOUtils.closeQuietly(in); IOUtils.closeQuietly(fileInput); } }
/** * Implmements the copy algorithm using a 4k buffer. * * @param in * @param out * @param mark * @throws IOException */ private final static void copy(CompressionInputStream in, CompressionOutputStream out, long mark) throws IOException { int size = Math.min(4096, (int) mark); byte[] buff = new byte[size]; int len = 0; int diff = (int) mark; long count = 0; do { len = in.read(buff, 0, Math.min(diff, size)); out.write(buff, 0, len); count += len; diff = (int) (mark - count); } while (diff > 0); }
public static BufferedReader getBufferedReader(File file, MapredContext context) throws IOException { URI fileuri = file.toURI(); Path path = new Path(fileuri); Configuration conf = context.getJobConf(); CompressionCodecFactory ccf = new CompressionCodecFactory(conf); CompressionCodec codec = ccf.getCodec(path); if (codec == null) { return new BufferedReader(new FileReader(file)); } else { Decompressor decompressor = CodecPool.getDecompressor(codec); FileInputStream fis = new FileInputStream(file); CompressionInputStream cis = codec.createInputStream(fis, decompressor); BufferedReader br = new BufferedReaderExt(new InputStreamReader(cis), decompressor); return br; } }
@Override public synchronized InputStream createDecompressionStream( InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException( "LZMA codec cannot be loaded. " + "You may want to check LD_LIBRARY_PATH."); } InputStream bis1 = null; if (downStreamBufferSize > 0) { bis1 = new BufferedInputStream(downStream, downStreamBufferSize); } else { bis1 = downStream; } conf.setInt("io.compression.codec.lzma.buffersize", 64 * 1024); CompressionInputStream cis = codec.createInputStream(bis1, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; }
@Override public synchronized InputStream createDecompressionStream( InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException( "LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?"); } InputStream bis1 = null; if (downStreamBufferSize > 0) { bis1 = new BufferedInputStream(downStream, downStreamBufferSize); } else { bis1 = downStream; } conf.setInt(IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); CompressionInputStream cis = codec.createInputStream(bis1, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; }
@Override public synchronized InputStream createDecompressionStream( InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException( "LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?"); } InputStream bis1 = null; if (downStreamBufferSize > 0) { bis1 = new BufferedInputStream(downStream, downStreamBufferSize); } else { bis1 = downStream; } CompressionInputStream cis = codec.createInputStream(bis1, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; }
@Test public void testCompressAndDecompressConsistent() throws Exception { final String testString = "Test String"; final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final OutputStreamWriter writer = new OutputStreamWriter(subject.createOutputStream(baos)); writer.write(testString); writer.flush(); writer.close(); final CompressionInputStream inputStream = subject.createInputStream(new ByteArrayInputStream(baos .toByteArray())); final StringWriter contentsTester = new StringWriter(); IOUtils.copy(inputStream, contentsTester); inputStream.close(); contentsTester.flush(); contentsTester.close(); Assert.assertEquals(testString, contentsTester.toString()); }
@Test public void testInternalErrorTranslation() throws Exception { String codecErrorMsg = "codec failure"; CompressionInputStream mockCodecStream = mock(CompressionInputStream.class); when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt())) .thenThrow(new InternalError(codecErrorMsg)); Decompressor mockDecoder = mock(Decompressor.class); CompressionCodec mockCodec = mock(CompressionCodec.class); when(mockCodec.createDecompressor()).thenReturn(mockDecoder); when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class))) .thenReturn(mockCodecStream); byte[] header = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 1}; try { ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header), 1024, 128, mockCodec, false, 0, mock(Logger.class), null); Assert.fail("shuffle was supposed to throw!"); } catch (IOException e) { Assert.assertTrue(e.getCause() instanceof InternalError); Assert.assertTrue(e.getMessage().contains(codecErrorMsg)); } }