pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dvrya...@apache.org
Subject svn commit: r1147934 - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/ src/org/apache/pig/builtin/ test/org/apache/pig/test/
Date Mon, 18 Jul 2011 15:39:37 GMT
Author: dvryaboy
Date: Mon Jul 18 15:39:35 2011
New Revision: 1147934

URL: http://svn.apache.org/viewvc?rev=1147934&view=rev
Log:
PIG-2143: Make PigStorage optionally store schema; improve docs.

Added:
    pig/trunk/src/org/apache/pig/builtin/JsonMetadata.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java
    pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    pig/trunk/test/org/apache/pig/test/TestPigStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1147934&r1=1147933&r2=1147934&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jul 18 15:39:35 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2143: Make PigStorage optionally store schema; improve docs. (dvryaboy) 
+
 PIG-1973: UDFContext.getUDFContext usage of ThreadLocal pattern 
  is not typical (woody via thejas)
 

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java?rev=1147934&r1=1147933&r2=1147934&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/JsonMetadata.java Mon Jul 18 15:39:35 2011
@@ -18,274 +18,5 @@
 
 package org.apache.pig.piggybank.storage;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.pig.Expression;
-import org.apache.pig.LoadMetadata;
-import org.apache.pig.StoreMetadata;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.backend.datastorage.ContainerDescriptor;
-import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.datastorage.HDirectory;
-import org.apache.pig.backend.hadoop.datastorage.HFile;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-/**
- * Reads and Writes metadata using JSON in metafiles next to the data.
- * 
- *
- */
-public class JsonMetadata implements LoadMetadata, StoreMetadata {
-
-    private static final Log log = LogFactory.getLog(JsonMetadata.class);
-
-    // These are not static+final because we may want to make these adjustable by users.
-    private String schemaFileName = ".pig_schema";
-    private String headerFileName = ".pig_header";
-    private String statFileName = ".pig_stats";
-    
-    private boolean printHeaders = true;
-
-    private byte fieldDel;
-    private byte recordDel;
-
-    public JsonMetadata() {
-
-    }
-
-    /**.
-     * Given a path, which may represent a glob pattern, a directory, or a file, this method
-     * finds the set of relevant metadata files on the storage system. The algorithm for finding the
-     * metadata file is as follows:
-     * <p>
-     * For each file represented by the path (either directly, or via a glob):
-     *   If parentPath/prefix.fileName exists, use that as the metadata file.
-     *   Else if parentPath/prefix exists, use that as the metadata file.
-     * <p>
-     * Resolving conflicts, merging the metadata, etc, is not handled by this method and should be
-     * taken care of by downstream code.
-     * 
-     * This can go into a util package if metadata files are considered a general enough pattern
-     * 
-     * @param path      Path, as passed in to a LoadFunc (may be a Hadoop glob)
-     * @param prefix    Metadata file designation, such as .pig_schema or .pig_stats
-     * @param conf      configuration object
-     * @return Set of element descriptors for all metadata files associated with the files on the path.
-     */
-    protected Set<ElementDescriptor> findMetaFile(String path, String prefix, Configuration conf) 
-        throws IOException {
-        DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
-        String fullPath = FileLocalizer.fullPath(path, storage);
-        Set<ElementDescriptor> metaFileSet = new HashSet<ElementDescriptor>();
-        if(storage.isContainer(fullPath)) {
-            ElementDescriptor metaFilePath = storage.asElement(fullPath, prefix);
-            if (metaFilePath.exists()) {
-                metaFileSet.add(metaFilePath); 
-            }
-        } else {
-            ElementDescriptor[] descriptors = storage.asCollection(path);
-            for(ElementDescriptor descriptor : descriptors) {
-                String fileName = null, parentName = null;
-                ContainerDescriptor parentContainer = null;
-                if (descriptor instanceof HFile) {
-                    Path descriptorPath = ((HFile) descriptor).getPath();
-                    fileName = descriptorPath.getName();
-                    Path parent = descriptorPath.getParent();
-                    parentName = parent.toString();
-                    parentContainer = new HDirectory((HDataStorage)storage,parent);
-                }
-                ElementDescriptor metaFilePath = storage.asElement(parentName, prefix+"."+fileName);
-
-                // if the file has a custom schema, use it
-                if (metaFilePath.exists()) {
-                    metaFileSet.add(metaFilePath);
-                    continue;
-                }
-
-                // if no custom schema, try the parent directory
-                metaFilePath = storage.asElement(parentContainer, prefix);
-                if (metaFilePath.exists()) {
-                    metaFileSet.add(metaFilePath);
-                }
-            }
-        }
-        return metaFileSet;
-    }
-    
-    //------------------------------------------------------------------------
-    // Implementation of LoadMetaData interface
-    
-    @Override
-    public String[] getPartitionKeys(String location, Job job) {
-        return null;
-    }
-
-    @Override
-    public void setPartitionFilter(Expression partitionFilter)
-            throws IOException {        
-    }
-    
-    /**
-     * For JsonMetadata schema is considered optional
-     * This method suppresses (and logs) errors if they are encountered.
-     * 
-     */
-    @Override
-    public ResourceSchema getSchema(String location, Job job) throws IOException {     
-        Configuration conf = job.getConfiguration();
-        Set<ElementDescriptor> schemaFileSet = null;
-        try {
-            schemaFileSet = findMetaFile(location, schemaFileName, conf);
-        } catch (IOException e) {
-            log.warn("Could not find schema file for "+ location); 
-            return null;
-        }
-              
-        // TODO we assume that all schemas are the same. The question of merging schemas is left open for now. 
-        ElementDescriptor schemaFile = null;
-        if (!schemaFileSet.isEmpty()) {            
-            schemaFile = schemaFileSet.iterator().next();
-        } else {
-            log.warn("Could not find schema file for "+location); 
-            return null;
-        }
-        log.debug("Found schema file: "+schemaFile.toString());
-        ResourceSchema resourceSchema = null;
-        try {
-            resourceSchema = new ObjectMapper().readValue(schemaFile.open(), ResourceSchema.class);
-        } catch (JsonParseException e) {
-            log.warn("Unable to load Resource Schema for "+location);
-            e.printStackTrace();
-        } catch (JsonMappingException e) {
-            log.warn("Unable to load Resource Schema for "+location);
-            e.printStackTrace();
-        } catch (IOException e) {
-            log.warn("Unable to load Resource Schema for "+location);
-            e.printStackTrace();
-        }
-        return resourceSchema;
-    }
-
-    /**
-     * For JsonMetadata stats are considered optional
-     * This method suppresses (and logs) errors if they are encountered.
-     * @see org.apache.pig.LoadMetadata#getStatistics(String, Configuration)
-     */
-    @Override
-    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
-        Configuration conf = job.getConfiguration();
-        Set<ElementDescriptor> statFileSet = null;
-        try {
-            statFileSet = findMetaFile(location, statFileName, conf);
-        } catch (IOException e) {
-            log.warn("could not fine stat file for "+location);
-            return null;
-        }
-        ElementDescriptor statFile = null;
-        if (!statFileSet.isEmpty()) {
-            statFile = statFileSet.iterator().next();
-        } else {
-            log.warn("Could not find stat file for "+location);
-            return null;
-        }
-        log.debug("Found stat file "+statFile.toString());
-        ResourceStatistics resourceStats = null;        
-        try {
-            resourceStats = new ObjectMapper().readValue(statFile.open(), ResourceStatistics.class);
-        } catch (JsonParseException e) {
-            log.warn("Unable to load Resource Statistics for "+location);
-            e.printStackTrace();
-        } catch (JsonMappingException e) {
-            log.warn("Unable to load Resource Statistics for "+location);
-            e.printStackTrace();
-        } catch (IOException e) {
-            log.warn("Unable to load Resource Statistics for "+location);
-            e.printStackTrace();
-        }
-        return resourceStats;
-    }
-
-    //------------------------------------------------------------------------
-    // Implementation of StoreMetaData interface
-    
-    @Override
-    public void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException {
-        Configuration conf = job.getConfiguration();
-        DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
-        ElementDescriptor statFilePath = storage.asElement(location, statFileName);
-        if(!statFilePath.exists() && stats != null) {
-            try {
-                new ObjectMapper().writeValue(statFilePath.create(), stats);                    
-            } catch (JsonGenerationException e) {
-                log.warn("Unable to write Resource Statistics for "+location);
-                e.printStackTrace();
-            } catch (JsonMappingException e) {
-                log.warn("Unable to write Resource Statistics for "+location);
-                e.printStackTrace();
-            }
-        }
-    }
-
-    @Override
-    public void storeSchema(ResourceSchema schema, String location, Job job) throws IOException {
-        Configuration conf = job.getConfiguration();
-        DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
-        ElementDescriptor schemaFilePath = storage.asElement(location, schemaFileName);
-        if(!schemaFilePath.exists() && schema != null) {
-            try {
-                new ObjectMapper().writeValue(schemaFilePath.create(), schema);
-            } catch (JsonGenerationException e) {
-                log.warn("Unable to write Resource Statistics for "+location);
-                e.printStackTrace();
-            } catch (JsonMappingException e) {
-                log.warn("Unable to write Resource Statistics for "+location);
-                e.printStackTrace();
-            }
-        }
-        if (printHeaders) {
-            ElementDescriptor headerFilePath = storage.asElement(location, headerFileName);
-            if (!headerFilePath.exists()) {
-                OutputStream os = headerFilePath.create();
-                try {
-                    String[] names = schema.fieldNames();
-
-                    for (int i=0; i < names.length; i++) {
-                        os.write(names[i].getBytes("UTF-8"));
-                        if (i <names.length-1) {
-                            os.write(fieldDel);
-                        } else {
-                            os.write(recordDel);
-                        }
-                    }
-                } finally {
-                    os.close();
-                }
-            }
-        }
-    }
-
-    public void setFieldDel(byte fieldDel) {
-        this.fieldDel = fieldDel;
-    }
-
-    public void setRecordDel(byte recordDel) {
-        this.recordDel = recordDel;
-    }
-
-}
+@Deprecated
+public class JsonMetadata extends org.apache.pig.builtin.JsonMetadata {}

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java?rev=1147934&r1=1147933&r2=1147934&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java Mon Jul 18 15:39:35 2011
@@ -18,25 +18,9 @@
 
 package org.apache.pig.piggybank.storage;
 
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.pig.Expression;
-import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadMetadata;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreMetadata;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.parser.ParserException;
-import org.apache.pig.impl.util.CastUtils;
-import org.apache.pig.impl.util.StorageUtil;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Utils;
 
 /**
  *  This Load/Store Func reads/writes metafiles that allow the schema and
@@ -46,115 +30,15 @@ import org.apache.pig.impl.util.Utils;
  *  It also creates a ".pig_headers" file that simply lists the delimited aliases.
  *  This is intended to make export to tools that can read files with header
  *  lines easier (just cat the header to your data).
- *
+ * @deprecated Use PigStorage with a -schema option instead
  */
