我们开始来分析Hadoop MapReduce的内部的运行机制。用户向Hadoop提交Job(作业),作业在JobTracker对象的控制下执行。Job被分解成为Task(任务),分发到集群中,在TaskTracker的控制下运行。Task包括MapTask和ReduceTask,是MapReduce的Map操作和Reduce操作执行的地方。这中任务分布的方法比较类似于HDFS中NameNode和DataNode的分工,NameNode对应的是JobTracker,DataNode对应的是TaskTracker。JobTracker,TaskTracker和MapReduce的客户端通过RPC通信,具体可以参考HDFS部分的分析。
我们先来分析一些辅助类,首先是和ID有关的类,ID的继承树如下:
* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.mapreduce;
import java.io.DataInput;
/**
* A general identifier, which internally stores the id
* as an integer. This is the super class of {@link JobID},
* {@link TaskID} and {@link TaskAttemptID}.
*
* @see JobID
* @see TaskID
* @see TaskAttemptID
*/
public abstract class ID implements WritableComparable<ID> {
protected static final char SEPARATOR = '_';
protected int id;
/** constructs an ID object from the given int */
public ID(int id) {
this.id = id;
}
protected ID() {
}
/** returns the int which represents the identifier */
public int getId() {
return id;
}
@Override
public String toString() {
return String.valueOf(id);
}
@Override
public int hashCode() {
return Integer.valueOf(id).hashCode();
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if(o == null)
return false;
if (o.getClass() == this.getClass()) {
ID that = (ID) o;
return this.id == that.id;
}
else
return false;
}
/** Compare IDs by associated numbers */
public int compareTo(ID that) {
return this.id - that.id;
}
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
}
public void write(DataOutput out) throws IOException {
out.writeInt(id);
}
}
* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.mapreduce;
import java.io.DataInput;
/**
* JobID represents the immutable and unique identifier for
* the job. JobID consists of two parts. First part
* represents the jobtracker identifier, so that jobID to jobtracker map
* is defined. For cluster setup this string is the jobtracker
* start time, for local setting, it is "local".
* Second part of the JobID is the job number. <br>
* An example JobID is :
* <code>job_200707121733_0003</code> , which represents the third job
* running at the jobtracker started at <code>200707121733</code>.
* <p>
* Applications should never construct or parse JobID strings, but rather
* use appropriate constructors or {@link #forName(String)} method.
*
* @see TaskID
* @see TaskAttemptID
* @see org.apache.hadoop.mapred.JobTracker#getNewJobId()
* @see org.apache.hadoop.mapred.JobTracker#getStartTime()
*/
public class JobID extends org.apache.hadoop.mapred.ID
implements Comparable<ID> {
protected static final String JOB = "job";
private final Text jtIdentifier;
protected static final NumberFormat idFormat = NumberFormat.getInstance();
static {
idFormat.setGroupingUsed(false);
idFormat.setMinimumIntegerDigits(4);
}
/**
* Constructs a JobID object
* @param jtIdentifier jobTracker identifier
* @param id job number
*/
public JobID(String jtIdentifier, int id) {
super(id);
this.jtIdentifier = new Text(jtIdentifier);
}
public JobID() {
jtIdentifier = new Text();
}
public String getJtIdentifier() {
return jtIdentifier.toString();
}
@Override
public boolean equals(Object o) {
if (!super.equals(o))
return false;
JobID that = (JobID)o;
return this.jtIdentifier.equals(that.jtIdentifier);
}
/**Compare JobIds by first jtIdentifiers, then by job numbers*/
@Override
public int compareTo(ID o) {
JobID that = (JobID)o;
int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
if(jtComp == 0) {
return this.id - that.id;
}
else return jtComp;
}
/**
* Add the stuff after the "job" prefix to the given builder. This is useful,
* because the sub-ids use this substring at the start of their string.
* @param builder the builder to append to
* @return the builder that was passed in
*/
public StringBuilder appendTo(StringBuilder builder) {
builder.append(SEPARATOR);
builder.append(jtIdentifier);
builder.append(SEPARATOR);
builder.append(idFormat.format(id));
return builder;
}
@Override
public int hashCode() {
return jtIdentifier.hashCode() + id;
}
@Override
public String toString() {
return appendTo(new StringBuilder(JOB)).toString();
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
this.jtIdentifier.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
jtIdentifier.write(out);
}
/** Construct a JobId object from given string
* @return constructed JobId object or null if the given String is null
* @throws IllegalArgumentException if the given string is malformed
*/
public static JobID forName(String str) throws IllegalArgumentException {
if(str == null)
return null;
try {
String[] parts = str.split("_");
if(parts.length == 3) {
if(parts[0].equals(JOB)) {
return new org.apache.hadoop.mapred.JobID(parts[1],
Integer.parseInt(parts[2]));
}
}
}catch (Exception ex) {//fall below
}
throw new IllegalArgumentException("JobId string : " + str
+ " is not properly formed");
}
}
这张图可以看出现在Hadoop的org.apache.hadoop.mapred向org.apache.hadoop.mapreduce迁移带来的一些问题,其中灰色是标注为@Deprecated的。ID携带一个整型,实现了WritableComparable接口,这表明它可以比较,而且可以被Hadoop的io机制串行化/解串行化(必须实现compareTo/readFields/write方法)。JobID是系统分配给作业的唯一标识符,它的toString结果是job_<jobtrackerID>_<jobNumber>。例子:job_200707121733_0003表明这是jobtracker 200707121733(利用jobtracker的开始时间作为ID)的第3号作业。
作业分成任务执行,任务号TaskID包含了它所属的作业ID,同时也有任务ID,同时还保持了这是否是一个Map任务(成员变量isMap)。任务号的字符串表示为task_<jobtrackerID>_<jobNumber>_[m|r]_<taskNumber>,如task_200707121733_0003_m_000005表示作业200707121733_0003的000005号任务,改任务是一个Map任务。
一个任务有可能有多个执行(错误恢复/消除Stragglers等),所以必须区分任务的多个执行,这是通过类TaskAttemptID来完成,它在任务号的基础上添加了尝试号。一个任务尝试号的例子是attempt_200707121733_0003_m_000005_0,它是任务task_200707121733_0003_m_000005的第0号尝试。
JVMId用于管理任务执行过程中的Java虚拟机,我们后面再讨论。
|