gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject svn commit: r1560408 - in /gora/trunk: gora-core/src/main/java/org/apache/gora/filter/ gora-core/src/test/java/org/apache/gora/filter/ gora-hbase/src/main/java/org/apache/gora/hbase/util/
Date Wed, 22 Jan 2014 16:29:06 GMT
Author: lewismc
Date: Wed Jan 22 16:29:05 2014
New Revision: 1560408

URL: http://svn.apache.org/r1560408
Log:
GORA-119 implement a filter enabled scan in gora

Added:
    gora/trunk/gora-core/src/main/java/org/apache/gora/filter/
    gora/trunk/gora-core/src/main/java/org/apache/gora/filter/Filter.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/filter/FilterList.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/filter/
    gora/trunk/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java

Added: gora/trunk/gora-core/src/main/java/org/apache/gora/filter/Filter.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/filter/Filter.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/filter/Filter.java (added)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/filter/Filter.java Wed Jan 22 16:29:05
2014
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Defines filtering (possibly including modification) of rows. By default
+ * all filtering is done client side. (In generic Gora classes). Datastore
+ * implementations can decide if they install remote filters, when possible.
+ *  
+ * @param <K>
+ * @param <T>
+ */
+public interface Filter<K, T extends Persistent> extends Writable{
+  
+  /**
+   * Filter the key and persistent. Modification is possible.
+   * 
+   * @param key
+   * @param persistent
+   * @return <code>true</code> if the row is filtered out (excluded), 
+   * <code>false</code> otherwise.
+   */
+  public boolean filter(K key, T persistent);
+}

