spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iras...@apache.org
Subject [1/2] spark git commit: [SPARK-20641][CORE] Add key-value store abstraction and LevelDB implementation.
Date Tue, 06 Jun 2017 18:39:17 GMT
Repository: spark
Updated Branches:
  refs/heads/master b61a401da -> 0cba49512


http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
new file mode 100644
index 0000000..5e33606
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Slf4jReporter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ * A set of small benchmarks for the LevelDB implementation.
+ *
+ * The benchmarks are run over two different types (one with just a natural index, and one
+ * with a ref index), over a set of 2^20 elements, and the following tests are performed:
+ *
+ * - write (then update) elements in sequential natural key order
+ * - write (then update) elements in random natural key order
+ * - iterate over natural index, ascending and descending
+ * - iterate over ref index, ascending and descending
+ */
+@Ignore
+public class LevelDBBenchmark {
+
+  private static final int COUNT = 1024;
+  private static final AtomicInteger IDGEN = new AtomicInteger();
+  private static final MetricRegistry metrics = new MetricRegistry();
+  private static final Timer dbCreation = metrics.timer("dbCreation");
+  private static final Timer dbClose = metrics.timer("dbClose");
+
+  private LevelDB db;
+  private File dbpath;
+
+  @Before
+  public void setup() throws Exception {
+    dbpath = File.createTempFile("test.", ".ldb");
+    dbpath.delete();
+    try(Timer.Context ctx = dbCreation.time()) {
+      db = new LevelDB(dbpath);
+    }
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    if (db != null) {
+      try(Timer.Context ctx = dbClose.time()) {
+        db.close();
+      }
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @AfterClass
+  public static void report() {
+    if (metrics.getTimers().isEmpty()) {
+      return;
+    }
+
+    int headingPrefix = 0;
+    for (Map.Entry<String, Timer> e : metrics.getTimers().entrySet()) {
+      headingPrefix = Math.max(e.getKey().length(), headingPrefix);
+    }
+    headingPrefix += 4;
+
+    StringBuilder heading = new StringBuilder();
+    for (int i = 0; i < headingPrefix; i++) {
+      heading.append(" ");
+    }
+    heading.append("\tcount");
+    heading.append("\tmean");
+    heading.append("\tmin");
+    heading.append("\tmax");
+    heading.append("\t95th");
+    System.out.println(heading);
+
+    for (Map.Entry<String, Timer> e : metrics.getTimers().entrySet()) {
+      StringBuilder row = new StringBuilder();
+      row.append(e.getKey());
+      for (int i = 0; i < headingPrefix - e.getKey().length(); i++) {
+        row.append(" ");
+      }
+
+      Snapshot s = e.getValue().getSnapshot();
+      row.append("\t").append(e.getValue().getCount());
+      row.append("\t").append(toMs(s.getMean()));
+      row.append("\t").append(toMs(s.getMin()));
+      row.append("\t").append(toMs(s.getMax()));
+      row.append("\t").append(toMs(s.get95thPercentile()));
+
+      System.out.println(row);
+    }
+
+    Slf4jReporter.forRegistry(metrics).outputTo(LoggerFactory.getLogger(LevelDBBenchmark.class))
+      .build().report();
+  }
+
+  private static String toMs(double nanos) {
+    return String.format("%.3f", nanos / 1000 / 1000);
+  }
+
+  @Test
+  public void sequentialWritesNoIndex() throws Exception {
+    List<SimpleType> entries = createSimpleType();
+    writeAll(entries, "sequentialWritesNoIndex");
+    writeAll(entries, "sequentialUpdatesNoIndex");
+    deleteNoIndex(entries, "sequentialDeleteNoIndex");
+  }
+
+  @Test
+  public void randomWritesNoIndex() throws Exception {
+    List<SimpleType> entries = createSimpleType();
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomWritesNoIndex");
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomUpdatesNoIndex");
+
+    Collections.shuffle(entries);
+    deleteNoIndex(entries, "randomDeletesNoIndex");
+  }
+
+  @Test
+  public void sequentialWritesIndexedType() throws Exception {
+    List<IndexedType> entries = createIndexedType();
+    writeAll(entries, "sequentialWritesIndexed");
+    writeAll(entries, "sequentialUpdatesIndexed");
+    deleteIndexed(entries, "sequentialDeleteIndexed");
+  }
+
+  @Test
+  public void randomWritesIndexedTypeAndIteration() throws Exception {
+    List<IndexedType> entries = createIndexedType();
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomWritesIndexed");
+
+    Collections.shuffle(entries);
+    writeAll(entries, "randomUpdatesIndexed");
+
+    // Run iteration benchmarks here since we've gone through the trouble of writing all
+    // the data already.
+    KVStoreView<?> view = db.view(IndexedType.class);
+    iterate(view, "naturalIndex");
+    iterate(view.reverse(), "naturalIndexDescending");
+    iterate(view.index("name"), "refIndex");
+    iterate(view.index("name").reverse(), "refIndexDescending");
+
+    Collections.shuffle(entries);
+    deleteIndexed(entries, "randomDeleteIndexed");
+  }
+
+  private void iterate(KVStoreView<?> view, String name) throws Exception {
+    Timer create = metrics.timer(name + "CreateIterator");
+    Timer iter = metrics.timer(name + "Iteration");
+    KVStoreIterator<?> it = null;
+    {
+      // Create the iterator several times, just to have multiple data points.
+      for (int i = 0; i < 1024; i++) {
+        if (it != null) {
+          it.close();
+        }
+        try(Timer.Context ctx = create.time()) {
+          it = view.closeableIterator();
+        }
+      }
+    }
+
+    for (; it.hasNext(); ) {
+      try(Timer.Context ctx = iter.time()) {
+        it.next();
+      }
+    }
+  }
+
+  private void writeAll(List<?> entries, String timerName) throws Exception {
+    Timer timer = newTimer(timerName);
+    for (Object o : entries) {
+      try(Timer.Context ctx = timer.time()) {
+        db.write(o);
+      }
+    }
+  }
+
+  private void deleteNoIndex(List<SimpleType> entries, String timerName) throws Exception
{
+    Timer delete = newTimer(timerName);
+    for (SimpleType i : entries) {
+      try(Timer.Context ctx = delete.time()) {
+        db.delete(i.getClass(), i.key);
+      }
+    }
+  }
+
+  private void deleteIndexed(List<IndexedType> entries, String timerName) throws Exception
{
+    Timer delete = newTimer(timerName);
+    for (IndexedType i : entries) {
+      try(Timer.Context ctx = delete.time()) {
+        db.delete(i.getClass(), i.key);
+      }
+    }
+  }
+
+  private List<SimpleType> createSimpleType() {
+    List<SimpleType> entries = new ArrayList<>();
+    for (int i = 0; i < COUNT; i++) {
+      SimpleType t = new SimpleType();
+      t.key = IDGEN.getAndIncrement();
+      t.name = "name" + (t.key % 1024);
+      entries.add(t);
+    }
+    return entries;
+  }
+
+  private List<IndexedType> createIndexedType() {
+    List<IndexedType> entries = new ArrayList<>();
+    for (int i = 0; i < COUNT; i++) {
+      IndexedType t = new IndexedType();
+      t.key = IDGEN.getAndIncrement();
+      t.name = "name" + (t.key % 1024);
+      entries.add(t);
+    }
+    return entries;
+  }
+
+  private Timer newTimer(String name) {
+    assertNull("Timer already exists: " + name, metrics.getTimers().get(name));
+    return metrics.timer(name);
+  }
+
+  public static class SimpleType {
+
+    @KVIndex
+    public int key;
+
+    public String name;
+
+  }
+
+  public static class IndexedType {
+
+    @KVIndex
+    public int key;
+
+    @KVIndex("name")
+    public String name;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
new file mode 100644
index 0000000..9340971
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+
+public class LevelDBIteratorSuite extends DBIteratorSuite {
+
+  private static File dbpath;
+  private static LevelDB db;
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    if (db != null) {
+      db.close();
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @Override
+  protected KVStore createStore() throws Exception {
+    dbpath = File.createTempFile("test.", ".ldb");
+    dbpath.delete();
+    db = new LevelDB(dbpath);
+    return db;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
new file mode 100644
index 0000000..ee1c397
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.commons.io.FileUtils;
+import org.iq80.leveldb.DBIterator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class LevelDBSuite {
+
+  private LevelDB db;
+  private File dbpath;
+
+  @After
+  public void cleanup() throws Exception {
+    if (db != null) {
+      db.close();
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @Before
+  public void setup() throws Exception {
+    dbpath = File.createTempFile("test.", ".ldb");
+    dbpath.delete();
+    db = new LevelDB(dbpath);
+  }
+
+  @Test
+  public void testReopenAndVersionCheckDb() throws Exception {
+    db.close();
+    db = null;
+    assertTrue(dbpath.exists());
+
+    db = new LevelDB(dbpath);
+    assertEquals(LevelDB.STORE_VERSION,
+      db.serializer.deserializeLong(db.db().get(LevelDB.STORE_VERSION_KEY)));
+    db.db().put(LevelDB.STORE_VERSION_KEY, db.serializer.serialize(LevelDB.STORE_VERSION
+ 1));
+    db.close();
+    db = null;
+
+    try {
+      db = new LevelDB(dbpath);
+      fail("Should have failed version check.");
+    } catch (UnsupportedStoreVersionException e) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testObjectWriteReadDelete() throws Exception {
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+    t.child = "child";
+
+    try {
+      db.read(CustomType1.class, t.key);
+      fail("Expected exception for non-existant object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+
+    db.write(t);
+    assertEquals(t, db.read(t.getClass(), t.key));
+    assertEquals(1L, db.count(t.getClass()));
+
+    db.delete(t.getClass(), t.key);
+    try {
+      db.read(t.getClass(), t.key);
+      fail("Expected exception for deleted object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+
+    // Look into the actual DB and make sure that all the keys related to the type have been
+    // removed.
+    assertEquals(0, countKeys(t.getClass()));
+  }
+
+  @Test
+  public void testMultipleObjectWriteReadDelete() throws Exception {
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key1";
+    t1.id = "id";
+    t1.name = "name1";
+    t1.child = "child1";
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "key2";
+    t2.id = "id";
+    t2.name = "name2";
+    t2.child = "child2";
+
+    db.write(t1);
+    db.write(t2);
+
+    assertEquals(t1, db.read(t1.getClass(), t1.key));
+    assertEquals(t2, db.read(t2.getClass(), t2.key));
+    assertEquals(2L, db.count(t1.getClass()));
+
+    // There should be one "id" index entry with two values.
+    assertEquals(2, db.count(t1.getClass(), "id", t1.id));
+
+    // Delete the first entry; now there should be 3 remaining keys, since one of the "name"
+    // index entries should have been removed.
+    db.delete(t1.getClass(), t1.key);
+
+    // Make sure there's a single entry in the "id" index now.
+    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
+
+    // Delete the remaining entry, make sure all data is gone.
+    db.delete(t2.getClass(), t2.key);
+    assertEquals(0, countKeys(t2.getClass()));
+  }
+
+  @Test
+  public void testMultipleTypesWriteReadDelete() throws Exception {
+    CustomType1 t1 = new CustomType1();
+    t1.key = "1";
+    t1.id = "id";
+    t1.name = "name1";
+    t1.child = "child1";
+
+    IntKeyType t2 = new IntKeyType();
+    t2.key = 2;
+    t2.id = "2";
+    t2.values = Arrays.asList("value1", "value2");
+
+    ArrayKeyIndexType t3 = new ArrayKeyIndexType();
+    t3.key = new int[] { 42, 84 };
+    t3.id = new String[] { "id1", "id2" };
+
+    db.write(t1);
+    db.write(t2);
+    db.write(t3);
+
+    assertEquals(t1, db.read(t1.getClass(), t1.key));
+    assertEquals(t2, db.read(t2.getClass(), t2.key));
+    assertEquals(t3, db.read(t3.getClass(), t3.key));
+
+    // There should be one "id" index with a single entry for each type.
+    assertEquals(1, db.count(t1.getClass(), "id", t1.id));
+    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
+    assertEquals(1, db.count(t3.getClass(), "id", t3.id));
+
+    // Delete the first entry; this should not affect the entries for the second type.
+    db.delete(t1.getClass(), t1.key);
+    assertEquals(0, countKeys(t1.getClass()));
+    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
+    assertEquals(1, db.count(t3.getClass(), "id", t3.id));
+
+    // Delete the remaining entries, make sure all data is gone.
+    db.delete(t2.getClass(), t2.key);
+    assertEquals(0, countKeys(t2.getClass()));
+
+    db.delete(t3.getClass(), t3.key);
+    assertEquals(0, countKeys(t3.getClass()));
+  }
+
+  @Test
+  public void testMetadata() throws Exception {
+    assertNull(db.getMetadata(CustomType1.class));
+
+    CustomType1 t = new CustomType1();
+    t.id = "id";
+    t.name = "name";
+    t.child = "child";
+
+    db.setMetadata(t);
+    assertEquals(t, db.getMetadata(CustomType1.class));
+
+    db.setMetadata(null);
+    assertNull(db.getMetadata(CustomType1.class));
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+    t.child = "child";
+
+    db.write(t);
+
+    t.name = "anotherName";
+
+    db.write(t);
+
+    assertEquals(1, db.count(t.getClass()));
+    assertEquals(1, db.count(t.getClass(), "name", "anotherName"));
+    assertEquals(0, db.count(t.getClass(), "name", "name"));
+  }
+
+  @Test
+  public void testSkip() throws Exception {
+    for (int i = 0; i < 10; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "key" + i;
+      t.id = "id" + i;
+      t.name = "name" + i;
+      t.child = "child" + i;
+
+      db.write(t);
+    }
+
+    KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator();
+    assertTrue(it.hasNext());
+    assertTrue(it.skip(5));
+    assertEquals("key5", it.next().key);
+    assertTrue(it.skip(3));
+    assertEquals("key9", it.next().key);
+    assertFalse(it.hasNext());
+  }
+
+  private int countKeys(Class<?> type) throws Exception {
+    byte[] prefix = db.getTypeInfo(type).keyPrefix();
+    int count = 0;
+
+    DBIterator it = db.db().iterator();
+    it.seek(prefix);
+
+    while (it.hasNext()) {
+      byte[] key = it.next().getKey();
+      if (LevelDBIterator.startsWith(key, prefix)) {
+        count++;
+      }
+    }
+
+    return count;
+  }
+
+  public static class IntKeyType {
+
+    @KVIndex
+    public int key;
+
+    @KVIndex("id")
+    public String id;
+
+    public List<String> values;
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof IntKeyType) {
+        IntKeyType other = (IntKeyType) o;
+        return key == other.key && id.equals(other.id) && values.equals(other.values);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return id.hashCode();
+    }
+
+  }
+
+  public static class ArrayKeyIndexType {
+
+    @KVIndex
+    public int[] key;
+
+    @KVIndex("id")
+    public String[] id;
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof ArrayKeyIndexType) {
+        ArrayKeyIndexType other = (ArrayKeyIndexType) o;
+        return Arrays.equals(key, other.key) && Arrays.equals(id, other.id);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return key.hashCode();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
new file mode 100644
index 0000000..8e61965
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class LevelDBTypeInfoSuite {
+
+  @Test
+  public void testIndexAnnotation() throws Exception {
+    KVTypeInfo ti = new KVTypeInfo(CustomType1.class);
+    assertEquals(5, ti.indices().count());
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key";
+    t1.id = "id";
+    t1.name = "name";
+    t1.num = 42;
+    t1.child = "child";
+
+    assertEquals(t1.key, ti.getIndexValue(KVIndex.NATURAL_INDEX_NAME, t1));
+    assertEquals(t1.id, ti.getIndexValue("id", t1));
+    assertEquals(t1.name, ti.getIndexValue("name", t1));
+    assertEquals(t1.num, ti.getIndexValue("int", t1));
+    assertEquals(t1.child, ti.getIndexValue("child", t1));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoNaturalIndex() throws Exception {
+    newTypeInfo(NoNaturalIndex.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoNaturalIndex2() throws Exception {
+    newTypeInfo(NoNaturalIndex2.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDuplicateIndex() throws Exception {
+    newTypeInfo(DuplicateIndex.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyIndexName() throws Exception {
+    newTypeInfo(EmptyIndexName.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalIndexName() throws Exception {
+    newTypeInfo(IllegalIndexName.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalIndexMethod() throws Exception {
+    newTypeInfo(IllegalIndexMethod.class);
+  }
+
+  @Test
+  public void testKeyClashes() throws Exception {
+    LevelDBTypeInfo ti = newTypeInfo(CustomType1.class);
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key1";
+    t1.name = "a";
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "key2";
+    t2.name = "aa";
+
+    CustomType1 t3 = new CustomType1();
+    t3.key = "key3";
+    t3.name = "aaa";
+
+    // Make sure entries with conflicting names are sorted correctly.
+    assertBefore(ti.index("name").entityKey(null, t1), ti.index("name").entityKey(null, t2));
+    assertBefore(ti.index("name").entityKey(null, t1), ti.index("name").entityKey(null, t3));
+    assertBefore(ti.index("name").entityKey(null, t2), ti.index("name").entityKey(null, t3));
+  }
+
+  @Test
+  public void testNumEncoding() throws Exception {
+    LevelDBTypeInfo.Index idx = newTypeInfo(CustomType1.class).indices().iterator().next();
+
+    assertEquals("+=00000001", new String(idx.toKey(1), UTF_8));
+    assertEquals("+=00000010", new String(idx.toKey(16), UTF_8));
+    assertEquals("+=7fffffff", new String(idx.toKey(Integer.MAX_VALUE), UTF_8));
+
+    assertBefore(idx.toKey(1), idx.toKey(2));
+    assertBefore(idx.toKey(-1), idx.toKey(2));
+    assertBefore(idx.toKey(-11), idx.toKey(2));
+    assertBefore(idx.toKey(-11), idx.toKey(-1));
+    assertBefore(idx.toKey(1), idx.toKey(11));
+    assertBefore(idx.toKey(Integer.MIN_VALUE), idx.toKey(Integer.MAX_VALUE));
+
+    assertBefore(idx.toKey(1L), idx.toKey(2L));
+    assertBefore(idx.toKey(-1L), idx.toKey(2L));
+    assertBefore(idx.toKey(Long.MIN_VALUE), idx.toKey(Long.MAX_VALUE));
+
+    assertBefore(idx.toKey((short) 1), idx.toKey((short) 2));
+    assertBefore(idx.toKey((short) -1), idx.toKey((short) 2));
+    assertBefore(idx.toKey(Short.MIN_VALUE), idx.toKey(Short.MAX_VALUE));
+
+    assertBefore(idx.toKey((byte) 1), idx.toKey((byte) 2));
+    assertBefore(idx.toKey((byte) -1), idx.toKey((byte) 2));
+    assertBefore(idx.toKey(Byte.MIN_VALUE), idx.toKey(Byte.MAX_VALUE));
+
+    byte prefix = LevelDBTypeInfo.ENTRY_PREFIX;
+    assertSame(new byte[] { prefix, LevelDBTypeInfo.FALSE }, idx.toKey(false));
+    assertSame(new byte[] { prefix, LevelDBTypeInfo.TRUE }, idx.toKey(true));
+  }
+
+  @Test
+  public void testArrayIndices() throws Exception {
+    LevelDBTypeInfo.Index idx = newTypeInfo(CustomType1.class).indices().iterator().next();
+
+    assertBefore(idx.toKey(new String[] { "str1" }), idx.toKey(new String[] { "str2" }));
+    assertBefore(idx.toKey(new String[] { "str1", "str2" }),
+      idx.toKey(new String[] { "str1", "str3" }));
+
+    assertBefore(idx.toKey(new int[] { 1 }), idx.toKey(new int[] { 2 }));
+    assertBefore(idx.toKey(new int[] { 1, 2 }), idx.toKey(new int[] { 1, 3 }));
+  }
+
+  private LevelDBTypeInfo newTypeInfo(Class<?> type) throws Exception {
+    return new LevelDBTypeInfo(null, type, type.getName().getBytes(UTF_8));
+  }
+
+  private void assertBefore(byte[] key1, byte[] key2) {
+    assertBefore(new String(key1, UTF_8), new String(key2, UTF_8));
+  }
+
+  private void assertBefore(String str1, String str2) {
+    assertTrue(String.format("%s < %s failed", str1, str2), str1.compareTo(str2) <
0);
+  }
+
+  private void assertSame(byte[] key1, byte[] key2) {
+    assertEquals(new String(key1, UTF_8), new String(key2, UTF_8));
+  }
+
+  public static class NoNaturalIndex {
+
+    public String id;
+
+  }
+
+  public static class NoNaturalIndex2 {
+
+    @KVIndex("id")
+    public String id;
+
+  }
+
+  public static class DuplicateIndex {
+
+    @KVIndex
+    public String key;
+
+    @KVIndex("id")
+    public String id;
+
+    @KVIndex("id")
+    public String id2;
+
+  }
+
+  public static class EmptyIndexName {
+
+    @KVIndex("")
+    public String id;
+
+  }
+
+  public static class IllegalIndexName {
+
+    @KVIndex("__invalid")
+    public String id;
+
+  }
+
+  public static class IllegalIndexMethod {
+
+    @KVIndex("id")
+    public String id(boolean illegalParam) {
+      return null;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/resources/log4j.properties b/common/kvstore/src/test/resources/log4j.properties
new file mode 100644
index 0000000..e8da774
--- /dev/null
+++ b/common/kvstore/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=DEBUG, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Silence verbose logs from 3rd-party libraries.
+log4j.logger.io.netty=INFO

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0533a8d..6835ea1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
 
   <modules>
     <module>common/sketch</module>
+    <module>common/kvstore</module>
     <module>common/network-common</module>
     <module>common/network-shuffle</module>
     <module>common/unsafe</module>
@@ -442,6 +443,11 @@
         <version>${commons.httpcore.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.fusesource.leveldbjni</groupId>
+        <artifactId>leveldbjni-all</artifactId>
+        <version>1.8</version>
+      </dependency>
+      <dependency>
         <groupId>org.seleniumhq.selenium</groupId>
         <artifactId>selenium-java</artifactId>
         <version>${selenium.version}</version>
@@ -590,6 +596,11 @@
       </dependency>
       <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-core</artifactId>
+        <version>${fasterxml.jackson.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-databind</artifactId>
         <version>${fasterxml.jackson.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index b5362ec..89b0c7a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -50,10 +50,10 @@ object BuildCommons {
   ).map(ProjectRef(buildLocation, _))
 
   val allProjects@Seq(
-    core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe,
tags, sketch, _*
+    core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe,
tags, sketch, kvstore, _*
   ) = Seq(
     "core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle",
"launcher", "unsafe",
-    "tags", "sketch"
+    "tags", "sketch", "kvstore"
   ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
 
   val optionallyEnabledProjects@Seq(mesos, yarn, sparkGangliaLgpl,
@@ -310,7 +310,7 @@ object SparkBuild extends PomBuild {
   val mimaProjects = allProjects.filterNot { x =>
     Seq(
       spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn,
-      unsafe, tags, sqlKafka010
+      unsafe, tags, sqlKafka010, kvstore
     ).contains(x)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message