hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Lee <stinkymi...@gmail.com>
Subject Re: Using MapReduce to do table comparing.
Date Tue, 29 Jul 2008 02:35:48 GMT
It is hard to quantify b/c I have severals jobs doing this concurrently 
and the timing is mixed with the extraction from the sources (MS-SQL) 
before loading into UDB server.  I say it takes total of 1.5 hours to 
extract, load, delta calculation and update archive for 130 million 
records. 

The physical server we are using is Sun 490 and I believe it has 8 CPUs 
and 32 GB of memory.  The storage is EMC.  Some big tables we utilizes 
MDC ( multi-dimensional clustering ).  I think you can get comparable 
number with cheaper Linux server but we did not have choice b/c the 
company only certified Sun box at the time of purchase ( that's 2 years 
ago )

BTW - when you do comparison in database, watch out for null comparison.

Also

    ( null > 1 ) or ( 2 > 1 )

is FALSE.

BTW - if you are going to do this - get the *fastest* hard-drive and 
RAID0 to get the maximum throughput.  It is an IO-bound problem.

Amber wrote:
> I agree with you this is an acceptable method if time spent on exporting data from RDBM,
importing file into HDFS and then importing data into RDBM again is considered as well, but
this is an single-process/thread method. BTW, can you tell me how long does it take your method
to process those 130 million rows, how much is the data volume, and how powerful are your
physical computers, thanks a lot!
> --------------------------------------------------
> From: "Michael Lee" <stinkyminky@gmail.com>
> Sent: Thursday, July 24, 2008 11:51 AM
> To: <core-user@hadoop.apache.org>
> Subject: Re: Using MapReduce to do table comparing.
>
>   
>> Amber wrote:
>>     
>>> We have a 10 million row table exported from AS400 mainframe every day, the table
is exported as a csv text file, which is about 30GB in size, then the csv file is imported
into a RDBMS table which is dropped and recreated every day. Now we want to find how many
rows are updated during each export-import interval, the table has a primary key, so deletes
and inserts can be found using RDBMS joins quickly, but we must do a column to column comparing
in order to find the difference between rows ( about 90%) with the same primary keys. Our
goal is to find a comparing process which takes no more than 10 minutes with a 4-node cluster,
each server in which has 4 4-core 3.0 GHz CPUs, 8GB memory  and a  300G local  RAID5 array.
>>>
>>> Bellow is our current solution:
>>>     The old data is kept in the RDBMS with index created on the primary key,
the new data is imported into HDFS as the input file of our Map-Reduce job. Every map task
connects to the RDBMS database, and selects old data from it for every row, map tasks will
generate outputs if differences are found, and there are no reduce tasks.
>>>
>>> As you can see, with the number of concurrent map tasks increasing, the RDBMS
database will become the bottleneck, so we want to kick off the RDBMS, but we have no idea
about how to retrieve the old row with a given key quickly from HDFS files, any suggestion
is welcome.
>>>       
>> 10 million is not bad.  I do this all the time in UDB 8.1 - multiple key 
>> columns and multiple value columns and calculate delta's - insert, 
>> delete and update.
>>
>> What other has suggested works ( I tried very crude version of what 
>> James Moore suggested in Hadoop with 70+ million records ) but you have 
>> to remember there are other costs ( dumping out files, putting into 
>> HDFS, etc. ).  It might be better if you process straight in database or 
>> do a straight file processing. Also the key is avoiding transaction.
>>
>> If you are doing outside of database...
>>
>> you have 'old.csv' and 'new.csv' and sorted by primary keys ( when you 
>> extract make sure you do order by ).  In your application, you open two 
>> file handlers and read one line at time.  Create the keys.  If the keys 
>> are the same, you compare two strings if they are the same.  If key is 
>> not the same, you have to find out natural orders - it can be insert or 
>> delete.  Once you decide, you read another line ( if insert/delete - you 
>> only read one line from one of the file )
>>
>> Here is the pseudo code
>>
>> oldFile = File.new(oldFilename, "r")
>> newFile = File.new(newFilename, "r")
>> outFile = File.new(outFilename, "w")
>>
>> oldLine = oldFile.gets
>> newLine = newFile.gets
>>
>> while ( true )
>> {
>>    oldKey = convertToKey(oldLine)
>>    newKey = convertToKey(newLine)
>>   
>>    if ( oldKey < newKey )
>>    {
>>       ## it is deletion
>>       outFile.puts oldLine + "," + "DELETE";      
>>       oldLine = oldFile.gets
>>    }
>>    elsif ( oldKey > newKey )
>>    {
>>       ## it is insert
>>       outFile.puts newLine + "," + "INSERT";
>>       newLine = newFile.gets
>>    }
>>    else
>>    {
>>       ## compare
>>       outFile.puts newLine + "," + "UPDATE" if ( oldLine != newLine )
>>
>>       oldLine = oldFile.gets
>>       newLine = newFile.gets
>>    }
>> }
>>
>> Okay - I skipped the part if eof is reached for each file but you get 
>> the point.
>>
>> If the both old and new are in database, you can open two databases 
>> connections and just do the process without dumping files.
>>
>> I journal about 130 million rows every day for quant financial database...
>>
>>
>>
>>
>>
>>     


Mime
View raw message