Added: gora/trunk/gora-core/src/main/java/org/apache/gora/filter/FilterList.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/filter/FilterList.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/filter/FilterList.java (added)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/filter/FilterList.java Wed Jan 22 16:29:05
2014
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.filter;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.ReflectionUtils;
+import org.apache.hadoop.io.Text;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FilterList<K, T extends PersistentBase> implements Filter<K, T>
{
+  /** set operator */
+  public static enum Operator {
+    /** !AND */
+    MUST_PASS_ALL,
+    /** !OR */
+    MUST_PASS_ONE
+  }
+  
+  private Operator operator = Operator.MUST_PASS_ALL;
+  private List<Filter<K, T>> filters = new ArrayList<Filter<K, T>>();
+  
+  public FilterList() {
+  }
+  
+  public FilterList(final List<Filter<K, T>> rowFilters) {
+    this.filters = rowFilters;
+  }
+  
+  public FilterList(final Operator operator) {
+    this.operator = operator;
+  }
+  
+  public FilterList(final Operator operator, final List<Filter<K, T>> rowFilters)
{
+    this.filters = rowFilters;
+    this.operator = operator;
+  }
+
+  public List<Filter<K, T>> getFilters() {
+    return filters;
+  }
+  
+  public Operator getOperator() {
+    return operator;
+  }
+  
+  public void addFilter(Filter<K, T> filter) {
+    this.filters.add(filter);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte opByte = in.readByte();
+    operator = Operator.values()[opByte];
+    int size = in.readInt();
+    if (size > 0) {
+      filters = new ArrayList<Filter<K, T>>(size);
+      try {
+        for (int i = 0; i < size; i++) {
+          @SuppressWarnings("unchecked")
+          Class<? extends Filter<K, T>> cls = (Class<? extends Filter<K,
T>>) Class.forName(Text.readString(in)).asSubclass(Filter.class);
+          Filter<K, T> filter = ReflectionUtils.newInstance(cls);
+          filter.readFields(in);
+          filters.add(filter);
+        }
+      } catch (Exception e) {
+        throw (IOException)new IOException("Failed filter init").initCause(e);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeByte(operator.ordinal());
+    out.writeInt(filters.size());
+    for (Filter<K, T> filter : filters) {
+      Text.writeString(out, filter.getClass().getName());
+      filter.write(out);
+    }
+  }
+
+  @Override
+  public boolean filter(K key, T persistent) {
+    // TODO not yet implemented
+    return false;
+  }
+
+}

Added: gora/trunk/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java (added)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java Wed Jan 22 16:29:05
2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+/**
+ * Defines a set of common filter compare operations.
+ */
+public enum FilterOp {
+  EQUALS,
+  NOT_EQUALS,
+  LESS,
+  LESS_OR_EQUAL,
+  GREATER,
+  GREATER_OR_EQUAL,
+}

Added: gora/trunk/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java (added)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java Wed
Jan 22 16:29:05 2014
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A filter that checks for a single field in the persistent.
+ * 
+ * @param <K>
+ * @param <T>
+ */
+public class MapFieldValueFilter<K, T extends PersistentBase> implements Filter<K,
T> {
+
+  protected String fieldName;
+  protected Utf8 mapKey;
+  protected FilterOp filterOp;
+  protected List<Object> operands = new ArrayList<Object>();
+  protected boolean filterIfMissing = false;
+
+  private Configuration conf = new Configuration(); // just create empty conf,
+                                                    // needed for ObjectWritable
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, fieldName);
+    Text.writeString(out, mapKey.toString());
+    WritableUtils.writeEnum(out, filterOp);
+    WritableUtils.writeVInt(out, operands.size());
+    for (int i = 0; i < operands.size(); i++) {
+      Object operand = operands.get(i);
+      if (operand instanceof String) {
+        throw new IllegalStateException("Use Utf8 instead of String for operands");
+      }
+      if (operand instanceof Utf8) {
+        operand = operand.toString();
+      }
+      if (operand instanceof Boolean) {
+        ObjectWritable.writeObject(out, operand, Boolean.TYPE, conf);
+      } else if (operand instanceof Character) {
+        ObjectWritable.writeObject(out, operand, Character.TYPE, conf);
+      } else if (operand instanceof Byte) {
+        ObjectWritable.writeObject(out, operand, Byte.TYPE, conf);
+      } else if (operand instanceof Short) {
+        ObjectWritable.writeObject(out, operand, Short.TYPE, conf);
+      } else if (operand instanceof Integer) {
+        ObjectWritable.writeObject(out, operand, Integer.TYPE, conf);
+      } else if (operand instanceof Long) {
+        ObjectWritable.writeObject(out, operand, Long.TYPE, conf);
+      } else if (operand instanceof Float) {
+        ObjectWritable.writeObject(out, operand, Float.TYPE, conf);
+      } else if (operand instanceof Double) {
+        ObjectWritable.writeObject(out, operand, Double.TYPE, conf);
+      } else if (operand instanceof Void) {
+        ObjectWritable.writeObject(out, operand, Void.TYPE, conf);
+      } else {
+        ObjectWritable.writeObject(out, operand, operand.getClass(), conf);
+      }
+    }
+    out.writeBoolean(filterIfMissing);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    fieldName = Text.readString(in);
+    mapKey = new Utf8(Text.readString(in));
+    filterOp = WritableUtils.readEnum(in, FilterOp.class);
+    operands.clear();
+    int operandsSize = WritableUtils.readVInt(in);
+    for (int i = 0; i < operandsSize; i++) {
+      Object operand = ObjectWritable.readObject(in, conf);
+      if (operand instanceof String) {
+        operand = new Utf8((String) operand);
+      }
+      operands.add(operand);
+    }
+    filterIfMissing = in.readBoolean();
+  }
+
+  @Override
+  public boolean filter(K key, T persistent) {
+    int fieldIndex = persistent.getFieldIndex(fieldName);
+    Map<Utf8, ?> fieldValue = (Map<Utf8, ?>) persistent.get(fieldIndex);
+    if (fieldValue == null) {
+      return filterIfMissing;
+    }
+    Object value = fieldValue.get(mapKey);
+    Object operand = operands.get(0);
+    if (value == null) {
+      return filterIfMissing;
+    }
+    if (filterOp.equals(FilterOp.EQUALS)) {
+      boolean equals = value.equals(operand);
+      return !equals;
+    } else if (filterOp.equals(FilterOp.NOT_EQUALS)) {
+      boolean equals = value.equals(operand);
+      return equals;
+    } else {
+      throw new IllegalStateException(filterOp + " not yet implemented");
+    }
+  }
+
+  public String getFieldName() {
+    return fieldName;
+  }
+
+  public void setFieldName(String fieldName) {
+    this.fieldName = fieldName;
+  }
+
+  public Utf8 getMapKey() {
+    return mapKey;
+  }
+
+  public void setMapKey(Utf8 mapKey) {
+    this.mapKey = mapKey;
+  }
+
+  public FilterOp getFilterOp() {
+    return filterOp;
+  }
+
+  public void setFilterOp(FilterOp filterOp) {
+    this.filterOp = filterOp;
+  }
+
+  public List<Object> getOperands() {
+    return operands;
+  }
+
+  public void setOperands(List<Object> operands) {
+    this.operands = operands;
+  }
+
+  public void setFilterIfMissing(boolean filterIfMissing) {
+    this.filterIfMissing = filterIfMissing;
+  }
+
+  public boolean isFilterIfMissing() {
+    return filterIfMissing;
+  }
+
+  @Override
+  public String toString() {
+    return "SingleFieldValueFilter [fieldName=" + fieldName + ",mapKey=" + mapKey + ", filterOp="
+ filterOp + ", operands=" + operands
+        + ", filterIfMissing=" + filterIfMissing + "]";
+  }
+}

