hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dominik Friedrich <domi...@wipe-records.org>
Subject Random thoughts
Date Tue, 06 Jun 2006 14:14:55 GMT
This is a collection about different aspects of hadoop and nutch. Some of 
these I implemented in parts a few months ago but never got so far that I 
would make sense to make the code public as some kind of proof of concept. 
Because I don't have the time to finish a proof of concept I'll summerize in 
this mail my ideas and the experiences with implementing so far.

In the discussion about the namenode performance and scalabilty the idea of 
using creating a java nio based implmentation came up. This was exactly one 
of my ideas I tried to implement. 

The architecture in my experimental server was one selector thread that 
received the messages and put them into a blocking incomming queue, several 
worker threads that read the messages from this queue and handled them, one 
blocking outgoing queue that where this worker threads put their outgoing 
(reply) messages and one sender thread that pulled messages from that 
out-queue and sent them to the right target.

The messages themself were simple serialized java objects. I used the 
externizable interface for serialization but in hadoop one would use the 
writable interface I think. When a worker thread creates a new message it 
includes the target address so the sender thread knows where to send the 
message to and for every incoming message the receiver thread includes the 
source address so the worker thread know where it came from. 
In order to deserialize the received data into the correct object I put a one 
byte identifier and a two byte length field ahead of every serialized 
message. Every message class contained this one byte identifier as public 
static id and all messages that the system understands have been registered 
with the deserializer at system startup but could also be un-/registered 
while runtime. To register this messages the message class has been passed to 
the register function and the deserializer put them into a hashmap with the 
id as key. This way the "protocol" the system understand is just a collection 
of registered messages and can easily be extended, even at runtime. This 
system can also extended to be able to send messages larger than the max 
network packet size. 

To implementing a failover system for the namenode one could send all packages 
to the server via multicast so all namenodes in that multicast group receive 
those messages and update their filesystem accordingly but only the current 
master responds. A heartbeat system between the n namenodes selects the 
current master.

The performance of a dummy client server system I used for simple test was 
quite good. The throughput was almost independend of the number concurrent 
connections. My estimation is that this architecture should be able to handle 
a serveral 1000 node cluster with 20-40 threads. The big problem I ran into 
when I tried to convert namenode and datanode from RPC to this architecture 
was the asynchrony I introduces. If the worker threads read a message from 
the in-queue and just respond to them its easy to implement but if a worker 
thread sends a message and has to wait for a reply you might need some kind 
of sessions so the worker thread that reads the response has all data it 
needs to work on that response correctly. I think in the namenode you could 
just put flags the namesystem since most message will update/read from it. On 
the client side I think you'll need to implement a session memory (maybe just 
a hashmap with sessesion id ->session data) if you don't want to block the 
worker threads. To handle the concurrency in such a system is not that 
difficult with the java.util.concurrent classes introduced in java 1.5 
(another reason to switch :).

When using a message system like above a way to transfer big data chunks is 
needed, too. I'd implent this as a simple tcp server that streams the data. 
E.g. if one tasktracker needs the map output from another one it opens a tcp 
port and sends a message to the other tasktracker to send that chunk to the 
given port. This way both sides can do on-the-fly work on the stream like 
sorting or something else and a connection loss is detected instantly when 
one side closes the port.

Another big archtectural change I tried at that time was using an osgi server 
and implement namenode, datanode, jobtracker and tasktracker as osgi 
services. The basic idea behind this was to improve maintainability and using 
an environment with good tool support (Eclipse 3.2 is based on an osgi kernel 
itself and includes some good tools for osgi service development and 
testing). The nice thing about osgi services is that you can define 
dependencies between them, versioning is supported and you can 
enable/disable/update any service at runtime. Other nice things like 
configuration service, http service and so on are also available. 

A hadoop based on osgi services would work like this. You deploy a basic 
configuration of the osgi server on all of your machines (e.g. 
namenode/jobtracker enabled on one machine, datanode/tasktracker enabled on 
the other machines). The actual service jars don't have to be copied to every 
machine at that time because you can specify an url where the osgi server 
downloads it from. This feature is also used when distributing new versions 
or tasktrackers need certain jar files for a job. Since the osgi server 
supports versioning and caches already downloaded plugins jar files will only 
be downloaded once and you can specify the min/max version of the plugin for 
each job. All service and plugin jars will be hosted on one distribution 
server so you have only to put the correct jar on this one server and the 
actual deploying is automaticaly done by the system.

To test if this idea works I'd ported namenode, datanode and jobtracker to 
osgi services and deployed them. This was not much work and worked fine. 
Porting the tasktracker was more complicated because of the osgi classloader 
but I got it running more or less, too. Then I tried to split nutch (hadoop 
wasn't created at that time yet) into more smaller services and things became 
really complicated because in nutch classes different packages depend on each 
other and so on. My goal was to put certain basic classes like RPC or 
configuration into seperate osgi services so jobtracker, namenode and every 
other server that would need them just uses one defined interface and those 
services could easily be replaced. 

I think the current documentation of hadoop and nutch is quite sparse. The 
best souce of information is currently the mailinglist, the source code and 
there are some articles in the wiki but there is nothing like user guide, 
administrators guide and developer guide in pdf format. This makes the 
learning curve for beginners unecessary steep. My proposal is to create such 
guides in docbook format. From docbook you can create pdfs and html pages, it 
can be versioned in svn, building pdf and html pages can be included in the 
ant script and last but not least everybody can use his favourite editor ;)  
If there are other who would work on such documents I'd be glad to write some 
chapters, too.

Best regards,
Dominik


Mime
View raw message