Updated Branches:
refs/heads/trunk 8df1027b4 -> d419f8f4f
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java
deleted file mode 100644
index 33196f5..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.types;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.record.HiveWritableRecord;
-
-/**
- * Interface for writing Vertex IDs from Hive records
- *
- * @param <I> Vertex ID
- */
-public interface HiveVertexIdWriter<I extends WritableComparable> {
- /**
- * Write Vertex ID to record
- *
- * @param id Vertex ID
- * @param record Hive record
- */
- void writeId(I id, HiveWritableRecord record);
-
- /**
- * Null implementation that does nothing
- *
- * @param <W> Writable type
- */
- public static class Null<W extends WritableComparable>
- implements HiveVertexIdWriter<W> {
- /** Singleton */
- private static final Null INSTANCE = new Null();
-
- /**
- * Get singleton
- *
- * @return singleton instance
- */
- public static Null get() {
- return INSTANCE;
- }
-
- @Override
- public void writeId(W id, HiveWritableRecord record) { }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java
deleted file mode 100644
index e9b0687..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.types;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.types.WritableWrapper;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
-import static org.apache.giraph.types.WritableWrappers.lookup;
-
-/**
- * Reader for Vertex/Edge Values from Hive with known types
- *
- * @param <W> Vertex/Edge Value
- */
-public class TypedValueReader<W extends Writable>
- implements HiveValueReader<W> {
- /** Hive column index */
- private final int columnIndex;
- /** {@link WritableWrapper} for Hive column to Giraph Writable */
- private final WritableWrapper<W, Object> writableWrapper;
-
- /**
- * Constructor
- *
- * @param columnIndex column index
- * @param writableWrapper {@link WritableWrapper}
- */
- public TypedValueReader(int columnIndex,
- WritableWrapper<W, Object> writableWrapper) {
- this.columnIndex = columnIndex;
- this.writableWrapper = writableWrapper;
- }
-
- /**
- * Create from Configuration with column name and Schema
- *
- * @param conf Configuration
- * @param columnOption StrConfOption for column name
- * @param schema HiveTableSchema
- * @param <V> Vertex/Edge Value
- * @return TypedVertexValueReader
- */
- public static <V extends Writable> HiveValueReader<V>
- createValueReader(
- ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
- HiveTableSchema schema) {
- Class<V> vertexValueClass = conf.getVertexValueClass();
- if (NullWritable.class.isAssignableFrom(vertexValueClass)) {
- return HiveValueReader.Null.get();
- }
- int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
- Class hiveClass = schema.columnType(columnIndex).javaClass();
- WritableWrapper wrapper = lookup(vertexValueClass, hiveClass);
- return new TypedValueReader(columnIndex, wrapper);
- }
-
- @Override
- public W readValue(HiveReadableRecord record) {
- Object object = record.get(columnIndex);
- return writableWrapper.wrap(object);
- }
-
- public int getColumnIndex() {
- return columnIndex;
- }
-
- public WritableWrapper<W, Object> getWritableWrapper() {
- return writableWrapper;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java
deleted file mode 100644
index 8841bda..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.types;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.types.WritableUnwrapper;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.facebook.hiveio.record.HiveWritableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
-import static org.apache.giraph.types.WritableUnwrappers.lookup;
-
-/**
- * Writer for Vertex/Edge Values from Hive with known types
- *
- * @param <W> Vertex/Edge Value
- */
-public class TypedValueWriter<W extends Writable>
- implements HiveValueWriter<W> {
- /** Hive column index */
- private final int columnIndex;
- /** {@link WritableUnwrapper} for Hive column to Giraph Writable */
- private final WritableUnwrapper<W, Object> writableUnwrapper;
-
- /**
- * Constructor
- *
- * @param columnIndex column index
- * @param writableUnwrapper JavaWritableConverter
- */
- public TypedValueWriter(int columnIndex,
- WritableUnwrapper<W, Object> writableUnwrapper) {
- this.columnIndex = columnIndex;
- this.writableUnwrapper = writableUnwrapper;
- }
-
- /**
- * Create from Configuration with column name and Schema
- *
- * @param conf Configuration
- * @param columnOption StrConfOption for column name
- * @param schema HiveTableSchema
- * @param <V> Vertex/Edge Value
- * @return TypedVertexValueReader
- */
- public static <V extends Writable> HiveValueWriter<V>
- createValueWriter(
- ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
- HiveTableSchema schema) {
- Class<V> vertexValueClass = conf.getVertexValueClass();
- if (NullWritable.class.isAssignableFrom(vertexValueClass)) {
- return HiveValueWriter.Null.get();
- }
- int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
- Class hiveClass = schema.columnType(columnIndex).javaClass();
- WritableUnwrapper unwrapper = lookup(vertexValueClass, hiveClass);
- return new TypedValueWriter(columnIndex, unwrapper);
- }
-
- @Override
- public void writeValue(W value, HiveWritableRecord record) {
- Object object = writableUnwrapper.unwrap(value);
- record.set(columnIndex, object);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java
deleted file mode 100644
index 6d80d37..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.types;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.types.WritableWrapper;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
-import static org.apache.giraph.types.WritableWrappers.lookup;
-
-/**
- * Reader for Vertex IDs from Hive with known types
- *
- * @param <I> Vertex ID
- */
-public class TypedVertexIdReader<I extends WritableComparable>
- implements HiveVertexIdReader<I> {
- /** Hive column index */
- private final int columnIndex;
- /** {@link WritableWrapper} for Hive column to Giraph Writable */
- private final WritableWrapper<I, Object> writableWrapper;
-
- /**
- * Constructor
- *
- * @param columnIndex column index
- * @param writableWrapper {@link WritableWrapper}
- */
- public TypedVertexIdReader(int columnIndex,
- WritableWrapper<I, Object> writableWrapper) {
- this.columnIndex = columnIndex;
- this.writableWrapper = writableWrapper;
- }
-
- /**
- * Create from Configuration with column name and Schema
- *
- * @param conf {@link org.apache.hadoop.conf.Configuration}
- * @param columnOption {@link StrConfOption} for column name
- * @param schema {@link HiveTableSchema}
- * @param <I> Vertex ID
- * @return {@link TypedVertexIdReader}
- */
- public static <I extends WritableComparable> HiveVertexIdReader<I>
- createIdReader(
- ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
- HiveTableSchema schema) {
- Class<I> vertexIdClass = conf.getVertexIdClass();
- if (NullWritable.class.isAssignableFrom(vertexIdClass)) {
- return HiveVertexIdReader.Null.get();
- }
- int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
- Class hiveClass = schema.columnType(columnIndex).javaClass();
- WritableWrapper wrapper = lookup(vertexIdClass, hiveClass);
- return new TypedVertexIdReader<I>(columnIndex, wrapper);
- }
-
- @Override
- public I readId(HiveReadableRecord record) {
- Object object = record.get(columnIndex);
- return writableWrapper.wrap(object);
- }
-
- public int getColumnIndex() {
- return columnIndex;
- }
-
- public WritableWrapper<I, Object> getWritableWrapper() {
- return writableWrapper;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java
deleted file mode 100644
index aece54b..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.types;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.types.WritableUnwrapper;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.record.HiveWritableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
-import static org.apache.giraph.types.WritableUnwrappers.lookup;
-
-/**
- * Writer for Vertex IDs from Hive with known types
- *
- * @param <I> Vertex ID
- */
-public class TypedVertexIdWriter<I extends WritableComparable>
- implements HiveVertexIdWriter<I> {
- /** Hive column index */
- private final int columnIndex;
- /** {@link WritableUnwrapper} for Hive column to Giraph Writable */
- private final WritableUnwrapper<I, Object> writableUnwrapper;
-
- /**
- * Constructor
- *
- * @param columnIndex column index
- * @param writableUnwrapper {@link WritableUnwrapper}
- */
- public TypedVertexIdWriter(int columnIndex,
- WritableUnwrapper<I, Object> writableUnwrapper) {
- this.columnIndex = columnIndex;
- this.writableUnwrapper = writableUnwrapper;
- }
-
- /**
- * Create from Configuration with column name and Schema
- *
- * @param conf {@link org.apache.hadoop.conf.Configuration}
- * @param columnOption {@link StrConfOption} for column name
- * @param schema {@link HiveTableSchema}
- * @param <I> Vertex ID
- * @return {@link TypedVertexIdWriter}
- */
- public static <I extends WritableComparable> HiveVertexIdWriter<I>
- createIdWriter(
- ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
- HiveTableSchema schema) {
- Class<I> vertexIdClass = conf.getVertexIdClass();
- if (NullWritable.class.isAssignableFrom(vertexIdClass)) {
- return HiveVertexIdWriter.Null.get();
- }
- int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
- Class hiveClass = schema.columnType(columnIndex).javaClass();
- WritableUnwrapper unwrapper = lookup(vertexIdClass, hiveClass);
- return new TypedVertexIdWriter<I>(columnIndex, unwrapper);
- }
-
- @Override
- public void writeId(I value, HiveWritableRecord record) {
- Object object = writableUnwrapper.unwrap(value);
- record.set(columnIndex, object);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java
deleted file mode 100644
index 7004637..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * HiveIO type-based things.
- */
-package org.apache.giraph.hive.types;
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java
new file mode 100644
index 0000000..e989e28
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.values;
+
+import org.apache.hadoop.io.Writable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+
+/**
+ * Interface for reading Vertex / Edge values from Hive records
+ *
+ * @param <T> Value type
+ */
+public interface HiveValueReader<T extends Writable> {
+ /**
+ * Read value from record
+ *
+ * @param value graph value to read into
+ * @param record Hive record
+ */
+ void readFields(T value, HiveReadableRecord record);
+
+ /**
+ * Null implementation that return NullWritable
+ *
+ * @param <W> Writable type
+ */
+ public class Null<W extends Writable> implements HiveValueReader<W> {
+ /** Singleton */
+ private static final Null INSTANCE = new Null();
+
+ /**
+ * Get singleton
+ *
+ * @return singleton instance
+ */
+ public static Null get() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void readFields(W value, HiveReadableRecord record) { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java
new file mode 100644
index 0000000..4b624f6
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.values;
+
+import org.apache.hadoop.io.Writable;
+
+import com.facebook.hiveio.record.HiveWritableRecord;
+
+/**
+ * Interface for writing Vertex / Edge values from Hive records
+ *
+ * @param <T> Value type
+ */
+public interface HiveValueWriter<T extends Writable> {
+ /**
+ * Write value to record
+ *
+ * @param value the value
+ * @param record Hive record
+ */
+ void write(T value, HiveWritableRecord record);
+
+ /**
+ * Null implementation that does nothing
+ *
+ * @param <W> Writable type
+ */
+ public static class Null<W extends Writable> implements HiveValueWriter<W> {
+ /** Singleton */
+ private static final Null INSTANCE = new Null();
+
+ /**
+ * Get singleton
+ *
+ * @return singleton instance
+ */
+ public static Null get() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void write(W value, HiveWritableRecord record) { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java
new file mode 100644
index 0000000..1e8f497
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive type (vertex ID, vertex value, edge value, etc) related things.
+ */
+package org.apache.giraph.hive.values;
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java b/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
index 49a36d0..ff932b3 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java
@@ -2,9 +2,11 @@ package org.apache.giraph.hive;
import org.junit.BeforeClass;
+import com.facebook.hiveio.log.LogHelpers;
+
public class GiraphHiveTestBase {
@BeforeClass
public static void silenceLoggers() {
- com.facebook.hiveio.testing.Helpers.silenceLoggers();
+ LogHelpers.silenceLoggers();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
index 00c00ca..d2103a7 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java
@@ -32,11 +32,14 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.google.common.collect.Maps;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
public class Helpers {
+ public static InputStream getResource(String name) {
+ return Helpers.class.getClassLoader().getResourceAsStream(name);
+ }
+
public static Map<Integer, Double> parseIntDoubleResults(Iterable<String> results) {
Map<Integer, Double> values = Maps.newHashMap();
for (String line : results) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
index d5bbb95..bf5af0f 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java
@@ -29,7 +29,6 @@ import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.thrift.TException;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import com.facebook.hiveio.common.HiveMetastores;
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
index af850d5..c75652c 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
@@ -29,7 +29,6 @@ import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.thrift.TException;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import com.facebook.hiveio.common.HiveMetastores;
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java
new file mode 100644
index 0000000..b664d7d
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java
@@ -0,0 +1,139 @@
+package org.apache.giraph.hive.jython;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.Language;
+import org.apache.giraph.hive.GiraphHiveTestBase;
+import org.apache.giraph.hive.Helpers;
+import org.apache.giraph.jython.JythonJob;
+import org.apache.giraph.scripting.DeployType;
+import org.apache.giraph.scripting.ScriptLoader;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.python.util.PythonInterpreter;
+
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.input.HiveInput;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.testing.LocalHiveServer;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import junit.framework.Assert;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.giraph.hive.Helpers.getResource;
+
+public class TestHiveJythonComplexTypes extends GiraphHiveTestBase {
+ private LocalHiveServer hiveServer = new LocalHiveServer("jython-test");
+
+ @Before
+ public void setUp() throws IOException, TException {
+ hiveServer.init();
+ HiveMetastores.setTestClient(hiveServer.getClient());
+ }
+
+ @Test
+ public void testFakeLabelPropagation() throws Exception {
+ String edgesTable = "flp_edges";
+ hiveServer.createTable("CREATE TABLE " + edgesTable +
+ " (source_id INT, " +
+ " target_id INT," +
+ " value FLOAT) " +
+ " ROW FORMAT DELIMITED " +
+ " FIELDS TERMINATED BY '\t'");
+ String[] edges = new String[] {
+ "1\t2\t0.2",
+ "2\t3\t0.3",
+ "3\t4\t0.4",
+ "4\t1\t0.1",
+ };
+ hiveServer.loadData(edgesTable, edges);
+
+ String vertexesTable = "flp_vertexes";
+ hiveServer.createTable("CREATE TABLE " + vertexesTable +
+ " (id INT, " +
+ " value MAP<INT,FLOAT>) " +
+ " ROW FORMAT DELIMITED " +
+ " FIELDS TERMINATED BY '\t' " +
+ " COLLECTION ITEMS TERMINATED BY ',' " +
+ " MAP KEYS TERMINATED BY ':' ");
+ String[] vertexes = new String[] {
+ "1\t11:0.8,12:0.1",
+ "2\t13:0.3,14:0.2",
+ "3\t15:0.4,16:0.7",
+ "4\t17:0.1,18:0.6",
+ };
+ hiveServer.loadData(vertexesTable, vertexes);
+
+ String outputTable = "flp_output";
+ hiveServer.createTable("CREATE TABLE " + outputTable +
+ " (id INT," +
+ " value MAP<INT,DOUBLE>) " +
+ " ROW FORMAT DELIMITED " +
+ " FIELDS TERMINATED BY '\t'");
+
+ String workerJythonPath =
+ "org/apache/giraph/jython/fake-label-propagation-worker.py";
+
+ InputStream launcher = getResource(
+ "org/apache/giraph/jython/fake-label-propagation-launcher.py");
+ assertNotNull(launcher);
+ InputStream worker = getResource(workerJythonPath);
+ assertNotNull(worker);
+
+ PythonInterpreter interpreter = new PythonInterpreter();
+
+ JythonJob jythonJob =
+ HiveJythonUtils.parseJythonStreams(interpreter, launcher, worker);
+
+ GiraphConfiguration conf = new GiraphConfiguration();
+
+ ScriptLoader.setScriptsToLoad(conf, workerJythonPath, DeployType.RESOURCE,
+ Language.JYTHON);
+
+ HiveJythonUtils.writeJythonJobToConf(jythonJob, conf, interpreter);
+
+ InternalVertexRunner.run(conf, new String[0], new String[0]);
+
+ Helpers.commitJob(conf);
+
+ HiveInputDescription inputDesc = new HiveInputDescription();
+ inputDesc.getTableDesc().setTableName(outputTable);
+
+ Iterator<HiveReadableRecord> records = HiveInput.readTable(inputDesc).iterator();
+
+ printRecords(HiveInput.readTable(inputDesc));
+
+ final int rows = 4;
+
+ Set<Integer>[] expected = new Set[rows+1];
+ expected[1] = ImmutableSet.of(11,12,15,16,17,18);
+ expected[2] = ImmutableSet.of(13,14,17,18,11,12);
+ expected[3] = ImmutableSet.of(15,16,11,12,13,14);
+ expected[4] = ImmutableSet.of(17,18,13,14,15,16);
+
+ for (int i = 0; i < rows; ++i) {
+ assertTrue(records.hasNext());
+ HiveReadableRecord record = records.next();
+ assertEquals(expected[record.getInt(0)], record.getMap(1).keySet());
+ }
+
+ assertFalse(records.hasNext());
+ }
+
+ private void printRecords(Iterable<HiveReadableRecord> records) {
+ for (HiveReadableRecord record : records) {
+ System.out.println("record: " + record);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java
new file mode 100644
index 0000000..c1db151
--- /dev/null
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java
@@ -0,0 +1,110 @@
+package org.apache.giraph.hive.jython;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.Language;
+import org.apache.giraph.hive.GiraphHiveTestBase;
+import org.apache.giraph.hive.Helpers;
+import org.apache.giraph.jython.JythonJob;
+import org.apache.giraph.scripting.DeployType;
+import org.apache.giraph.scripting.ScriptLoader;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.thrift.TException;
+import org.junit.Before;
+import org.junit.Test;
+import org.python.util.PythonInterpreter;
+
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.input.HiveInput;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.testing.LocalHiveServer;
+import junit.framework.Assert;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.giraph.hive.Helpers.getResource;
+import static org.junit.Assert.assertEquals;
+
+public class TestHiveJythonPrimitives extends GiraphHiveTestBase {
+ private LocalHiveServer hiveServer = new LocalHiveServer("jython-test");
+
+ @Before
+ public void setUp() throws IOException, TException {
+ hiveServer.init();
+ HiveMetastores.setTestClient(hiveServer.getClient());
+ }
+
+ @Test
+ public void testCountEdges() throws Exception {
+ String edgesTable = "count_edges_edge_input";
+ hiveServer.createTable("CREATE TABLE " + edgesTable +
+ " (source_edge_id INT, " +
+ " target_edge_id INT) " +
+ " ROW FORMAT DELIMITED " +
+ " FIELDS TERMINATED BY '\t'");
+ String[] edges = new String[] {
+ "1\t2",
+ "2\t3",
+ "2\t4",
+ "4\t1"
+ };
+ hiveServer.loadData(edgesTable, edges);
+
+ String outputTable = "count_edges_output";
+ hiveServer.createTable("CREATE TABLE " + outputTable +
+ " (vertex_id INT," +
+ " num_edges INT) " +
+ " ROW FORMAT DELIMITED " +
+ " FIELDS TERMINATED BY '\t'");
+
+ String workerJythonPath = "org/apache/giraph/jython/count-edges.py";
+
+ InputStream launcher = getResource(
+ "org/apache/giraph/jython/count-edges-launcher.py");
+ assertNotNull(launcher);
+ InputStream worker = getResource(workerJythonPath);
+ assertNotNull(worker);
+
+ PythonInterpreter interpreter = new PythonInterpreter();
+
+ JythonJob jythonJob =
+ HiveJythonUtils.parseJythonStreams(interpreter, launcher, worker);
+
+ GiraphConfiguration conf = new GiraphConfiguration();
+
+ ScriptLoader.setScriptsToLoad(conf, workerJythonPath,
+ DeployType.RESOURCE, Language.JYTHON);
+
+ HiveJythonUtils.writeJythonJobToConf(jythonJob, conf, interpreter);
+
+ InternalVertexRunner.run(conf, new String[0], new String[0]);
+
+ Helpers.commitJob(conf);
+
+ HiveInputDescription inputDesc = new HiveInputDescription();
+ inputDesc.getTableDesc().setTableName(outputTable);
+
+ Iterator<HiveReadableRecord> records = HiveInput.readTable(inputDesc).iterator();
+
+ int expected[] = { -1, 1, 2, -1, 1 };
+
+ assertTrue(records.hasNext());
+ HiveReadableRecord record = records.next();
+ Assert.assertEquals(expected[record.getInt(0)], record.getInt(1));
+
+ assertTrue(records.hasNext());
+ record = records.next();
+ Assert.assertEquals(expected[record.getInt(0)], record.getInt(1));
+
+ assertTrue(records.hasNext());
+ record = records.next();
+ Assert.assertEquals(expected[record.getInt(0)], record.getInt(1));
+
+ assertFalse(records.hasNext());
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestJythonLabelInfluence.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestJythonLabelInfluence.java b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestJythonLabelInfluence.java
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
index 4d4d976..39390f1 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
@@ -29,7 +29,6 @@ import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.thrift.TException;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import com.facebook.hiveio.common.HiveMetastores;
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
new file mode 100644
index 0000000..a87f030
--- /dev/null
+++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.giraph.combiner import DoubleSumCombiner
+from org.apache.giraph.edge import ByteArrayEdges
+from org.apache.giraph.jython import JythonJob
+from org.apache.hadoop.io import IntWritable
+from org.apache.hadoop.io import NullWritable
+
+
+def prepare(job):
+ job.hive_database = "default"
+ job.workers = 3
+
+ job.computation_name = "CountEdges"
+
+ job.vertex_id.type = IntWritable
+ job.vertex_value.type = IntWritable
+ job.edge_value.type = NullWritable
+ job.message_value.type = NullWritable
+
+ edge_input = JythonJob.EdgeInput()
+ edge_input.table = "count_edges_edge_input"
+ edge_input.source_id_column = "source_edge_id"
+ edge_input.target_id_column = "target_edge_id"
+ job.edge_inputs.add(edge_input)
+
+ job.vertex_output.table = "count_edges_output"
+ job.vertex_output.id_column = "vertex_id"
+ job.vertex_output.value_column = "num_edges"
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py
new file mode 100644
index 0000000..2d1c381
--- /dev/null
+++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.hadoop.io import IntWritable
+from org.apache.giraph.jython import JythonJob
+
+
+def prepare(job):
+ job.hive_database = "default"
+ job.workers = 5
+
+ job.computation_name = "FakeLabelPropagation"
+
+ job.vertex_id.type = IntWritable
+
+ job.vertex_value.type = FakeLPVertexValue
+ job.vertex_value.hive_io = FakeLPVertexValueHive
+
+ job.edge_value.type = FakeLPEdgeValue
+ job.edge_value.hive_reader = FakeLPEdgeReader
+
+ job.message_value.type = "FakeLPMessageValue"
+
+ edge_input = JythonJob.EdgeInput()
+ edge_input.table = "flp_edges"
+ edge_input.source_id_column = "source_id"
+ edge_input.target_id_column = "target_id"
+ edge_input.value_column = "value"
+ job.edge_inputs.add(edge_input)
+
+ vertex_input = JythonJob.VertexInput()
+ vertex_input.table = "flp_vertexes"
+ vertex_input.id_column = "id"
+ vertex_input.value_column = "value"
+ job.vertex_inputs.add(vertex_input)
+
+ job.vertex_output.table = "flp_output"
+ job.vertex_output.id_column = "id"
+ job.vertex_output.value_column = "value"
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py
new file mode 100644
index 0000000..9499a35
--- /dev/null
+++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.hadoop.io import Writable
+from org.apache.giraph.jython import JythonComputation
+from org.apache.giraph.hive.jython import JythonHiveIO
+from org.apache.giraph.hive.jython import JythonHiveReader
+
+
+# Implements HiveColumnIO to tell Giraph how to read/write from Hive
+class FakeLPVertexValue:
+ def __init__(self):
+ self.labels = {}
+ self.dog = 'cat'
+
+ def add(self, message):
+ for label, weight in message.labels.iteritems():
+ if label in self.labels:
+ self.labels[label] += weight
+ else:
+ self.labels[label] = weight
+
+
+# Hive reader/writer for vertexes
+class FakeLPVertexValueHive(JythonHiveIO):
+ def readFromHive(self, vertex_value, column):
+ vertex_value.labels = column.getMap()
+
+ def writeToHive(self, vertex_value, column):
+ column.setMap(vertex_value.labels)
+
+
+# Implements Writable to override default Jython serialization which grabs all
+# of the data in an object.
+# Also implements HiveColumnReadable to read from Hive.
+class FakeLPEdgeValue(Writable):
+ def __init__(self):
+ self.value = 2.13
+ self.foo = "bar"
+
+ def readFields(self, data_input):
+ self.value = data_input.readFloat()
+ self.foo = "read_in"
+
+ def write(self, data_output):
+ data_output.writeFloat(self.value)
+ self.foo = "wrote_out"
+
+
+# Hive reader for edges
+class FakeLPEdgeReader(JythonHiveReader):
+ def readFromHive(self, edge_value, column):
+ edge_value.value = column.getFloat()
+
+
+# Doesn't implement anything. Use default Jython serialization.
+# No need for I/O with Hive
+class FakeLPMessageValue:
+ def __init__(self):
+ self.labels = {}
+
+
+# Implements BasicComputation to be a Computation Giraph can use
+class FakeLabelPropagation(JythonComputation):
+ def compute(self, vertex, messages):
+ if self.superstep == 0:
+ self.send_msg(vertex)
+ elif self.superstep < self.conf.getInt("supersteps", 3):
+ for message in messages:
+ vertex.value.add(message)
+ self.send_msg(vertex)
+ else:
+ vertex.voteToHalt()
+
+ def send_msg(self, vertex):
+ msg = FakeLPMessageValue()
+ msg.labels = vertex.value.labels
+ self.sendMessageToAllEdges(vertex, msg)
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8aa1f45..41b6bb1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -254,6 +254,7 @@ under the License.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<dep.accumulo.version>1.4.0</dep.accumulo.version>
+ <dep.airline.version>0.5</dep.airline.version>
<dep.base64.version>2.3.8</dep.base64.version>
<dep.cli-parser.version>1.1</dep.cli-parser.version>
<dep.codehaus-jackson.version>1.8.0</dep.codehaus-jackson.version>
@@ -271,7 +272,7 @@ under the License.
<dep.guava.version>12.0</dep.guava.version>
<dep.hcatalog.version>0.5.0-incubating</dep.hcatalog.version>
<dep.hive.version>0.11.0</dep.hive.version>
- <dep.hiveio.version>0.15</dep.hiveio.version>
+ <dep.hiveio.version>0.16</dep.hiveio.version>
<dep.json.version>20090211</dep.json.version>
<dep.junit.version>4.8</dep.junit.version>
<dep.jython.version>2.5.3</dep.jython.version>
@@ -1049,6 +1050,11 @@ under the License.
<version>${dep.yourkit-api.version}</version>
</dependency>
<dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>airline</artifactId>
+ <version>${dep.airline.version}</version>
+ </dependency>
+ <dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>${dep.netty.version}</version>
|