Added: gora/trunk/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java
(added)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java
Wed Jan 22 16:29:05 2014
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A filter that checks for a single field in the persistent.
+ * 
+ * @param <K>
+ * @param <T>
+ */
+public class SingleFieldValueFilter<K, T extends PersistentBase> implements Filter<K,
T>{
+
+  protected String fieldName;
+  protected FilterOp filterOp;
+  protected List<Object> operands = new ArrayList<Object>();
+  protected boolean filterIfMissing = false;
+
+  private Configuration conf = new Configuration(); //just create empty conf,
+                                                    //needed for ObjectWritable
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, fieldName);
+    WritableUtils.writeEnum(out, filterOp);
+    WritableUtils.writeVInt(out, operands.size());
+    for (int i = 0; i < operands.size(); i++) {
+      Object operand = operands.get(i);
+      if (operand instanceof String) {
+        throw new IllegalStateException("Use Utf8 instead of String for operands");
+      }
+      if (operand instanceof Utf8) {
+        operand=operand.toString();
+      }
+      if (operand instanceof Boolean) {
+        ObjectWritable.writeObject(out, operand, Boolean.TYPE, conf);
+      } else if (operand instanceof Character) {
+        ObjectWritable.writeObject(out, operand, Character.TYPE, conf);
+      } else if (operand instanceof Byte) {
+        ObjectWritable.writeObject(out, operand, Byte.TYPE, conf);
+      } else if (operand instanceof Short) {
+        ObjectWritable.writeObject(out, operand, Short.TYPE, conf);
+      } else if (operand instanceof Integer) {
+        ObjectWritable.writeObject(out, operand, Integer.TYPE, conf);
+      } else if (operand instanceof Long) {
+        ObjectWritable.writeObject(out, operand, Long.TYPE, conf);
+      } else if (operand instanceof Float) {
+        ObjectWritable.writeObject(out, operand, Float.TYPE, conf);
+      } else if (operand instanceof Double) {
+        ObjectWritable.writeObject(out, operand, Double.TYPE, conf);
+      } else if (operand instanceof Void) {
+        ObjectWritable.writeObject(out, operand, Void.TYPE, conf);
+      } else {
+        ObjectWritable.writeObject(out, operand, operand.getClass(), conf);
+      }
+    }
+    out.writeBoolean(filterIfMissing);
+  }  
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    fieldName = Text.readString(in);
+    filterOp = WritableUtils.readEnum(in, FilterOp.class);
+    operands.clear();
+    int operandsSize = WritableUtils.readVInt(in);
+    for (int i = 0; i < operandsSize; i++) {
+      Object operand = ObjectWritable.readObject(in, conf);
+      if (operand instanceof String) {
+        operand=new Utf8((String) operand);
+      }
+      operands.add(operand);
+    }
+    filterIfMissing = in.readBoolean();
+  }
+
+  @Override
+  public boolean filter(K key, T persistent) {
+    int fieldIndex = persistent.getFieldIndex(fieldName);
+    Object fieldValue = persistent.get(fieldIndex);
+    Object operand = operands.get(0);
+    if (fieldValue == null) {
+      return filterIfMissing;
+    }
+    if (filterOp.equals(FilterOp.EQUALS)) {
+      boolean equals = operand.equals(fieldValue);
+      return !equals;
+    } else if (filterOp.equals(FilterOp.NOT_EQUALS)) {
+      boolean equals = operand.equals(fieldValue);
+      return equals;
+    // TODO implement other FilterOp operators
+    } else {
+      throw new IllegalStateException(filterOp + " not yet implemented");
+    }
+  }
+  
+  public String getFieldName() {
+    return fieldName;
+  }
+  public void setFieldName(String fieldName) {
+    this.fieldName = fieldName;
+  }
+  
+  public FilterOp getFilterOp() {
+    return filterOp;
+  }
+  public void setFilterOp(FilterOp filterOp) {
+    this.filterOp = filterOp;
+  }
+
+  public List<Object> getOperands() {
+    return operands;
+  }
+  public void setOperands(List<Object> operands) {
+    this.operands = operands;
+  }
+  
+  public void setFilterIfMissing(boolean filterIfMissing) {
+    this.filterIfMissing = filterIfMissing;
+  }
+  
+  public boolean isFilterIfMissing() {
+    return filterIfMissing;
+  }
+
+  @Override
+  public String toString() {
+    return "SingleFieldValueFilter [fieldName=" + fieldName + ", filterOp="
+        + filterOp + ", operands=" + operands + ", filterIfMissing="
+        + filterIfMissing + "]";
+  }
+}

