HBase File Locality in HDFS
One of the more ambiguous things in
Hadoop is block replication: it happens automatically and you should not have to worry about it.
HBase relies on it 100% to provide the data safety as it stores its files into the
distributed file system.While that works completely transparent, one of the more advancedquestions asked though is how does this affect performance? This usuallyarises when the user starts writing
MapReducejobs against either HBase or Hadoop directly. Especially with largerdata being stored in HBase, how does the system take care of placing thedata close to where it is needed? This is referred to data locality andin case of HBase using the Hadoop file system (HDFS) there may bedoubts how that is working.
First let's see how Hadoop handles this. The MapReduce documentationadvertises the fact that tasks run close to the data they process. Thisis achieved by breaking up large files in HDFS into smaller chunks, orso called blocks. That is also the reason why the block size in Hadoopis much larger than you may know them from operating systems and theirfile systems. Default setting is 64MB, but usually 128MB is chosen, ifnot even larger when you are sure all your files are larger than asingle block in size. Each block maps to a task run to process thecontained data. That also means larger block sizes equal fewer map tasksto run as the number of mappers is driven by the number of blocks thatneed processing. Hadoop knows where blocks are located and runs the maptasks directly on the node that hosts it (actually one of them asreplication means it has a few hosts to chose from). This is how itguarantees data locality during MapReduce.
Back to HBase. When you have arrived at that point with Hadoop and younow understand that it can process data locally you start to questionhow this may work with HBase. If you have read my
poston HBase's storage architecture you saw that HBase simply stores filesin HDFS. It does so for the actual data files (HFile) as well as its log(WAL). And if you look into the code it simply uses
FileSystem.create(Path path)
to create these. When you then consider two access patterns, a) directrandom access and b) MapReduce scanning of tables, you wonder if carewas taken that the HDFS blocks are close to where they are read byHBase.
One thing upfront, if you do not co-share your cluster with Hadoop andHBase but instead employ a separate Hadoop as well as a stand-aloneHBase cluster then there is
no data locality - and it can't be.That equals to running a separate MapReduce cluster where it would notbe able to execute tasks directly on the datanode. It is imperative fordata locality to have them running on the same cluster, Hadoop (as inthe HDFS), MapReduce and HBase. End of story.
OK, you them all co-located on a single (hopefully larger) cluster? Thenread on. How does Hadoop figure out where data is located as HBaseaccesses it. Remember the access pattern above, both go through a singlepiece of software called a RegionServer. Case a) uses random accesspatterns while b) scans all contiguous rows of a table but does sothrough the same API. As explained in my referenced post and mentionedabove, HBase simply stores files and those get distributed as replicatedblocks across all data nodes of the HDFS. Now imagine you stop HBaseafter saving a lot of data and restarting it subsequently. The regionservers are restarted and assign a seemingly random number of regions.At this very point there is no data locality guaranteed - how could itbe?
The most important factor is that HBase is not restarted frequently andthat it performs house keeping on a regular basis. These so calledcompactions rewrite files as new data is added over time. All files inHDFS once written are immutable (for all sorts of reasons). Because ofthat, data is written into new files and as their number grows HBasecompacts them into another set of new, consolidated files. And here isthe kicker: HDFS is smart enough to put the data where it is needed! Howdoes that work you ask? We need to take a deep dive into Hadoop'ssource code and see how the above
FileSystem.create(Path path)
that HBase uses works. We are running on HDFS here, so we are actually using
DistributedFileSystem.create(Path path)
which looks like this:
1.
public FSDataOutputStream create(Path f) throws IOException {
2.
return create(f, true);
3.
}
It returns a
FSDataOutputStream
and that is create like so:
1.
publicFSDataOutputStream create(Path f, FsPermission permission, booleanoverwrite, int bufferSize, short replication, long blockSize,Progressable progress) throws IOException {
2.
returnnew FSDataOutputStream(dfs.create(getPathName(f), permission,overwrite, replication, blockSize, progress, bufferSize), statistics);
3.
}
It uses a
DFSClient
instance that is the "umbilical" cord connecting the client with the NameNode:
1.
this.dfs = new DFSClient(namenode, conf, statistics);
What is returned though is a
DFSClient.DFSOutputStream
instance. As you write data into the stream the
DFSClient
aggregates it into "packages" which are then written as blocks to the data nodes. This happens in
DFSClient.DFSOutputStream.DataStreamer
(please hang in there, we are close!) which runs as a daemon thread inthe background. The magic unfolds now in a few hops on the stack, firstin the daemon
run()
it gets the list of nodes to store the data on:
1.
nodes = nextBlockOutputStream(src);
This in turn calls:
1.
long startTime = System.currentTimeMillis();
2.
lb = locateFollowingBlock(startTime);
3.
block = lb.getBlock();
4.
nodes = lb.getLocations();
We follow further down and see that
locateFollowingBlocks()
calls:
1.
return namenode.addBlock(src, clientName);
Here is where it all comes together. The name node is called to add a new block and the
src
parameter indicates for what file, while
clientName
is the name of the
DFSClient
instance. I skip one more small method in between and show you the next bigger step involved:
01.
public LocatedBlock getAdditionalBlock(String src, String clientName) throws IOException {
02.
...
03.
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
04.
...
05.
fileLength = pendingFile.computeContentSummary().getLength();
06.
blockSize = pendingFile.getPreferredBlockSize();
07.
clientNode = pendingFile.getClientNode();
08.
replication = (int)pendingFile.getReplication();
09.
10.
// choose targets for the new block tobe allocated.
11.
DatanodeDescriptor targets[] = replicator.chooseTarget(replication, clientNode, null, blockSize);
12.
...
13.
}
We are finally getting to the core of this code in the
replicator.chooseTarget()
call:
01.
privateDatanodeDescriptor chooseTarget(int numOfReplicas, DatanodeDescriptorwriter, List<Node> excludedNodes, long blocksize, intmaxNodesPerRack, List<DatanodeDescriptor> results) {
02.
03.
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
04.
return writer;
05.
}
06.
07.
int numOfResults = results.size();
08.
boolean newBlock = (numOfResults==0);
09.
if (writer == null && !newBlock) {
10.
writer = (DatanodeDescriptor)results.get(0);
11.
}
12.
13.
try {
14.
switch(numOfResults) {
15.
case 0:
16.
writer = chooseLocalNode(writer, excludedNodes, blocksize, maxNodesPerRack, results);
17.
if (--numOfReplicas == 0) {
18.
break;
19.
}
20.
case 1:
21.
chooseRemoteRack(1, results.get(0), excludedNodes, blocksize, maxNodesPerRack, results);
22.
if (--numOfReplicas == 0) {
23.
break;
24.
}
25.
case 2:
26.
if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
27.
chooseRemoteRack(1, results.get(0), excludedNodes, blocksize, maxNodesPerRack, results);
28.
} else if (newBlock) {
29.
chooseLocalRack(results.get(1), excludedNodes, blocksize, maxNodesPerRack, results);
30.
} else {
31.
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, results);
32.
}
33.
if (--numOfReplicas == 0) {
34.
break;
35.
}
36.
default:
37.
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);
38.
}
39.
} catch (NotEnoughReplicasException e) {
40.
FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of " + numOfReplicas);
41.
}
42.
return writer;
43.
}
Recall that we have started with the
DFSClient
and created a file which was subsequently filled with data. As theblocks need writing out the above code checks first if that can be doneon the same host that the client is on, i.e. the "writer". That is "case0". In "case 1" the code tries to find a remote rack to have a distantreplication of the block. Lastly is fills the list of required replicaswith local or machines of another rack.
So this means for HBase that as the region server stays up for longenough (which is the default) that after a major compaction on alltables - which can be invoked manually or is triggered by aconfiguration setting - it has the files local on the same host. Thedata node that shares the same physical host has a copy of all data theregion server requires. If you are running a scan or get or any otheruse-case you can be sure to get the best performance.
Finally a good overview over the HDFS design and data replication can be found
here.Also note that the HBase team is working on redesigning how the Masteris assigning the regions to servers. The plan is to improve it so thatregions are deployed on the server where most blocks are. This willparticularly be useful after a restart because it would guarantee abetter data locality right off the bat. Stay tuned!