hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From william kinney <william.kin...@gmail.com>
Subject Re: Map performance with custom binary format
Date Thu, 30 Jul 2009 04:07:58 GMT
OK:
  implemented some iotop/iostat monitoring in ganglia. Looks pretty
standard (job was 23:00 to 23:06):
   - Single Node Disk Read: http://imagebin.org/57716
   - Single Node Disk Write: http://imagebin.org/57717

On each node, noticed that the two TaskTracker$Child processes were
consuming close to 90% of each core. The TaskTracker and DataNode were
close to 0%. For the TT children, I did jstack dumps, but didn't
really see much that popped out other than a lot of time spent in a
SimpleDateFormat section and the protobuf parse. I switched the SDF
out with commons.lang FastDateFormat, which reduced the total time for
both the Hadoop job and the local/non-hadoop test, so still a
discrepancy between local and hadoop runs.

"You can look at the logs for an individual task, and see how much data it
read, and how long it took.  It might be hitting your 50MB/sec or close in a
burst, or perhaps not."
  - I decided to log the performance of each RecordReader use within
hadoop, which is essentially 1:1 for TaskTracker$Child process since I
have 1 InputSplit per file (ie, no splitting), right?. Saw:
Example 1) 527639090 bytes in : 18050 ms. (27.8778 MB/s)
Example 2) 533770314 bytes in : 23494 ms. (21.6669 MB/s)
Example 3) 529711886 bytes in : 20092 ms. (25.1429 MB/s)
...etc
For reference, the non-hadoop/local test:
530710906 bytes in : 9133 ms. (55.41721 MB/s)

Regarding the JobTracker only doing 1 task / node / 2 seconds, that
will definitely hurt. Although the above discrepancy takes priority
for me, for now.

"What does the web interface tell you about the number of concurrent map
tasks during the run?  Does it approach the max task slots?"
  - Yeah it definitely does, from the TaskTracker page on each node,
I'm seeing almost always 2 "RUNNING" tasks (and an accumulating list
of "COMMIT_PENDING" tasks under Non-Running, which slowly grows as the
job progresses). Normal?

Also, I used a profiler to profile a local/non-hadoop test of the
RecordReader/Map():
  class: %Time
      org.apache.commons.lang.time.FastDateFormat.format(long):  46%
      com......parseFrom(byte[]):  42%
      java.io.FileInputStream.read(byte[], int, int): 5%
      ...rest are 1%'ish
  I guess this doesn't show anything helpful. I'll try to attach it to
hadoop remotely...anyone have any experience doing this w/ YourKit
Java Profiler?

Anyways, decided to test the "large files" vs "small files" theory again:
  Small files (1449 files, ranging 10-100MB. average: 32 MB)
    - HDFS bytes read  49,057,491,374
    - Map input records  737,850,142
    - Finished in: 7mins, 26sec
    ... 104.898 MB/s
  Large files (22 files, around 500MB. average 514MB)
    - HDFS bytes read  11,852,421,152
    - Map input records 179,657,432
    - Finished in: 1mins, 8sec
    ... 166.225 MB/s

   Not sure why before the large files were taking longer, perhaps the