Added: gora/trunk/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java
(added)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java
Wed Jan 22 16:29:05 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import static org.junit.Assert.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.hadoop.io.WritableUtils;
+import org.junit.Test;
+
+public class TestMapFieldValueFilter {
+
+  @Test
+  public void testSerialization() throws IOException {
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String,
WebPage>();
+    filter.setFieldName(WebPage.Field.METADATA.toString());
+    filter.setMapKey(new Utf8("fetchTime"));
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("http://example.org"));
+    byte[] byteArray = WritableUtils.toByteArray(filter);
+    MapFieldValueFilter<String, WebPage> filter2 = new MapFieldValueFilter<String,
WebPage>();
+    filter2.readFields(new DataInputStream(new ByteArrayInputStream(byteArray)));
+    assertEquals(filter.getFieldName(), filter2.getFieldName());
+    assertEquals(filter.getMapKey(), filter2.getMapKey());
+    assertEquals(filter.getFilterOp(), filter2.getFilterOp());
+    assertArrayEquals(filter.getOperands().toArray(), filter2.getOperands().toArray());
+    assertEquals(filter.isFilterIfMissing(), filter2.isFilterIfMissing());
+  }
+  
+  @Test
+  public void testFilterBasics() {
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String,
WebPage>();
+    filter.setFieldName(WebPage.Field.OUTLINKS.toString());
+    filter.setMapKey(new Utf8("example"));
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("http://example.org"));
+    
+    WebPage page = new WebPage();
+    page.putToOutlinks(new Utf8("example"), new Utf8("http://example.org"));
+    assertFalse(filter.filter("irrelevant", page));
+    page.putToOutlinks(new Utf8("example"), new Utf8("http://example2.com"));
+    assertTrue(filter.filter("irrelevant", page));
+    page = new WebPage();
+    assertTrue(filter.filter("irrelevant", page));
+    filter.setFilterIfMissing(false);
+    
+    assertFalse(filter.filter("irrelevant", page));
+  }
+  
+  @Test
+  public void testFilterEntryInMap() {
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String,
WebPage>();
+    filter.setFieldName(WebPage.Field.OUTLINKS.toString());
+    filter.setMapKey(new Utf8("foobar.whatever"));
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("Click here for foobar!"));
+    
+    WebPage page = new WebPage();
+    assertTrue(filter.filter("irrelevant", page));
+    page.putToOutlinks(new Utf8("foobar.whatever"), new Utf8("Mismatch!"));
+    assertTrue(filter.filter("irrelevant", page));
+    page.putToOutlinks(new Utf8("foobar.whatever"), new Utf8("Click here for foobar!"));
+    assertFalse(filter.filter("irrelevant", page));
+  }
+
+}

