2013-04-17 09:48:58.0|分类: hadoop|浏览量: 1617
FileSystem类有一系列创建文件的方法。最简单的是给拟创建的文件指定一个路径对象,然后返回一个用来写的输出流: public FSDataOutputStream create(Path f) throws IOException /** * Opens an FSDataOutputStream at the indicated Path. Files are overwritten * by default. */ public FSDataOutputStream create(Path f) throws IOException { return create(f, true); } /** * Opens an FSDataOutputStream at the indicated Path. * * overwrite 是否强制覆盖已有的文件 */ public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { return create(f, overwrite, getConf().getInt("io.file.buffer.size", 4096), getDefaultReplication(), getDefaultBlockSize()); } /** * Opens an FSDataOutputStream at the indicated Path. * @param f the file name to open * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. 写入文件时的缓冲大小 * @param replication(复制) required block replication for the file. * @param blockSize - 文件块大小 */ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize ) throws IOException { return create(f, overwrite, bufferSize, replication, blockSize, null); } /** * Opens an FSDataOutputStream at the indicated Path with write-progress * reporting. * @param f the file name to open * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. * @param replication required block replication for the file. */ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress ) throws IOException { return this.create(f, FsPermission.getDefault(), overwrite, bufferSize, replication, blockSize, progress); } /** * Opens an FSDataOutputStream at the indicated Path with write-progress * reporting. * @param f the file name to open * @param permission * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. * @param replication required block replication for the file. * @param blockSize * @param progress * @throws IOException * @see #setPermission(Path, FsPermission) * * 这个方法有重载的版本允许我们指定是否强制覆盖已有的文件、文件副本数量、 * 写入文件时的缓冲大小、文件块大小以及文件许可 */ public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException; 这个create()方法有重载的版本允许我们指定是否强制覆盖已有的文件、文件副本数量、写入文件时的缓冲大小、文件块大小以及文件许可。 参数默认:overwrite 默认强制覆盖已有的文件 bufferSize 写入文件时的缓冲大小(4096) replication 默认的是1 blockSize 32MB Progressable 默认的为null permission文件权限 是777 注意:create()方法为需要写入的文件而创建的父目录可能原先并不存在。虽然这样很方便,但有时并不希望这样。如果我们想在父目录不存在时不执行写入,就必须在调用exists()首先检查父目录是否存在。 还有一个用于传递回调接口的重载方法Progressable,如此一来,我们所写的应用就会被告知数据写入数据节点的进度: package org.apache.hadoop.util; ublic interface Progressable { public void progress();新建文件的另一种方法是使用append()在一个已有文件中追加(也有一些其他重载版本): public FSDataOutputStream append(Path f) throws IOException 这个操作允许一个写入者打开已有文件并在其末尾写入数据。有了这个API,会产生无边界文件的应用,以日志文件为例,就可以在重启后在已有文件上继续写入。此添加操作是可选的,并不是所有Hadoop文件系统都有实现。HDFS支持添加,但S3就不支持了。 例3-4展示了如何将本地文件复制到Hadoop文件系统。我们在每次Hadoop调用progress()方法时,也就是在每64 KB数据包写入数据节点管道后打印一个句号来展示整个过程。(注意,这个动作并不是API指定的,因此在Hadoop后面的版本中大多被改变了。API仅仅是让我们注意到"发生了一些事"。) 例3-4:将本地文件复制到Hadoop文件系统并显示进度 // vv FileCopyWithProgress public class FileCopyWithProgress { public static void main(String[] args) throws Exception { String localSrc = args[0]; String dst = args[1]; InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); OutputStream out = fs.create(new Path(dst), new Progressable() { public void progress() { System.out.print("."); } }); IOUtils.copyBytes(in, out, 4096, true); } } // ^^ FileCopyWithProgress 典型用途: % hadoop FileCopyWithProgress input/docs/1400-8.txt hdfs://localhost/user/tom/1400-8.txt 目前,其他Hadoop文件系统在写入时都不会调用progress()。通过后面几章的描述,我们会感到进度之于MapReduce应用的重要性。 FSDataOutputStream FSDataInputStream类是关于从HDFS读数据的,而FSDataOutputStream类是向HDFS中写数据的 FileSystem中的create()方法返回了一个FSDataOutputStream,与FSDataInputStream类似,它也有一个查询文件当前位置的方法: package org.apache.hadoop.fs; public class FSDataOutputStream extends DataOutputStream implements Syncable { public void write(int b) throws IOException { out.write(b); position++; if (statistics != null) { statistics.incrementBytesWritten(1); } } public long getPos() throws IOException { // implementation elided } // implementation elided } 但是,与FSDataInputStream不同,FSDataOutputStream不允许定位。这是因为HDFS只允许对一个打开的文件顺序写入,或向一个已有文件添加。换句话说,它不支持除文件尾部的其他位置的写入,这样一来,写入时的定位就没有什么意义。 |