2013-05-13 14:44:44.0|分类: hadoop|浏览量: 1220
编码器和解码器用以执行压缩解压算法。在Hadoop里,编码/解码器是通过一个压缩解码器接口实现的。 Hadoop可用的编码/解码器。
如果我们压缩的文件有相应压缩格式的扩展名(比如lzo,gz,bzip2等),hadoop就会根据扩展名去选择解码器解压。 hadoop对每个压缩格式的支持,详细见下表:
gzip是一个通用的压缩工具,在空间/时间权衡中,居于其他两种压缩方法之间。bzip2更高效,但是更慢。LZO优化压缩速度,但是压缩效率稍逊一筹。 在hadoop中可以使用CompressionCodec对数据流进行压缩和解压缩。如果要对写入输出流的数据进行压缩,可用createOutputStream(OutputStream out)方法在在底层的数据流中对需要以压缩格式写入在此之前尚未压缩的数据建立一个CompressionOutputStream对象,相反,对输入数据流读取数据进行解压缩时,调用createInputStream(InputStream in)获取CompressionInputStream。 简单的例子: public class StreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class<?> codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf); CompressionOutputStream out = codec.createOutputStream(System.out); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } }运行命令: echo "tianbaoxing" | hadoop com.hadoop.ch4.StreamCompressor org.apache.hadoop.io.compress.GzipCodec CompressionCodecFactory方法判断CompressionCodec CompressionCodecFactory提供了getCodec()方法,从而将文件扩展名映射到相应的CompressionCodec。此方法接受一个Path对象。下面的例子显示了一个应用程序,此程序便使用这个功能来解压缩文件。 import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; // vv FileDecompressor public class FileDecompressor { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if (codec == null) { System.err.println("No codec found for " + uri); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try { in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } }编码/解码器一旦找到,就会被用来去掉文件名后缀生成输出文件名(通过CompressionCodecFactory的静态方法removeSuffix()来实现)。这样,如下调用程序便把一个名为file.gz的文件解压缩为file文件: hadoop FileDecompressor file.gzCompressionCodecFactory从io.compression.codecs配置属性定义的列表中找到编码/解码器。默认情况下,这个列表列出了Hadoop提供的所有编码/解码器(见表4-3),如果你有一个希望要注册的编码/解码器(如外部托管的LZO编码/解码器)你可以改变这个列表。每个编码/解码器知道它的默认文件扩展名,从而使CompressionCodecFactory可以通过搜索这个列表来找到一个给定的扩展名相匹配的编码/解码器(如果有的话)。
本地库 考虑到性能,最好使用一个本地库(native library)来压缩和解压。例如,在一个测试中,使用本地gzip压缩库减少了解压时间50%,压缩时间大约减少了10%(与内置的Java实现相比较)。表4-4展示了Java和本地提供的每个压缩格式的实现。井不是所有的格式都有本地实现(例如bzip2压缩),而另一些则仅有本地实现(例如LZO)。
Hadoop带有预置的32位和64位Linux的本地压缩库,位于库/本地目录。对于其他平台,需要自己编译库,具体请参见Hadoop的维基百科http://wiki.apache.org/hadoop/NativeHadoop。 本地库通过Java系统属性java.library.path来使用。Hadoop的脚本在bin目录中已经设置好这个属性,但如果不使用该脚本,则需要在应用中设置属性。 默认情况下,Hadoop会在它运行的平台上查找本地库,如果发现就自动加载。这意味着不必更改任何配置设置就可以使用本地库。在某些情况下,可能希望禁用本地库,比如在调试压缩相关问题的时候。为此,将属性hadoop.native.lib设置为false,即可确保内置的Java等同内置实现被使用(如果它们可用的话)。 CodecPool(压缩解码池) 如果要用本地库在应用中大量执行压缩解压任务,可以考虑使用CodecPool,从而重用压缩程序和解压缩程序,节约创建这些对象的开销。 下例所用的API只创建了一个很简单的压缩程序,因此不必使用这个池。此应用程序使用一个压缩池程序来压缩从标准输入读入然后将其写入标准愉出的数据: public class PooledStreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class<?> codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); Compressor compressor = null; try { compressor = CodecPool.getCompressor(codec); CompressionOutputStream out = codec.createOutputStream(System.out, compressor); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } finally { CodecPool.returnCompressor(compressor); } } }我们从缓冲池中为指定的CompressionCodec检索到一个Compressor实例,codec的重载方法createOutputStream()中使用的便是它。通过使用finally块,我们便可确保此压缩程序会被返回缓冲池,即使在复制数据流之间的字节期间抛出了一个IOException。 |