hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fredrik Hedberg <fred...@avafan.com>
Subject Re: How to use DBInputFormat?
Date Fri, 06 Feb 2009 21:42:31 GMT
Well, that's also implicit by design, and cannot really be solved in a  
generic way. As with any system, it's not foolproof; unless you fully  
understand what you're doing, you won't reliably get the result you're  
seeking.

As I said before, the JDBC interface for Hadoop solves a specific  
problem, whereas HBase and HDFS is really the answer to the kind of  
problem your hinting at.


Fredrik


On Feb 6, 2009, at 4:06 PM, Stefan Podkowinski wrote:

> On Fri, Feb 6, 2009 at 2:40 PM, Fredrik Hedberg <fredrik@avafan.com>  
> wrote:
>> Well, that obviously depend on the RDBMS' implementation. And  
>> although the
>> case is not as bad as you describe (otherwise you better ask your  
>> RDBMS
>> vendor for your money back), your point is valid. But then again, a  
>> RDBMS is
>> not designed for that kind of work.
>
> Right. Clash of design paradigms. Hey MySQL, I want my money back!!  
> Oh, wait..
> Another scenario I just recognized: what about current/"realtime"
> data? E.g. 'select * from logs where date = today()'. Working with
> 'offset' may turn out to return different results after the table has
> been updated and tasks are still pending. Pretty ugly to trace down
> this condition, after you found out that sometimes your results are
> just not right..
>
>
>> What do you mean by "creating splits/map tasks on the fly  
>> dynamically"?
>>
>>
>> Fredrik
>>
>>
>> On Feb 5, 2009, at 4:49 PM, Stefan Podkowinski wrote:
>>
>>> As far as i understand the main problem is that you need to create
>>> splits from streaming data with an unknown number of records and
>>> offsets. Its just the same problem as with externally compressed  
>>> data
>>> (.gz). You need to go through the complete stream (or do a table  
>>> scan)
>>> to create logical splits. Afterwards each map task needs to seek to
>>> the appropriate offset on a new stream over again. Very expansive.  
>>> As
>>> with compressed files, no wonder only one map task is started for  
>>> each
>>> .gz file and will consume the complete file. IMHO the DBInputFormat
>>> should follow this behavior and just create 1 split whatsoever.
>>> Maybe a future version of hadoop will allow to create splits/map  
>>> tasks
>>> on the fly dynamically?
>>>
>>> Stefan
>>>
>>> On Thu, Feb 5, 2009 at 3:28 PM, Fredrik Hedberg <fredrik@avafan.com>
>>> wrote:
>>>>
>>>> Indeed sir.
>>>>
>>>> The implementation was designed like you describe for two  
>>>> reasons. First
>>>> and
>>>> foremost to make is as simple as possible for the user to use a  
>>>> JDBC
>>>> database as input and output for Hadoop. Secondly because of the  
>>>> specific
>>>> requirements the MapReduce framework brings to the table (split
>>>> distribution, split reproducibility etc).
>>>>
>>>> This design will, as you note, never handle the same amount of  
>>>> data as
>>>> HBase
>>>> (or HDFS), and was never intended to. That being said, there are  
>>>> a couple
>>>> of
>>>> ways that the current design could be augmented to perform better  
>>>> (and,
>>>> as
>>>> in its current form, tweaked, depending on you data and  
>>>> computational
>>>> requirements). Shard awareness is one way, which would let each
>>>> database/tasktracker-node execute mappers on data where each  
>>>> split is a
>>>> single database server for example.
>>>>
>>>> If you have any ideas on how the current design can be improved,  
>>>> please
>>>> do
>>>> share.
>>>>
>>>>
>>>> Fredrik
>>>>
>>>> On Feb 5, 2009, at 11:37 AM, Stefan Podkowinski wrote:
>>>>
>>>>> The 0.19 DBInputFormat class implementation is IMHO only  
>>>>> suitable for
>>>>> very simple queries working on only few datasets. Thats due to the
>>>>> fact that it tries to create splits from the query by
>>>>> 1) getting a count of all rows using the specified count query  
>>>>> (huge
>>>>> performance impact on large tables)
>>>>> 2) creating splits by issuing an individual query for each split  
>>>>> with
>>>>> a "limit" and "offset" parameter appended to the input sql query
>>>>>
>>>>> Effectively your input query "select * from orders" would become
>>>>> "select * from orders limit <splitlength> offset <splitstart>"
and
>>>>> executed until count has been reached. I guess this is not  
>>>>> working sql
>>>>> syntax for oracle.
>>>>>
>>>>> Stefan
>>>>>
>>>>>
>>>>> 2009/2/4 Amandeep Khurana <amansk@gmail.com>:
>>>>>>
>>>>>> Adding a semicolon gives me the error "ORA-00911: Invalid  
>>>>>> character"
>>>>>>
>>>>>> Amandeep
>>>>>>
>>>>>>
>>>>>> Amandeep Khurana
>>>>>> Computer Science Graduate Student
>>>>>> University of California, Santa Cruz
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 4, 2009 at 6:46 AM, Rasit OZDAS  
>>>>>> <rasitozdas@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Amandeep,
>>>>>>> "SQL command not properly ended"
>>>>>>> I get this error whenever I forget the semicolon at the end.
>>>>>>> I know, it doesn't make sense, but I recommend giving it a try
>>>>>>>
>>>>>>> Rasit
>>>>>>>
>>>>>>> 2009/2/4 Amandeep Khurana <amansk@gmail.com>:
>>>>>>>>
>>>>>>>> The same query is working if I write a simple JDBC client
and  
>>>>>>>> query
>>>>>>>> the
>>>>>>>> database. So, I'm probably doing something wrong in the 