Added: gora/trunk/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java
(added)
+++ gora/trunk/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java
Wed Jan 22 16:29:05 2014
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.hadoop.io.WritableUtils;
+import org.junit.Test;
+
+public class TestSingleFieldValueFilter {
+
+  @Test
+  public void testSerialization() throws IOException {
+    SingleFieldValueFilter<String, WebPage> filter = new SingleFieldValueFilter<String,
WebPage>();
+    filter.setFieldName(WebPage.Field.URL.toString());
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("http://example.org"));
+    byte[] byteArray = WritableUtils.toByteArray(filter);
+    SingleFieldValueFilter<String, WebPage> filter2 = new SingleFieldValueFilter<String,
WebPage>();
+    filter2.readFields(new DataInputStream(new ByteArrayInputStream(byteArray)));
+    assertEquals(filter.getFieldName(), filter2.getFieldName());
+    assertEquals(filter.getFilterOp(), filter2.getFilterOp());
+    assertArrayEquals(filter.getOperands().toArray(), filter2.getOperands().toArray());
+    assertEquals(filter.isFilterIfMissing(), filter2.isFilterIfMissing());
+  }
+  
+  @Test
+  public void testFilterBasics() {
+    SingleFieldValueFilter<String, WebPage> filter = new SingleFieldValueFilter<String,
WebPage>();
+    filter.setFieldName(WebPage.Field.URL.toString());
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("example.org"));
+    
+    WebPage page = new WebPage();
+    page.setUrl(new Utf8("example.org"));
+    assertFalse(filter.filter("irrelevant", page));
+    page.setUrl(new Utf8("mismatch.whatever"));
+    assertTrue(filter.filter("irrelevant", page));
+    page = new WebPage();
+    assertTrue(filter.filter("irrelevant", page));
+    
+    filter.setFilterIfMissing(false);
+    
+    assertFalse(filter.filter("irrelevant", page));
+  }
+  
+  @Test
+  public void testFilterInequals() {
+    SingleFieldValueFilter<String, WebPage> filter = new SingleFieldValueFilter<String,
WebPage>();
+    filter.setFieldName(WebPage.Field.URL.toString());
+    filter.setFilterOp(FilterOp.NOT_EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("example.org"));
+    
+    WebPage page = new WebPage();
+    page.setUrl(new Utf8("example.org"));
+    assertTrue(filter.filter("irrelevant", page));
+    
+    page.setUrl(new Utf8("something.else"));
+    assertFalse(filter.filter("irrelevant", page));
+  }
+  
+}

Added: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java (added)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java Wed Jan
22 16:29:05 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+
+public abstract class BaseFactory<K, T extends PersistentBase> implements FilterFactory<K,
T> {
+
+  private HBaseFilterUtil<K, T> util;
+
+  @Override
+  public HBaseFilterUtil<K, T> getHbaseFitlerUtil() {
+    return util;
+  }
+
+  @Override
+  public void setHBaseFitlerUtil(HBaseFilterUtil<K, T> util) {
+    this.util = util;
+  }
+  
+}