+@Deprecated
 public class PigStorageSchema extends PigStorage implements LoadMetadata, StoreMetadata {
-
-    private byte delim = '\t';
-    private ResourceSchema schema;
-    LoadCaster caster;
-
     public PigStorageSchema() {
-        super();
+        this("\t");
     }
 
     public PigStorageSchema(String delim) {
-        super(delim);
-        this.delim = StorageUtil.parseFieldDel(delim);
-    }
-
-    @Override
-    public Tuple getNext() throws IOException {
-        Tuple tup = super.getNext();
-        if (tup == null) return null;
-
-        if ( caster == null) {
-            caster = getLoadCaster();
-        }
-        if (signature != null) {
-            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
-                    new String[] {signature});
-            String serializedSchema = p.getProperty(signature+".schema");
-            if (serializedSchema == null) return tup;
-            try {
-                schema = new ResourceSchema(Utils.getSchemaFromString(serializedSchema));
-            } catch (ParserException e) {
-                mLog.error("Unable to parse serialized schema " + serializedSchema, e);
-            }
-        }
-
-        if (schema != null) {
-
-            ResourceFieldSchema[] fieldSchemas = schema.getFields();
-            int tupleIdx = 0;
-            // If some fields have been projected out, the tuple
-            // only contains required fields.
-            // We walk the requiredColumns array to find required fields,
-            // and cast those.
-            for (int i = 0; i < fieldSchemas.length; i++) {
-                if (mRequiredColumns == null || (mRequiredColumns.length>i && mRequiredColumns[i])) {
-                    Object val = null;
-                    if(tup.get(tupleIdx) != null){
-                        byte[] bytes = ((DataByteArray) tup.get(tupleIdx)).get();
-                        val = CastUtils.convertToType(caster, bytes,
-                                fieldSchemas[i], fieldSchemas[i].getType());
-                    }
-                    tup.set(tupleIdx, val);
-                    tupleIdx++;
-                }
-            }
-        }
-        return tup;
-    }
-
-    //------------------------------------------------------------------------
-    // Implementation of LoadMetaData interface
-
-    @Override
-    public ResourceSchema getSchema(String location,
-            Job job) throws IOException {
-        schema = (new JsonMetadata()).getSchema(location, job);
-        if (signature != null && schema != null) {
-            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
-                    new String[] {signature});
-            p.setProperty(signature + ".schema", schema.toString());
-    }
-        return schema;
-    }
-
-    @Override
-    public ResourceStatistics getStatistics(String location,
-            Job job) throws IOException {
-        return null;
-    }
-
-    @Override
-    public void setPartitionFilter(Expression partitionFilter)
-            throws IOException {
-    }
-
-    @Override
-    public String[] getPartitionKeys(String location, Job job)
-            throws IOException {
-        return null;
-    }
-
-    //------------------------------------------------------------------------
-    // Implementation of StoreMetadata
-
-    @Override
-    public void storeSchema(ResourceSchema schema, String location,
-            Job job) throws IOException {
-        JsonMetadata metadataWriter = new JsonMetadata();
-        byte recordDel = '\n';
-        metadataWriter.setFieldDel(delim);
-        metadataWriter.setRecordDel(recordDel);
-        metadataWriter.storeSchema(schema, location, job);
-    }
-
-    @Override
-    public void storeStatistics(ResourceStatistics stats, String location,
-            Job job) throws IOException {
-
+        super(delim, "-schema");
     }
 }

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java?rev=1147934&r1=1147933&r2=1147934&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/TestPigStorageSchema.java Mon Jul 18 15:39:35 2011
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.pig.piggybank.test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.test.MiniCluster;
-import org.apache.pig.test.Util;
-import org.apache.pig.test.utils.TypeCheckingTestUtil;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestPigStorageSchema {
-
-    protected ExecType execType = ExecType.LOCAL;
-    static MiniCluster cluster = MiniCluster.buildCluster();
-    static PigServer pig;
-    static final String datadir = "build/test/tmpdata/";
-
-    PigContext pigContext = new PigContext(execType, new Properties());
-    Map<String, String> fileNameMap = new HashMap<String, String>();
-
-    @Before
-    public void setUp() throws Exception {
-        pig = new PigServer(execType, cluster.getProperties());
-        Util.deleteDirectory(new File(datadir));
-        try {
-            pig.mkdirs(datadir);
-        } catch (IOException e) {};
-        Util.createLocalInputFile(datadir + "originput",
-                new String[] {"A,1", "B,2", "C,3", "D,2",
-                              "A,5", "B,5", "C,8", "A,8",
-                              "D,8", "A,9"});
-
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        Util.deleteDirectory(new File(datadir));
-        pig.shutdown();
-    }
-
-    @Test
-    public void testPigStorageSchema() throws Exception {
-        pigContext.connect();
-        String query = "a = LOAD '" + datadir + "originput' using org.apache.pig.piggybank.storage.PigStorageSchema() " +
-        "as (f1:chararray, f2:int);";
-        pig.registerQuery(query);
-        Schema origSchema = pig.dumpSchema("a");
-        pig.store("a", datadir + "aout", "org.apache.pig.piggybank.storage.PigStorageSchema()");
-
-        // aout now has a schema.
-
-        // Verify that loading a-out with no given schema produces
-        // the original schema.
-
-        pig.registerQuery("b = LOAD '" + datadir + "aout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
-        Schema genSchema = pig.dumpSchema("b");
-        Assert.assertTrue("generated schema equals original" ,
-                Schema.equals(genSchema, origSchema, true, false));
-
-        // Verify that giving our own schema works
-        String [] aliases ={"foo", "bar"};
-        byte[] types = {DataType.INTEGER, DataType.LONG};
-        Schema newSchema = TypeCheckingTestUtil.genFlatSchema(
-                aliases,types);
-        pig.registerQuery("c = LOAD '" + datadir + "aout' using org.apache.pig.piggybank.storage.PigStorageSchema() "+
-        "as (foo:int, bar:long);");
-        Schema newGenSchema = pig.dumpSchema("c");
-        Assert.assertTrue("explicit schema overrides metadata",
-                Schema.equals(newSchema, newGenSchema, true, false));
-    }
-
-    @Test
-    public void testSchemaConversion() throws Exception {
-
-        Util.createLocalInputFile(datadir + "originput2",
-                new String[] {"1", "2", "3", "2",
-                              "5", "5", "8", "8",
-                              "8", "9"});
-
-        pig.registerQuery("A = LOAD '" + datadir + "originput2' using org.apache.pig.piggybank.storage.PigStorageSchema() " +
-        "as (f:int);");
-        pig.registerQuery("B = group A by f;");
-        Schema origSchema = pig.dumpSchema("B");
-        ResourceSchema rs1 = new ResourceSchema(origSchema);
-        pig.registerQuery("STORE B into '" + datadir + "bout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
-
-        pig.registerQuery("C = LOAD '" + datadir + "bout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
-        Schema genSchema = pig.dumpSchema("C");
-        ResourceSchema rs2 = new ResourceSchema(genSchema);
-        Assert.assertTrue("generated schema equals original" , ResourceSchema.equals(rs1, rs2));
-
-        pig.registerQuery("C1 = LOAD '" + datadir + "bout' as (a0:int, A: {t: (f:int) } );");
-        pig.registerQuery("D = foreach C1 generate a0, SUM(A);");
-
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
-                        "(1,1L)",
-                        "(2,4L)",
-                        "(3,3L)",
-                        "(5,10L)",
-                        "(8,24L)",
-                        "(9,9L)"
-                });
-
-        Iterator<Tuple> iter = pig.openIterator("D");
-        int counter = 0;
-        while (iter.hasNext()) {
-            Assert.assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
-        }
-
-        Assert.assertEquals(expectedResults.size(), counter);
-    }
-
-    @Test
-    public void testSchemaConversion2() throws Exception {
-
-        pig.registerQuery("A = LOAD '" + datadir + "originput' using org.apache.pig.piggybank.storage.PigStorageSchema(',') " +
-        "as (f1:chararray, f2:int);");
-        pig.registerQuery("B = group A by f1;");
-        Schema origSchema = pig.dumpSchema("B");
-        ResourceSchema rs1 = new ResourceSchema(origSchema);
-        pig.registerQuery("STORE B into '" + datadir + "cout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
-
-        pig.registerQuery("C = LOAD '" + datadir + "cout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
-        Schema genSchema = pig.dumpSchema("C");
-        ResourceSchema rs2 = new ResourceSchema(genSchema);
-        Assert.assertTrue("generated schema equals original" , ResourceSchema.equals(rs1, rs2));
-
-        pig.registerQuery("C1 = LOAD '" + datadir + "cout' as (a0:chararray, A: {t: (f1:chararray, f2:int) } );");
-        pig.registerQuery("D = foreach C1 generate a0, SUM(A.f2);");
-
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
-                        "('A',23L)",
-                        "('B',7L)",
-                        "('C',11L)",
-                        "('D',10L)"
-                });
-
-        Iterator<Tuple> iter = pig.openIterator("D");
-        int counter = 0;
-        while (iter.hasNext()) {
-            Assert.assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
-        }
-
-        Assert.assertEquals(expectedResults.size(), counter);
-    }
-
-    /**
-     * See PIG-1830
-     * @throws IOException
-     */
-    @Test
-    public void testByteArrayConversion() throws IOException {
-        Util.createLocalInputFile(datadir + "originput2",
-                new String[] {"peter\t1", "samir\t2", "michael\t4",
-                "peter\t2", "peter\t4", "samir\t1", "john\t"
-        });
-        Util.createLocalInputFile(datadir + ".pig_schema",
-                new String[] {
-                "{\"fields\":[{\"name\":\"name\",\"type\":55,\"schema\":null," +
-                "\"description\":\"autogenerated from Pig Field Schema\"}," +
-                "{\"name\":\"val\",\"type\":10,\"schema\":null,\"description\":"+
-                "\"autogenerated from Pig Field Schema\"}],\"version\":0," +
-                "\"sortKeys\":[],\"sortKeyOrders\":[]}"
-        });
-        pig.registerQuery("Events = LOAD '" + datadir + "originput2' USING org.apache.pig.piggybank.storage.PigStorageSchema();");
-        pig.registerQuery("Sessions = GROUP Events BY name;");
-        Iterator<Tuple> sessions = pig.openIterator("Sessions");
-        while (sessions.hasNext()) {
-            System.out.println(sessions.next());
-}
-
-
-    }
-
-    // See PIG-1993
-    @Test
-    public void testColumnPrune() throws IOException {
-        Util.createLocalInputFile(datadir + "originput2",
-                new String[] {"peter\t1", "samir\t2", "michael\t4",
-                "peter\t2", "peter\t4", "samir\t1", "john\t"
-        });
-        Util.createLocalInputFile(datadir + ".pig_schema",
-                new String[] {
-                "{\"fields\":[{\"name\":\"name\",\"type\":55,\"schema\":null," +
-                "\"description\":\"autogenerated from Pig Field Schema\"}," +
-                "{\"name\":\"val\",\"type\":10,\"schema\":null,\"description\":"+
-                "\"autogenerated from Pig Field Schema\"}],\"version\":0," +
-                "\"sortKeys\":[],\"sortKeyOrders\":[]}"
-        });
-        pig.registerQuery("Events = LOAD '" + datadir + "originput2' USING org.apache.pig.piggybank.storage.PigStorageSchema();");
-        pig.registerQuery("EventsName = foreach Events generate name;");
-        Iterator<Tuple> sessions = pig.openIterator("EventsName");
-        sessions.next().toString().equals("(1)");
-        sessions.next().toString().equals("(2)");
-        sessions.next().toString().equals("(4)");
-        sessions.next().toString().equals("(2)");
-        sessions.next().toString().equals("(4)");
-        sessions.next().toString().equals("(1)");
-        sessions.next().toString().equals("()");
-        Assert.assertFalse(sessions.hasNext());
-    }
-
-    @Test
-    public void testPigStorageSchemaHeaderDelimiter() throws Exception {
-        pigContext.connect();
-        String query = "a = LOAD '" + datadir + "originput' using org.apache.pig.piggybank.storage.PigStorageSchema(',') " +
-                "as (foo:chararray, bar:int);";
-        pig.registerQuery(query);
-        pig.registerQuery("STORE a into '" + datadir + "dout' using org.apache.pig.piggybank.storage.PigStorageSchema('#');");
-        pig.registerQuery("STORE a into '" + datadir + "eout' using org.apache.pig.piggybank.storage.PigStorageSchema();");
-
-        String outPath = FileLocalizer.fullPath(datadir + "dout/.pig_header",
-                pig.getPigContext());
-        Assert.assertTrue(FileLocalizer.fileExists(outPath,
-                pig.getPigContext()));
-
-        String[] header = Util.readOutput(pig.getPigContext(), outPath);
-        Assert.assertArrayEquals("Headers are not the same.", new String[] {"foo#bar"}, header);
-
-        outPath = FileLocalizer.fullPath(datadir + "eout/.pig_header",
-                pig.getPigContext());
-        Assert.assertTrue(FileLocalizer.fileExists(outPath,
-                pig.getPigContext()));
-
-        header = Util.readOutput(pig.getPigContext(), outPath);
-        Assert.assertArrayEquals("Headers are not the same.", new String[] {"foo\tbar"}, header);
-    }
-
-}

Added: pig/trunk/src/org/apache/pig/builtin/JsonMetadata.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonMetadata.java?rev=1147934&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonMetadata.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/JsonMetadata.java Mon Jul 18 15:39:35 2011
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.hadoop.datastorage.HDirectory;
+import org.apache.pig.backend.hadoop.datastorage.HFile;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Reads and Writes metadata using JSON in metafiles next to the data.
+ *
+ *
+ */
+public class JsonMetadata implements LoadMetadata, StoreMetadata {
+
+    private static final Log log = LogFactory.getLog(JsonMetadata.class);
+
+    // These are not static+final because we may want to make these adjustable by users.
+    private String schemaFileName = ".pig_schema";
+    private String headerFileName = ".pig_header";
+    private String statFileName = ".pig_stats";
+
+    private boolean printHeaders = true;
+
+    private byte fieldDel;
+    private byte recordDel;
+
+    public JsonMetadata() {
+
+    }
+
+    /**.
+     * Given a path, which may represent a glob pattern, a directory, or a file, this method
+     * finds the set of relevant metadata files on the storage system. The algorithm for finding the
+     * metadata file is as follows:
+     * <p>
+     * For each file represented by the path (either directly, or via a glob):
+     *   If parentPath/prefix.fileName exists, use that as the metadata file.
+     *   Else if parentPath/prefix exists, use that as the metadata file.
+     * <p>
+     * Resolving conflicts, merging the metadata, etc, is not handled by this method and should be
+     * taken care of by downstream code.
+     * <p>
+     * This can go into a util package if metadata files are considered a general enough pattern
+     * <p>
+     * @param path      Path, as passed in to a LoadFunc (may be a Hadoop glob)
+     * @param prefix    Metadata file designation, such as .pig_schema or .pig_stats
+     * @param conf      configuration object
+     * @return Set of element descriptors for all metadata files associated with the files on the path.
+     */
+    protected Set<ElementDescriptor> findMetaFile(String path, String prefix, Configuration conf)
+        throws IOException {
+        DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
+        String fullPath = FileLocalizer.fullPath(path, storage);
+        Set<ElementDescriptor> metaFileSet = new HashSet<ElementDescriptor>();
+        if(storage.isContainer(fullPath)) {
+            ElementDescriptor metaFilePath = storage.asElement(fullPath, prefix);
+            if (metaFilePath.exists()) {
+                metaFileSet.add(metaFilePath);
+            }
+        } else {
+            ElementDescriptor[] descriptors = storage.asCollection(path);
+            for(ElementDescriptor descriptor : descriptors) {
+                String fileName = null, parentName = null;
+                ContainerDescriptor parentContainer = null;
+                if (descriptor instanceof HFile) {
+                    Path descriptorPath = ((HFile) descriptor).getPath();
+                    fileName = descriptorPath.getName();
+                    Path parent = descriptorPath.getParent();
+                    parentName = parent.toString();
+                    parentContainer = new HDirectory((HDataStorage)storage,parent);
+                }
+                ElementDescriptor metaFilePath = storage.asElement(parentName, prefix+"."+fileName);
+
+                // if the file has a custom schema, use it
+                if (metaFilePath.exists()) {
+                    metaFileSet.add(metaFilePath);
+                    continue;
+                }
+
+                // if no custom schema, try the parent directory
+                metaFilePath = storage.asElement(parentContainer, prefix);
+                if (metaFilePath.exists()) {
+                    metaFileSet.add(metaFilePath);
+                }
+            }
+        }
+        return metaFileSet;
+    }
+
+    //------------------------------------------------------------------------
+    // Implementation of LoadMetaData interface
+
+    @Override
+    public String[] getPartitionKeys(String location, Job job) {
+        return null;
+    }
+
+    @Override
+    public void setPartitionFilter(Expression partitionFilter)
+            throws IOException {
+    }
+
+    /**
+     * For JsonMetadata schema is considered optional
+     * This method suppresses (and logs) errors if they are encountered.
+     *
+     */
+    @Override
+    public ResourceSchema getSchema(String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
+        Set<ElementDescriptor> schemaFileSet = null;
+        try {
+            schemaFileSet = findMetaFile(location, schemaFileName, conf);
+        } catch (IOException e) {
+            log.warn("Could not find schema file for "+ location);
+            return null;
+        }
+
+        // TODO we assume that all schemas are the same. The question of merging schemas is left open for now.
+        ElementDescriptor schemaFile = null;
+        if (!schemaFileSet.isEmpty()) {
+            schemaFile = schemaFileSet.iterator().next();
+        } else {
+            log.warn("Could not find schema file for "+location);
+            return null;
+        }
+        log.debug("Found schema file: "+schemaFile.toString());
+        ResourceSchema resourceSchema = null;
+        try {
+            resourceSchema = new ObjectMapper().readValue(schemaFile.open(), ResourceSchema.class);
+        } catch (JsonParseException e) {
+            log.warn("Unable to load Resource Schema for "+location);
+            e.printStackTrace();
+        } catch (JsonMappingException e) {
+            log.warn("Unable to load Resource Schema for "+location);
+            e.printStackTrace();
+        } catch (IOException e) {
+            log.warn("Unable to load Resource Schema for "+location);
+            e.printStackTrace();
+        }
+        return resourceSchema;
+    }
+
+    /**
+     * For JsonMetadata stats are considered optional
+     * This method suppresses (and logs) errors if they are encountered.
+     * @see org.apache.pig.LoadMetadata#getStatistics(String, Job)
+     */
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
+        Set<ElementDescriptor> statFileSet = null;
+        try {
+            statFileSet = findMetaFile(location, statFileName, conf);
+        } catch (IOException e) {
+            log.warn("could not fine stat file for "+location);
+            return null;
+        }
+        ElementDescriptor statFile = null;
+        if (!statFileSet.isEmpty()) {
+            statFile = statFileSet.iterator().next();
+        } else {
+            log.warn("Could not find stat file for "+location);
+            return null;
+        }
+        log.debug("Found stat file "+statFile.toString());
+        ResourceStatistics resourceStats = null;
+        try {
+            resourceStats = new ObjectMapper().readValue(statFile.open(), ResourceStatistics.class);
+        } catch (JsonParseException e) {
+            log.warn("Unable to load Resource Statistics for "+location);
+            e.printStackTrace();
+        } catch (JsonMappingException e) {
+            log.warn("Unable to load Resource Statistics for "+location);
+            e.printStackTrace();
+        } catch (IOException e) {
+            log.warn("Unable to load Resource Statistics for "+location);
+            e.printStackTrace();
+        }
+        return resourceStats;
+    }
+
+    //------------------------------------------------------------------------
+    // Implementation of StoreMetaData interface
+
+    @Override
+    public void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
+        DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
+        ElementDescriptor statFilePath = storage.asElement(location, statFileName);
+        if(!statFilePath.exists() && stats != null) {
+            try {
+                new ObjectMapper().writeValue(statFilePath.create(), stats);
+            } catch (JsonGenerationException e) {
+                log.warn("Unable to write Resource Statistics for "+location);
+                e.printStackTrace();
+            } catch (JsonMappingException e) {
+                log.warn("Unable to write Resource Statistics for "+location);
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public void storeSchema(ResourceSchema schema, String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
+        DataStorage storage = new HDataStorage(ConfigurationUtil.toProperties(conf));
+        ElementDescriptor schemaFilePath = storage.asElement(location, schemaFileName);
+        if(!schemaFilePath.exists() && schema != null) {
+            try {
+                new ObjectMapper().writeValue(schemaFilePath.create(), schema);
+            } catch (JsonGenerationException e) {
+                log.warn("Unable to write Resource Statistics for "+location);
+                e.printStackTrace();
+            } catch (JsonMappingException e) {
+                log.warn("Unable to write Resource Statistics for "+location);
+                e.printStackTrace();
+            }
+        }
+        if (printHeaders) {
+            ElementDescriptor headerFilePath = storage.asElement(location, headerFileName);
+            if (!headerFilePath.exists()) {
+                OutputStream os = headerFilePath.create();
+                try {
+                    String[] names = schema.fieldNames();
+
+                    for (int i=0; i < names.length; i++) {
+                        os.write(names[i].getBytes("UTF-8"));
+                        if (i <names.length-1) {
+                            os.write(fieldDel);
+                        } else {
+                            os.write(recordDel);
+                        }
+                    }
+                } finally {
+                    os.close();
+                }
+            }
+        }
+    }
+
+    public void setFieldDel(byte fieldDel) {
+        this.fieldDel = fieldDel;
+    }
+
+    public void setRecordDel(byte recordDel) {
+        this.recordDel = recordDel;
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1147934&r1=1147933&r2=1147934&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Mon Jul 18 15:39:35 2011
@@ -23,13 +23,18 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -37,13 +42,19 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.Expression;
 import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
 import org.apache.pig.LoadPushDown;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreMetadata;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
@@ -53,46 +64,123 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.CastUtils;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.StorageUtil;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
 
 /**
- * A load function that parses a line of input into fields using a delimiter to
- * set the fields. The delimiter is given as a regular expression. See
- * {@link java.lang.String#split(String)} and {@link java.util.regex.Pattern}
- * for more information.
+ * A load function that parses a line of input into fields using a character delimiter.
+ * The default delimiter is a tab. You can specify any character as a literal ("a"),
+ * a known escape character ("\\t"), or a dec or hex value ("\\u001", "\\x0A").
+ * <p>
+ * An optional second constructor argument is provided that allows one to customize
+ * advanced behaviors. A list of available options is below:
+ * <ul>
+ * <li><code>-schema</code> Stores the schema of the relation using a hidden JSON file.
+ * <li><code>-noschema</code> Ignores a stored schema during loading.
+ * </ul>
+ * <p>
+ * <h3>Schemas</h3>
+ * If <code>-schema</code> is specified, a hidden ".pig_schema" file is created in the output directory
+ * when storing data. It is used by PigStorage (with or without -schema) during loading to determine the
+ * field names and types of the data without the need for a user to explicitly provide the schema in an
+ * <code>as</code> clause, unless <code>-noschema</code> is specified. No attempt to merge conflicting
+ * schemas is made during loading. The first schema encountered during a file system scan is used.
+ * <p>
+ * In addition, using <code>-schema</code> drops a ".pig_headers" file in the output directory.
+ * This file simply lists the delimited aliases. This is intended to make export to tools that can read
+ * files with header lines easier (just cat the header to your data).
+ * <p>
+ * Note that regardless of whether or not you store the schema, you <b>always</b> need to specify
+ * the correct delimiter to read your data. If you store reading delimiter "#" and then load using
+ * the default delimiter, your data will not be parsed correctly.
+ *
+ * <h3>Compression</h3>
+ * Storing to a directory whose name ends in ".bz2" or ".gz" or ".lzo" (if you have installed support
+ * for LZO compression in Hadoop) will automatically use the corresponding compression codec.<br>
+ * <code>output.compression.enabled</code> and <code>output.compression.codec</code> job properties
+ * also work.
+ * <p>
+ * Loading from directories ending in .bz2 or .bz works automatically; other compression formats are not
+ * auto-detected on loading.
+ *
  */
 @SuppressWarnings("unchecked")
-public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface, 
-LoadPushDown {
-    protected RecordReader in = null;    
+public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface,
+LoadPushDown, LoadMetadata, StoreMetadata {
+    protected RecordReader in = null;
     protected RecordWriter writer = null;
     protected final Log mLog = LogFactory.getLog(getClass());
     protected String signature;
-        
+
     private byte fieldDel = '\t';
     private ArrayList<Object> mProtoTuple = null;
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
     private String loadLocation;
-    
-    public PigStorage() {
-    }
-    
+
+    boolean storeSchema = false;
+    boolean dontLoadSchema = false;
+    protected ResourceSchema schema;
+    protected LoadCaster caster;
+
+    private final CommandLine configuredOptions;
+    private final Options validOptions = new Options();
+    private final static CommandLineParser parser = new GnuParser();
+
     protected boolean[] mRequiredColumns = null;
-    
     private boolean mRequiredColumnsInitialized = false;
 
+    private void populateValidOptions() {
+        validOptions.addOption("schema", false, "Loads / Stores the schema of the relation using a hidden JSON file.");
+        validOptions.addOption("noschema", false, "Disable attempting to load data schema from the filesystem.");
+    }
+
+    public PigStorage() {
+        this("\t", "");
+    }
+
     /**
-     * Constructs a Pig loader that uses specified regex as a field delimiter.
-     * 
+     * Constructs a Pig loader that uses specified character as a field delimiter.
+     *
      * @param delimiter
      *            the single byte character that is used to separate fields.
      *            ("\t" is the default.)
+     * @throws ParseException
      */
     public PigStorage(String delimiter) {
-        this();
-        fieldDel = StorageUtil.parseFieldDel(delimiter);        
+        this(delimiter, "");
+    }
+
+    /**
+     * Constructs a Pig loader that uses specified character as a field delimiter.
+     * <p>
+     * Understands the following options, which can be specified in the second paramter:
+     * <ul>
+     * <li><code>-schema</code> Loads / Stores the schema of the relation using a hidden JSON file.
+     * <li><code>-noschema</code> Ignores a stored schema during loading.
+     * </ul>
+     * @param delimiter the single byte character that is used to separate fields.
+     * @param options a list of options that can be used to modify PigStorage behavior
+     * @throws ParseException
+     */
+    public PigStorage(String delimiter, String options) {
+        populateValidOptions();
+        fieldDel = StorageUtil.parseFieldDel(delimiter);
+        String[] optsArr = options.split(" ");
+        try {
+            configuredOptions = parser.parse(validOptions, optsArr);
+            storeSchema = configuredOptions.hasOption("schema");
+            dontLoadSchema = configuredOptions.hasOption("noschema");
+        } catch (ParseException e) {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp( "PigStorage(',', '[options]')", validOptions);
+            // We wrap this exception in a Runtime exception so that
+            // existing loaders that extend PigStorage don't break
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
@@ -109,7 +197,7 @@ LoadPushDown {
             boolean notDone = in.nextKeyValue();
             if (!notDone) {
                 return null;
-            }                                                                                           
+            }
             Text value = (Text) in.getCurrentValue();
             byte[] buf = value.getBytes();
             int len = value.getLength();
@@ -128,20 +216,60 @@ LoadPushDown {
                 readField(buf, start, len);
             }
             Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
-            return t;
+
+            return dontLoadSchema ? t : applySchema(t);
         } catch (InterruptedException e) {
             int errCode = 6018;
             String errMsg = "Error while reading input";
-            throw new ExecException(errMsg, errCode, 
+            throw new ExecException(errMsg, errCode,
                     PigException.REMOTE_ENVIRONMENT, e);
         }
-      
+    }
+
+    private Tuple applySchema(Tuple tup) throws IOException {
+        if ( caster == null) {
+            caster = getLoadCaster();
+        }
+        if (signature != null && schema == null) {
+            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+                    new String[] {signature});
+            String serializedSchema = p.getProperty(signature+".schema");
+            if (serializedSchema == null) return tup;
+            try {
+                schema = new ResourceSchema(Utils.getSchemaFromString(serializedSchema));
+            } catch (ParserException e) {
+                mLog.error("Unable to parse serialized schema " + serializedSchema, e);
+            }
+        }
+
+        if (schema != null) {
+
+            ResourceFieldSchema[] fieldSchemas = schema.getFields();
+            int tupleIdx = 0;
+            // If some fields have been projected out, the tuple
+            // only contains required fields.
+            // We walk the requiredColumns array to find required fields,
+            // and cast those.
+            for (int i = 0; i < fieldSchemas.length; i++) {
+                if (mRequiredColumns == null || (mRequiredColumns.length>i && mRequiredColumns[i])) {
+                    Object val = null;
+                    if(tup.get(tupleIdx) != null){
+                        byte[] bytes = ((DataByteArray) tup.get(tupleIdx)).get();
+                        val = CastUtils.convertToType(caster, bytes,
+                                fieldSchemas[i], fieldSchemas[i].getType());
+                    }
+                    tup.set(tupleIdx, val);
+                    tupleIdx++;
+                }
+            }
+        }
+        return tup;
     }
 
     @Override
     public void putNext(Tuple f) throws IOException {
         try {
-            writer.write(null, f);            
+            writer.write(null, f);
         } catch (InterruptedException e) {
             throw new IOException(e);
         }
@@ -185,7 +313,7 @@ LoadPushDown {
         }
         return new RequiredFieldResponse(true);
     }
-    
+
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof PigStorage)
@@ -214,7 +342,7 @@ LoadPushDown {
 
     @Override
     public void setLocation(String location, Job job)
-            throws IOException {
+    throws IOException {
         loadLocation = location;
         FileInputFormat.setInputPaths(job, location);
     }
@@ -226,13 +354,14 @@ LoadPushDown {
 
     @Override
     public void prepareToWrite(RecordWriter writer) {
-        this.writer = writer;        
+        this.writer = writer;
     }
 
     @Override
     public void setStoreLocation(String location, Job job) throws IOException {
         job.getConfiguration().set("mapred.textoutputformat.separator", "");
         FileOutputFormat.setOutputPath(job, new Path(location));
+
         if( "true".equals( job.getConfiguration().get( "output.compression.enabled" ) ) ) {
             FileOutputFormat.setCompressOutput( job, true );
             String codec = job.getConfiguration().get( "output.compression.codec" );
@@ -242,15 +371,17 @@ LoadPushDown {
                 throw new RuntimeException("Class not found: " + codec );
             }
         } else {
-            if (location.endsWith(".bz2") || location.endsWith(".bz")) {
-                FileOutputFormat.setCompressOutput(job, true);
-                FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
-            }  else if (location.endsWith(".gz")) {
-                FileOutputFormat.setCompressOutput(job, true);
-                FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
-            } else {
-                FileOutputFormat.setCompressOutput( job, false);
-            }
+            // This makes it so that storing to a directory ending with ".gz" or ".bz2" works.
+            setCompression(new Path(location), job);
+        }
+    }
+
+    private void setCompression(Path path, Job job) {
+        CompressionCodecFactory codecFactory = new CompressionCodecFactory(job.getConfiguration());
+        CompressionCodec codec = codecFactory.getCodec(path);
+        if (codec != null) {
+            FileOutputFormat.setCompressOutput(job, true);
+            FileOutputFormat.setOutputCompressorClass(job, codec.getClass());
         }
     }
 
@@ -261,7 +392,7 @@ LoadPushDown {
 
     @Override
     public String relToAbsPathForStoreLocation(String location, Path curDir)
-            throws IOException {
+    throws IOException {
         return LoadFunc.getAbsolutePath(location, curDir);
     }
 
@@ -270,10 +401,10 @@ LoadPushDown {
         return fieldDel;
     }
 
-    
+
     @Override
     public void setUDFContextSignature(String signature) {
-        this.signature = signature; 
+        this.signature = signature;
     }
 
     @Override
@@ -287,8 +418,64 @@ LoadPushDown {
 
     @Override
     public void cleanupOnFailure(String location, Job job)
-            throws IOException {
+    throws IOException {
         StoreFunc.cleanupOnFailureImpl(location, job);
     }
 
+
+    //------------------------------------------------------------------------
+    // Implementation of LoadMetaData interface
+
+    @Override
+    public ResourceSchema getSchema(String location,
+            Job job) throws IOException {
+        if (!dontLoadSchema) {
+            schema = (new JsonMetadata()).getSchema(location, job);
+
+            if (signature != null && schema != null) {
+                Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+                        new String[] {signature});
+                p.setProperty(signature + ".schema", schema.toString());
+            }
+        }
+        return schema;
+    }
+
+    @Override
+    public ResourceStatistics getStatistics(String location,
+            Job job) throws IOException {
+        return null;
+    }
+
+    @Override
+    public void setPartitionFilter(Expression partitionFilter)
+    throws IOException {
+    }
+
+    @Override
+    public String[] getPartitionKeys(String location, Job job)
+    throws IOException {
+        return null;
+    }
+
+    //------------------------------------------------------------------------
+    // Implementation of StoreMetadata
+
+    @Override
+    public void storeSchema(ResourceSchema schema, String location,
+            Job job) throws IOException {
+        if (storeSchema) {
+            JsonMetadata metadataWriter = new JsonMetadata();
+            byte recordDel = '\n';
+            metadataWriter.setFieldDel(fieldDel);
+            metadataWriter.setRecordDel(recordDel);
+            metadataWriter.storeSchema(schema, location, job);
+        }
+    }
+
+    @Override
+    public void storeStatistics(ResourceStatistics stats, String location,
+            Job job) throws IOException {
+
+    }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1147934&r1=1147933&r2=1147934&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Mon Jul 18 15:39:35 2011
@@ -19,54 +19,87 @@
 package org.apache.pig.test;
 
 import static org.apache.pig.ExecType.MAPREDUCE;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Properties;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
-
-import junit.framework.Assert;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.test.utils.TypeCheckingTestUtil;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-@RunWith(JUnit4.class)
+import org.junit.Assert;
+
 public class TestPigStorage  {
-        
+
     protected final Log log = LogFactory.getLog(getClass());
-    
+
     private static MiniCluster cluster = MiniCluster.buildCluster();
-    
+    static PigServer pig;
+    static final String datadir = "build/test/tmpdata/";
+
+    PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+    Map<String, String> fileNameMap = new HashMap<String, String>();
+
     @Before
-    public void setup() {
+    public void setup() throws IOException {
         // some tests are in map-reduce mode and some in local - so before
         // each test, we will de-initialize FileLocalizer so that temp files
         // are created correctly depending on the ExecType in the test.
         FileLocalizer.setInitialized(false);
+
+        // If needed, a test can change that. Most tests are local so we save a bit
+        // of typing here.
+        pig = new PigServer(ExecType.LOCAL, cluster.getProperties());
+        Util.deleteDirectory(new File(datadir));
+        try {
+            pig.mkdirs(datadir);
+        } catch (IOException e) {};
+        Util.createLocalInputFile(datadir + "originput",
+                new String[] {"A,1", "B,2", "C,3", "D,2",
+                "A,5", "B,5", "C,8", "A,8",
+                "D,8", "A,9"});
+
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        Util.deleteDirectory(new File(datadir));
+        pig.shutdown();
     }
-    
+
     @AfterClass
     public static void shutdown() {
         cluster.shutDown();
     }
+
     @Test
     public void testBlockBoundary() throws ExecException {
-        
-        // This tests PigStorage loader with records exectly 
+
+        // This tests PigStorage loader with records exectly
         // on the boundary of the file blocks.
         Properties props = new Properties();
         for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
@@ -75,41 +108,41 @@ public class TestPigStorage  {
         props.setProperty("mapred.max.split.size", "20");
         PigServer pigServer = new PigServer(MAPREDUCE, props);
         String[] inputs = {
-                "abcdefgh1", "abcdefgh2", "abcdefgh3", 
+                "abcdefgh1", "abcdefgh2", "abcdefgh3",
                 "abcdefgh4", "abcdefgh5", "abcdefgh6",
                 "abcdefgh7", "abcdefgh8", "abcdefgh9"
         };
-        
+
         String[] expected = {
-                "(abcdefgh1)", "(abcdefgh2)", "(abcdefgh3)", 
+                "(abcdefgh1)", "(abcdefgh2)", "(abcdefgh3)",
                 "(abcdefgh4)", "(abcdefgh5)", "(abcdefgh6)",
                 "(abcdefgh7)", "(abcdefgh8)", "(abcdefgh9)"
         };
-        
+
         System.setProperty("pig.overrideBlockSize", "20");
-        
+
         String INPUT_FILE = "tmp.txt";
-        
+
         try {
-                                    
+
             PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
             for (String s : inputs) {
                 w.println(s);
             }
             w.close();
-            
+
             Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-            
+
             pigServer.registerQuery("a = load '" + INPUT_FILE + "';");
-            
+
             Iterator<Tuple> iter = pigServer.openIterator("a");
             int counter = 0;
             while (iter.hasNext()){
                 assertEquals(expected[counter++].toString(), iter.next().toString());
             }
-            
+
             assertEquals(expected.length, counter);
-        
+
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
@@ -122,33 +155,226 @@ public class TestPigStorage  {
                 Assert.fail();
             }
         }
-    } 
-    
+    }
+
     /**
      * Test to verify that PigStorage works fine in the following scenario:
      * The column prune optimization determines only columns 2 and 3 are needed
      * and there are records in the data which have only 1 column (malformed data).
      * In this case, PigStorage should return an empty tuple to represent columns
-     * 2 and 3 and {@link POProject} would handle catching any 
+     * 2 and 3 and {@link POProject} would handle catching any
      * {@link IndexOutOfBoundsException} resulting from accessing a field in the
-     * tuple and substitute a null. 
+     * tuple and substitute a null.
      */
     @Test
     public void testPruneColumnsWithMissingFields() throws IOException {
         String inputFileName = "TestPigStorage-testPruneColumnsWithMissingFields-input.txt";
         Util.createLocalInputFile(
-                inputFileName, 
+                inputFileName,
                 new String[] {"1\t2\t3", "4", "5\t6\t7"});
-        PigServer ps = new PigServer(ExecType.LOCAL);
         String script = "a = load '" + inputFileName + "' as (i:int, j:int, k:int);" +
         		"b = foreach a generate j, k;";
-        Util.registerMultiLineQuery(ps, script);
-        Iterator<Tuple> it = ps.openIterator("b");
+        Util.registerMultiLineQuery(pig, script);
+        Iterator<Tuple> it = pig.openIterator("b");
         assertEquals(Util.createTuple(new Integer[] { 2, 3}), it.next());
         assertEquals(Util.createTuple(new Integer[] { null, null}), it.next());
         assertEquals(Util.createTuple(new Integer[] { 6, 7}), it.next());
         assertFalse(it.hasNext());
-                
+
     }
 
+    @Test
+    public void testPigStorageSchema() throws Exception {
+        pigContext.connect();
+        String query = "a = LOAD '" + datadir + "originput' using PigStorage('\\t', '-schema') " +
+        "as (f1:chararray, f2:int);";
+        pig.registerQuery(query);
+        Schema origSchema = pig.dumpSchema("a");
+        pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
+
+        // aout now has a schema.
+
+        // Verify that loading a-out with no given schema produces
+        // the original schema.
+
+        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t');");
+        Schema genSchema = pig.dumpSchema("b");
+        Assert.assertTrue("generated schema equals original" ,
+                Schema.equals(genSchema, origSchema, true, false));
+
+        // Verify that giving our own schema works
+        String [] aliases ={"foo", "bar"};
+        byte[] types = {DataType.INTEGER, DataType.LONG};
+        Schema newSchema = TypeCheckingTestUtil.genFlatSchema(
+                aliases,types);
+        pig.registerQuery("c = LOAD '" + datadir + "aout' using PigStorage('\\t', '-schema') "+
+        "as (foo:int, bar:long);");
+        Schema newGenSchema = pig.dumpSchema("c");
+        Assert.assertTrue("explicit schema overrides metadata",
+                Schema.equals(newSchema, newGenSchema, true, false));
+
+        // Verify that explicitly requesting no schema works
+        pig.registerQuery("d = LOAD '" + datadir + "aout' using PigStorage('\\t', '-noschema');");
+        genSchema = pig.dumpSchema("d");
+        assertNull(genSchema);
+    }
+
+    @Test
+    public void testSchemaConversion() throws Exception {
+
+        Util.createLocalInputFile(datadir + "originput2",
+                new String[] {"1", "2", "3", "2",
+                              "5", "5", "8", "8",
+                              "8", "9"});
+
+        pig.registerQuery("A = LOAD '" + datadir + "originput2' using PigStorage('\\t', '-schema') " +
+        "as (f:int);");
+        pig.registerQuery("B = group A by f;");
+        Schema origSchema = pig.dumpSchema("B");
+        ResourceSchema rs1 = new ResourceSchema(origSchema);
+        pig.registerQuery("STORE B into '" + datadir + "bout' using PigStorage('\\t', '-schema');");
+
+        pig.registerQuery("C = LOAD '" + datadir + "bout' using PigStorage('\\t', '-schema');");
+        Schema genSchema = pig.dumpSchema("C");
+        ResourceSchema rs2 = new ResourceSchema(genSchema);
+        Assert.assertTrue("generated schema equals original" , ResourceSchema.equals(rs1, rs2));
+
+        pig.registerQuery("C1 = LOAD '" + datadir + "bout' as (a0:int, A: {t: (f:int) } );");
+        pig.registerQuery("D = foreach C1 generate a0, SUM(A);");
+
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                        "(1,1L)",
+                        "(2,4L)",
+                        "(3,3L)",
+                        "(5,10L)",
+                        "(8,24L)",
+                        "(9,9L)"
+                });
+
+        Iterator<Tuple> iter = pig.openIterator("D");
+        int counter = 0;
+        while (iter.hasNext()) {
+            Assert.assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+        }
+
+        Assert.assertEquals(expectedResults.size(), counter);
+    }
+
+    @Test
+    public void testSchemaConversion2() throws Exception {
+
+        pig.registerQuery("A = LOAD '" + datadir + "originput' using PigStorage(',', '-schema') " +
+        "as (f1:chararray, f2:int);");
+        pig.registerQuery("B = group A by f1;");
+        Schema origSchema = pig.dumpSchema("B");
+        ResourceSchema rs1 = new ResourceSchema(origSchema);
+        pig.registerQuery("STORE B into '" + datadir + "cout' using PigStorage('\\t', '-schema');");
+
+        pig.registerQuery("C = LOAD '" + datadir + "cout' using PigStorage('\\t', '-schema');");
+        Schema genSchema = pig.dumpSchema("C");
+        ResourceSchema rs2 = new ResourceSchema(genSchema);
+        Assert.assertTrue("generated schema equals original" , ResourceSchema.equals(rs1, rs2));
+
+        pig.registerQuery("C1 = LOAD '" + datadir + "cout' as (a0:chararray, A: {t: (f1:chararray, f2:int) } );");
+        pig.registerQuery("D = foreach C1 generate a0, SUM(A.f2);");
+
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                        "('A',23L)",
+                        "('B',7L)",
+                        "('C',11L)",
+                        "('D',10L)"
+                });
+
+        Iterator<Tuple> iter = pig.openIterator("D");
+        int counter = 0;
+        while (iter.hasNext()) {
+            Assert.assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+        }
+
+        Assert.assertEquals(expectedResults.size(), counter);
+    }
+
+    /**
+     * See PIG-1830
+     * @throws IOException
+     */
+    @Test
+    public void testByteArrayConversion() throws IOException {
+        Util.createLocalInputFile(datadir + "originput2",
+                new String[] {"peter\t1", "samir\t2", "michael\t4",
+                "peter\t2", "peter\t4", "samir\t1", "john\t"
+        });
+        Util.createLocalInputFile(datadir + ".pig_schema",
+                new String[] {
+                "{\"fields\":[{\"name\":\"name\",\"type\":55,\"schema\":null," +
+                "\"description\":\"autogenerated from Pig Field Schema\"}," +
+                "{\"name\":\"val\",\"type\":10,\"schema\":null,\"description\":"+
+                "\"autogenerated from Pig Field Schema\"}],\"version\":0," +
+                "\"sortKeys\":[],\"sortKeyOrders\":[]}"
+        });
+        pig.registerQuery("Events = LOAD '" + datadir + "originput2' USING PigStorage('\\t', '-schema');");
+        pig.registerQuery("Sessions = GROUP Events BY name;");
+        Iterator<Tuple> sessions = pig.openIterator("Sessions");
+        while (sessions.hasNext()) {
+            System.out.println(sessions.next());
+}
+
+
+    }
+
+    // See PIG-1993
+    @Test
+    public void testColumnPrune() throws IOException {
+        Util.createLocalInputFile(datadir + "originput2",
+                new String[] {"peter\t1", "samir\t2", "michael\t4",
+                "peter\t2", "peter\t4", "samir\t1", "john\t"
+        });
+        Util.createLocalInputFile(datadir + ".pig_schema",
+                new String[] {
+                "{\"fields\":[{\"name\":\"name\",\"type\":55,\"schema\":null," +
+                "\"description\":\"autogenerated from Pig Field Schema\"}," +
+                "{\"name\":\"val\",\"type\":10,\"schema\":null,\"description\":"+
+                "\"autogenerated from Pig Field Schema\"}],\"version\":0," +
+                "\"sortKeys\":[],\"sortKeyOrders\":[]}"
+        });
+        pig.registerQuery("Events = LOAD '" + datadir + "originput2' USING PigStorage('\\t', '-schema');");
+        pig.registerQuery("EventsName = foreach Events generate name;");
+        Iterator<Tuple> sessions = pig.openIterator("EventsName");
+        sessions.next().toString().equals("(1)");
+        sessions.next().toString().equals("(2)");
+        sessions.next().toString().equals("(4)");
+        sessions.next().toString().equals("(2)");
+        sessions.next().toString().equals("(4)");
+        sessions.next().toString().equals("(1)");
+        sessions.next().toString().equals("()");
+        Assert.assertFalse(sessions.hasNext());
+    }
+
+    @Test
+    public void testPigStorageSchemaHeaderDelimiter() throws Exception {
+        pigContext.connect();
+        String query = "a = LOAD '" + datadir + "originput' using PigStorage(',', '-schema') " +
+                "as (foo:chararray, bar:int);";
+        pig.registerQuery(query);
+        pig.registerQuery("STORE a into '" + datadir + "dout' using PigStorage('#', '-schema');");
+        pig.registerQuery("STORE a into '" + datadir + "eout' using PigStorage('\\t', '-schema');");
+
+        String outPath = FileLocalizer.fullPath(datadir + "dout/.pig_header",
+                pig.getPigContext());
+        Assert.assertTrue(FileLocalizer.fileExists(outPath,
+                pig.getPigContext()));
+
+        String[] header = Util.readOutput(pig.getPigContext(), outPath);
+        Assert.assertArrayEquals("Headers are not the same.", new String[] {"foo#bar"}, header);
+
+        outPath = FileLocalizer.fullPath(datadir + "eout/.pig_header",
+                pig.getPigContext());
+        Assert.assertTrue(FileLocalizer.fileExists(outPath,
+                pig.getPigContext()));
+
+        header = Util.readOutput(pig.getPigContext(), outPath);
+        Assert.assertArrayEquals("Headers are not the same.", new String[] {"foo\tbar"}, header);
+    }
 }



Mime
View raw message