hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From java8964 java8964 <java8...@hotmail.com>
Subject RE: Question related to Decompressor interface
Date Tue, 12 Feb 2013 20:21:04 GMT

Can someone share some idea what the Hadoop source code of class org.apache.hadoop.io.compress.BlockDecompressorStream,
method rawReadInt() is trying to do here?
There is a comment in the code this this method shouldn't return negative number, but in my
testing file, it contains the following bytes from the inputStream: 248, 19, 20, 116, which
corresponding to b1, b2, b3, b4.
After the 4 bytes is read fromt the input stream, then the return result will be a negative
number here, as 
(b1 << 24) = -134217728(b2 << 16) = 1245184(b3 << 8) = 5120(b4 <<
0) = 116
I am not sure what logic of this method is trying to do here, can anyone share some idea about
it?
Thanks








  private int rawReadInt() throws IOException {
    int b1 = in.read();
    int b2 = in.read();
    int b3 = in.read();
    int b4 = in.read();
    if ((b1 | b2 | b3 | b4) < 0)
      throw new EOFException();
    return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0));
  }
From: java8964@hotmail.com
To: user@hadoop.apache.org
Subject: Question related to Decompressor interface
Date: Sat, 9 Feb 2013 15:49:31 -0500





HI, 
Currently I am researching about options of encrypting the data in the MapReduce, as we plan
to use the Amazon EMR or EC2 services for our data.
I am thinking that the compression codec is good place to integrate with the encryption logic,
and I found out there are some people having the same idea as mine.
I google around and found out this code:
https://github.com/geisbruch/HadoopCryptoCompressor/
It doesn't seem maintained any more, but it gave me a starting point. I download the source
code, and try to do some tests with it.
It doesn't work out of box. There are some bugs I have to fix to make it work. I believe it
contains 'AES' as an example algorithm.
But right now, I faced a problem when I tried to use it in my testing MapReduer program. Here
is the stack trace I got:
2013-02-08 23:16:47,038 INFO org.apache.hadoop.io.compress.crypto.CryptoBasicDecompressor:
buf length = 512, and offset = 0, length = -132967308java.lang.IndexOutOfBoundsException 
  at java.nio.ByteBuffer.wrap(ByteBuffer.java:352)    at org.apache.hadoop.io.compress.crypto.CryptoBasicDecompressor.setInput(CryptoBasicDecompressor.java:100)
   at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:97)
   at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:83)  
 at java.io.InputStream.read(InputStream.java:82)    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
   at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:114)
   at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:458)
   at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:76)
   at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:85)
   at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)
   at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
   at java.security.AccessController.doPrivileged(Native Method)    at javax.security.auth.Subject.doAs(Subject.java:396)
   at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
   at org.apache.hadoop.mapred.Child.main(Child.java:262)
I know the error is thrown out of this custom CryptoBasicDecompressor class, but I really
have questions related to the interface it implemented: Decompressor.
There is limited document about this interface, for example, when and how the method setInput()
will be invoked. If I want to write my own Decompressor, what do these methods mean in the
interface?In the above case, I enable some debug information, you can see that in this case,
the byte[] array passed to setInput method, only have 512 as the length, but the 3rd parameter
of length passed in is a negative number: -132967308. That caused the IndexOutOfBoundsException.
If I check the GzipDecompressor class of this method in the hadoop, the code will also throw
IndexOutoutBoundsException in this case, so this is a RuntimeException case. Why it happened
in my test case?
Here is my test case:
I have a simpel log text file about 700k. I encrypted it with above code using 'AES'. I can
encrypted and decrypted to get my original content. The file name is foo.log.crypto, this
file extension is registered to invoke this CryptoBasicDecompressor in my testing hadoop using
CDH4.1.2 release (hadoop 2.0). Everything works as I expected. The CryptoBasicDecompressor
is invoked when the input file is foo.log.crypto, as you can see in the above stack trace.
But I don't know why the 3rd parameter (length) in setInput() is a negative number at runtime.
In additional to it, I also have further questions related to use Compressor/Decompressor
to handle the encrypting/decrypting file. Ideally, I wonder if the encrypting/decrypting can
support file splits. This maybe depends the algorithm we are using, is that right? If so,
what kind of algorithm can do that? I am not sure if it likes the compressor cases, most of
them do not support file split. If so, it maybe not good for my requirements.
If we have a 1G file, encrypted in the Amazone S3, after it copied to the HDFS of Amazon EMR,
can each block of the date be decrypted independently by each mapper, then passed to the underline
RecorderReader to be processed totally concurrently? Does any one do this before? If so, what
encryption algorithm does support it? Any idea?
Thanks
Yong 		 	   		   		 	   		  
Mime
View raw message