Added: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java (added)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java Wed
Jan 22 16:29:05 2014
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.filter.FilterList;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.Filter;
+import org.apache.gora.filter.MapFieldValueFilter;
+import org.apache.gora.filter.SingleFieldValueFilter;
+import org.apache.gora.hbase.store.HBaseColumn;
+import org.apache.gora.hbase.store.HBaseStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DefaultFactory <K, T extends PersistentBase> extends BaseFactory<K,
T> {
+  private static final Log LOG = LogFactory.getLog(DefaultFactory.class);
+  
+  @Override
+  public List<String> getSupportedFilters() {
+    List<String> filters = new ArrayList<String>();
+    filters.add(SingleFieldValueFilter.class.getCanonicalName());
+    filters.add(MapFieldValueFilter.class.getCanonicalName());
+    filters.add(FilterList.class.getCanonicalName());
+    return filters;
+  }
+  
+  @Override
+  public org.apache.hadoop.hbase.filter.Filter createFilter(Filter<K, T> filter, HBaseStore<K,
T> store) {
+    if (filter instanceof FilterList) {
+      FilterList<K, T> filterList = (FilterList<K, T>) filter;
+      org.apache.hadoop.hbase.filter.FilterList hbaseFilter = new org.apache.hadoop.hbase.filter.FilterList(
+          Operator.valueOf(filterList.getOperator().name()));
+      for (Filter<K, T> rowFitler : filterList.getFilters()) {
+        FilterFactory<K, T> factory = getHbaseFitlerUtil().getFactory(rowFitler);
+        if (factory == null) {
+          LOG.warn("HBase remote filter factory not yet implemented for " + rowFitler.getClass().getCanonicalName());
+          return null;
+        }
+        org.apache.hadoop.hbase.filter.Filter hbaseRowFilter = factory.createFilter(rowFitler,
store);
+        if (hbaseRowFilter != null) {
+          hbaseFilter.addFilter(hbaseRowFilter);
+        }
+      }
+      return hbaseFilter;
+    } else if (filter instanceof SingleFieldValueFilter) {
+      SingleFieldValueFilter<K, T> fieldFilter = (SingleFieldValueFilter<K, T>)
filter;
+
+      HBaseColumn column = store.getMapping().getColumn(fieldFilter.getFieldName());
+      CompareOp compareOp = getCompareOp(fieldFilter.getFilterOp());
+      byte[] family = column.getFamily();
+      byte[] qualifier = column.getQualifier();
+      byte[] value = HBaseByteInterface.toBytes(fieldFilter.getOperands().get(0));
+      SingleColumnValueFilter hbaseFilter = new SingleColumnValueFilter(family, qualifier,
compareOp, value);
+      hbaseFilter.setFilterIfMissing(fieldFilter.isFilterIfMissing());
+
+      return hbaseFilter;
+    } else if (filter instanceof MapFieldValueFilter) {
+      MapFieldValueFilter<K, T> mapFilter = (MapFieldValueFilter<K, T>) filter;
+
+      HBaseColumn column = store.getMapping().getColumn(mapFilter.getFieldName());
+      CompareOp compareOp = getCompareOp(mapFilter.getFilterOp());
+      byte[] family = column.getFamily();
+      byte[] qualifier = HBaseByteInterface.toBytes(mapFilter.getMapKey());
+      byte[] value = HBaseByteInterface.toBytes(mapFilter.getOperands().get(0));
+      SingleColumnValueFilter hbaseFilter = new SingleColumnValueFilter(family, qualifier,
compareOp, value);
+      hbaseFilter.setFilterIfMissing(mapFilter.isFilterIfMissing());
+
+      return hbaseFilter;
+    } else {
+      LOG.warn("HBase remote filter not yet implemented for " + filter.getClass().getCanonicalName());
+      return null;
+    }
+  }
+
+  private CompareOp getCompareOp(FilterOp filterOp) {
+    switch (filterOp) {
+      case EQUALS:
+        return CompareOp.EQUAL;
+      case NOT_EQUALS:
+        return CompareOp.NOT_EQUAL;
+      case LESS:
+        return CompareOp.LESS;
+      case LESS_OR_EQUAL:
+        return CompareOp.LESS_OR_EQUAL;
+      case GREATER:
+        return CompareOp.GREATER;
+      case GREATER_OR_EQUAL:
+        return CompareOp.GREATER_OR_EQUAL;
+      default:
+        throw new IllegalArgumentException(filterOp + " no HBase equivalent yet");
+    }
+  }
+
+
+}

