crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marcin Michalski <>
Subject Re: Best way to pass GenericData.Record from one fn to the next one
Date Wed, 24 Feb 2016 22:39:34 GMT
Actually, this doesn't work since there seems to be an issue with passing
multiple inputs that have different schema to:
*avroFile(paths)*). This is due to From only grabbing the first schema from
the list of input paths. I think this is bit misleading. Can I extend this
code to return a List<Source<GenericData.Record>> for every schema that I
am working with?

   * Creates a {@code Source<GenericData.Record>} by reading the schema of
the Avro file
   * at the given paths using the {@code FileSystem} information contained
in the given
   * {@code Configuration} instance. If the first path is a directory, the
schema of a file in
   * the directory will be used to determine the schema to use.
   * @param paths The path to the data on the filesystem
   * @param conf The configuration information
   * @return A new {@code Source<GenericData.Record>} instance
  public static Source<GenericData.Record> avroFile(List<Path> paths,
Configuration conf) {
    Preconditions.checkArgument(!paths.isEmpty(), "At least one path must
be supplied");
    return avroFile(paths, Avros.generics(getSchemaFromPath(*paths.get(0)*,

I see that there is another method that I could potentially use
avroTableFiles but I am not sure if this intended to be used as input?

   * Creates a {@code TableSource<K,V>} for reading an Avro key/value file
at the given paths.
   * @param paths list of paths to be read by the returned source
   * @param tableType Avro table type for deserializing the table data
   * @return a new {@code TableSource<K,V>} instance for reading Avro
key/value data
  public static <K, V> TableSource<K, V> avroTableFile(List<Path> paths,
PTableType<K, V> tableType) {
    return new AvroTableFileSource<K, V>(paths,

On Wed, Feb 24, 2016 at 12:51 PM, Marcin Michalski <>

> I am going to try to write the generic input Record into a new avro schema
> that has some new elements + the original input record as json string. Then
> I will dedup the records based on grouping key and then invoke another fn
> that will reconstruct the original Avro schema based on the Json value and
> finally write that using FSDataOutputStream. I think that should work but
> it is bit hacky
> On Wed, Feb 24, 2016 at 7:52 AM, Josh Wills <> wrote:
>> In theory, any PType that supports GenericRecord will work- even a dummy
>> one that defines a schema that isn't the same as the one you're using.
>> I don't recommend doing that, of course, but it will work.
>> On Wed, Feb 24, 2016 at 12:18 AM Marcin Michalski <>
>> wrote:
>>> Hi, is there an easy way to pass GenericData.Record between Fns in
>>> crunch without specifically stating the schema? Since I want to pass
>>> multiple avro files that have various schemas as input to a single DoFn
>>> which will enhance the data into a Pair and later I want to do an
>>> aggregation (deduping) Fn on that data but don't want to specify the Schema
>>> in between (I just want to work with GenericData.Record instances. Here is
>>> an example
>>> PCollection<Record> messages =
>>> // I don't want pass the schema instance but rather just work with
>>> GenericData.Record, is that possible? Or do I need to store use Avros.bytes
>>> instead and then reconstruct the Record later in the next Fn?
>>> messages.parallellDo(new EventEnhancerDoFn(),
>>> Avros.generics(messageSchema)).groupByKey...
>>> Thanks,
>>> Marcin
> --
> Marcin Michalski | Big Data Engineer
> <> | (917) 478-9422 (c)
> <>
> Tagged, Inc. is now if(we). Learn more at

Marcin Michalski | Big Data Engineer <> | (917) 478-9422 (c)
Tagged, Inc. is now if(we). Learn more at

View raw message