hadoop-hive-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 代志远-dai zhiyuan (JIRA) <j...@apache.org>
Subject [jira] Updated: (HIVE-959) Hive to use new interfaces in Hadoop 0.20
Date Mon, 14 Dec 2009 11:39:18 GMT

     [ https://issues.apache.org/jira/browse/HIVE-959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

代志远-dai zhiyuan updated HIVE-959:
---------------------------------

    Affects Version/s: 0.5.0
         Release Note: 
Modified:Index: Hive/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextInputFormat.java
===================================================================
--- Hive/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextInputFormat.java
(版本 890273)
+++ Hive/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextInputFormat.java
(工作副本)
@@ -21,155 +21,167 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
+import java.util.List;
 
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.LineRecordReader;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
 /**
  * FileInputFormat for base64 encoded text files.
  * 
- * Each line is a base64-encoded record.
- * The key is a LongWritable which is the offset.
- * The value is a BytesWritable containing the base64-decoded bytes.
+ * Each line is a base64-encoded record. The key is a LongWritable which is the offset. The
value is a BytesWritable containing the base64-decoded
+ * bytes.
  * 
- * This class accepts a configurable parameter:
- * "base64.text.input.format.signature"
+ * This class accepts a configurable parameter: "base64.text.input.format.signature"
  * 
- * The UTF-8 encoded signature will be compared with the beginning
- * of each decoded bytes.  If they don't match, the record is discarded.
- * If they match, the signature is stripped off the data.
+ * The UTF-8 encoded signature will be compared with the beginning of each decoded bytes.
If they don't match, the record is discarded. If they match,
+ * the signature is stripped off the data.
  */
-public class Base64TextInputFormat
-  implements InputFormat<LongWritable, BytesWritable>, JobConfigurable {
-  
-  
-  public static class Base64LineRecordReader
-    implements RecordReader<LongWritable, BytesWritable>, JobConfigurable {
+public class Base64TextInputFormat extends InputFormat<LongWritable, BytesWritable>
implements Configurable {
 
-    LineRecordReader reader;
-    Text text;
+	public static class Base64LineRecordReader extends RecordReader<LongWritable, BytesWritable>
implements Configurable {
 
-    public Base64LineRecordReader(LineRecordReader reader) {
-      this.reader = reader;
-      text = reader.createValue();
-    }
-    
-    @Override
-    public void close() throws IOException {
-      reader.close();
-    }
+		LineRecordReader reader;
+		Text text;
+		BytesWritable value;
+		Configuration job;
+		
+		public Base64LineRecordReader(LineRecordReader reader) {
+			this.reader = reader;
+			text = reader.getCurrentValue();
+		}
 
-    @Override
-    public LongWritable createKey() {
-      return reader.createKey();
-    }
+		@Override
+		public void close() throws IOException {
+			reader.close();
+		}
 
-    @Override
-    public BytesWritable createValue() {
-      return new BytesWritable();
-    }
+		@Override
+		public LongWritable getCurrentKey() {
+			return reader.getCurrentKey();
+		}
 
-    @Override
-    public long getPos() throws IOException {
-      return reader.getPos();
-    }
+		@Override
+		public BytesWritable getCurrentValue() {
+			return new BytesWritable();
+		}
 
-    @Override
-    public float getProgress() throws IOException {
-      return reader.getProgress();
-    }
+		@Override
+		public float getProgress() throws IOException {
+			return reader.getProgress();
+		}
 
-    @Override
-    public boolean next(LongWritable key, BytesWritable value)
-        throws IOException {
-      while (reader.next(key, text)) {
-        // text -> byte[] -> value
-        byte[] textBytes = text.getBytes();
-        int length = text.getLength();
-        
-        // Trim additional bytes
-        if (length != textBytes.length) {
-          textBytes = Arrays.copyOf(textBytes, length);
-        }
-        byte[] binaryData = base64.decode(textBytes);
+		@Override
+		public boolean nextKeyValue() throws IOException {
+			value = new BytesWritable();
+			while (reader.nextKeyValue()) {
+				// text -> byte[] -> value
+				byte[] textBytes = text.getBytes();
+				int length = text.getLength();
 
-        // compare data header with signature
-        int i;
-        for (i = 0; i < binaryData.length && i < signature.length &&
-                        binaryData[i] == signature[i]; ++i);
+				// Trim additional bytes
+				if (length != textBytes.length) {
+					textBytes = Arrays.copyOf(textBytes, length);
+				}
+				byte[] binaryData = base64.decode(textBytes);
 
-        // return the row only if it's not corrupted
-        if (i == signature.length) {
-          value.set(binaryData, signature.length,
-                    binaryData.length - signature.length);
-          return true;
-        }
-      }
-      // no more data
-      return false;
-    }
-    
-    private byte[] signature;
-    private Base64 base64 = new Base64();
+				// compare data header with signature
+				int i;
+				for (i = 0; i < binaryData.length && i < signature.length && binaryData[i]
== signature[i]; ++i);
 
-    @Override
-    public void configure(JobConf job) {
-      try {
-        String signatureString = job.get("base64.text.input.format.signature");
-        if (signatureString != null) {
-          signature = signatureString.getBytes("UTF-8");
-        } else {
-          signature = new byte[0];
-        }
-      } catch (UnsupportedEncodingException e) {
-        e.printStackTrace();
-      }      
-    }
-  }
-  
-  TextInputFormat format;
-  JobConf job;
-  
-  public Base64TextInputFormat() {
-    format = new TextInputFormat();
-  }
-  
-  @Override
-  public void configure(JobConf job) {
-    this.job = job;
-    format.configure(job);
-  }
-  
-  public RecordReader<LongWritable, BytesWritable> getRecordReader(
-      InputSplit genericSplit, JobConf job,
-      Reporter reporter) throws IOException {
-    reporter.setStatus(genericSplit.toString());
-    Base64LineRecordReader reader = new Base64LineRecordReader(
-        new LineRecordReader(job, (FileSplit) genericSplit));
-    reader.configure(job);
-    return reader;
-  }
+				// return the row only if it's not corrupted
+				if (i == signature.length) {
+					value.set(binaryData, signature.length, binaryData.length - signature.length);
+					return true;
+				}
+			}
+			// no more data
+			return false;
+		}
 
-  @Override
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    return format.getSplits(job, numSplits);
-  }
+		private byte[] signature;
+		private Base64 base64 = new Base64();
 
-  // Cannot put @Override here because hadoop 0.18+ removed this method.
-  public void validateInput(JobConf job) throws IOException {
-    ShimLoader.getHadoopShims().inputFormatValidateInput(format, job);
-  }
+		@Override
+		public void setConf(Configuration job) {
+			this.job = job;
+			try {
+				String signatureString = job.get("base64.text.input.format.signature");
+				if (signatureString != null) {
+					signature = signatureString.getBytes("UTF-8");
+				} else {
+					signature = new byte[0];
+				}
+			} catch (UnsupportedEncodingException e) {
+				e.printStackTrace();
+			}
+		}
 
+		@Override
+		public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
+			// TODO Auto-generated method stub
+		}
+
+		@Override
+		public Configuration getConf() {
+			// TODO Auto-generated method stub
+			return job;
+		}
+	}
+
+	TextInputFormat format;
+	Configuration job;
+
+	public Base64TextInputFormat() {
+		format = new TextInputFormat();
+	}
+
+	@Override
+	public void setConf(Configuration job) {
+		this.job = job;
+		if (format instanceof Configurable) {
+			((Configurable) format).setConf(job);
+		}
+	}
+
+
+	@Override
+	public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit genericSplit,
TaskAttemptContext context)
+			throws IOException, InterruptedException {
+		LineRecordReader lineReader = new LineRecordReader();
+		lineReader.initialize((FileSplit) genericSplit, context);
+		Base64LineRecordReader reader = new Base64LineRecordReader(lineReader);
+		reader.initialize(genericSplit, context);
+		reader.setConf(job);
+		return reader;
+	}
+
+	@Override
+	public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
{
+		return format.getSplits(context);
+	}
+
+	@Override
+	public Configuration getConf() {
+		// TODO Auto-generated method stub
+		return job;
+	}
+
+	// Cannot put @Override here because hadoop 0.18+ removed this method.
+	public void validateInput(Configuration job) throws IOException {
+		ShimLoader.getHadoopShims().inputFormatValidateInput(format, job);
+	}
 }

         Hadoop Flags: [Incompatible change]
               Status: Patch Available  (was: Open)

> Hive to use new interfaces in Hadoop 0.20
> -----------------------------------------
>
>                 Key: HIVE-959
>                 URL: https://issues.apache.org/jira/browse/HIVE-959
>             Project: Hadoop Hive
>          Issue Type: Improvement
>    Affects Versions: 0.5.0
>            Reporter: Zheng Shao
>
> Hadoop 0.20 has a lot of new interfaces including MapReduce etc.Hive should move to the
new interface.
> Hive won't be compatible with earlier hadoop versions after this change, so we might
want to wait till hive 0.5 is branched out first.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message