Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 89FAD200CE3 for ; Sun, 13 Aug 2017 18:09:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8871D163600; Sun, 13 Aug 2017 16:09:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AB6061634B5 for ; Sun, 13 Aug 2017 18:09:40 +0200 (CEST) Received: (qmail 83842 invoked by uid 500); 13 Aug 2017 16:09:39 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 83827 invoked by uid 99); 13 Aug 2017 16:09:37 -0000 Received: from Unknown (HELO svn01-us-west.apache.org) (209.188.14.144) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 13 Aug 2017 16:09:37 +0000 Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 3C78F3A0167 for ; Sun, 13 Aug 2017 16:09:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1804929 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ src/org/apache/pig/data/ Date: Sun, 13 Aug 2017 16:09:32 -0000 To: commits@pig.apache.org From: szita@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20170813160936.3C78F3A0167@svn01-us-west.apache.org> archived-at: Sun, 13 Aug 2017 16:09:41 -0000 Author: szita Date: Sun Aug 13 16:09:32 2017 New Revision: 1804929 URL: http://svn.apache.org/viewvc?rev=1804929&view=rev Log: PIG-5277: Spark mode is writing nulls among tuples to the output (workaround) (szita) Added: pig/trunk/src/org/apache/pig/data/NonWritableTuple.java Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1804929&r1=1804928&r2=1804929&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Sun Aug 13 16:09:32 2017 @@ -44,6 +44,8 @@ OPTIMIZATIONS BUG FIXES +PIG-5277: Spark mode is writing nulls among tuples to the output (workaround) (szita) + PIG-5283: Configuration is not passed to SparkPigSplits on the backend (szita) PIG-5284: Fix flakyness introduced by PIG-3655 (szita) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1804929&r1=1804928&r2=1804929&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Sun Aug 13 16:09:32 2017 @@ -37,6 +37,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.StoreFuncDecorator; import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; +import org.apache.pig.data.NonWritableTuple; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.util.ObjectSerializer; @@ -141,7 +142,9 @@ public class PigOutputFormat extends Out public void write(WritableComparable key, Tuple value) throws IOException, InterruptedException { if(mode == Mode.SINGLE_STORE) { - storeDecorator.putNext(value); + if (!(value instanceof NonWritableTuple)) { + storeDecorator.putNext(value); + } } else { throw new IOException("Internal Error: Unexpected code path"); } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1804929&r1=1804928&r2=1804929&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Sun Aug 13 16:09:32 2017 @@ -34,6 +34,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark; +import org.apache.pig.data.NonWritableTuple; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; @@ -209,7 +210,7 @@ public class JoinGroupSparkConverter imp out = (Tuple) result.result; break; case POStatus.STATUS_NULL: - out = null; + out = NonWritableTuple.INSTANCE; break; default: throw new RuntimeException( Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java?rev=1804929&r1=1804928&r2=1804929&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java Sun Aug 13 16:09:32 2017 @@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.data.NonWritableTuple; import org.apache.pig.data.Tuple; abstract class OutputConsumerIterator implements java.util.Iterator { @@ -59,6 +60,9 @@ abstract class OutputConsumerIterator im return; } Tuple v1 = input.next(); + if (v1 instanceof NonWritableTuple) { + v1 = null; + } attach(v1); } Added: pig/trunk/src/org/apache/pig/data/NonWritableTuple.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/NonWritableTuple.java?rev=1804929&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/data/NonWritableTuple.java (added) +++ pig/trunk/src/org/apache/pig/data/NonWritableTuple.java Sun Aug 13 16:09:32 2017 @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; + +/** + * A singleton Tuple type which is not picked up for writing by PigRecordWriter + */ +public class NonWritableTuple extends AbstractTuple { + + + public static final NonWritableTuple INSTANCE = new NonWritableTuple(); + + private NonWritableTuple(){} + + @Override + public int size() { + return 0; + } + + @Override + public Object get(int fieldNum) throws ExecException { + return null; + } + + @Override + public List getAll() { + throw new RuntimeException("Unimplemented"); + } + + @Override + public void set(int fieldNum, Object val) throws ExecException { + throw new ExecException("Unimplemented"); + } + + @Override + public void append(Object val) { + throw new RuntimeException("Unimplemented"); + } + + @Override + public long getMemorySize() { + throw new RuntimeException("Unimplemented"); + } + + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("Unimplemented"); + } + + @Override + public void write(DataOutput arg0) throws IOException { + throw new IOException("Unimplemented"); + } + + @Override + public int compareTo(Object o) { + throw new RuntimeException("Unimplemented"); + } + +}