hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Enis Soztutar <enis....@gmail.com>
Subject Re: How to use DBInputFormat?
Date Thu, 05 Feb 2009 16:26:41 GMT
Please see below,

Stefan Podkowinski wrote:
> As far as i understand the main problem is that you need to create
> splits from streaming data with an unknown number of records and
> offsets. Its just the same problem as with externally compressed data
> (.gz). You need to go through the complete stream (or do a table scan)
> to create logical splits. Afterwards each map task needs to seek to
> the appropriate offset on a new stream over again. Very expansive. As
> with compressed files, no wonder only one map task is started for each
> .gz file and will consume the complete file. 
I cannot see an easy way to split the JDBC stream and pass them to nodes.
> IMHO the DBInputFormat
> should follow this behavior and just create 1 split whatsoever.
>   
Why would we want to limit to 1 splits, which effectively resolves to 
sequential computation?
> Maybe a future version of hadoop will allow to create splits/map tasks
> on the fly dynamically?
>   
It is obvious that input residing in one database is not optimal for 
hadoop, and in any case(even with sharding)
DB I/O would be the bottleneck. I guess DBInput/Output formats should be 
used when data is small but computation is costly.

> Stefan
>
> On Thu, Feb 5, 2009 at 3:28 PM, Fredrik Hedberg <fredrik@avafan.com> wrote:
>   
>> Indeed sir.
>>
>> The implementation was designed like you describe for two reasons. First and
>> foremost to make is as simple as possible for the user to use a JDBC
>> database as input and output for Hadoop. Secondly because of the specific
>> requirements the MapReduce framework brings to the table (split
>> distribution, split reproducibility etc).
>>
>> This design will, as you note, never handle the same amount of data as HBase
>> (or HDFS), and was never intended to. That being said, there are a couple of
>> ways that the current design could be augmented to perform better (and, as
>> in its current form, tweaked, depending on you data and computational
>> requirements). Shard awareness is one way, which would let each
>> database/tasktracker-node execute mappers on data where each split is a
>> single database server for example.
>>
>> If you have any ideas on how the current design can be improved, please do
>> share.
>>
>>
>> Fredrik
>>
>> On Feb 5, 2009, at 11:37 AM, Stefan Podkowinski wrote:
>>
>>     
>>> The 0.19 DBInputFormat class implementation is IMHO only suitable for
>>> very simple queries working on only few datasets. Thats due to the
>>> fact that it tries to create splits from the query by
>>> 1) getting a count of all rows using the specified count query (huge
>>> performance impact on large tables)
>>> 2) creating splits by issuing an individual query for each split with
>>> a "limit" and "offset" parameter appended to the input sql query
>>>
>>> Effectively your input query "select * from orders" would become
>>> "select * from orders limit <splitlength> offset <splitstart>" and
>>> executed until count has been reached. I guess this is not working sql
>>> syntax for oracle.
>>>
>>> Stefan
>>>
>>>
>>> 2009/2/4 Amandeep Khurana <amansk@gmail.com>:
>>>       
>>>> Adding a semicolon gives me the error "ORA-00911: Invalid character"
>>>>
>>>> Amandeep
>>>>
>>>>
>>>> Amandeep Khurana
>>>> Computer Science Graduate Student
>>>> University of California, Santa Cruz
>>>>
>>>>
>>>> On Wed, Feb 4, 2009 at 6:46 AM, Rasit OZDAS <rasitozdas@gmail.com>
wrote:
>>>>
>>>>         
>>>>> Amandeep,
>>>>> "SQL command not properly ended"
>>>>> I get this error whenever I forget the semicolon at the end.
>>>>> I know, it doesn't make sense, but I recommend giving it a try
>>>>>
>>>>> Rasit
>>>>>
>>>>> 2009/2/4 Amandeep Khurana <amansk@gmail.com>:
>>>>>           
>>>>>> The same query is working if I write a simple JDBC client and query
the
>>>>>> database. So, I'm probably doing something wrong in the connection
>>>>>>             
>>>>> settings.
>>>>>           
>>>>>> But the error looks to be on the query side more than the connection
>>>>>>             
>>>>> side.
>>>>>           
>>>>>> Amandeep
>>>>>>
>>>>>>
>>>>>> Amandeep Khurana
>>>>>> Computer Science Graduate Student
>>>>>> University of California, Santa Cruz
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 3, 2009 at 7:25 PM, Amandeep Khurana <amansk@gmail.com>
>>>>>>             
>>>>> wrote:
>>>>>           
>>>>>>> Thanks Kevin
>>>>>>>
>>>>>>> I couldnt get it work. Here's the error I get:
>>>>>>>
>>>>>>> bin/hadoop jar ~/dbload.jar LoadTable1
>>>>>>> 09/02/03 19:21:17 INFO jvm.JvmMetrics: Initializing JVM Metrics
with
>>>>>>> processName=JobTracker, sessionId=
>>>>>>> 09/02/03 19:21:20 INFO mapred.JobClient: Running job: job_local_0001
>>>>>>> 09/02/03 19:21:21 INFO mapred.JobClient:  map 0% reduce 0%
>>>>>>> 09/02/03 19:21:22 INFO mapred.MapTask: numReduceTasks: 0
>>>>>>> 09/02/03 19:21:24 WARN mapred.LocalJobRunner: job_local_0001
>>>>>>> java.io.IOException: ORA-00933: SQL command not properly ended
>>>>>>>
>>>>>>>       at
>>>>>>>
>>>>>>>               
>>>>> org.apache.hadoop.mapred.lib.db.DBInputFormat.getRecordReader(DBInputFormat.java:289)
>>>>>           
>>>>>>>       at org.apache.hadoop.mapred.MapTask.run(MapTask.java:321)
>>>>>>>       at
>>>>>>>
>>>>>>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:138)
>>>>>>> java.io.IOException: Job failed!
>>>>>>>       at
>>>>>>>               
>>>>> org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1217)
>>>>>           
>>>>>>>       at LoadTable1.run(LoadTable1.java:130)
>>>>>>>       at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
>>>>>>>       at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
>>>>>>>       at LoadTable1.main(LoadTable1.java:107)
>>>>>>>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
>>>>>>>       at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
>>>>>>>       at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>>>>>>>               
>>>>> Source)
>>>>>           
>>>>>>>       at java.lang.reflect.Method.invoke(Unknown Source)
>>>>>>>       at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
>>>>>>>       at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
>>>>>>>       at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
>>>>>>>       at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
>>>>>>>       at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
>>>>>>>
>>>>>>> Exception closing file
>>>>>>>
>>>>>>>               
>>>>> /user/amkhuran/contract_table/_temporary/_attempt_local_0001_m_000000_0/part-00000
>>>>>           
>>>>>>> java.io.IOException: Filesystem closed
>>>>>>>       at
>>>>>>>               
>>>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:198)
>>>>>           
>>>>>>>       at
>>>>>>>               
>>>>> org.apache.hadoop.hdfs.DFSClient.access$600(DFSClient.java:65)
>>>>>           
>>>>>>>       at
>>>>>>>
>>>>>>>               
>>>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:3084)
>>>>>           
>>>>>>>       at
>>>>>>>
>>>>>>>               
>>>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:3053)
>>>>>           
>>>>>>>       at
>>>>>>>
>>>>>>> org.apache.hadoop.hdfs.DFSClient$LeaseChecker.close(DFSClient.java:942)
>>>>>>>       at org.apache.hadoop.hdfs.DFSClient.close(DFSClient.java:210)
>>>>>>>       at
>>>>>>>
>>>>>>>               
>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.close(DistributedFileSystem.java:243)
>>>>>           
>>>>>>>       at
>>>>>>> org.apache.hadoop.fs.FileSystem$Cache.closeAll(FileSystem.java:1413)
>>>>>>>       at org.apache.hadoop.fs.FileSystem.closeAll(FileSystem.java:236)
>>>>>>>       at
>>>>>>>
>>>>>>> org.apache.hadoop.fs.FileSystem$ClientFinalizer.run(FileSystem.java:221)
>>>>>>>
>>>>>>>
>>>>>>> Here's my code:
>>>>>>>
>>>>>>> public class LoadTable1 extends Configured implements Tool  {
>>>>>>>
>>>>>>>     // data destination on hdfs
>>>>>>>     private static final String CONTRACT_OUTPUT_PATH =
>>>>>>>               
>>>>> "contract_table";
>>>>>           
>>>>>>>     // The JDBC connection URL and driver implementation class
>>>>>>>
>>>>>>> private static final String CONNECT_URL = "jdbc:oracle:thin:@dbhost
>>>>>>> :1521:PSEDEV";
>>>>>>>     private static final String DB_USER = "user";
>>>>>>>     private static final String DB_PWD = "pass";
>>>>>>>     private static final String DATABASE_DRIVER_CLASS =
>>>>>>> "oracle.jdbc.driver.OracleDriver";
>>>>>>>
>>>>>>>     private static final String CONTRACT_INPUT_TABLE =
>>>>>>> "OSE_EPR_CONTRACT";
>>>>>>>
>>>>>>>     private static final String [] CONTRACT_INPUT_TABLE_FIELDS
= {
>>>>>>>       "PORTFOLIO_NUMBER", "CONTRACT_NUMBER"};
>>>>>>>
>>>>>>>     private static final String ORDER_CONTRACT_BY_COL =
>>>>>>> "CONTRACT_NUMBER";
>>>>>>>
>>>>>>>
>>>>>>>   static class ose_epr_contract implements Writable, DBWritable
{
>>>>>>>
>>>>>>>
>>>>>>>       String CONTRACT_NUMBER;
>>>>>>>
>>>>>>>
>>>>>>>       public void readFields(DataInput in) throws IOException
{
>>>>>>>
>>>>>>>           this.CONTRACT_NUMBER = Text.readString(in);
>>>>>>>
>>>>>>>       }
>>>>>>>
>>>>>>>       public void write(DataOutput out) throws IOException {
>>>>>>>
>>>>>>>           Text.writeString(out, this.CONTRACT_NUMBER);
>>>>>>>
>>>>>>>
>>>>>>>       }
>>>>>>>
>>>>>>>       public void readFields(ResultSet in_set) throws SQLException
{
>>>>>>>
>>>>>>>           this.CONTRACT_NUMBER = in_set.getString(1);
>>>>>>>
>>>>>>>       }
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void write(PreparedStatement prep_st) throws SQLException
>>>>>>>               
>>>>> {
>>>>>           
>>>>>>>           // TODO Auto-generated method stub
>>>>>>>
>>>>>>>       }
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>   public static class LoadMapper extends MapReduceBase
>>>>>>>                               implements Mapper<LongWritable,
>>>>>>> ose_epr_contract, Text, NullWritable> {
>>>>>>>       private static final char FIELD_SEPARATOR = 1;
>>>>>>>
>>>>>>>       public void map(LongWritable arg0, ose_epr_contract arg1,
>>>>>>>               OutputCollector<Text, NullWritable> arg2,
Reporter arg3)
>>>>>>>               throws IOException {
>>>>>>>
>>>>>>>           StringBuilder sb = new StringBuilder();
>>>>>>> ;
>>>>>>>           sb.append(arg1.CONTRACT_NUMBER);
>>>>>>>
>>>>>>>
>>>>>>>           arg2.collect(new Text (sb.toString()), NullWritable.get());
>>>>>>>
>>>>>>>       }
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>
>>>>>>>   public static void main(String[] args) throws Exception {
>>>>>>>       Class.forName("oracle.jdbc.driver.OracleDriver");
>>>>>>>       int exit = ToolRunner.run(new LoadTable1(), args);
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>   public int run(String[] arg0) throws Exception {
>>>>>>>       JobConf conf = new JobConf(getConf(), LoadTable1.class);
>>>>>>>
>>>>>>>       conf.setInputFormat(DBInputFormat.class);
>>>>>>>       DBConfiguration.configureDB(conf, DATABASE_DRIVER_CLASS,
>>>>>>> CONNECT_URL, DB_USER, DB_PWD);
>>>>>>>
>>>>>>>       DBInputFormat.setInput(conf, ose_epr_contract.class,
>>>>>>>               "select CONTRACT_NUMBER from OSE_EPR_CONTRACT",
>>>>>>>               "select COUNT(CONTRACT_NUMBER) from OSE_EPR_CONTRACT");
>>>>>>>       FileOutputFormat.setOutputPath(conf, new
>>>>>>> Path(CONTRACT_OUTPUT_PATH));
>>>>>>>
>>>>>>>       conf.setMapperClass(LoadMapper.class);
>>>>>>>       conf.setNumReduceTasks(0);
>>>>>>>
>>>>>>>       conf.setOutputKeyClass(Text.class);
>>>>>>>       conf.setOutputValueClass(NullWritable.class);
>>>>>>>
>>>>>>>       JobClient.runJob(conf);
>>>>>>>
>>>>>>>       return 0;
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> -Amandeep
>>>>>>>
>>>>>>> Amandeep Khurana
>>>>>>> Computer Science Graduate Student
>>>>>>> University of California, Santa Cruz
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 3, 2009 at 6:51 PM, Kevin Peterson <kpeterson@biz360.com
>>>>>>>               
>>>>>> wrote:
>>>>>>             
>>>>>>>> On Tue, Feb 3, 2009 at 5:49 PM, Amandeep Khurana <amansk@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>                 
>>>>>>>>> In the setInput(...) function in DBInputFormat, there
are two sets
>>>>>>>>> of
>>>>>>>>> arguments that one can use.
>>>>>>>>>
>>>>>>>>> 1. public static void *setInput*(JobConf
>>>>>>>>>
>>>>>>>>> a) In this, do we necessarily have to give all the fieldNames
(which
>>>>>>>>>                   
>>>>> are
>>>>>           
>>>>>>>>> the
>>>>>>>>> column names right?) that the table has, or do we need
to specify
>>>>>>>>>                   
>>>>> only
>>>>>           
>>>>>>>> the
>>>>>>>>                 
>>>>>>>>> ones that we want to extract?
>>>>>>>>>                   
>>>>>>>> You may specify only those columns that you are interested
in.
>>>>>>>>
>>>>>>>> b) Do we have to have a orderBy or not necessarily? Does
this relate
>>>>>>>> to
>>>>>>>> the
>>>>>>>>                 
>>>>>>>>> primary key in the table in any ways?
>>>>>>>>>                   
>>>>>>>> Conditions and order by are not necessary.
>>>>>>>>
>>>>>>>> a) Is there any restriction on the kind of queries that this
function
>>>>>>>>                 
>>>>>>>>> can take in the inputQuery string?
>>>>>>>>>                   
>>>>>>>> I don't think so, but I don't use this method -- I just use
the
>>>>>>>>                 
>>>>> fieldNames
>>>>>           
>>>>>>>> and tableName method.
>>>>>>>>
>>>>>>>>
>>>>>>>>                 
>>>>>>>>> I am facing issues in getting this to work with an Oracle
database
>>>>>>>>>                   
>>>>> and
>>>>>           
>>>>>>>>> have no idea of how to debug it (an email sent earlier).
>>>>>>>>> Can anyone give me some inputs on this please?
>>>>>>>>>                   
>>>>>>>> Create a new table that has one column, put about five entries
into
>>>>>>>>                 
>>>>> that
>>>>>           
>>>>>>>> table, then try to get a map job working that outputs the
values to a
>>>>>>>>                 
>>>>> text
>>>>>           
>>>>>>>> file. If that doesn't work, post your code and errors.
>>>>>>>>
>>>>>>>>                 
>>>>>>>               
>>>>>
>>>>> --
>>>>> M. Raşit ÖZDAŞ
>>>>>
>>>>>           
>>     
>
>   

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message