hadoop-pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dmitriy V. Ryaboy (JIRA)" <j...@apache.org>
Subject [jira] Created: (PIG-1285) Allow SingleTupleBag to be serialized
Date Tue, 09 Mar 2010 04:21:27 GMT
Allow SingleTupleBag to be serialized
-------------------------------------

                 Key: PIG-1285
                 URL: https://issues.apache.org/jira/browse/PIG-1285
             Project: Pig
          Issue Type: Improvement
            Reporter: Dmitriy V. Ryaboy


Currently, Pig uses a SingleTupleBag for efficiency when a full-blown spillable bag implementation
is not needed in the Combiner optimization.

Unfortunately this can create problems. The below Initial.exec() code fails at run-time with
the message that a SingleTupleBag cannot be serialized:

{code}
@Override
public Tuple exec(Tuple in) throws IOException {
      // single record. just copy.
      if (in == null) return null;   
      try {
         Tuple resTuple = tupleFactory_.newTuple(in.size());
         for (int i=0; i< in.size(); i++) {
           resTuple.set(i, in.get(i));
        }
        return resTuple;
       } catch (IOException e) {
         log.warn(e);
         return null;
      }
    }

{code}

The code below can fix the problem in the UDF, but it seems like something that should be
handled transparently, not requiring UDF authors to know about SingleTupleBags.

{code}
@Override
public Tuple exec(Tuple in) throws IOException {
      // single record. just copy.
      if (in == null) return null;   
      
      /*
       * Unfortunately SingleTupleBags are not serializable. We cache whether a given index
contains a bag
       * in the map below, and copy all bags into DefaultBags before returning to avoid serialization
exceptions.
       */
      Map<Integer, Boolean> isBagAtIndex = Maps.newHashMap();
      
      try {
        Tuple resTuple = tupleFactory_.newTuple(in.size());

        for (int i=0; i< in.size(); i++) {
          Object obj = in.get(i);
          if (!isBagAtIndex.containsKey(i)) {
            isBagAtIndex.put(i, obj instanceof SingleTupleBag);
          }
          if (isBagAtIndex.get(i)) {
            DataBag newBag = bagFactory_.newDefaultBag();
            newBag.addAll((DataBag)obj);
            obj = newBag;
          }
          resTuple.set(i, obj);
        }
        return resTuple;
      } catch (IOException e) {
        log.warn(e);
        return null;
      }
    }
{code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message