pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Pig Wiki] Update of "LoadStoreMigrationGuide" by PradeepKamath
Date Wed, 10 Feb 2010 01:32:56 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The "LoadStoreMigrationGuide" page has been changed by PradeepKamath.
http://wiki.apache.org/pig/LoadStoreMigrationGuide?action=diff&rev1=1&rev2=2

--------------------------------------------------

- This page describes how to migrate from the old LoadFunc and StoreFunc interface (as of
Pig 0.6.0) to the new interfaces proposed in http://wiki.apache.org/pig/LoadStoreRedesignProposal
and planned to be released in Pig 0.7.0.
+ This page describes how to migrate from the old !LoadFunc and !StoreFunc interface (as of
Pig 0.6.0) to the new interfaces proposed in http://wiki.apache.org/pig/LoadStoreRedesignProposal
and planned to be released in Pig 0.7.0.
  
  = LoadFunc Migration =
- The methods in the old LoadFunc have been split among a LoadFunc abstract class and 3 new
interfaces - LoadMetadata (methods to deal with metadata), LoadPushDown (methods to push operations
from pig runtime into loader implementations) and LoadCaster with methods to convert byte
arrays to specific types. An example of how a simple LoadFunc implementation based on old
interface can be converted to the new interfaces will be shown below. The loader implementation
in the example is a loader for text data with line delimiter as '\n' and '\t' as default field
delimiter (which can be overridden by passing a different field delimiter in the constructor)
- this is similar to current PigStorage loader in Pig.
+ The methods in the old !LoadFunc have been split among a !LoadFunc abstract class and 3
new interfaces - !LoadMetadata (methods to deal with metadata), !LoadPushDown (methods to
push operations from pig runtime into loader implementations) and !LoadCaster with methods
to convert byte arrays to specific types. An example of how a simple !LoadFunc implementation
based on old interface can be converted to the new interfaces will be shown below. The loader
implementation in the example is a loader for text data with line delimiter as '\n' and '\t'
as default field delimiter (which can be overridden by passing a different field delimiter
in the constructor) - this is similar to current !PigStorage loader in Pig.
  
  == Old Implementation ==
