geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [geode] branch feature/transcoding_experiments updated: Adding a value serializer that sends protobuf structs but caches strings
Date Wed, 16 May 2018 00:46:19 GMT
This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch feature/transcoding_experiments
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/transcoding_experiments by this push:
     new e953f39  Adding a value serializer that sends protobuf structs but caches strings
e953f39 is described below

commit e953f39555a762568a43f8b53056d2de973be038
Author: Dan Smith <upthewaterspout@apache.org>
AuthorDate: Tue May 15 17:42:49 2018 -0700

    Adding a value serializer that sends protobuf structs but caches strings
    
    This CachingProtobufStructSerializer is just like the
    ProtobufStructSerializer, but metadata like field names and type ids are
    cached.
---
 .../src/main/proto/v1/string_caching_struct.proto  |  39 +++++
 .../CachingProtobufStructSerializer.java           | 174 +++++++++++++++++++++
 ...he.geode.protocol.serialization.ValueSerializer |   3 +-
 .../serialization/CachingStructSerializerTest.java |  74 +++++++++
 4 files changed, 289 insertions(+), 1 deletion(-)

diff --git a/geode-protobuf-messages/src/main/proto/v1/string_caching_struct.proto b/geode-protobuf-messages/src/main/proto/v1/string_caching_struct.proto
new file mode 100644
index 0000000..fd0fff2
--- /dev/null
+++ b/geode-protobuf-messages/src/main/proto/v1/string_caching_struct.proto
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+syntax = "proto3";
+
+option java_multiple_files = true;
+
+package org.apache.geode.internal.protocol.protobuf.v1;
+
+import "v1/basicTypes.proto";
+
+message CachedString {
+  int32 id = 1;
+  string value = 2;
+}
+
+message CachingStruct {
+  CachedString typeName = 1;
+  repeated Field fields = 2;
+}
+
+message Field {
+  CachedString fieldName = 1;
+  oneof value {
+    EncodedValue encodedValue = 2;
+    CachingStruct structValue = 3;
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/serialization/CachingProtobufStructSerializer.java
b/geode-protobuf/src/main/java/org/apache/geode/protocol/serialization/CachingProtobufStructSerializer.java
new file mode 100644
index 0000000..0249848
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/serialization/CachingProtobufStructSerializer.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.protocol.serialization;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.NullValue;
+import com.google.protobuf.UnsafeByteOperations;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.v1.CachedString;
+import org.apache.geode.internal.protocol.protobuf.v1.CachingStruct;
+import org.apache.geode.internal.protocol.protobuf.v1.Field;
+import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+
+public class CachingProtobufStructSerializer implements ValueSerializer {
+  static final String PROTOBUF_STRUCT = "__PROTOBUF_STRUCT_AS_PDX";
+  private Cache cache;
+  private final Map<String, CachedString> writeCache = new HashMap<>();
+  private final Map<Integer, String> readCache = new HashMap<>();
+
+  @Override
+  public ByteString serialize(Object object) throws IOException {
+    return serializeStruct(object).toByteString();
+  }
+
+  CachingStruct serializeStruct(Object object) {
+
+    PdxInstance pdxInstance = (PdxInstance) object;
+
+    CachingStruct.Builder structBuilder = CachingStruct.newBuilder();
+    for (String fieldName : pdxInstance.getFieldNames()) {
+      Object value = pdxInstance.getField(fieldName);
+      Field serialized = serializeField(fieldName, value);
+      structBuilder.addFields(serialized);
+    }
+
+    structBuilder.setTypeName(cacheWrite(pdxInstance.getClassName()));
+
+    return structBuilder.build();
+  }
+
+  private Field serializeField(String fieldName, Object value) {
+    Field.Builder builder = Field.newBuilder();
+    builder.setFieldName(cacheWrite(fieldName));
+    if (value instanceof String) {
+      builder.setEncodedValue(
+          BasicTypes.EncodedValue.newBuilder().setStringResult((String) value).build());
+    } else if (value instanceof Boolean) {
+      builder.setEncodedValue(
+          BasicTypes.EncodedValue.newBuilder().setBooleanResult((Boolean) value).build());
+    } else if (value instanceof Integer) {
+      builder.setEncodedValue(
+          BasicTypes.EncodedValue.newBuilder().setIntResult((Integer) value).build());
+    } else if (value instanceof Byte) {
+      builder.setEncodedValue(
+          BasicTypes.EncodedValue.newBuilder().setByteResult((Byte) value).build());
+    } else if (value instanceof Long) {
+      builder.setEncodedValue(
+          BasicTypes.EncodedValue.newBuilder().setLongResult((Long) value).build());
+    } else if (value instanceof Double) {
+      builder.setEncodedValue(
+          BasicTypes.EncodedValue.newBuilder().setDoubleResult((Double) value).build());
+    } else if (value instanceof byte[]) {
+      builder.setEncodedValue(BasicTypes.EncodedValue.newBuilder()
+          .setBinaryResult(UnsafeByteOperations.unsafeWrap((byte[]) value)).build());
+    } else if (value instanceof PdxInstance) {
+      builder.setStructValue(serializeStruct(value));
+    } else if (value == null) {
+      builder.setEncodedValue(
+          BasicTypes.EncodedValue.newBuilder().setNullResult(NullValue.NULL_VALUE).build());
+    } else {
+      throw new IllegalStateException(
+          "Don't know how to translate object of type " + value.getClass() + ": " + value);
+    }
+    return builder.build();
+  }
+
+  private CachedString cacheWrite(final String string) {
+    return writeCache.computeIfAbsent(string,
+        name -> CachedString.newBuilder().setId(writeCache.size() + 1).setValue(name).build());
+  }
+
+  @Override
+  public Object deserialize(ByteString bytes) throws IOException, ClassNotFoundException
{
+    CachingStruct struct = CachingStruct.parseFrom(bytes);
+    return deserialize(struct);
+  }
+
+  private Object deserialize(CachingStruct struct) {
+
+    String typeName = cacheRead(struct.getTypeName());
+    PdxInstanceFactory pdxInstanceFactory = cache.createPdxInstanceFactory(typeName);
+
+    for (Field field : struct.getFieldsList()) {
+      String fieldName = cacheRead(field.getFieldName());
+      Object value = deserializeField(field);
+      if (value instanceof String) {
+        pdxInstanceFactory.writeString(fieldName, (String) value);
+      } else if (value instanceof Boolean) {
+        pdxInstanceFactory.writeBoolean(fieldName, (Boolean) value);
+      } else if (value instanceof Integer) {
+        pdxInstanceFactory.writeInt(fieldName, (Integer) value);
+      } else if (value instanceof Byte) {
+        pdxInstanceFactory.writeByte(fieldName, (Byte) value);
+      } else if (value instanceof Long) {
+        pdxInstanceFactory.writeLong(fieldName, (Long) value);
+      } else if (value instanceof byte[]) {
+        pdxInstanceFactory.writeByteArray(fieldName, (byte[]) value);
+      } else if (value instanceof Double) {
+        pdxInstanceFactory.writeDouble(fieldName, (Double) value);
+      } else if (value instanceof PdxInstance) {
+        pdxInstanceFactory.writeObject(fieldName, value);
+      } else if (value instanceof List) {
+        pdxInstanceFactory.writeObject(fieldName, value);
+      } else if (value == null) {
+        pdxInstanceFactory.writeObject(fieldName, null);
+      } else {
+        throw new IllegalStateException(
+            "Don't know how to translate object of type " + value.getClass() + ": " + value);
+      }
+    }
+
+    return pdxInstanceFactory.create();
+  }
+
+  private String cacheRead(final CachedString fieldName) {
+    String value = fieldName.getValue();
+    int id = fieldName.getId();
+    if (value == null) {
+      value = readCache.get(id);
+    } else if (id != 0) {
+      readCache.put(id, value);
+    }
+    return value;
+  }
+
+  private Object deserializeField(Field value) {
+    switch (value.getValueCase()) {
+      case ENCODEDVALUE:
+        return new ProtobufSerializationService().decode(value.getEncodedValue());
+      case STRUCTVALUE:
+        return deserialize(value.getStructValue());
+      default:
+        throw new IllegalStateException(
+            "Don't know how to translate object of type " + value.getValueCase() + ": " +
value);
+    }
+  }
+
+  @Override
+  public void init(Cache cache) {
+    this.cache = cache;
+
+  }
+}
diff --git a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.protocol.serialization.ValueSerializer
b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.protocol.serialization.ValueSerializer
index 5d487dd..0866211 100644
--- a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.protocol.serialization.ValueSerializer
+++ b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.protocol.serialization.ValueSerializer
@@ -1,3 +1,4 @@
 org.apache.geode.protocol.serialization.ProtobufStructSerializer
 org.apache.geode.protocol.serialization.CompressingProtobufStructSerializer
-org.apache.geode.protocol.serialization.PdxPassThroughSerializer
\ No newline at end of file
+org.apache.geode.protocol.serialization.PdxPassThroughSerializer
+org.apache.geode.protocol.serialization.CachingProtobufStructSerializer
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/serialization/CachingStructSerializerTest.java
b/geode-protobuf/src/test/java/org/apache/geode/protocol/serialization/CachingStructSerializerTest.java
new file mode 100644
index 0000000..05b5342
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/serialization/CachingStructSerializerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.protocol.serialization;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import com.google.protobuf.ByteString;
+import com.pholser.junit.quickcheck.From;
+import com.pholser.junit.quickcheck.Property;
+import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@RunWith(JUnitQuickcheck.class)
+@Category(IntegrationTest.class)
+public class CachingStructSerializerTest {
+
+  private CachingProtobufStructSerializer serializer;
+  private static Cache cache;
+
+  @BeforeClass
+  public static void createCache() {
+    cache = new CacheFactory().set(ConfigurationProperties.LOG_LEVEL, "error")
+        .setPdxReadSerialized(true).create();
+  }
+
+  @Before
+  public void createSerializer() {
+    serializer = new CachingProtobufStructSerializer();
+    serializer.init(cache);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cache.close();
+  }
+
+
+  @Property(trials = 10)
+  public void testSymmetry(
+      @PdxInstanceGenerator.ClassName("someclass") @PdxInstanceGenerator.FieldTypes({String.class,
+          int.class, long.class, byte.class, byte[].class, double.class,
+          PdxInstance.class}) @From(PdxInstanceGenerator.class) PdxInstance original)
+      throws IOException, ClassNotFoundException {
+    ByteString bytes = serializer.serialize(original);
+    PdxInstance actual = (PdxInstance) serializer.deserialize(bytes);
+    assertThat(original).isEqualTo(actual);
+    assertEquals(actual, original);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
upthewaterspout@apache.org.

Mime
View raw message