pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pradeep...@apache.org
Subject svn commit: r903430 - in /hadoop/pig/branches/load-store-redesign: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Date Tue, 26 Jan 2010 21:38:13 GMT
Author: pradeepkth
Date: Tue Jan 26 21:38:13 2010
New Revision: 903430

URL: http://svn.apache.org/viewvc?rev=903430&view=rev
Log:
PIG-1200: Using TableInputFormat in HBaseStorage (zjffdu via pradeepkth)

Removed:
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
Modified:
    hadoop/pig/branches/load-store-redesign/CHANGES.txt
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=903430&r1=903429&r2=903430&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Tue Jan 26 21:38:13 2010
@@ -22,6 +22,8 @@
 
 INCOMPATIBLE CHANGES
 
+PIG-1200: Using TableInputFormat in HBaseStorage (zjffdu via pradeepkth)
+
 PIG-1148: Move splitable logic from pig latin to InputFormat (zjffdu via
 pradeepkth)
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=903430&r1=903429&r2=903430&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
(original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Tue Jan 26 21:38:13 2010
@@ -16,155 +16,124 @@
  */
 package org.apache.pig.backend.hadoop.hbase;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.pig.LoadFunc;
-import org.apache.pig.Slice;
-import org.apache.pig.Slicer;
-import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 /**
  * A <code>Slicer</code> that split the hbase table into {@link HBaseSlice}s.
  * And a load function will provided to do none load operations, the actually
  * load operatrions will be done in {@link HBaseSlice}.
  */
-public class HBaseStorage extends LoadFunc implements Slicer {
+public class HBaseStorage extends LoadFunc {
 
-    private byte[][] m_cols;
-    private HTable m_table;
-    private HBaseConfiguration m_conf;
-
-    private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
-
-    // HBase Slicer
-    // Creates a slice per region of a specified table.
-
-    /**
-     * Constructor. Construct a HBase Table loader to load the cells of the
-     * provided columns.
-     * 
-     * @param columnList
-     *            columnlist that is a presented string delimited by space.
-     */
-    public HBaseStorage(String columnList) {
-        String[] colNames = columnList.split(" ");
-        m_cols = new byte[colNames.length][];
-        for (int i = 0; i < m_cols.length; i++) {
-            m_cols[i] = Bytes.toBytes(colNames[i]);
-        }
-
-        m_conf = new HBaseConfiguration();
-    }
-
-    @Override
-    public Slice[] slice(DataStorage store, String tablename)
-            throws IOException {
-        validate(store, tablename);
-
-        byte[][] startKeys = m_table.getStartKeys();
-        if (startKeys == null || startKeys.length == 0) {
-            throw new IOException("Expecting at least one region");
-        }
-        if (m_cols == null || m_cols.length == 0) {
-            throw new IOException("Expecting at least one column");
-        }
-        // one region one slice
-        Slice[] slices = new Slice[startKeys.length];
-        for (int i = 0; i < startKeys.length; i++) {
-            String regionLocation = m_table.getRegionLocation(startKeys[i])
-                    .getServerAddress().getHostname();
-            slices[i] = new HBaseSlice(m_table.getTableName(), startKeys[i],
-                    ((i + 1) < startKeys.length) ? startKeys[i + 1]
-                            : HConstants.EMPTY_START_ROW, m_cols,
-                    regionLocation);
-            LOG.info("slice: " + i + "->" + slices[i]);
-        }
+	private byte[][] m_cols;
+	private HTable m_table;
+	private Configuration m_conf=new Configuration();
+	private RecordReader reader;
+	private Scan scan=new Scan();
+	
+	private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
+
+	// HBase Slicer
+	// Creates a slice per region of a specified table.
+
+	/**
+	 * Constructor. Construct a HBase Table loader to load the cells of the
+	 * provided columns.
+	 * 
+	 * @param columnList
+	 *            columnlist that is a presented string delimited by space.
+	 */
+	public HBaseStorage(String columnList) {
+		String[] colNames = columnList.split(" ");
+		m_cols = new byte[colNames.length][];
+		for (int i = 0; i < m_cols.length; i++) {
+			m_cols[i] = Bytes.toBytes(colNames[i]);
+			scan.addColumn(m_cols[i]);
+		}		
+	}
+
+
+	@Override
+	public Tuple getNext() throws IOException {
+		try {
+			if (reader.nextKeyValue()) {
+				ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
+						.getCurrentKey();
+				Result result = (Result) reader.getCurrentValue();
+				Tuple tuple=TupleFactory.getInstance().newTuple(m_cols.length);
+				for (int i=0;i<m_cols.length;++i){
+					tuple.set(i, new DataByteArray(result.getValue(m_cols[i])));
+				}
+				return tuple;
+			}
+		} catch (InterruptedException e) {
+			throw new IOException(e);
+		}
+		return null;
+	}
+
+	@Override
+	public InputFormat getInputFormat() {
+		TableInputFormat inputFormat = new TableInputFormat();
+		inputFormat.setConf(m_conf);
+		return inputFormat;
+	}
+
+	@Override
+	public void prepareToRead(RecordReader reader, PigSplit split) {
+		this.reader = reader;
+	}
 
-        return slices;
-    }
-
-    @Override
-    public void validate(DataStorage store, String tablename)
-            throws IOException {
-        ensureTable(tablename);
-    }
-
-    private void ensureTable(String tablename) throws IOException {
-        LOG.info("tablename: "+tablename);
-
-        // We're looking for the right scheme here (actually, we don't
-        // care what the scheme is as long as it is one and it's
-        // different from hdfs and file. If the user specified to use
-        // the multiquery feature and did not specify a scheme we will
-        // have transformed it to an absolute path. In that case we'll
-        // take the last component and guess that's what was
-        // meant. We'll print a warning in that case.
-        int index;
-        if(-1 != (index = tablename.indexOf("://"))) {
-            if (tablename.startsWith("hdfs:") 
-                || tablename.startsWith("file:")) {
-                index = tablename.lastIndexOf("/");
-                if (-1 == index) {
-                    index = tablename.lastIndexOf("\\");
-                }
-
-                if (-1 == index) {
-                    throw new IOException("Got tablename: "+tablename
-                        +". Either turn off multiquery (-no_multiquery)"
-                        +" or specify load path as \"hbase://<tablename>\".");
-                } else {
-                    String in = tablename;
-                    tablename = tablename.substring(index+1);
-                    LOG.warn("Got tablename: "+in+" Assuming you meant table: "
-                             +tablename+". Either turn off multiquery (-no_multiquery) "
-                             +"or specify load path as \"hbase://<tablename>\" "
-                             +"to avoid this warning.");
-                }
-            } else {
-                tablename = tablename.substring(index+3);
-            }
-        }
-
-        if (m_table == null) {
-            m_table = new HTable(m_conf, tablename);
-        }
-    }
-
-    // HBase LoadFunc
-    // It is just a mock class to let the UDF be casted to a LOADFUNC during
-    // parsing.
-    
-
-    @Override
-    public Tuple getNext() throws IOException {
-        // do nothing
-        return null;
-    }
-
-    @Override
-    public InputFormat getInputFormat() {
-        return null;
-    }
-
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
+	@Override
     public void setLocation(String location, Job job) throws IOException {
-        throw new UnsupportedOperationException();
-    }
+        if (location.startsWith("hbase://")){
+        	m_conf.set(TableInputFormat.INPUT_TABLE, location.substring(8));
+        }else{
+        	m_conf.set(TableInputFormat.INPUT_TABLE, location);
+        }
+        m_conf.set(TableInputFormat.SCAN, convertScanToString(scan));
+    }
+
+	@Override
+	public String relativeToAbsolutePath(String location, Path curDir)
+			throws IOException {
+		return location;
+	}
+	
+	private static String convertScanToString(Scan scan) {
+
+		try {
+			ByteArrayOutputStream out = new ByteArrayOutputStream();
+			DataOutputStream dos = new DataOutputStream(out);
+			scan.write(dos);
+			return Base64.encodeBytes(out.toByteArray());
+		} catch (IOException e) {
+			LOG.error(e);
+			return "";
+		}
+
+	}
 }



Mime
View raw message