Added: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java (added)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java Wed
Jan 22 16:29:05 2014
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import org.apache.gora.filter.Filter;
+import org.apache.gora.hbase.store.HBaseStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+
+import java.util.List;
+
+public interface FilterFactory <K, T extends PersistentBase> {
+  void setHBaseFitlerUtil(HBaseFilterUtil<K, T> util);
+  HBaseFilterUtil<K, T> getHbaseFitlerUtil();
+  List<String> getSupportedFilters();
+  org.apache.hadoop.hbase.filter.Filter createFilter(Filter<K, T> filter, HBaseStore<K,
T> store);
+}

Added: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java?rev=1560408&view=auto
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java (added)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java Wed
Jan 22 16:29:05 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.hbase.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.filter.Filter;
+import org.apache.gora.hbase.store.HBaseStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.GoraException;
+import org.apache.gora.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class HBaseFilterUtil<K, T extends PersistentBase> {
+  private static final Log LOG = LogFactory.getLog(HBaseFilterUtil.class);
+
+  private Map<String, FilterFactory<K, T>> factories = new LinkedHashMap<String,
FilterFactory<K, T>>();
+
+  public HBaseFilterUtil(Configuration conf) throws GoraException {
+    String[] factoryClassNames = conf.getStrings("gora.hbase.filter.factories", "org.apache.gora.hbase.util.DefaultFactory");
+
+    for (String factoryClass : factoryClassNames) {
+      try {
+        @SuppressWarnings("unchecked")
+        FilterFactory<K, T> factory = (FilterFactory<K, T>) ReflectionUtils.newInstance(factoryClass);
+        for (String filterClass : factory.getSupportedFilters()) {
+          factories.put(filterClass, factory);
+        }
+        factory.setHBaseFitlerUtil(this);
+      } catch (Exception e) {
+        throw new GoraException(e);
+      }
+    }
+  }
+
+  public FilterFactory<K, T> getFactory(Filter<K, T> fitler) {
+    return factories.get(fitler.getClass().getCanonicalName());
+  }
+
+  /**
+   * Set a filter on the Scan. It translates a Gora filter to a HBase filter.
+   * 
+   * @param scan
+   * @param filter
+   *          The Gora filter.
+   * @param store
+   *          The HBaseStore.
+   * @return if remote filter is succesfully applied.
+   */
+  public boolean setFilter(Scan scan, Filter<K, T> filter, HBaseStore<K, T> store)
{
+
+    FilterFactory<K, T> factory = getFactory(filter);
+    if (factory != null) {
+      org.apache.hadoop.hbase.filter.Filter hbaseFilter = factory.createFilter(filter, store);
+      if (hbaseFilter != null) {
+        scan.setFilter(hbaseFilter);
+        return true;
+      } else {
+        LOG.warn("HBase remote filter not yet implemented for " + filter.getClass().getCanonicalName());
+        return false;
+      }
+    } else {
+      LOG.warn("HBase remote filter factory not yet implemented for " + filter.getClass().getCanonicalName());
+      return false;
+    }
+  }
+  
+}



Mime
View raw message