giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [1/5] GIRAPH-717: HiveJythonRunner with support for pure Jython value types (nitay)
Date Wed, 31 Jul 2013 18:55:46 GMT
Updated Branches:
  refs/heads/trunk 8df1027b4 -> d419f8f4f


http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java
deleted file mode 100644
index 33196f5..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.types;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.record.HiveWritableRecord;
-
-/**
- * Interface for writing Vertex IDs from Hive records
- *
- * @param <I> Vertex ID
- */
-public interface HiveVertexIdWriter<I extends WritableComparable> {
-  /**
-   * Write Vertex ID to record
-   *
-   * @param id Vertex ID
-   * @param record Hive record
-   */
-  void writeId(I id, HiveWritableRecord record);
-
-  /**
-   * Null implementation that does nothing
-   *
-   * @param <W> Writable type
-   */
-  public static class Null<W extends WritableComparable>
-      implements HiveVertexIdWriter<W> {
-    /** Singleton */
-    private static final Null INSTANCE = new Null();
-
-    /**
-     * Get singleton
-     *
-     * @return singleton instance
-     */
-    public static Null get() {
-      return INSTANCE;
-    }
-
-    @Override
-    public void writeId(W id, HiveWritableRecord record) { }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java
deleted file mode 100644
index e9b0687..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.types;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.types.WritableWrapper;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
-import static org.apache.giraph.types.WritableWrappers.lookup;
-
-/**
- * Reader for Vertex/Edge Values from Hive with known types
- *
- * @param <W> Vertex/Edge Value
- */
-public class TypedValueReader<W extends Writable>
-    implements HiveValueReader<W> {
-  /** Hive column index */
-  private final int columnIndex;
-  /** {@link WritableWrapper} for Hive column to Giraph Writable */
-  private final WritableWrapper<W, Object> writableWrapper;
-
-  /**
-   * Constructor
-   *
-   * @param columnIndex column index
-   * @param writableWrapper {@link WritableWrapper}
-   */
-  public TypedValueReader(int columnIndex,
-      WritableWrapper<W, Object> writableWrapper) {
-    this.columnIndex = columnIndex;
-    this.writableWrapper = writableWrapper;
-  }
-
-  /**
-   * Create from Configuration with column name and Schema
-   *
-   * @param conf Configuration
-   * @param columnOption StrConfOption for column name
-   * @param schema HiveTableSchema
-   * @param <V> Vertex/Edge Value
-   * @return TypedVertexValueReader
-   */
-  public static <V extends Writable> HiveValueReader<V>
-  createValueReader(
-        ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
-        HiveTableSchema schema) {
-    Class<V> vertexValueClass = conf.getVertexValueClass();
-    if (NullWritable.class.isAssignableFrom(vertexValueClass)) {
-      return HiveValueReader.Null.get();
-    }
-    int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
-    Class hiveClass = schema.columnType(columnIndex).javaClass();
-    WritableWrapper wrapper = lookup(vertexValueClass, hiveClass);
-    return new TypedValueReader(columnIndex, wrapper);
-  }
-
-  @Override
-  public W readValue(HiveReadableRecord record) {
-    Object object = record.get(columnIndex);
-    return writableWrapper.wrap(object);
-  }
-
-  public int getColumnIndex() {
-    return columnIndex;
-  }
-
-  public WritableWrapper<W, Object> getWritableWrapper() {
-    return writableWrapper;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java
deleted file mode 100644
index 8841bda..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.types;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.types.WritableUnwrapper;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.facebook.hiveio.record.HiveWritableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
-import static org.apache.giraph.types.WritableUnwrappers.lookup;
-
-/**
- * Writer for Vertex/Edge Values from Hive with known types
- *
- * @param <W> Vertex/Edge Value
- */
-public class TypedValueWriter<W extends Writable>
-    implements HiveValueWriter<W> {
-  /** Hive column index */
-  private final int columnIndex;
-  /** {@link WritableUnwrapper} for Hive column to Giraph Writable */
-  private final WritableUnwrapper<W, Object> writableUnwrapper;
-
-  /**
-   * Constructor
-   *
-   * @param columnIndex column index
-   * @param writableUnwrapper JavaWritableConverter
-   */
-  public TypedValueWriter(int columnIndex,
-      WritableUnwrapper<W, Object> writableUnwrapper) {
-    this.columnIndex = columnIndex;
-    this.writableUnwrapper = writableUnwrapper;
-  }
-
-  /**
-   * Create from Configuration with column name and Schema
-   *
-   * @param conf Configuration
-   * @param columnOption StrConfOption for column name
-   * @param schema HiveTableSchema
-   * @param <V> Vertex/Edge Value
-   * @return TypedVertexValueReader
-   */
-  public static <V extends Writable> HiveValueWriter<V>
-  createValueWriter(
-        ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
-        HiveTableSchema schema) {
-    Class<V> vertexValueClass = conf.getVertexValueClass();
-    if (NullWritable.class.isAssignableFrom(vertexValueClass)) {
-      return HiveValueWriter.Null.get();
-    }
-    int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
-    Class hiveClass = schema.columnType(columnIndex).javaClass();
-    WritableUnwrapper unwrapper = lookup(vertexValueClass, hiveClass);
-    return new TypedValueWriter(columnIndex, unwrapper);
-  }
-
-  @Override
-  public void writeValue(W value, HiveWritableRecord record) {
-    Object object = writableUnwrapper.unwrap(value);
-    record.set(columnIndex, object);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java
deleted file mode 100644
index 6d80d37..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.types;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.types.WritableWrapper;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
-import static org.apache.giraph.types.WritableWrappers.lookup;
-
-/**
- * Reader for Vertex IDs from Hive with known types
- *
- * @param <I> Vertex ID
- */
-public class TypedVertexIdReader<I extends WritableComparable>
-    implements HiveVertexIdReader<I> {
-  /** Hive column index */
-  private final int columnIndex;
-  /** {@link WritableWrapper} for Hive column to Giraph Writable */
-  private final WritableWrapper<I, Object> writableWrapper;
-
-  /**
-   * Constructor
-   *
-   * @param columnIndex column index
-   * @param writableWrapper {@link WritableWrapper}
-   */
-  public TypedVertexIdReader(int columnIndex,
-      WritableWrapper<I, Object> writableWrapper) {
-    this.columnIndex = columnIndex;
-    this.writableWrapper = writableWrapper;
-  }
-
-  /**
-   * Create from Configuration with column name and Schema
-   *
-   * @param conf {@link org.apache.hadoop.conf.Configuration}
-   * @param columnOption {@link StrConfOption} for column name
-   * @param schema {@link HiveTableSchema}
-   * @param <I> Vertex ID
-   * @return {@link TypedVertexIdReader}
-   */
-  public static <I extends WritableComparable> HiveVertexIdReader<I>
-  createIdReader(
-      ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
-      HiveTableSchema schema) {
-    Class<I> vertexIdClass = conf.getVertexIdClass();
-    if (NullWritable.class.isAssignableFrom(vertexIdClass)) {
-      return HiveVertexIdReader.Null.get();
-    }
-    int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
-    Class hiveClass = schema.columnType(columnIndex).javaClass();
-    WritableWrapper wrapper = lookup(vertexIdClass, hiveClass);
-    return new TypedVertexIdReader<I>(columnIndex, wrapper);
-  }
-
-  @Override
-  public I readId(HiveReadableRecord record) {
-    Object object = record.get(columnIndex);
-    return writableWrapper.wrap(object);
-  }
-
-  public int getColumnIndex() {
-    return columnIndex;
-  }
-
-  public WritableWrapper<I, Object> getWritableWrapper() {
-    return writableWrapper;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java
deleted file mode 100644
index aece54b..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.types;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.types.WritableUnwrapper;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.record.HiveWritableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
-import static org.apache.giraph.types.WritableUnwrappers.lookup;
-
-/**
- * Writer for Vertex IDs from Hive with known types
- *
- * @param <I> Vertex ID
- */
-public class TypedVertexIdWriter<I extends WritableComparable>
-    implements HiveVertexIdWriter<I> {
-  /** Hive column index */
-  private final int columnIndex;
-  /** {@link WritableUnwrapper} for Hive column to Giraph Writable */
-  private final WritableUnwrapper<I, Object> writableUnwrapper;
-
-  /**
-   * Constructor
-   *
-   * @param columnIndex column index
-   * @param writableUnwrapper {@link WritableUnwrapper}
-   */
-  public TypedVertexIdWriter(int columnIndex,
-      WritableUnwrapper<I, Object> writableUnwrapper) {
-    this.columnIndex = columnIndex;
-    this.writableUnwrapper = writableUnwrapper;
-  }
-
-  /**
-   * Create from Configuration with column name and Schema
-   *
-   * @param conf {@link org.apache.hadoop.conf.Configuration}
-   * @param columnOption {@link StrConfOption} for column name
-   * @param schema {@link HiveTableSchema}
-   * @param <I> Vertex ID
-   * @return {@link TypedVertexIdWriter}
-   */
-  public static <I extends WritableComparable> HiveVertexIdWriter<I>
-  createIdWriter(
-      ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
-      HiveTableSchema schema) {
-    Class<I> vertexIdClass = conf.getVertexIdClass();
-    if (NullWritable.class.isAssignableFrom(vertexIdClass)) {
-      return HiveVertexIdWriter.Null.get();
-    }
-    int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
-    Class hiveClass = schema.columnType(columnIndex).javaClass();
-    WritableUnwrapper unwrapper = lookup(vertexIdClass, hiveClass);
-    return new TypedVertexIdWriter<I>(columnIndex, unwrapper);
-  }
-
-  @Override
-  public void writeId(I value, HiveWritableRecord record) {
-    Object object = writableUnwrapper.unwrap(value);
-    record.set(columnIndex, object);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java
deleted file mode 100644
index 7004637..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * HiveIO type-based things.
- */
-package org.apache.giraph.hive.types;

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java
new file mode 100644
index 0000000..e989e28
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.values;
+
+import org.apache.hadoop.io.Writable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+
+/**
+ * Interface for reading Vertex / Edge values from Hive records
+ *
+ * @param <T> Value type
+ */
+public interface HiveValueReader<T extends Writable> {
+  /**
+   * Read value from record
+   *
+   * @param value graph value to read into
+   * @param record Hive record
+   */
+  void readFields(T value, HiveReadableRecord record);
+
+  /**
+   * Null implementation that return NullWritable
+   *
+   * @param <W> Writable type
+   */
+  public class Null<W extends Writable> implements HiveValueReader<W> {
+    /** Singleton */
+    private static final Null INSTANCE = new Null();
+
+    /**
+     * Get singleton
+     *
+     * @return singleton instance
+     */
+    public static Null get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void readFields(W value, HiveReadableRecord record) { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java
new file mode 100644
index 0000000..4b624f6
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.values;
+
+import org.apache.hadoop.io.Writable;
+
+import com.facebook.hiveio.record.HiveWritableRecord;
+
+/**
+ * Interface for writing Vertex / Edge values from Hive records
+ *
+ * @param <T> Value type
+ */
+public interface HiveValueWriter<T extends Writable> {
+  /**
+   * Write value to record
+   *
+   * @param value the value
+   * @param record Hive record
+   */
+  void write(T value, HiveWritableRecord record);
+
+  /**
+   * Null implementation that does nothing
+   *
+   * @param <W> Writable type
+   */
+  public static class Null<W extends Writable> implements HiveValueWriter<W> {
+    /** Singleton */
+    private static final Null INSTANCE = new Null();
+
+    /**
+     * Get singleton
+     *
+     * @return singleton instance
+     */
+    public static Null get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void write(W value, HiveWritableRecord record) { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java
new file mode 100644
index 0000000..1e8f497
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive type (vertex ID, vertex value, edge value, etc) related things.
+ */
+package org.apache.giraph.hive.values;

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java b/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
index 49a36d0..ff932b3 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
@@ -2,9 +2,11 @@ package org.apache.giraph.hive;
 
 import org.junit.BeforeClass;
 
+import com.facebook.hiveio.log.LogHelpers;
+
 public class GiraphHiveTestBase {
   @BeforeClass
   public static void silenceLoggers() {
-    com.facebook.hiveio.testing.Helpers.silenceLoggers();
+    LogHelpers.silenceLoggers();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
index 00c00ca..d2103a7 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
@@ -32,11 +32,14 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import com.google.common.collect.Maps;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 public class Helpers {
+  public static InputStream getResource(String name) {
+    return Helpers.class.getClassLoader().getResourceAsStream(name);
+  }
+
   public static Map<Integer, Double> parseIntDoubleResults(Iterable<String> results) {
     Map<Integer, Double> values = Maps.newHashMap();
     for (String line : results) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
index d5bbb95..bf5af0f 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
@@ -29,7 +29,6 @@ import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.thrift.TException;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.facebook.hiveio.common.HiveMetastores;

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
index af850d5..c75652c 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
@@ -29,7 +29,6 @@ import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.thrift.TException;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.facebook.hiveio.common.HiveMetastores;

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java
new file mode 100644
index 0000000..b664d7d
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java
@@ -0,0 +1,139 @@
+package org.apache.giraph.hive.jython;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.Language;
+import org.apache.giraph.hive.GiraphHiveTestBase;
+import org.apache.giraph.hive.Helpers;
+import org.apache.giraph.jython.JythonJob;
+import org.apache.giraph.scripting.DeployType;
+import org.apache.giraph.scripting.ScriptLoader;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.python.util.PythonInterpreter;
+
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.input.HiveInput;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.testing.LocalHiveServer;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import junit.framework.Assert;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.giraph.hive.Helpers.getResource;
+
+public class TestHiveJythonComplexTypes extends GiraphHiveTestBase {
+  private LocalHiveServer hiveServer = new LocalHiveServer("jython-test");
+
+  @Before
+  public void setUp() throws IOException, TException {
+    hiveServer.init();
+    HiveMetastores.setTestClient(hiveServer.getClient());
+  }
+
+  @Test
+  public void testFakeLabelPropagation() throws Exception {
+    String edgesTable = "flp_edges";
+    hiveServer.createTable("CREATE TABLE " + edgesTable +
+        " (source_id INT, " +
+        "  target_id INT," +
+        "  value FLOAT) " +
+        " ROW FORMAT DELIMITED " +
+        " FIELDS TERMINATED BY '\t'");
+    String[] edges = new String[] {
+        "1\t2\t0.2",
+        "2\t3\t0.3",
+        "3\t4\t0.4",
+        "4\t1\t0.1",
+    };
+    hiveServer.loadData(edgesTable, edges);
+
+    String vertexesTable = "flp_vertexes";
+    hiveServer.createTable("CREATE TABLE " + vertexesTable +
+        " (id INT, " +
+        "  value MAP<INT,FLOAT>) " +
+        " ROW FORMAT DELIMITED " +
+        " FIELDS TERMINATED BY '\t' " +
+        " COLLECTION ITEMS TERMINATED BY ',' " +
+        " MAP KEYS TERMINATED BY ':' ");
+    String[] vertexes = new String[] {
+        "1\t11:0.8,12:0.1",
+        "2\t13:0.3,14:0.2",
+        "3\t15:0.4,16:0.7",
+        "4\t17:0.1,18:0.6",
+    };
+    hiveServer.loadData(vertexesTable, vertexes);
+
+    String outputTable = "flp_output";
+    hiveServer.createTable("CREATE TABLE " + outputTable +
+        " (id INT," +
+        "  value MAP<INT,DOUBLE>) " +
+        " ROW FORMAT DELIMITED " +
+        " FIELDS TERMINATED BY '\t'");
+
+    String workerJythonPath =
+        "org/apache/giraph/jython/fake-label-propagation-worker.py";
+
+    InputStream launcher = getResource(
+        "org/apache/giraph/jython/fake-label-propagation-launcher.py");
+    assertNotNull(launcher);
+    InputStream worker = getResource(workerJythonPath);
+    assertNotNull(worker);
+
+    PythonInterpreter interpreter = new PythonInterpreter();
+
+    JythonJob jythonJob =
+        HiveJythonUtils.parseJythonStreams(interpreter, launcher, worker);
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+
+    ScriptLoader.setScriptsToLoad(conf, workerJythonPath, DeployType.RESOURCE,
+        Language.JYTHON);
+
+    HiveJythonUtils.writeJythonJobToConf(jythonJob, conf, interpreter);
+
+    InternalVertexRunner.run(conf, new String[0], new String[0]);
+
+    Helpers.commitJob(conf);
+
+    HiveInputDescription inputDesc = new HiveInputDescription();
+    inputDesc.getTableDesc().setTableName(outputTable);
+
+    Iterator<HiveReadableRecord> records = HiveInput.readTable(inputDesc).iterator();
+
+    printRecords(HiveInput.readTable(inputDesc));
+
+    final int rows = 4;
+
+    Set<Integer>[] expected = new Set[rows+1];
+    expected[1] = ImmutableSet.of(11,12,15,16,17,18);
+    expected[2] = ImmutableSet.of(13,14,17,18,11,12);
+    expected[3] = ImmutableSet.of(15,16,11,12,13,14);
+    expected[4] = ImmutableSet.of(17,18,13,14,15,16);
+
+    for (int i = 0; i < rows; ++i) {
+      assertTrue(records.hasNext());
+      HiveReadableRecord record = records.next();
+      assertEquals(expected[record.getInt(0)], record.getMap(1).keySet());
+    }
+
+    assertFalse(records.hasNext());
+  }
+
+  private void printRecords(Iterable<HiveReadableRecord> records) {
+    for (HiveReadableRecord record : records) {
+      System.out.println("record: " + record);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java
new file mode 100644
index 0000000..c1db151
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java
@@ -0,0 +1,110 @@
+package org.apache.giraph.hive.jython;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.Language;
+import org.apache.giraph.hive.GiraphHiveTestBase;
+import org.apache.giraph.hive.Helpers;
+import org.apache.giraph.jython.JythonJob;
+import org.apache.giraph.scripting.DeployType;
+import org.apache.giraph.scripting.ScriptLoader;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.python.util.PythonInterpreter;
+
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.input.HiveInput;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.testing.LocalHiveServer;
+import junit.framework.Assert;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.giraph.hive.Helpers.getResource;
+import static org.junit.Assert.assertEquals;
+
+public class TestHiveJythonPrimitives extends GiraphHiveTestBase {
+  private LocalHiveServer hiveServer = new LocalHiveServer("jython-test");
+
+  @Before
+  public void setUp() throws IOException, TException {
+    hiveServer.init();
+    HiveMetastores.setTestClient(hiveServer.getClient());
+  }
+
+  @Test
+  public void testCountEdges() throws Exception {
+    String edgesTable = "count_edges_edge_input";
+    hiveServer.createTable("CREATE TABLE " + edgesTable +
+        " (source_edge_id INT, " +
+        "  target_edge_id INT) " +
+        " ROW FORMAT DELIMITED " +
+        " FIELDS TERMINATED BY '\t'");
+    String[] edges = new String[] {
+        "1\t2",
+        "2\t3",
+        "2\t4",
+        "4\t1"
+    };
+    hiveServer.loadData(edgesTable, edges);
+
+    String outputTable = "count_edges_output";
+    hiveServer.createTable("CREATE TABLE " + outputTable +
+        " (vertex_id INT," +
+        "  num_edges INT) " +
+        " ROW FORMAT DELIMITED " +
+        " FIELDS TERMINATED BY '\t'");
+
+    String workerJythonPath = "org/apache/giraph/jython/count-edges.py";
+
+    InputStream launcher = getResource(
+        "org/apache/giraph/jython/count-edges-launcher.py");
+    assertNotNull(launcher);
+    InputStream worker = getResource(workerJythonPath);
+    assertNotNull(worker);
+
+    PythonInterpreter interpreter = new PythonInterpreter();
+
+    JythonJob jythonJob =
+        HiveJythonUtils.parseJythonStreams(interpreter, launcher, worker);
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+
+    ScriptLoader.setScriptsToLoad(conf, workerJythonPath,
+        DeployType.RESOURCE, Language.JYTHON);
+
+    HiveJythonUtils.writeJythonJobToConf(jythonJob, conf, interpreter);
+
+    InternalVertexRunner.run(conf, new String[0], new String[0]);
+
+    Helpers.commitJob(conf);
+
+    HiveInputDescription inputDesc = new HiveInputDescription();
+    inputDesc.getTableDesc().setTableName(outputTable);
+
+    Iterator<HiveReadableRecord> records = HiveInput.readTable(inputDesc).iterator();
+
+    int expected[] = { -1, 1, 2, -1, 1 };
+
+    assertTrue(records.hasNext());
+    HiveReadableRecord record = records.next();
+    Assert.assertEquals(expected[record.getInt(0)], record.getInt(1));
+
+    assertTrue(records.hasNext());
+    record = records.next();
+    Assert.assertEquals(expected[record.getInt(0)], record.getInt(1));
+
+    assertTrue(records.hasNext());
+    record = records.next();
+    Assert.assertEquals(expected[record.getInt(0)], record.getInt(1));
+
+    assertFalse(records.hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestJythonLabelInfluence.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestJythonLabelInfluence.java b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestJythonLabelInfluence.java
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
index 4d4d976..39390f1 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
@@ -29,7 +29,6 @@ import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.thrift.TException;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.facebook.hiveio.common.HiveMetastores;

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
new file mode 100644
index 0000000..a87f030
--- /dev/null
+++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.giraph.combiner import DoubleSumCombiner
+from org.apache.giraph.edge import ByteArrayEdges
+from org.apache.giraph.jython import JythonJob
+from org.apache.hadoop.io import IntWritable
+from org.apache.hadoop.io import NullWritable
+
+
+def prepare(job):
+    job.hive_database = "default"
+    job.workers = 3
+
+    job.computation_name = "CountEdges"
+
+    job.vertex_id.type = IntWritable
+    job.vertex_value.type = IntWritable
+    job.edge_value.type = NullWritable
+    job.message_value.type = NullWritable
+
+    edge_input = JythonJob.EdgeInput()
+    edge_input.table = "count_edges_edge_input"
+    edge_input.source_id_column = "source_edge_id"
+    edge_input.target_id_column = "target_edge_id"
+    job.edge_inputs.add(edge_input)
+
+    job.vertex_output.table = "count_edges_output"
+    job.vertex_output.id_column = "vertex_id"
+    job.vertex_output.value_column = "num_edges"

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py
new file mode 100644
index 0000000..2d1c381
--- /dev/null
+++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.hadoop.io import IntWritable
+from org.apache.giraph.jython import JythonJob
+
+
+def prepare(job):
+    job.hive_database = "default"
+    job.workers = 5
+
+    job.computation_name = "FakeLabelPropagation"
+
+    job.vertex_id.type = IntWritable
+
+    job.vertex_value.type = FakeLPVertexValue
+    job.vertex_value.hive_io = FakeLPVertexValueHive
+
+    job.edge_value.type = FakeLPEdgeValue
+    job.edge_value.hive_reader = FakeLPEdgeReader
+
+    job.message_value.type = "FakeLPMessageValue"
+
+    edge_input = JythonJob.EdgeInput()
+    edge_input.table = "flp_edges"
+    edge_input.source_id_column = "source_id"
+    edge_input.target_id_column = "target_id"
+    edge_input.value_column = "value"
+    job.edge_inputs.add(edge_input)
+
+    vertex_input = JythonJob.VertexInput()
+    vertex_input.table = "flp_vertexes"
+    vertex_input.id_column = "id"
+    vertex_input.value_column = "value"
+    job.vertex_inputs.add(vertex_input)
+
+    job.vertex_output.table = "flp_output"
+    job.vertex_output.id_column = "id"
+    job.vertex_output.value_column = "value"

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py
new file mode 100644
index 0000000..9499a35
--- /dev/null
+++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.hadoop.io import Writable
+from org.apache.giraph.jython import JythonComputation
+from org.apache.giraph.hive.jython import JythonHiveIO
+from org.apache.giraph.hive.jython import JythonHiveReader
+
+
+# Implements HiveColumnIO to tell Giraph how to read/write from Hive
+class FakeLPVertexValue:
+    def __init__(self):
+        self.labels = {}
+        self.dog = 'cat'
+
+    def add(self, message):
+        for label, weight in message.labels.iteritems():
+            if label in self.labels:
+                self.labels[label] += weight
+            else:
+                self.labels[label] = weight
+
+
+# Hive reader/writer for vertexes
+class FakeLPVertexValueHive(JythonHiveIO):
+    def readFromHive(self, vertex_value, column):
+        vertex_value.labels = column.getMap()
+
+    def writeToHive(self, vertex_value, column):
+        column.setMap(vertex_value.labels)
+
+
+# Implements Writable to override default Jython serialization which grabs all
+# of the data in an object.
+# Also implements HiveColumnReadable to read from Hive.
+class FakeLPEdgeValue(Writable):
+    def __init__(self):
+        self.value = 2.13
+        self.foo = "bar"
+
+    def readFields(self, data_input):
+        self.value = data_input.readFloat()
+        self.foo = "read_in"
+
+    def write(self, data_output):
+        data_output.writeFloat(self.value)
+        self.foo = "wrote_out"
+
+
+# Hive reader for edges
+class FakeLPEdgeReader(JythonHiveReader):
+    def readFromHive(self, edge_value, column):
+        edge_value.value = column.getFloat()
+
+
+# Doesn't implement anything. Use default Jython serialization.
+# No need for I/O with Hive
+class FakeLPMessageValue:
+    def __init__(self):
+        self.labels = {}
+
+
+# Implements BasicComputation to be a Computation Giraph can use
+class FakeLabelPropagation(JythonComputation):
+    def compute(self, vertex, messages):
+        if self.superstep == 0:
+            self.send_msg(vertex)
+        elif self.superstep < self.conf.getInt("supersteps", 3):
+            for message in messages:
+                vertex.value.add(message)
+            self.send_msg(vertex)
+        else:
+            vertex.voteToHalt()
+
+    def send_msg(self, vertex):
+        msg = FakeLPMessageValue()
+        msg.labels = vertex.value.labels
+        self.sendMessageToAllEdges(vertex, msg)

http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8aa1f45..41b6bb1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -254,6 +254,7 @@ under the License.
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 
     <dep.accumulo.version>1.4.0</dep.accumulo.version>
+    <dep.airline.version>0.5</dep.airline.version>
     <dep.base64.version>2.3.8</dep.base64.version>
     <dep.cli-parser.version>1.1</dep.cli-parser.version>
     <dep.codehaus-jackson.version>1.8.0</dep.codehaus-jackson.version>
@@ -271,7 +272,7 @@ under the License.
     <dep.guava.version>12.0</dep.guava.version>
     <dep.hcatalog.version>0.5.0-incubating</dep.hcatalog.version>
     <dep.hive.version>0.11.0</dep.hive.version>
-    <dep.hiveio.version>0.15</dep.hiveio.version>
+    <dep.hiveio.version>0.16</dep.hiveio.version>
     <dep.json.version>20090211</dep.json.version>
     <dep.junit.version>4.8</dep.junit.version>
     <dep.jython.version>2.5.3</dep.jython.version>
@@ -1049,6 +1050,11 @@ under the License.
         <version>${dep.yourkit-api.version}</version>
       </dependency>
       <dependency>
+        <groupId>io.airlift</groupId>
+        <artifactId>airline</artifactId>
+        <version>${dep.airline.version}</version>
+      </dependency>
+      <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty</artifactId>
         <version>${dep.netty.version}</version>


Mime
View raw message