hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Pavlis <david.pav...@javlin.eu>
Subject Re: How-to use DFSClient's BlockReader from Java
Date Tue, 10 Jan 2012 12:32:32 GMT
Hi Todd,

Understand, I will re-think the MR approach. Nonetheless I like the idea
with getting the block id and accessing the locally stored file directly.
Is there a way (public interface) to find for particular datanode, where
the local storage is rooted (which local subdirs it uses) ?
I found that this info is available in Storage class, but is there a
"public" way of getting it - through some protocol or so ?

Thanks in advance,

David.



On 9.1.12 9:30 PM, "Todd Lipcon" <todd@cloudera.com> wrote:

>Hi David,
>
>I'd definitely recommend using MapReduce for this. What you've
>described is essentially identical to MR.
>
>Otherwise, you should use the public API
>FileSystem.getFileBlockLocations(), and then read the host names out
>of the returned BlockLocation struct. Then just use a normal
>FileSystem open call from that node - it will automatically pick the
>local replica for you without any further work.
>
>-Todd
>
>On Mon, Jan 9, 2012 at 12:01 PM, David Pavlis <david.pavlis@javlin.eu>
>wrote:
>> Hi Todd,
>>
>> Thanks for letting me know.  OK - here is what I am trying to do (it is
>>a
>> POC for now):
>>
>> We have an ETL framework which helps with transforming data - parsing
>> various formats, reading from DBs,
>> aggregating, sorting, etc..
>> We do have currently concept of a "cluster" which basically allows input
>> data (say datafile) be split/partitioned
>> across several nodes and then one data transformation is executed on
>>those
>> data. The way it works is that
>> we can analyze what the transformation does and if it is supposed to
>> consume data which is spread over cluster, we
>> execute a slightly modified copy/instance of that transformation on each
>> node of the cluster where some piece/partition
>> of data resides. We do not have concept of any "clustered" filesystem -
>> our partitioned data reside in ordinary files
>> and there is no metadata layer on top. If we need one single output data
>> file, then we just perform merge operation. It is
>> quite limiting as if we need to manipulate such data, we need to do it
>> piece by piece (on each participating node).
>>
>> So we essentially do split-transform-merge with merge being optional
>>(can
>> be part of the transformation directly, so we don't
>> create temp files).
>>
>> Here is the idea with HDFS - each of our transformation node becomes
>> HDFS's datanode. Then if we are to process particular
>> input data split over several datanodes, then we just instruct our
>> transformation nodes to read specific block of the file (block/s
>> which happens to be on the same physical machine as our transformation
>> node is also datanode) -hence my interest in BlockReader.
>>
>> I was also considering wrapping our transformation job into map-reduce
>>job
>> of Hadoop, but that seems a bit limiting and also
>> we would need to "take" the whole Hadoop stack and give it control over
>> our jobs. But that still might be the right way.
>> Also, I need to solve writing of partitioned data - here I would like to
>> control the block allocation somehow as ideally transformation
>> running on particular node would be reading locally stored blocks and
>> outputting data to locally allocated block of HDFS file.
>>
>> Well, I hope I explained the situation clearly enough.
>>
>> I will be thankful for any comments.
>>
>> Regards,
>>
>> David.
>>
>>
>>
>>
>> On 9.1.12 6:59 PM, "Todd Lipcon" <todd@cloudera.com> wrote:
>>
>>>Hi David,
>>>
>>>For what it's worth, you should be aware that you're calling internal
>>>APIs that have no guarantee of stability between versions. I can
>>>practically guarantee that your code will have to be modified for any
>>>HDFS upgrade you do. That's why these APIs are undocumented.
>>>
>>>Perhaps you can explain what your high-level goal is, here, and we can
>>>suggest a supported mechanism for achieving it.
>>>
>>>-Todd
>>>
>>>On Mon, Jan 9, 2012 at 9:56 AM, David Pavlis <david.pavlis@javlin.eu>
>>>wrote:
>>>> Hi Denny,
>>>>
>>>> Thanks a lot. I was able to make my code work.
>>>>
>>>> I am posting a small example below - in case somebody in the future
>>>>has
>>>> similar need ;-)
>>>> (not handling replica datablocks).
>>>>
>>>> David.
>>>>
>>>>
>>>>***********************************************************************
>>>>**
>>>>**
>>>> public static void main(String args[]){
>>>>        String filename="/user/hive/warehouse/sample_07/sample_07.csv";
>>>>        int DATANODE_PORT = 50010;
>>>>        int NAMENODE_PORT = 8020;
>>>>        String HOST_IP = "192.168.1.230";
>>>>
>>>>        byte[] buf=new byte[1000];
>>>>
>>>>
>>>>        try{
>>>>
>>>>                ClientProtocol client= DFSClient.createNamenode(new
>>>> InetSocketAddress(HOST_IP,NAMENODE_PORT), new Configuration());
>>>>
>>>>
>>>>
>>>>
>>>>                LocatedBlocks located=
>>>>client.getBlockLocations(filename, 0,
>>>> Long.MAX_VALUE);
>>>>
>>>>
>>>>
>>>>                for(LocatedBlock block : located.getLocatedBlocks()){
>>>>                        Socket sock =
>>>>SocketFactory.getDefault().createSocket();
>>>>                        InetSocketAddress targetAddr = new
>>>> InetSocketAddress(HOST_IP,DATANODE_PORT);
>>>>                        NetUtils.connect(sock, targetAddr, 10000);
>>>>                        sock.setSoTimeout(10000);
>>>>
>>>>
>>>>                        BlockReader
>>>>reader=BlockReader.newBlockReader(sock,  filename,
>>>>                                block.getBlock().getBlockId(),
>>>>block.getBlockToken(),
>>>> block.getBlock().getGenerationStamp(), 0,
>>>>                block.getBlockSize(),
>>>> 1000);
>>>>
>>>>
>>>>                        int count=0;
>>>>                        int length;
>>>>                        while((length=reader.read(buf,0,1000))>0){
>>>>                                //System.out.print(new
>>>>String(buf,0,length,"UTF-8"));
>>>>                                if (length<1000) break;
>>>>                        }
>>>>                        reader.close();
>>>>                        sock.close();
>>>>                }
>>>>
>>>>
>>>>        }catch(IOException ex){
>>>>                ex.printStackTrace();
>>>>        }
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>***********************************************************************
>>>>**
>>>>**
>>>>
>>>>
>>>>
>>>> From:  Denny Ye <dennyy99@gmail.com>
>>>> Reply-To:  <hdfs-user@hadoop.apache.org>
>>>> Date:  Mon, 9 Jan 2012 16:29:18 +0800
>>>> To:  <hdfs-user@hadoop.apache.org>
>>>> Subject:  Re: How-to use DFSClient's BlockReader from Java
>>>>
>>>>
>>>> hi David     Please refer to the method "DFSInputStream#blockSeekTo",
>>>>it
>>>> has same purpose with you.
>>>>
>>>>
>>>>***********************************************************************
>>>>**
>>>>**
>>>>        LocatedBlock targetBlock = getBlockAt(target, true);
>>>>        assert (target==this.pos) : "Wrong postion " + pos + " expect
>>>>" +
>>>> target;
>>>>        long offsetIntoBlock = target - targetBlock.getStartOffset();
>>>>
>>>>        DNAddrPair retval = chooseDataNode(targetBlock);
>>>>        chosenNode = retval.info <http://retval.info>;
>>>>        InetSocketAddress targetAddr = retval.addr;
>>>>
>>>>        try {
>>>>          s = socketFactory.createSocket();
>>>>          NetUtils.connect(s, targetAddr, socketTimeout);
>>>>          s.setSoTimeout(socketTimeout);
>>>>          Block blk = targetBlock.getBlock();
>>>>          Token<BlockTokenIdentifier> accessToken =
>>>> targetBlock.getBlockToken();
>>>>
>>>>          blockReader = BlockReader.newBlockReader(s, src,
>>>> blk.getBlockId(),
>>>>              accessToken,
>>>>              blk.getGenerationStamp(),
>>>>              offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
>>>>              buffersize, verifyChecksum, clientName);
>>>>
>>>>
>>>>
>>>>***********************************************************************
>>>>**
>>>>**
>>>>
>>>>
>>>> -Regards
>>>> Denny Ye
>>>>
>>>> 2012/1/6 David Pavlis <david.pavlis@javlin.eu>
>>>>
>>>> Hi,
>>>>
>>>> I am relatively new to Hadoop and I am trying to utilize HDFS for own
>>>> application where I want to take advantage of data partitioning HDFS
>>>> performs.
>>>>
>>>> The idea is that I get list of individual blocks - BlockLocations of
>>>> particular file and then directly read those (go to individual
>>>>DataNodes).
>>>> So far I found org.apache.hadoop.hdfs.DFSClient.BlockReader to be the
>>>>way
>>>> to go.
>>>>
>>>> However I am struggling with instantiating the BlockReader() class,
>>>>namely
>>>> creating the "Token<BlockTokenIdentifier>".
>>>>
>>>> Is there an example Java code showing how to access individual blocks
>>>>of
>>>> particular file stored on HDFS ?
>>>>
>>>> Thanks in advance,
>>>>
>>>> David.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>--
>>>Todd Lipcon
>>>Software Engineer, Cloudera
>>
>>
>
>
>
>-- 
>Todd Lipcon
>Software Engineer, Cloudera



Mime
View raw message