Return-Path: Delivered-To: apmail-hadoop-general-archive@minotaur.apache.org Received: (qmail 11989 invoked from network); 9 Jun 2010 22:33:07 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 9 Jun 2010 22:33:07 -0000 Received: (qmail 37764 invoked by uid 500); 9 Jun 2010 22:33:06 -0000 Delivered-To: apmail-hadoop-general-archive@hadoop.apache.org Received: (qmail 37452 invoked by uid 500); 9 Jun 2010 22:33:05 -0000 Mailing-List: contact general-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: general@hadoop.apache.org Delivered-To: mailing list general@hadoop.apache.org Received: (qmail 37224 invoked by uid 99); 9 Jun 2010 22:33:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Jun 2010 22:33:05 +0000 X-ASF-Spam-Status: No, hits=2.2 required=10.0 tests=FREEMAIL_FROM,HTML_MESSAGE,SPF_PASS,T_TO_NO_BRKTS_FREEMAIL X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of n.atreju@gmail.com designates 74.125.82.48 as permitted sender) Received: from [74.125.82.48] (HELO mail-ww0-f48.google.com) (74.125.82.48) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Jun 2010 22:32:59 +0000 Received: by wwe15 with SMTP id 15so1664263wwe.35 for ; Wed, 09 Jun 2010 15:32:38 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:mime-version:received:received:in-reply-to :references:date:message-id:subject:from:to:cc:content-type; bh=Uht1AzCRNKuI8WwYXrUhG7oB6WRasK0Fb/jnmwkeSMM=; b=wsww5s+feyMi7uwAp2Yn/KaDgBwgTWJtDgNS8yl0F0rIYQLOJaK8TmHkNlatW0T/Qc fiOg1HWQeB2UBtQp0okCy5A8dl0x8huOLwLY40SZw28hjhLHvaY6WSnAIys90Xc8qcqw dp8SYPbqRhDd0bDstfAu0E+Omx7xbRsGqHTO0= DomainKey-Signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; b=c0y9UPhs/x1lDW/9K9B8uErCZEAybtA/Xba0At8KvtJ9tklfZXZaiEwE8Iha0me6JR ylnjfC4iFkytwrtkvQQD2u5Cw2wmQOUoJpKQO2eqUL3lhsgeugNq4NHDvKrX+lrfgjKj gHi7GXabMIDFpFnxJuDqbggDrmx7q/DkqQQQY= MIME-Version: 1.0 Received: by 10.227.147.193 with SMTP id m1mr3377685wbv.23.1276122758639; Wed, 09 Jun 2010 15:32:38 -0700 (PDT) Received: by 10.216.170.202 with HTTP; Wed, 9 Jun 2010 15:32:38 -0700 (PDT) In-Reply-To: References: Date: Wed, 9 Jun 2010 15:32:38 -0700 Message-ID: Subject: Re: How to apply RDBMS table updates and deletes into Hadoop From: atreju To: hive-user@hadoop.apache.org Cc: general@hadoop.apache.org Content-Type: multipart/alternative; boundary=001636831762b7f3520488a07b75 X-Virus-Checked: Checked by ClamAV on apache.org --001636831762b7f3520488a07b75 Content-Type: text/plain; charset=ISO-8859-1 As an ideal solution, I have a suggestion to Hive contributors to make it look like Hive is doing insert/update/delete: This will require a special type of table creation syntax. I will call it as "UPDATABLE TABLE". The table will have 3 special columns that are defined in the create script: 1. The primary key column. (let's say: col_pk) 2. BIGINT type date column that shows the ms from Jan 1st, 1970 to actual data manipulation date/time in RDMBS. (dml_date) 3. TINYINT or BOOLEAN type column that will store 0 if the record is deleted and 1 if it is inserted or updated. (dml_action) This will require the RDBMS table to have PK and last update date column and deletes recorded in some other table by pk and date. On Day 1, the entire table is put into Hadoop, with addition of 2 extra columns: dml_date (bigint) and dml_action. On Day 2, we first find the max of dml_date from Hive table. Then we query from RDBMS inserts/updates/deletes since that date/time and write into a file with the correct dml_date/dml_action. The file goes to the same folder that our Hive table is in. Right now, if on Day 1 we had 100 rows and on Day 2, 10 rows a regular Hive table would show 110 rows. But, since this is a special table (UPDATABLE TABLE), every time this table is queried in Hive, Hive first run a map-reduce that would find the most recent (max(dml_date)) row per pk (group by col_pk) that is not deleted (dml_action!=0) and use that output in the user's query. That is the big idea!! Hive can have Insert/Update/Delete commands that would do nothing but create a file with rows of manipulated data with correct date and action. There can be a special "flush" kind of command that runs the MR and replaces all files in the table directory with single file. That can run weekly, monthly or may be after each time dml data received from RDBMS. Sqoop can have Hive interface that saves certain table attributes like pk column, RDBMS connection info,... and with one command from Hive, the Hive table gets updated from RDBMS.... What do you think? On Tue, Jun 8, 2010 at 3:58 PM, Aaron Kimball wrote: > I think that this might be the way to go. In general, folding updates and > deletes into datasets is a difficult problem due to the append-only nature > of datasets. > > Something that might help you here is to partition your tables in Hive > based on some well-distributed key. Then if you have a relatively small > number of partitions affected by an incremental import (perhaps more > recently-imported records are more likely to be updated? in this case, > partition the tables by the month/week you imported them?) you can only > perform the fold-in of the new deltas on the affected partitions. This > should be much faster than a full table scan. > > Have you seen the Sqoop tool? It handles imports and exports between HDFS > (and Hive) and RDBMS systems -- but currently can only import new records > (and subsequent INSERTs); it can't handle updates/deletes. Sqoop is > available at http://github.com/cloudera/sqoop -- it doesn't run on Apache > 0.20.3, but works on CDH (Cloudera's Distribution for Hadoop) and Hadoop > 0.21/trunk. > > This sort of capability is something I'm really interested in adding to > Sqoop. If you've got a well-run process for doing this, I'd really > appreciate your help adding this feature :) Send me an email off-list if > you're interested. At the very least, I'd urge you to try out the tool. > > Cheers, > - Aaron Kimball > > > On Tue, Jun 8, 2010 at 8:54 PM, atreju wrote: > >> To generate smart output from base data we need to copy some base tables >> from relational database into Hadoop. Some of them are big. To dump the >> entire table into Hadoop everyday is not an option since there are like 30+ >> tables and each would take several hours. >> >> The methodology that we approached is to get the entire table dump first. >> Then each day or every 4-6 hours get only insert/update/delete since the >> last copy from RDBMS (based on a date field in the table). Using Hive do >> outer join + union the new data with existing data and write into a new >> file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3 records >> inserted, 2 records updated and 1 deleted since the last Hadoop copy, then >> the Hive query will get 97 of the not changed data + 3 inserts + 2 updates >> and write into a new file. The other applications like Pig or Hive will pick >> the most recent file to use when selecting/loading data from those base >> table data files. >> >> This logic is working fine in lower environments for small size tables. >> With production data, for about 30GB size table, the incremental >> re-generation of the file in Hadoop is still taking several hours. I tried >> using zipped version and it took even longer time. I am not convinced that >> this is the best we can do to handle updates and deletes since we had to >> re-write 29GB unchanged data of the 30GB file again into a new file. ...and >> this is not the biggest table. >> >> I am thinking that this should be problem for many companies. What are the >> other approaches to apply updates and deletes on base tables to the >> Hadoop data files? >> >> We have 4 data nodes and using version 20.3. >> >> Thanks! >> >> > > --001636831762b7f3520488a07b75--