+ {{{
+ /**
+  * A load function that parses a line of input into fields using a delimiter to set the
fields. 
+  */
+ public class SimpleTextLoader extends Utf8StorageConverter {
+     protected BufferedPositionedInputStream in = null;
+     protected final Log mLog = LogFactory.getLog(getClass());
+         
+     long                end            = Long.MAX_VALUE;
+     private byte recordDel = '\n';
+     private byte fieldDel = '\t';
+     private ByteArrayOutputStream mBuf = null;
+     private ArrayList<Object> mProtoTuple = null;
+     private static final String UTF8 = "UTF-8";
+     
+     public SimpleTextLoader() {
+     }
+ 
+     /**
+      * Constructs a Pig loader that uses specified character as a field delimiter.
+      * 
+      * @param delimiter
+      *            the single byte character that is used to separate fields.
+      *            ("\t" is the default.)
+      */
+     public SimpleTextLoader(String delimiter) {
+         this();
+         if (delimiter.length() == 1) {
+             this.fieldDel = (byte)delimiter.charAt(0);
+         } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
+             switch (delimiter.charAt(1)) {
+             case 't':
+                 this.fieldDel = (byte)'\t';
+                 break;
+ 
+             case 'x':
+             case 'u':
+                 this.fieldDel =
+                     Integer.valueOf(delimiter.substring(2)).byteValue();
+                 break;
+ 
+             default:                
+                 throw new RuntimeException("Unknown delimiter " + delimiter);
+             }
+         } else {            
+             throw new RuntimeException("PigStorage delimeter must be a single character");
+         }
+     }
+ 
+     public long getPosition() throws IOException {
+         return in.getPosition();
+     }
+ 
+     public long skip(long n) throws IOException {
+         
+         long skipped = in.skip(n-1);
+         prevByte = (byte)in.read();
+         if(prevByte == -1) // End of stream.
+             return skipped;
+         else
+             return skipped+1;
+     }
+ 
+     public Tuple getNext() throws IOException {
+         if (in == null || in.getPosition() > end) {
+             return null;
+         }
+ 
+         if (mBuf == null) mBuf = new ByteArrayOutputStream(4096);
+         mBuf.reset();
+         while (true) {
+             // BufferedPositionedInputStream is buffered, so I don't need
+             // to buffer.
+             int b = in.read();
+             prevByte = (byte)b;
+             
+             if (b == fieldDel) {
+                 readField();
+             } else if (b == recordDel) {
+                 readField();
+                 //Tuple t =  mTupleFactory.newTuple(mProtoTuple);
+                 Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
+                 mProtoTuple = null;
+                 return t;
+             } else if (b == -1) {
+                 // hit end of file
+                 return null;
+             } else {
+                 mBuf.write(b);
+             }
+         }
+     }
+ 
+     public Tuple getSampledTuple() throws IOException {
+        
+         if(prevByte == null || prevByte == recordDel) 
+             // prevByte = null when this is called for the first time, in that case bindTo
would have already
+             // called getNext() if it was required.
+         return getNext();
+         
+         else{   // We are in middle of record. So, we skip this and return the next one.
+             getNext();
+             return getNext();            
+         }
+     }
+ 
+     public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long
end) throws IOException {
+         this.in = in;
+         this.end = end;
+         
+         // Since we are not block aligned we throw away the first
+         // record and cound on a different instance to read it
+         if (offset != 0) {
+             getNext();
+         }
+     }
+     
+     OutputStream mOut;
+     public void bindTo(OutputStream os) throws IOException {
+         mOut = os;
+     }
+ 
+     private void putField(Object field) throws IOException {
+         //string constants for each delimiter
+         String tupleBeginDelim = "(";
+         String tupleEndDelim = ")";
+         String bagBeginDelim = "{";
+         String bagEndDelim = "}";
+         String mapBeginDelim = "[";
+         String mapEndDelim = "]";
+         String fieldDelim = ",";
+         String mapKeyValueDelim = "#";
+ 
+         switch (DataType.findType(field)) {
+         case DataType.NULL:
+             break; // just leave it empty
+ 
+         case DataType.BOOLEAN:
+             mOut.write(((Boolean)field).toString().getBytes());
+             break;
+ 
+         case DataType.INTEGER:
+             mOut.write(((Integer)field).toString().getBytes());
+             break;
+ 
+         case DataType.LONG:
+             mOut.write(((Long)field).toString().getBytes());
+             break;
+ 
+         case DataType.FLOAT:
+             mOut.write(((Float)field).toString().getBytes());
+             break;
+ 
+         case DataType.DOUBLE:
+             mOut.write(((Double)field).toString().getBytes());
+             break;
+ 
+         case DataType.BYTEARRAY: {
+             byte[] b = ((DataByteArray)field).get();
+             mOut.write(b, 0, b.length);
+             break;
+                                  }
+ 
+         case DataType.CHARARRAY:
+             // oddly enough, writeBytes writes a string
+             mOut.write(((String)field).getBytes(UTF8));
+             break;
+ 
+         case DataType.MAP:
+             boolean mapHasNext = false;
+             Map<String, Object> m = (Map<String, Object>)field;
+             mOut.write(mapBeginDelim.getBytes(UTF8));
+             for(String s: m.keySet()) {
+                 if(mapHasNext) {
+                     mOut.write(fieldDelim.getBytes(UTF8));
+                 } else {
+                     mapHasNext = true;
+                 }
+                 putField(s);
+                 mOut.write(mapKeyValueDelim.getBytes(UTF8));
+                 putField(m.get(s));
+             }
+             mOut.write(mapEndDelim.getBytes(UTF8));
+             break;
+ 
+         case DataType.TUPLE:
+             boolean tupleHasNext = false;
+             Tuple t = (Tuple)field;
+             mOut.write(tupleBeginDelim.getBytes(UTF8));
+             for(int i = 0; i < t.size(); ++i) {
+                 if(tupleHasNext) {
+                     mOut.write(fieldDelim.getBytes(UTF8));
+                 } else {
+                     tupleHasNext = true;
+                 }
+                 try {
+                     putField(t.get(i));
+                 } catch (ExecException ee) {
+                     throw ee;
+                 }
+             }
+             mOut.write(tupleEndDelim.getBytes(UTF8));
+             break;
+ 
+         case DataType.BAG:
+             boolean bagHasNext = false;
+             mOut.write(bagBeginDelim.getBytes(UTF8));
+             Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
+             while(tupleIter.hasNext()) {
+                 if(bagHasNext) {
+                     mOut.write(fieldDelim.getBytes(UTF8));
+                 } else {
+                     bagHasNext = true;
+                 }
+                 putField((Object)tupleIter.next());
+             }
+             mOut.write(bagEndDelim.getBytes(UTF8));
+             break;
+             
+         default: {
+             int errCode = 2108;
+             String msg = "Could not determine data type of field: " + field;
+             throw new ExecException(msg, errCode, PigException.BUG);
+         }
+         
+         }
+     }
+ 
+     public void putNext(Tuple f) throws IOException {
+         // I have to convert integer fields to string, and then to bytes.
+         // If I use a DataOutputStream to convert directly from integer to
+         // bytes, I don't get a string representation.
+         int sz = f.size();
+         for (int i = 0; i < sz; i++) {
+             Object field;
+             try {
+                 field = f.get(i);
+             } catch (ExecException ee) {
+                 throw ee;
+             }
+ 
+             putField(field);
+ 
+             if (i == sz - 1) {
+                 // last field in tuple.
+                 mOut.write(recordDel);
+             } else {
+                 mOut.write(fieldDel);
+             }
+         }
+     }
+ 
+     public void finish() throws IOException {
+     }
+ 
+     private void readField() {
+         if (mProtoTuple == null) mProtoTuple = new ArrayList<Object>();
+         if (mBuf.size() == 0) {
+             // NULL value
+             mProtoTuple.add(null);
+         } else {
+             // TODO, once this can take schemas, we need to figure out
+             // if the user requested this to be viewed as a certain
+             // type, and if so, then construct it appropriately.
+             byte[] array = mBuf.toByteArray();
+             if (array[array.length-1]=='\r' && os==OS_WINDOWS) {
+                 // This is a java 1.6 function.  Until pig officially moves to
+                 // 1.6 we can't use this.
+                 // array = Arrays.copyOf(array, array.length-1);
+                 byte[] tmp = new byte[array.length - 1];
+                 for (int i = 0; i < array.length - 1; i++) tmp[i] = array[i];
+                 array = tmp;
+             }
+                 
+             if (array.length==0)
+                 mProtoTuple.add(null);
+             else
+                 mProtoTuple.add(new DataByteArray(array));
+         }
+         mBuf.reset();
+     }
+ 
+     /* (non-Javadoc)
+      * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType,
org.apache.pig.backend.datastorage.DataStorage)
+      */
+     public Schema determineSchema(String fileName, ExecType execType,
+             DataStorage storage) throws IOException {
+         // TODO Auto-generated method stub
+         return null;
+     }
+ 
+     public void fieldsToRead(Schema schema) {
+         // do nothing
+     }
+     
+     public boolean equals(Object obj) {
+         return equals((PigStorage)obj);
+     }
+ 
+     public boolean equals(PigStorage other) {
+         return this.fieldDel == other.fieldDel;
+     }
+ 
+     /* (non-Javadoc)
+      * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+      */
+    
+     public Class getStorePreparationClass() throws IOException {
+         // TODO Auto-generated method stub
+         return null;
+     }
+  }
+ 
+ }}}
  == New Implementation ==
  

Mime
View raw message