hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Todd Lipcon <t...@cloudera.com>
Subject Re: How-to use DFSClient's BlockReader from Java
Date Tue, 10 Jan 2012 18:19:30 GMT
On Tue, Jan 10, 2012 at 4:32 AM, David Pavlis <david.pavlis@javlin.eu> wrote:
> Hi Todd,
>
> Understand, I will re-think the MR approach. Nonetheless I like the idea
> with getting the block id and accessing the locally stored file directly

You really don't want to do this - what happens when the balancer
moves the block from under you?

> Is there a way (public interface) to find for particular datanode, where
> the local storage is rooted (which local subdirs it uses) ?
> I found that this info is available in Storage class, but is there a
> "public" way of getting it - through some protocol or so ?

No, it's also a private API. If you use the FileSystem API, and you're
on the local node, all of these optimizations will happen
automatically for you, plus they'll keep working when you upgrade.
Even MapReduce just uses the public APIs.

-Todd

>
>
> On 9.1.12 9:30 PM, "Todd Lipcon" <todd@cloudera.com> wrote:
>
>>Hi David,
>>
>>I'd definitely recommend using MapReduce for this. What you've
>>described is essentially identical to MR.
>>
>>Otherwise, you should use the public API
>>FileSystem.getFileBlockLocations(), and then read the host names out
>>of the returned BlockLocation struct. Then just use a normal
>>FileSystem open call from that node - it will automatically pick the
>>local replica for you without any further work.
>>
>>-Todd
>>
>>On Mon, Jan 9, 2012 at 12:01 PM, David Pavlis <david.pavlis@javlin.eu>
>>wrote:
>>> Hi Todd,
>>>
>>> Thanks for letting me know.  OK - here is what I am trying to do (it is
>>>a
>>> POC for now):
>>>
>>> We have an ETL framework which helps with transforming data - parsing
>>> various formats, reading from DBs,
>>> aggregating, sorting, etc..
>>> We do have currently concept of a "cluster" which basically allows input
>>> data (say datafile) be split/partitioned
>>> across several nodes and then one data transformation is executed on
>>>those
>>> data. The way it works is that
>>> we can analyze what the transformation does and if it is supposed to
>>> consume data which is spread over cluster, we
>>> execute a slightly modified copy/instance of that transformation on each
>>> node of the cluster where some piece/partition
>>> of data resides. We do not have concept of any "clustered" filesystem -
>>> our partitioned data reside in ordinary files
>>> and there is no metadata layer on top. If we need one single output data
>>> file, then we just perform merge operation. It is
>>> quite limiting as if we need to manipulate such data, we need to do it
>>> piece by piece (on each participating node).
>>>
>>> So we essentially do split-transform-merge with merge being optional
>>>(can
>>> be part of the transformation directly, so we don't
>>> create temp files).
>>>
>>> Here is the idea with HDFS - each of our transformation node becomes
>>> HDFS's datanode. Then if we are to process particular
>>> input data split over several datanodes, then we just instruct our
>>> transformation nodes to read specific block of the file (block/s
>>> which happens to be on the same physical machine as our transformation
>>> node is also datanode) -hence my interest in BlockReader.
>>>
>>> I was also considering wrapping our transformation job into map-reduce
>>>job
>>> of Hadoop, but that seems a bit limiting and also
>>> we would need to "take" the whole Hadoop stack and give it control over
>>> our jobs. But that still might be the right way.
>>> Also, I need to solve writing of partitioned data - here I would like to
>>> control the block allocation somehow as ideally transformation
>>> running on particular node would be reading locally stored blocks and
>>> outputting data to locally allocated block of HDFS file.
>>>
>>> Well, I hope I explained the situation clearly enough.
>>>
>>> I will be thankful for any comments.
>>>
>>> Regards,
>>>
>>> David.
>>>
>>>
>>>
>>>
>>> On 9.1.12 6:59 PM, "Todd Lipcon" <todd@cloudera.com> wrote:
>>>
>>>>Hi David,
>>>>
>>>>For what it's worth, you should be aware that you're calling internal
>>>>APIs that have no guarantee of stability between versions. I can
>>>>practically guarantee that your code will have to be modified for any
>>>>HDFS upgrade you do. That's why these APIs are undocumented.
>>>>
>>>>Perhaps you can explain what your high-level goal is, here, and we can
>>>>suggest a supported mechanism for achieving it.
>>>>
>>>>-Todd
>>>>
>>>>On Mon, Jan 9, 2012 at 9:56 AM, David Pavlis <david.pavlis@javlin.eu>
>>>>wrote:
>>>>> Hi Denny,
>>>>>
>>>>> Thanks a lot. I was able to make my code work.
>>>>>
>>>>> I am posting a small example below - in case somebody in the future
>>>>>has
>>>>> similar need ;-)
>>>>> (not handling replica datablocks).
>>>>>
>>>>> David.
>>>>>
>>>>>
>>>>>***********************************************************************
>>>>>**
>>>>>**
>>>>> public static void main(String args[]){
>>>>>        String filename="/user/hive/warehouse/sample_07/sample_07.csv";
>>>>>        int DATANODE_PORT = 50010;
>>>>>        int NAMENODE_PORT = 8020;
>>>>>        String HOST_IP = "192.168.1.230";
>>>>>
>>>>>        byte[] buf=new byte[1000];
>>>>>
>>>>>
>>>>>        try{
>>>>>
>>>>>                ClientProtocol client= DFSClient.createNamenode(new
>>>>> InetSocketAddress(HOST_IP,NAMENODE_PORT), new Configuration());
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                LocatedBlocks located=
>>>>>client.getBlockLocations(filename, 0,
>>>>> Long.MAX_VALUE);
>>>>>
>>>>>
>>>>>
>>>>>                for(LocatedBlock block : located.getLocatedBlocks()){
>>>>>                        Socket sock =
>>>>>SocketFactory.getDefault().createSocket();
>>>>>                        InetSocketAddress targetAddr = new
>>>>> InetSocketAddress(HOST_IP,DATANODE_PORT);
>>>>>                        NetUtils.connect(sock, targetAddr,
10000);
>>>>>                        sock.setSoTimeout(10000);
>>>>>
>>>>>
>>>>>                        BlockReader
>>>>>reader=BlockReader.newBlockReader(sock,  filename,
>>>>>                                block.getBlock().getBlockId(),
>>>>>block.getBlockToken(),
>>>>> block.getBlock().getGenerationStamp(), 0,
>>>>>                block.getBlockSize(),
>>>>> 1000);
>>>>>
>>>>>
>>>>>                        int count=0;
>>>>>                        int length;
>>>>>                        while((length=reader.read(buf,0,1000))>0){
>>>>>                                //System.out.print(new
>>>>>String(buf,0,length,"UTF-8"));
>>>>>                                if (length<1000) break;
>>>>>                        }
>>>>>                        reader.close();
>>>>>                        sock.close();
>>>>>                }
>>>>>
>>>>>
>>>>>        }catch(IOException ex){
>>>>>                ex.printStackTrace();
>>>>>        }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>***********************************************************************
>>>>>**
>>>>>**
>>>>>
>>>>>
>>>>>
>>>>> From:  Denny Ye <dennyy99@gmail.com>
>>>>> Reply-To:  <hdfs-user@hadoop.apache.org>
>>>>> Date:  Mon, 9 Jan 2012 16:29:18 +0800
>>>>> To:  <hdfs-user@hadoop.apache.org>
>>>>> Subject:  Re: How-to use DFSClient's BlockReader from Java
>>>>>
>>>>>
>>>>> hi David     Please refer to the method "DFSInputStream#blockSeekTo",
>>>>>it
>>>>> has same purpose with you.
>>>>>
>>>>>
>>>>>***********************************************************************
>>>>>**
>>>>>**
>>>>>        LocatedBlock targetBlock = getBlockAt(target, true);
>>>>>        assert (target==this.pos) : "Wrong postion " + pos + " expect
>>>>>" +
>>>>> target;
>>>>>        long offsetIntoBlock = target - targetBlock.getStartOffset();
>>>>>
>>>>>        DNAddrPair retval = chooseDataNode(targetBlock);
>>>>>        chosenNode = retval.info <http://retval.info>;
>>>>>        InetSocketAddress targetAddr = retval.addr;
>>>>>
>>>>>        try {
>>>>>          s = socketFactory.createSocket();
>>>>>          NetUtils.connect(s, targetAddr, socketTimeout);
>>>>>          s.setSoTimeout(socketTimeout);
>>>>>          Block blk = targetBlock.getBlock();
>>>>>          Token<BlockTokenIdentifier> accessToken =
>>>>> targetBlock.getBlockToken();
>>>>>
>>>>>          blockReader = BlockReader.newBlockReader(s, src,
>>>>> blk.getBlockId(),
>>>>>              accessToken,
>>>>>              blk.getGenerationStamp(),
>>>>>              offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
>>>>>              buffersize, verifyChecksum, clientName);
>>>>>
>>>>>
>>>>>
>>>>>***********************************************************************
>>>>>**
>>>>>**
>>>>>
>>>>>
>>>>> -Regards
>>>>> Denny Ye
>>>>>
>>>>> 2012/1/6 David Pavlis <david.pavlis@javlin.eu>
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am relatively new to Hadoop and I am trying to utilize HDFS for own
>>>>> application where I want to take advantage of data partitioning HDFS
>>>>> performs.
>>>>>
>>>>> The idea is that I get list of individual blocks - BlockLocations of
>>>>> particular file and then directly read those (go to individual
>>>>>DataNodes).
>>>>> So far I found org.apache.hadoop.hdfs.DFSClient.BlockReader to be the
>>>>>way
>>>>> to go.
>>>>>
>>>>> However I am struggling with instantiating the BlockReader() class,
>>>>>namely
>>>>> creating the "Token<BlockTokenIdentifier>".
>>>>>
>>>>> Is there an example Java code showing how to access individual blocks
>>>>>of
>>>>> particular file stored on HDFS ?
>>>>>
>>>>> Thanks in advance,
>>>>>
>>>>> David.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>--
>>>>Todd Lipcon
>>>>Software Engineer, Cloudera
>>>
>>>
>>
>>
>>
>>--
>>Todd Lipcon
>>Software Engineer, Cloudera
>
>



-- 
Todd Lipcon
Software Engineer, Cloudera

Mime
View raw message