pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Pig Wiki] Update of "LoadStoreMigrationGuide" by PradeepKamath
Date Wed, 10 Feb 2010 22:59:19 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The "LoadStoreMigrationGuide" page has been changed by PradeepKamath.
http://wiki.apache.org/pig/LoadStoreMigrationGuide?action=diff&rev1=12&rev2=13

--------------------------------------------------

  
  An example of how a simple !LoadFunc implementation based on old interface can be converted
to the new interfaces is shown in the Examples section below. 
  
+ = StoreFunc Migration =
  
+ The main change is that the new !StoreFunc API is based on a !OutputFormat to read the data.
Implementations can choose to use existing !OutputFormat like !TextOutputFormat or implement
a new one.
+ 
+ An example of how a simple !StoreFunc implementation based on old interface can be converted
to the new interfaces is shown in the Examples section below. 
  
  = Examples =
  
@@ -159, +163 @@

      private TupleFactory mTupleFactory = TupleFactory.getInstance();
      private static final int BUFFER_SIZE = 1024;
      
-     public PigStorage() {
+     public SimpleTextLoader() {
      }
      
      /**
@@ -169, +173 @@

       *            the single byte character that is used to separate fields.
       *            ("\t" is the default.)
       */
-     public PigStorage(String delimiter) {
+     public SimpleTextLoader(String delimiter) {
          this();
          if (delimiter.length() == 1) {
              this.fieldDel = (byte)delimiter.charAt(0);
@@ -263, +267 @@

  The storer implementation in the example is a storer for text data with line delimiter as
'\n' and '\t' as default field delimiter (which can be overridden by passing a different field
delimiter in the constructor) - this is similar to current !PigStorage storer in Pig. The
new implementation uses an existing Hadoop supported !OutputFormat - !TextOutputFormat as
the underlying !OutputFormat.
  
  === Old Implementation ===
+ {{{
+ public class SimpleTextStorer implements StoreFunc {
+         
+     protected byte recordDel = '\n';
+     protected byte fieldDel = '\t';
+     
+     protected static final String UTF8 = "UTF-8";
+     
+     public SimpleTextStorer() {}
  
+     /**
+      * Constructs a Pig storer that uses specified character as a field delimiter.
+      * 
+      * @param delimiter
+      *            the single byte character that is used to separate fields.
+      *            ("\t" is the default.)
+      */
+     public SimpleTextStorer(String delimiter) {
+         this();
+         if (delimiter.length() == 1) {
+             this.fieldDel = (byte)delimiter.charAt(0);
+         } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
+             switch (delimiter.charAt(1)) {
+             case 't':
+                 this.fieldDel = (byte)'\t';
+                 break;
+ 
+             case 'x':
+             case 'u':
+                 this.fieldDel =
+                     Integer.valueOf(delimiter.substring(2)).byteValue();
+                 break;
+ 
+             default:                
+                 throw new RuntimeException("Unknown delimiter " + delimiter);
+             }
+         } else {            
+             throw new RuntimeException("PigStorage delimeter must be a single character");
+         }
+     }
+ 
+     OutputStream mOut;
+     public void bindTo(OutputStream os) throws IOException {
+         mOut = os;
+     }
+ 
+     @SuppressWarnings("unchecked")
+     private void putField(Object field) throws IOException {
+         //string constants for each delimiter
+         String tupleBeginDelim = "(";
+         String tupleEndDelim = ")";
+         String bagBeginDelim = "{";
+         String bagEndDelim = "}";
+         String mapBeginDelim = "[";
+         String mapEndDelim = "]";
+         String fieldDelim = ",";
+         String mapKeyValueDelim = "#";
+ 
+         switch (DataType.findType(field)) {
+         case DataType.NULL:
+             break; // just leave it empty
+ 
+         case DataType.BOOLEAN:
+             mOut.write(((Boolean)field).toString().getBytes());
+             break;
+ 
+         case DataType.INTEGER:
+             mOut.write(((Integer)field).toString().getBytes());
+             break;
+ 
+         case DataType.LONG:
+             mOut.write(((Long)field).toString().getBytes());
+             break;
+ 
+         case DataType.FLOAT:
+             mOut.write(((Float)field).toString().getBytes());
+             break;
+ 
+         case DataType.DOUBLE:
+             mOut.write(((Double)field).toString().getBytes());
+             break;
+ 
+         case DataType.BYTEARRAY: {
+             byte[] b = ((DataByteArray)field).get();
+             mOut.write(b, 0, b.length);
+             break;
+                                  }
+ 
+         case DataType.CHARARRAY:
+             // oddly enough, writeBytes writes a string
+             mOut.write(((String)field).getBytes(UTF8));
+             break;
+ 
+         case DataType.MAP:
+             boolean mapHasNext = false;
+             Map<String, Object> m = (Map<String, Object>)field;
+             mOut.write(mapBeginDelim.getBytes(UTF8));
+             for(Map.Entry<String, Object> e: m.entrySet()) {
+                 if(mapHasNext) {
+                     mOut.write(fieldDelim.getBytes(UTF8));
+                 } else {
+                     mapHasNext = true;
+                 }
+                 putField(e.getKey());
+                 mOut.write(mapKeyValueDelim.getBytes(UTF8));
+                 putField(e.getValue());
+             }
+             mOut.write(mapEndDelim.getBytes(UTF8));
+             break;
+ 
+         case DataType.TUPLE:
+             boolean tupleHasNext = false;
+             Tuple t = (Tuple)field;
+             mOut.write(tupleBeginDelim.getBytes(UTF8));
+             for(int i = 0; i < t.size(); ++i) {
+                 if(tupleHasNext) {
+                     mOut.write(fieldDelim.getBytes(UTF8));
+                 } else {
+                     tupleHasNext = true;
+                 }
+                 try {
+                     putField(t.get(i));
+                 } catch (ExecException ee) {
+                     throw ee;
+                 }
+             }
+             mOut.write(tupleEndDelim.getBytes(UTF8));
+             break;
+ 
+         case DataType.BAG:
+             boolean bagHasNext = false;
+             mOut.write(bagBeginDelim.getBytes(UTF8));
+             Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
+             while(tupleIter.hasNext()) {
+                 if(bagHasNext) {
+                     mOut.write(fieldDelim.getBytes(UTF8));
+                 } else {
+                     bagHasNext = true;
+                 }
+                 putField((Object)tupleIter.next());
+             }
+             mOut.write(bagEndDelim.getBytes(UTF8));
+             break;
+             
+         default: {
+             int errCode = 2108;
+             String msg = "Could not determine data type of field: " + field;
+             throw new ExecException(msg, errCode, PigException.BUG);
+         }
+         
+         }
+     }
+ 
+     public void putNext(Tuple f) throws IOException {
+         // I have to convert integer fields to string, and then to bytes.
+         // If I use a DataOutputStream to convert directly from integer to
+         // bytes, I don't get a string representation.
+         int sz = f.size();
+         for (int i = 0; i < sz; i++) {
+             Object field;
+             try {
+                 field = f.get(i);
+             } catch (ExecException ee) {
+                 throw ee;
+             }
+ 
+             putField(field);
+ 
+             if (i == sz - 1) {
+                 // last field in tuple.
+                 mOut.write(recordDel);
+             } else {
+                 mOut.write(fieldDel);
+             }
+         }
+     }
+ 
+     public void finish() throws IOException {
+     }
+ 
+     /* (non-Javadoc)
+      * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+      */
+    
+     public Class getStorePreparationClass() throws IOException {
+         return null;
+     }
+ 
+ }
+ }}}
  === New Implementation ===
  
+ {{{
+ 
+ }}}
+ 

Mime
View raw message