>>>>>>>> connection
>>>>>>>
>>>>>>> settings.
>>>>>>>>
>>>>>>>> But the error looks to be on the query side more than the
 
>>>>>>>> connection
>>>>>>>
>>>>>>> side.
>>>>>>>>
>>>>>>>> Amandeep
>>>>>>>>
>>>>>>>>
>>>>>>>> Amandeep Khurana
>>>>>>>> Computer Science Graduate Student
>>>>>>>> University of California, Santa Cruz
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 3, 2009 at 7:25 PM, Amandeep Khurana <amansk@gmail.com

>>>>>>>> >
>>>>>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Kevin
>>>>>>>>>
>>>>>>>>> I couldnt get it work. Here's the error I get:
>>>>>>>>>
>>>>>>>>> bin/hadoop jar ~/dbload.jar LoadTable1
>>>>>>>>> 09/02/03 19:21:17 INFO jvm.JvmMetrics: Initializing JVM
 
>>>>>>>>> Metrics with
>>>>>>>>> processName=JobTracker, sessionId=
>>>>>>>>> 09/02/03 19:21:20 INFO mapred.JobClient: Running job:
 
>>>>>>>>> job_local_0001
>>>>>>>>> 09/02/03 19:21:21 INFO mapred.JobClient:  map 0% reduce
0%
>>>>>>>>> 09/02/03 19:21:22 INFO mapred.MapTask: numReduceTasks:
0
>>>>>>>>> 09/02/03 19:21:24 WARN mapred.LocalJobRunner: job_local_0001
>>>>>>>>> java.io.IOException: ORA-00933: SQL command not properly
ended
>>>>>>>>>
>>>>>>>>>    at
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org 
>>>>>>> .apache 
>>>>>>> .hadoop 
>>>>>>> .mapred 
>>>>>>> .lib.db.DBInputFormat.getRecordReader(DBInputFormat.java:289)
>>>>>>>>>
>>>>>>>>>    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:321)
>>>>>>>>>    at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.hadoop.mapred.LocalJobRunner 
>>>>>>>>> $Job.run(LocalJobRunner.java:138)
>>>>>>>>> java.io.IOException: Job failed!
>>>>>>>>>    at
>>>>>>>
>>>>>>> org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1217)
>>>>>>>>>
>>>>>>>>>    at LoadTable1.run(LoadTable1.java:130)
>>>>>>>>>    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:

>>>>>>>>> 65)
>>>>>>>>>    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:

>>>>>>>>> 79)
>>>>>>>>>    at LoadTable1.main(LoadTable1.java:107)
>>>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
 
>>>>>>>>> Method)
>>>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
 
>>>>>>>>> Source)
>>>>>>>>>    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>>>>>>>
>>>>>>> Source)
>>>>>>>>>
>>>>>>>>>    at java.lang.reflect.Method.invoke(Unknown Source)
>>>>>>>>>    at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
>>>>>>>>>    at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
>>>>>>>>>    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:

>>>>>>>>> 65)
>>>>>>>>>    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:

