Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 51E917CC0 for ; Wed, 9 Nov 2011 09:09:09 +0000 (UTC) Received: (qmail 23490 invoked by uid 500); 9 Nov 2011 09:09:09 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 23464 invoked by uid 500); 9 Nov 2011 09:09:09 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 23457 invoked by uid 99); 9 Nov 2011 09:09:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Nov 2011 09:09:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Nov 2011 09:09:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C2ACA23889DE for ; Wed, 9 Nov 2011 09:08:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1199668 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/builtin/ test/e2e/pig/tests/ test/org/apache/pig/test/ test/org/apache/pig/test/data/ Date: Wed, 09 Nov 2011 09:08:40 -0000 To: commits@pig.apache.org From: daijy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111109090840.C2ACA23889DE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: daijy Date: Wed Nov 9 09:08:39 2011 New Revision: 1199668 URL: http://svn.apache.org/viewvc?rev=1199668&view=rev Log: PIG-2332: JsonLoader/JsonStorage Added: pig/trunk/src/org/apache/pig/builtin/JsonLoader.java pig/trunk/src/org/apache/pig/builtin/JsonStorage.java pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/LoadFunc.java pig/trunk/src/org/apache/pig/StoreFunc.java pig/trunk/test/e2e/pig/tests/nightly.conf pig/trunk/test/org/apache/pig/test/Util.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1199668&r1=1199667&r2=1199668&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Wed Nov 9 09:08:39 2011 @@ -44,6 +44,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-2332: JsonLoader/JsonStorage (daijy) + PIG-2334: Set default number of reducers for S3N filesystem (ddaniels888 via daijy) PIG-1387: Syntactical Sugar for PIG-1385 (azaroth) Modified: pig/trunk/src/org/apache/pig/LoadFunc.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadFunc.java?rev=1199668&r1=1199667&r2=1199668&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/LoadFunc.java (original) +++ pig/trunk/src/org/apache/pig/LoadFunc.java Wed Nov 9 09:08:39 2011 @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; @@ -37,6 +38,7 @@ import org.apache.pig.builtin.Utf8Storag import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.PigStatusReporter; /** @@ -292,5 +294,15 @@ public abstract class LoadFunc { public void setUDFContextSignature(String signature) { // default implementation is a no-op } - + + /** + * Issue a warning. Warning messages are aggregated and reported to + * the user. + * @param msg String message of the warning + * @param warningEnum type of warning + */ + public final void warn(String msg, Enum warningEnum) { + Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum); + counter.increment(1); + } } Modified: pig/trunk/src/org/apache/pig/StoreFunc.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=1199668&r1=1199667&r2=1199668&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/StoreFunc.java (original) +++ pig/trunk/src/org/apache/pig/StoreFunc.java Wed Nov 9 09:08:39 2011 @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; @@ -29,6 +30,7 @@ import org.apache.pig.classification.Int import org.apache.pig.classification.InterfaceStability; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.PigStatusReporter; /** @@ -173,4 +175,15 @@ public abstract class StoreFunc implemen fs.delete(path, true); } } + + /** + * Issue a warning. Warning messages are aggregated and reported to + * the user. + * @param msg String message of the warning + * @param warningEnum type of warning + */ + public final void warn(String msg, Enum warningEnum) { + Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum); + counter.increment(1); + } } Added: pig/trunk/src/org/apache/pig/builtin/JsonLoader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonLoader.java?rev=1199668&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/JsonLoader.java (added) +++ pig/trunk/src/org/apache/pig/builtin/JsonLoader.java Wed Nov 9 09:08:39 2011 @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.builtin; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + +import org.apache.pig.Expression; +import org.apache.pig.LoadCaster; +import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; +import org.apache.pig.PigWarning; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.impl.util.Utils; +import org.apache.pig.parser.ParserException; + +/** + * A loader for data stored using {@link JsonStorage}. This is not a generic + * JSON loader. It depends on the schema being stored with the data when + * conceivably you could write a loader that determines the schema from the + * JSON. + */ +public class JsonLoader extends LoadFunc implements LoadMetadata { + + protected RecordReader reader = null; + protected ResourceSchema schema = null; + + private String udfcSignature = null; + private JsonFactory jsonFactory = null; + private TupleFactory tupleFactory = TupleFactory.getInstance(); + private BagFactory bagFactory = BagFactory.getInstance(); + + private static final String SCHEMA_SIGNATURE = "pig.jsonloader.schema"; + + public JsonLoader() { + } + + public JsonLoader(String schemaString) throws IOException { + schema = new ResourceSchema(Utils.parseSchema(schemaString)); + } + + public void setLocation(String location, Job job) throws IOException { + // Tell our input format where we will be reading from + FileInputFormat.setInputPaths(job, location); + } + + @SuppressWarnings("unchecked") + public InputFormat getInputFormat() throws IOException { + // We will use TextInputFormat, the default Hadoop input format for + // text. It has a LongWritable key that we will ignore, and the value + // is a Text (a string writable) that the JSON data is in. + return new TextInputFormat(); + } + + public LoadCaster getLoadCaster() throws IOException { + // We do not expect to do casting of byte arrays, because we will be + // returning typed data. + return null; + } + + @SuppressWarnings("unchecked") + public void prepareToRead(RecordReader reader, PigSplit split) + throws IOException { + this.reader = reader; + + // Get the schema string from the UDFContext object. + UDFContext udfc = UDFContext.getUDFContext(); + Properties p = + udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature}); + String strSchema = p.getProperty(SCHEMA_SIGNATURE); + if (strSchema == null) { + throw new IOException("Could not find schema in UDF context"); + } + + // Parse the schema from the string stored in the properties object. + schema = new ResourceSchema(Utils.getSchemaFromString(strSchema)); + + jsonFactory = new JsonFactory(); + } + + public Tuple getNext() throws IOException { + Text val = null; + try { + // Read the next key value pair from the record reader. If it's + // finished, return null + if (!reader.nextKeyValue()) return null; + + // Get the current value. We don't use the key. + val = (Text)reader.getCurrentValue(); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + + // Create a parser specific for this input line. This may not be the + // most efficient approach. + ByteArrayInputStream bais = new ByteArrayInputStream(val.getBytes()); + JsonParser p = jsonFactory.createJsonParser(bais); + + // Create the tuple we will be returning. We create it with the right + // number of fields, as the Tuple object is optimized for this case. + ResourceFieldSchema[] fields = schema.getFields(); + Tuple t = tupleFactory.newTuple(fields.length); + + // Read the start object marker. Throughout this file if the parsing + // isn't what we expect we return a tuple with null fields rather than + // throwing an exception. That way a few mangled lines don't fail the + // job. + if (p.nextToken() != JsonToken.START_OBJECT) { + warn("Bad record, could not find start of record " + + val.toString(), PigWarning.UDF_WARNING_1); + return t; + } + + // Read each field in the record + for (int i = 0; i < fields.length; i++) { + t.set(i, readField(p, fields[i], i)); + } + + if (p.nextToken() != JsonToken.END_OBJECT) { + warn("Bad record, could not find end of record " + + val.toString(), PigWarning.UDF_WARNING_1); + return t; + } + p.close(); + return t; + } + + private Object readField(JsonParser p, + ResourceFieldSchema field, + int fieldnum) throws IOException { + // Read the next token + JsonToken tok = p.nextToken(); + if (tok == null) { + warn("Early termination of record, expected " + schema.getFields().length + + " fields bug found " + fieldnum, PigWarning.UDF_WARNING_1); + return null; + } + + // Check to see if this value was null + if (tok == JsonToken.VALUE_NULL) return null; + + // Read based on our expected type + switch (field.getType()) { + case DataType.INTEGER: + // Read the field name + tok = p.nextToken(); + if (tok == JsonToken.VALUE_NULL) return null; + return p.getIntValue(); + + case DataType.LONG: + tok = p.nextToken(); + if (tok == JsonToken.VALUE_NULL) return null; + return p.getLongValue(); + + case DataType.FLOAT: + tok = p.nextToken(); + return p.getFloatValue(); + + case DataType.DOUBLE: + tok = p.nextToken(); + if (tok == JsonToken.VALUE_NULL) return null; + return p.getDoubleValue(); + + case DataType.BYTEARRAY: + tok = p.nextToken(); + if (tok == JsonToken.VALUE_NULL) return null; + byte[] b = p.getText().getBytes(); + // Use the DBA constructor that copies the bytes so that we own + // the memory + return new DataByteArray(b, 0, b.length); + + case DataType.CHARARRAY: + tok = p.nextToken(); + if (tok == JsonToken.VALUE_NULL) return null; + return p.getText(); + + case DataType.MAP: + // Should be a start of the map object + if (p.nextToken() != JsonToken.START_OBJECT) { + warn("Bad map field, could not find start of object, field " + + fieldnum, PigWarning.UDF_WARNING_1); + return null; + } + Map m = new HashMap(); + while (p.nextToken() != JsonToken.END_OBJECT) { + String k = p.getCurrentName(); + String v = p.getText(); + m.put(k, v); + } + return m; + + case DataType.TUPLE: + if (p.nextToken() != JsonToken.START_OBJECT) { + warn("Bad tuple field, could not find start of object, " + + "field " + fieldnum, PigWarning.UDF_WARNING_1); + return null; + } + + ResourceSchema s = field.getSchema(); + ResourceFieldSchema[] fs = s.getFields(); + Tuple t = tupleFactory.newTuple(fs.length); + + for (int j = 0; j < fs.length; j++) { + t.set(j, readField(p, fs[j], j)); + } + + if (p.nextToken() != JsonToken.END_OBJECT) { + warn("Bad tuple field, could not find end of object, " + + "field " + fieldnum, PigWarning.UDF_WARNING_1); + return null; + } + return t; + + case DataType.BAG: + if (p.nextToken() != JsonToken.START_ARRAY) { + warn("Bad bag field, could not find start of array, " + + "field " + fieldnum, PigWarning.UDF_WARNING_1); + return null; + } + + s = field.getSchema(); + fs = s.getFields(); + // Drill down the next level to the tuple's schema. + s = fs[0].getSchema(); + fs = s.getFields(); + + DataBag bag = bagFactory.newDefaultBag(); + + JsonToken innerTok; + while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) { + if (innerTok != JsonToken.START_OBJECT) { + warn("Bad bag tuple field, could not find start of " + + "object, field " + fieldnum, PigWarning.UDF_WARNING_1); + return null; + } + + t = tupleFactory.newTuple(fs.length); + for (int j = 0; j < fs.length; j++) { + t.set(j, readField(p, fs[j], j)); + } + + if (p.nextToken() != JsonToken.END_OBJECT) { + warn("Bad bag tuple field, could not find end of " + + "object, field " + fieldnum, PigWarning.UDF_WARNING_1); + return null; + } + bag.add(t); + } + return bag; + default: + throw new IOException("Unknown type in input schema: " + + field.getType()); + } + + } + + //------------------------------------------------------------------------ + + public void setUDFContextSignature(String signature) { + udfcSignature = signature; + } + + public ResourceSchema getSchema(String location, Job job) + throws IOException { + + ResourceSchema s; + if (schema!=null) { + s = schema; + } else { + // Parse the schema + s = (new JsonMetadata()).getSchema(location, job, true); + + if (s == null) { + throw new IOException("Unable to parse schema found in file in " + location); + } + } + + // Now that we have determined the schema, store it in our + // UDFContext properties object so we have it when we need it on the + // backend + UDFContext udfc = UDFContext.getUDFContext(); + Properties p = + udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature}); + p.setProperty(SCHEMA_SIGNATURE, s.toString()); + + return s; + } + + public ResourceStatistics getStatistics(String location, Job job) + throws IOException { + // We don't implement this one. + return null; + } + + public String[] getPartitionKeys(String location, Job job) + throws IOException { + // We don't have partitions + return null; + } + + public void setPartitionFilter(Expression partitionFilter) + throws IOException { + // We don't have partitions + } +} Added: pig/trunk/src/org/apache/pig/builtin/JsonStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonStorage.java?rev=1199668&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/JsonStorage.java (added) +++ pig/trunk/src/org/apache/pig/builtin/JsonStorage.java Wed Nov 9 09:08:39 2011 @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.builtin; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import org.codehaus.jackson.JsonEncoding; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.StoreMetadata; +import org.apache.pig.StoreFunc; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.DataBag; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.impl.util.Utils; + +/** + * A JSON Pig store function. Each Pig tuple is stored on one line (as one + * value for TextOutputFormat) so that it can be read easily using + * TextInputFormat. Pig tuples are mapped to JSON objects. Pig bags are + * mapped to JSON arrays. Pig maps are also mapped to JSON objects. Maps are + * assumed to be string to string. A schema is stored in a side file to deal + * with mapping between JSON and Pig types. The schema file share the same format + * as the one we use in PigStorage. + */ +public class JsonStorage extends StoreFunc implements StoreMetadata { + + protected RecordWriter writer = null; + protected ResourceSchema schema = null; + + private String udfcSignature = null; + private JsonFactory jsonFactory = null; + + // Default size for the byte buffer, should fit most tuples. + private static final int BUF_SIZE = 4 * 1024; + + private static final String SCHEMA_SIGNATURE = "pig.jsonstorage.schema"; + + /* + * Methods called on the front end + */ + + @Override + public OutputFormat getOutputFormat() throws IOException { + // We will use TextOutputFormat, the default Hadoop output format for + // text. The key is unused and the value will be a + // Text (a string writable type) that we store our JSON data in. + return new TextOutputFormat(); + } + + @Override + public void setStoreLocation(String location, Job job) throws IOException { + // FileOutputFormat has a utility method for setting up the output + // location. + FileOutputFormat.setOutputPath(job, new Path(location)); + } + + @Override + public void setStoreFuncUDFContextSignature(String signature) { + // store the signature so we can use it later + udfcSignature = signature; + } + + @Override + public void checkSchema(ResourceSchema s) throws IOException { + // We won't really check the schema here, we'll store it in our + // UDFContext properties object so we have it when we need it on the + // backend + + UDFContext udfc = UDFContext.getUDFContext(); + Properties p = + udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature}); + p.setProperty(SCHEMA_SIGNATURE, s.toString()); + } + + + /* + * Methods called on the back end + */ + + @Override + public void prepareToWrite(RecordWriter writer) throws IOException { + // Store the record writer reference so we can use it when it's time + // to write tuples + this.writer = writer; + + // Get the schema string from the UDFContext object. + UDFContext udfc = UDFContext.getUDFContext(); + Properties p = + udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature}); + String strSchema = p.getProperty(SCHEMA_SIGNATURE); + if (strSchema == null) { + throw new IOException("Could not find schema in UDF context"); + } + + // Parse the schema from the string stored in the properties object. + schema = new ResourceSchema(Utils.getSchemaFromString(strSchema)); + + // Build a Json factory + jsonFactory = new JsonFactory(); + } + + @SuppressWarnings("unchecked") + public void putNext(Tuple t) throws IOException { + // Build a ByteArrayOutputStream to write the JSON into + ByteArrayOutputStream baos = new ByteArrayOutputStream(BUF_SIZE); + // Build the generator + JsonGenerator json = + jsonFactory.createJsonGenerator(baos, JsonEncoding.UTF8); + + // Write the beginning of the top level tuple object + json.writeStartObject(); + + ResourceFieldSchema[] fields = schema.getFields(); + for (int i = 0; i < fields.length; i++) { + writeField(json, fields[i], t.get(i)); + } + json.writeEndObject(); + json.close(); + + // Hand a null key and our string to Hadoop + try { + writer.write(null, new Text(baos.toByteArray())); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } + + @SuppressWarnings("unchecked") + private void writeField(JsonGenerator json, + ResourceFieldSchema field, + Object d) throws IOException { + + // If the field is missing or the value is null, write a null + if (d == null) { + json.writeNullField(field.getName()); + return; + } + + // Based on the field's type, write it out + switch (field.getType()) { + case DataType.INTEGER: + json.writeNumberField(field.getName(), (Integer)d); + return; + + case DataType.LONG: + json.writeNumberField(field.getName(), (Long)d); + return; + + case DataType.FLOAT: + json.writeNumberField(field.getName(), (Float)d); + return; + + case DataType.DOUBLE: + json.writeNumberField(field.getName(), (Double)d); + return; + + case DataType.BYTEARRAY: + json.writeStringField(field.getName(), d.toString()); + return; + + case DataType.CHARARRAY: + json.writeStringField(field.getName(), (String)d); + return; + + case DataType.MAP: + json.writeFieldName(field.getName()); + json.writeStartObject(); + for (Map.Entry e : ((Map)d).entrySet()) { + json.writeStringField(e.getKey(), e.getValue().toString()); + } + json.writeEndObject(); + return; + + case DataType.TUPLE: + json.writeFieldName(field.getName()); + json.writeStartObject(); + + ResourceSchema s = field.getSchema(); + if (s == null) { + throw new IOException("Schemas must be fully specified to use " + + "this storage function. No schema found for field " + + field.getName()); + } + ResourceFieldSchema[] fs = s.getFields(); + + for (int j = 0; j < fs.length; j++) { + writeField(json, fs[j], ((Tuple)d).get(j)); + } + json.writeEndObject(); + return; + + case DataType.BAG: + json.writeFieldName(field.getName()); + json.writeStartArray(); + s = field.getSchema(); + if (s == null) { + throw new IOException("Schemas must be fully specified to use " + + "this storage function. No schema found for field " + + field.getName()); + } + fs = s.getFields(); + if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) { + throw new IOException("Found a bag without a tuple " + + "inside!"); + } + // Drill down the next level to the tuple's schema. + s = fs[0].getSchema(); + if (s == null) { + throw new IOException("Schemas must be fully specified to use " + + "this storage function. No schema found for field " + + field.getName()); + } + fs = s.getFields(); + for (Tuple t : (DataBag)d) { + json.writeStartObject(); + for (int j = 0; j < fs.length; j++) { + writeField(json, fs[j], t.get(j)); + } + json.writeEndObject(); + } + json.writeEndArray(); + return; + } + } + + public void storeStatistics(ResourceStatistics stats, + String location, + Job job) throws IOException { + // We don't implement this method + } + + public void storeSchema(ResourceSchema schema, String location, Job job) + throws IOException { + // Store the schema in a side file in the same directory. MapReduce + // does not include files starting with "_" when reading data for a job. + JsonMetadata metadataWriter = new JsonMetadata(); + byte recordDel = '\n'; + byte fieldDel = '\t'; + metadataWriter.setFieldDel(fieldDel); + metadataWriter.setRecordDel(recordDel); + metadataWriter.storeSchema(schema, location, job); + } + +} \ No newline at end of file Modified: pig/trunk/test/e2e/pig/tests/nightly.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1199668&r1=1199667&r2=1199668&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/nightly.conf (original) +++ pig/trunk/test/e2e/pig/tests/nightly.conf Wed Nov 9 09:08:39 2011 @@ -3993,6 +3993,37 @@ store E into ':OUTPATH:';\, store e into ':OUTPATH:';?, } ], + },{ + 'name' => 'JsonLoaderStorage', + 'tests' => [ + { + 'num' => 1, + 'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + store A into ':OUTPATH:.intermediate' using JsonStorage(); + exec + A = LOAD ':OUTPATH:.intermediate' using JsonLoader(); + store A into ':OUTPATH:';?, + 'notmq' => 1, + 'verify_pig_script' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int,gpa:double); + store A into ':OUTPATH:';?, + }, { + 'num' => 2, + 'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + store A into ':OUTPATH:.intermediate1' using JsonStorage(); + B = LOAD ':INPATH:/singlefile/votertab10k' AS (name:chararray, age:int, registration:chararray, contributions:double); + store B into ':OUTPATH:.intermediate2' using JsonStorage(); + exec + A = LOAD ':OUTPATH:.intermediate1' using JsonLoader(); + B = LOAD ':OUTPATH:.intermediate2' using JsonLoader(); + C = JOIN A by name, B by name; + store C into ':OUTPATH:';?, + 'notmq' => 1, + 'verify_pig_script' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int,gpa:double); + B = LOAD ':INPATH:/singlefile/votertab10k' AS (name:chararray, age:int, registration:chararray, contributions:double); + C = JOIN A by name, B by name; + store C into ':OUTPATH:';?, + } + ], }, ], }, Added: pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java?rev=1199668&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java (added) +++ pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java Wed Nov 9 09:08:39 2011 @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.Iterator; + +import junit.framework.Assert; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TestJsonLoaderStorage { + private static PigServer pigServer; + File jsonFile; + + @BeforeClass + public static void setUp() throws Exception{ + removeOutput(); + pigServer = new PigServer(ExecType.LOCAL); + } + + private static void removeOutput() { + File outputDir = new File("jsonStorage1.json"); + if (outputDir.exists()) { + for (File c : outputDir.listFiles()) + c.delete(); + outputDir.delete(); + } + } + + @Test + public void testJsonStorage1() throws Exception{ + removeOutput(); + + pigServer.registerScript("test/org/apache/pig/test/data/jsonStorage1.pig"); + + File resultFile = new File("jsonStorage1.json/part-m-00000"); + + String result = Util.readFile(resultFile); + String expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.result")); + Assert.assertTrue(result.equals(expected)); + + File schemaFile = new File("jsonStorage1.json/.pig_schema"); + result = Util.readFile(schemaFile); + expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.schema")); + Assert.assertTrue(result.equals(expected)); + } + + @Test + public void testJsonLoader1() throws Exception{ + + File tmpFile = File.createTempFile("tmp", null); + tmpFile.delete(); + + pigServer.registerQuery("a = load 'jsonStorage1.json' using JsonLoader();"); + pigServer.store("a", tmpFile.getCanonicalPath()); + + String result = Util.readFile(new File(tmpFile.getCanonicalPath()+"/part-m-00000")); + String expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.txt")); + Assert.assertTrue(result.equals(expected)); + } + + @Test + public void testJsonLoader2() throws Exception{ + + File tmpFile = File.createTempFile("tmp", null); + tmpFile.delete(); + + File schemaFile = new File("test/org/apache/pig/test/data/jsonStorage1.schema"); + schemaFile.delete(); + + pigServer.registerQuery("a = load 'jsonStorage1.json' using" + + " JsonLoader('a0:int,a1:{(a10:int,a11:chararray)},a2:(a20:double,a21:bytearray),a3:[chararray]');"); + pigServer.store("a", tmpFile.getCanonicalPath()); + + String result = Util.readFile(new File(tmpFile.getCanonicalPath()+"/part-m-00000")); + String expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.txt")); + Assert.assertTrue(result.equals(expected)); + } + + @Test + public void testJsonStorage2() throws Exception{ + + File inputFile = File.createTempFile("tmp", null); + PrintWriter pw = new PrintWriter(new FileWriter(inputFile)); + pw.println("\t\t\t"); + pw.close(); + + File interFile = File.createTempFile("tmp", null); + interFile.delete(); + + pigServer.registerQuery("a = load '" + inputFile.getCanonicalPath() + "' as (a0:int, a1:chararray, a2, a3:(a30:int));"); + pigServer.store("a", interFile.getCanonicalPath(), "JsonStorage"); + + pigServer.registerQuery("b = load '" + interFile.getCanonicalPath() + "' using JsonLoader();"); + Iterator iter = pigServer.openIterator("b"); + + Tuple t = iter.next(); + + Assert.assertTrue(t.size()==4); + Assert.assertTrue(t.get(0)==null); + Assert.assertTrue(t.get(1)==null); + Assert.assertTrue(t.get(2)==null); + Assert.assertTrue(t.get(3)==null); + + Assert.assertFalse(iter.hasNext()); + } + + @AfterClass + public static void tearDown() { + removeOutput(); + } +} \ No newline at end of file Modified: pig/trunk/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1199668&r1=1199667&r2=1199668&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/Util.java (original) +++ pig/trunk/test/org/apache/pig/test/Util.java Wed Nov 9 09:08:39 2011 @@ -24,6 +24,7 @@ import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.FileWriter; @@ -1129,6 +1130,15 @@ public class Util { Assert.assertEquals("Comparing actual and expected results. ", expectedResList, actualResList); } - + public static String readFile(File file) throws IOException { + BufferedReader reader = new BufferedReader(new FileReader(file)); + String result = ""; + String line; + while ((line=reader.readLine())!=null) { + result += line; + result += "\n"; + } + return result; + } } Added: pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig?rev=1199668&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig (added) +++ pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig Wed Nov 9 09:08:39 2011 @@ -0,0 +1,2 @@ +a = load 'test/org/apache/pig/test/data/jsonStorage1.txt' as (a0:int, a1:{t:(a10:int, a11:chararray)},a2:(a20:double, a21), a3:map[chararray]); +store a into 'jsonStorage1.json' using JsonStorage(); Added: pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result?rev=1199668&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result (added) +++ pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result Wed Nov 9 09:08:39 2011 @@ -0,0 +1,2 @@ +{"a0":1,"a1":[{"a10":1,"a11":"tom"},{"a10":2,"a11":"jerry"}],"a2":{"a20":1.01,"a21":"sun"},"a3":{"key3":"c","key2":"b","key1":"a"}} +{"a0":2,"a1":[{"a10":6,"a11":"cat"},{"a10":7,"a11":"dog"},{"a10":8,"a11":"pig"}],"a2":{"a20":2.3,"a21":"moon"},"a3":{"key4":"value4","key1":"value1"}} Added: pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt?rev=1199668&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt (added) +++ pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt Wed Nov 9 09:08:39 2011 @@ -0,0 +1,2 @@ +1 {(1,tom),(2,jerry)} (1.01,sun) [key3#c,key2#b,key1#a] +2 {(6,cat),(7,dog),(8,pig)} (2.3,moon) [key4#value4,key1#value1]