SimpleDateFormat>FastDateFormat change? Anyways, good to see where I
need to take the file sizes too...but still 166 MB is not the rate I
was hoping for (given the # of nodes and local performance).

So I guess in summary, hadoop TaskTracker$Child processes that are
doing the Map() and RecordReader are about 50% slower than the normal,
local non-hadoop version. In addition, their rate (~25MB/s) * Num
Nodes (10) suggests ~ 250MB/s total job performance, but I'm only
seeing ~166MB/s.

Will

On Tue, Jul 28, 2009 at 6:35 PM, Scott Carey<scott@richrelevance.com> wrote:
> See below:
>
>
> On 7/28/09 12:15 PM, "william kinney" <william.kinney@gmail.com> wrote:
>
>> Sorry, forgot to include that detail.
>>
>> Some data from ganglia:
>>
>>   CPU:
>>     - on all 10 nodes, I am seeing for the life of the job 85-95% CPU
>> usage, with about 10% of that being "System" CPU, vs "User".
>>     - Single node graph: http://imagebin.org/57520
>>     - Cluster graph: http://imagebin.org/57523
>
> Ok, CPU is definitely loaded.  Identify which processes are primarily
> responsible (Tasks? Datanode? Tasktracker?) You'll want to make the
> processes eating CPU during a run spit out some stack traces to 'profile'
> the activity.  Use either the 'jstack' utility with the JDK, or do a 'kill
> -3 <pid>' on a java process to spit out the stack trace to stdout.  You'll
> want to do this a handful of times on a single job if possible to identify
> any trends.
>
>>
>>   Memory:
>>     - Memory used before job is about 0.4GB, During job it fluctuates
>> up to 0.6GB and 0.7GB, then back down to 0.4GB. Most of the node
>> memory (8GB) is showing as "Cached".
>>     - Single node graph: http://imagebin.org/57522
>
> So the OS is mostly just caching disk files in RAM.
>
>>
>>   Network:
>>     - IN and OUT: Each node 6-12MB/s, cumulative about 30-44MB/s.
>>     - Single node graph: http://imagebin.org/57521
>>     - Cluster graph: http://imagebin.org/57525
>>
>
> That is a not insignificant, but cumulative across the cluster its not much.
>
>> iostat (disk) (sampled most of the nodes, below values are ranges I saw):
>>     tps: 0.41-1.27
>>     Blk_read/s: 46-58
>>     Blk_wrtn/s: 20-23
>> (have two disks per node, both SAS, 10k RPM)
>>
>
> Did you do iostat with a parameter to have it spit out more than one row?
> By default, it spits out data averaged since boot time, like vmstat.
> My favorite iostat params for monitoring are:
> iostat -mx 5
> iostat -dmx 5
> (or 10 or 15 or 60 second intervals depending on what I'm doing)  Ganglia
> might have some I/O info -- you want both iops and some sort of bytes/sec
> measurement.
>
>> ---
>> Are those Blk_read/wrtn/s as in block size (4096?) = bytes/second?
>>
>
> I think its the 512 byte block notion, but I always use -m to put it in
> useful units.
>
>> Also, from the job page (different job, same Map method, just more
>> data...~40GB. 781 files):
>> Map input records       629,738,080
>> Map input bytes         41,538,992,880
>>
>> Anything else I can look into?
>
> Based on your other email:
>
> There are almost 800 map tasks, these seem to mostly be data local.  The
> current implementation of the JobTracker schedules rather slowly, and can at
> best place one new task per node per 2 seconds or so on a small cluster.
> So, with 10 servers, it will take at least 80 seconds just to schedule all
> the tasks.
> If each server can run 8 tasks concurrently, then if the average task
> doesn¹t take somewhat longer than 16 seconds, the system will not reach full
> utilization.
>
> What does the web interface tell you about the number of concurrent map
> tasks during the run?  Does it approach the max task slots?
>
> You can look at the logs for an individual task, and see how much data it
> read, and how long it took.  It might be hitting your 50MB/sec or close in a
> burst, or perhaps not.
>
> Given the sort of bottlenecks I often see, I suspect the scheduling.  But,
> you have almost maxed CPU use, so its probably not that.  Getting stack
> dumps to see what the processor is doing during your test will help narrow
> it down.
>
>
>>
>> Do my original numbers (only 2x performance) jump out at you as being
>> way off? Or it is common to see that a setup similar to mine?
>>
>> I should also note that given its a custom binary format, I do not
>> support Splitting (isSplittable() is false). I don't think that would
>> count for such a large discrepancy in expected performance, would it?
>>
>
> If the files are all larger than the block size, it would cause a lot more
> network activity -- but unless your switch or network is broken or not
> gigabit -- there is a lot of capacity left in the network.
>
>> Thanks for the help,
>> Will
>>
>>
>> On Tue, Jul 28, 2009 at 12:58 PM, Scott Carey<scott@richrelevance.com> wrote:
>>> Well, the first thing to do in any performance bottleneck investigation is
>>> to look at the machine hardware resource usage.
>>>
>>> During your test, what is the CPU use and disk usage?  What about network
>>> utilization?
>>> Top, vmstat, iostat, and some network usage monitoring would be useful.  It
>>> could be many things causing your lack of scalability, but without actually
>>> monitoring your machines to see if there is an obvious bottleneck its just
>>> random guessing and hunches.
>>>
>>>
>>>
>>> On 7/28/09 8:18 AM, "william kinney" <william.kinney@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Thanks in advance for the help!
>>>>
>>>> I have a performance question relating to how fast I can expect Hadoop
>>>> to scale. Running Cloudera's 0.18.3-10.
>>>>
>>>> I have custom binary format, which is just Google Protocol Buffer
>>>> (protobuf) serialized data:
>>>>
>>>>   669 files, ~30GB total size (ranging 10MB to 100MB each).
>>>>   128MB block size.
>>>>   10 Hadoop Nodes.
>>>>
>>>> I tested my InputFormat and RecordReader for my input format, and it
>>>> showed about 56MB/s performance (single thread, no hadoop, passed in
>>>> test file via FileInputFormat instead of FSDataInputStream) on
>>>> hardware similar to what I have in my cluster.
>>>> I also then tested some simple Map logic along w/ the above, and got
>>>> around 54MB/s. I believe that difference can be accounted for parsing
>>>> the protobuf data into java objects.
>>>>
>>>> Anyways, when I put this logic into a job that has
>>>>   - no reduce (.setNumReduceTasks(0);)
>>>>   - no emit
>>>>   - just protobuf parsing calls (like above)
>>>>
>>>> I get a finish time of 10mins, 25sec, which is about 106.24 MB/s.
>>>>
>>>> So my question, why is the rate only 2x what I see on a single thread,
>>>> non-hadoop test? Would it not be:
>>>>   54MB/s x 10 (Num Nodes) - small hadoop overhead ?
>>>>
>>>> Is there any area of my configuration I should look into for tuning?
>>>>
>>>> Anyway I could get more accurate performance monitoring of my job?
>>>>
>>>> On a side note, I tried the same job after combining the files into
>>>> about 11 files (still 30GB in size), and actually saw a decrease in
>>>> performance (~90MB/s).
>>>>
>>>> Any help is appreciated. Thanks!
>>>>
>>>> Will
>>>>
>>>> some hadoop-site.xml values:
>>>> dfs.replication  3
>>>> io.file.buffer.size   65536
>>>> dfs.datanode.handler.count  3
>>>> mapred.tasktracker.map.tasks.maximum  6
>>>> dfs.namenode.handler.count  5
>>>>
>>>
>>>
>>
>
>

Mime
View raw message