>>>>>>>>> 79)
>>>>>>>>>    at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
>>>>>>>>>
>>>>>>>>> Exception closing file
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> /user/amkhuran/contract_table/_temporary/ 
>>>>>>> _attempt_local_0001_m_000000_0/part-00000
>>>>>>>>>
>>>>>>>>> java.io.IOException: Filesystem closed
>>>>>>>>>    at
>>>>>>>
>>>>>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:198)
>>>>>>>>>
>>>>>>>>>    at
>>>>>>>
>>>>>>> org.apache.hadoop.hdfs.DFSClient.access$600(DFSClient.java:65)
>>>>>>>>>
>>>>>>>>>    at
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.hadoop.hdfs.DFSClient 
>>>>>>> $DFSOutputStream.closeInternal(DFSClient.java:3084)
>>>>>>>>>
>>>>>>>>>    at
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.hadoop.hdfs.DFSClient 
>>>>>>> $DFSOutputStream.close(DFSClient.java:3053)
>>>>>>>>>
>>>>>>>>>    at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.hadoop.hdfs.DFSClient 
>>>>>>>>> $LeaseChecker.close(DFSClient.java:942)
>>>>>>>>>    at org.apache.hadoop.hdfs.DFSClient.close(DFSClient.java:

>>>>>>>>> 210)
>>>>>>>>>    at
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org 
>>>>>>> .apache 
>>>>>>> .hadoop 
>>>>>>> .hdfs.DistributedFileSystem.close(DistributedFileSystem.java:

>>>>>>> 243)
>>>>>>>>>
>>>>>>>>>    at
>>>>>>>>> org.apache.hadoop.fs.FileSystem 
>>>>>>>>> $Cache.closeAll(FileSystem.java:1413)
>>>>>>>>>    at  
>>>>>>>>> org.apache.hadoop.fs.FileSystem.closeAll(FileSystem.java:236)
>>>>>>>>>    at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.hadoop.fs.FileSystem 
>>>>>>>>> $ClientFinalizer.run(FileSystem.java:221)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Here's my code:
>>>>>>>>>
>>>>>>>>> public class LoadTable1 extends Configured implements
Tool  {
>>>>>>>>>
>>>>>>>>>  // data destination on hdfs
>>>>>>>>>  private static final String CONTRACT_OUTPUT_PATH =
>>>>>>>
>>>>>>> "contract_table";
>>>>>>>>>
>>>>>>>>>  // The JDBC connection URL and driver implementation
class
>>>>>>>>>
>>>>>>>>> private static final String CONNECT_URL =  
>>>>>>>>> "jdbc:oracle:thin:@dbhost
>>>>>>>>> :1521:PSEDEV";
>>>>>>>>>  private static final String DB_USER = "user";
>>>>>>>>>  private static final String DB_PWD = "pass";
>>>>>>>>>  private static final String DATABASE_DRIVER_CLASS =
>>>>>>>>> "oracle.jdbc.driver.OracleDriver";
>>>>>>>>>
>>>>>>>>>  private static final String CONTRACT_INPUT_TABLE =
>>>>>>>>> "OSE_EPR_CONTRACT";
>>>>>>>>>
>>>>>>>>>  private static final String [] CONTRACT_INPUT_TABLE_FIELDS
 
>>>>>>>>> = {
>>>>>>>>>    "PORTFOLIO_NUMBER", "CONTRACT_NUMBER"};
>>>>>>>>>
>>>>>>>>>  private static final String ORDER_CONTRACT_BY_COL =
>>>>>>>>> "CONTRACT_NUMBER";
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> static class ose_epr_contract implements Writable,  
>>>>>>>>> DBWritable {
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    String CONTRACT_NUMBER;
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    public void readFields(DataInput in) throws IOException
{
>>>>>>>>>
>>>>>>>>>        this.CONTRACT_NUMBER = Text.readString(in);
>>>>>>>>>
>>>>>>>>>    }
>>>>>>>>>
>>>>>>>>>    public void write(DataOutput out) throws IOException
{
>>>>>>>>>
>>>>>>>>>        Text.writeString(out, this.CONTRACT_NUMBER);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    }
>>>>>>>>>
>>>>>>>>>    public void readFields(ResultSet in_set) throws  
>>>>>>>>> SQLException {
>>>>>>>>>
>>>>>>>>>        this.CONTRACT_NUMBER = in_set.getString(1);
>>>>>>>>>
>>>>>>>>>    }
>>>>>>>>>
>>>>>>>>>    @Override
>>>>>>>>>    public void write(PreparedStatement prep_st) throws
 
>>>>>>>>> SQLException
>>>>>>>
>>>>>>> {
>>>>>>>>>
>>>>>>>>>        // TODO Auto-generated method stub
>>>>>>>>>
>>>>>>>>>    }
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public static class LoadMapper extends MapReduceBase
>>>>>>>>>                            implements Mapper<LongWritable,
>>>>>>>>> ose_epr_contract, Text, NullWritable> {
>>>>>>>>>    private static final char FIELD_SEPARATOR = 1;
>>>>>>>>>
>>>>>>>>>    public void map(LongWritable arg0, ose_epr_contract
arg1,
>>>>>>>>>            OutputCollector<Text, NullWritable>
arg2,  
>>>>>>>>> Reporter arg3)
>>>>>>>>>            throws IOException {
>>>>>>>>>
>>>>>>>>>        StringBuilder sb = new StringBuilder();
>>>>>>>>> ;
>>>>>>>>>        sb.append(arg1.CONTRACT_NUMBER);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>        arg2.collect(new Text (sb.toString()),  
>>>>>>>>> NullWritable.get());
>>>>>>>>>
>>>>>>>>>    }
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> public static void main(String[] args) throws Exception
{
>>>>>>>>>    Class.forName("oracle.jdbc.driver.OracleDriver");
>>>>>>>>>    int exit = ToolRunner.run(new LoadTable1(), args);
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public int run(String[] arg0) throws Exception {
>>>>>>>>>    JobConf conf = new JobConf(getConf(), LoadTable1.class);
>>>>>>>>>
>>>>>>>>>    conf.setInputFormat(DBInputFormat.class);
>>>>>>>>>    DBConfiguration.configureDB(conf, DATABASE_DRIVER_CLASS,
>>>>>>>>> CONNECT_URL, DB_USER, DB_PWD);
>>>>>>>>>
>>>>>>>>>    DBInputFormat.setInput(conf, ose_epr_contract.class,
>>>>>>>>>            "select CONTRACT_NUMBER from OSE_EPR_CONTRACT",
>>>>>>>>>            "select COUNT(CONTRACT_NUMBER) from  
>>>>>>>>> OSE_EPR_CONTRACT");
>>>>>>>>>    FileOutputFormat.setOutputPath(conf, new
>>>>>>>>> Path(CONTRACT_OUTPUT_PATH));
>>>>>>>>>
>>>>>>>>>    conf.setMapperClass(LoadMapper.class);
>>>>>>>>>    conf.setNumReduceTasks(0);
>>>>>>>>>
>>>>>>>>>    conf.setOutputKeyClass(Text.class);
>>>>>>>>>    conf.setOutputValueClass(NullWritable.class);
>>>>>>>>>
>>>>>>>>>    JobClient.runJob(conf);
>>>>>>>>>
>>>>>>>>>    return 0;
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> -Amandeep
>>>>>>>>>
>>>>>>>>> Amandeep Khurana
>>>>>>>>> Computer Science Graduate Student
>>>>>>>>> University of California, Santa Cruz
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Feb 3, 2009 at 6:51 PM, Kevin Peterson <kpeterson@biz360.com
>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> On Tue, Feb 3, 2009 at 5:49 PM, Amandeep Khurana
<amansk@gmail.com 
>>>>>>>>>> >
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> In the setInput(...) function in DBInputFormat,
there are  
>>>>>>>>>>> two sets
>>>>>>>>>>> of
>>>>>>>>>>> arguments that one can use.
>>>>>>>>>>>
>>>>>>>>>>> 1. public static void *setInput*(JobConf
>>>>>>>>>>>
>>>>>>>>>>> a) In this, do we necessarily have to give all
the  
>>>>>>>>>>> fieldNames
>>>>>>>>>>> (which
>>>>>>>
>>>>>>> are
>>>>>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>> column names right?) that the table has, or do
we need to  
>>>>>>>>>>> specify
>>>>>>>
>>>>>>> only
>>>>>>>>>>
>>>>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>>> ones that we want to extract?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> You may specify only those columns that you are interested
 
>>>>>>>>>> in.
>>>>>>>>>>
>>>>>>>>>> b) Do we have to have a orderBy or not necessarily?
Does this
>>>>>>>>>> relate
>>>>>>>>>> to
>>>>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>>> primary key in the table in any ways?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Conditions and order by are not necessary.
>>>>>>>>>>
>>>>>>>>>> a) Is there any restriction on the kind of queries
that this
>>>>>>>>>> function
>>>>>>>>>>>
>>>>>>>>>>> can take in the inputQuery string?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I don't think so, but I don't use this method --
I just use  
>>>>>>>>>> the
>>>>>>>
>>>>>>> fieldNames
>>>>>>>>>>
>>>>>>>>>> and tableName method.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> I am facing issues in getting this to work with
an Oracle  
>>>>>>>>>>> database
>>>>>>>
>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>>> have no idea of how to debug it (an email sent
earlier).
>>>>>>>>>>> Can anyone give me some inputs on this please?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Create a new table that has one column, put about
five  
>>>>>>>>>> entries into
>>>>>>>
>>>>>>> that
>>>>>>>>>>
>>>>>>>>>> table, then try to get a map job working that outputs
the  
>>>>>>>>>> values to
>>>>>>>>>> a
>>>>>>>
>>>>>>> text
>>>>>>>>>>
>>>>>>>>>> file. If that doesn't work, post your code and errors.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> M. Raşit ÖZDAŞ
>>>>>>>
>>>>>>
>>>>
>>>>
>>
>>


Mime
View raw message