apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sand...@apache.org
Subject apex-core git commit: APEXCORE-648 Unnecessary byte array copy in DefaultStatefulStreamCodec.toDataStatePair()
Date Thu, 23 Feb 2017 21:25:01 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 7106b766a -> 1e3d47bef


APEXCORE-648 Unnecessary byte array copy in DefaultStatefulStreamCodec.toDataStatePair()


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/1e3d47be
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/1e3d47be
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/1e3d47be

Branch: refs/heads/master
Commit: 1e3d47bef8b32d678b2a726fb2aaf6854f4650b9
Parents: 7106b76
Author: Vlad Rozov <v.rozov@datatorrent.com>
Authored: Mon Feb 20 08:03:11 2017 -0800
Committer: Vlad Rozov <v.rozov@datatorrent.com>
Committed: Wed Feb 22 22:30:18 2017 -0800

----------------------------------------------------------------------
 .../stram/codec/DefaultStatefulStreamCodec.java | 22 ++++-----
 .../stram/codec/StatefulStreamCodec.java        |  6 +--
 .../codec/DefaultStatefulStreamCodecTest.java   | 50 ++++++++++++--------
 3 files changed, 43 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e3d47be/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java
b/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java
index 094c2e2..5bb1640 100644
--- a/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java
+++ b/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java
@@ -49,6 +49,7 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements
StatefulStrea
   private final Output data;
   private final Output state;
   private final Input input;
+  private final DataStatePair dataStatePair;
 
   @SuppressWarnings("OverridableMethodCallInConstructor")
   public DefaultStatefulStreamCodec()
@@ -63,10 +64,11 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements
StatefulStrea
     classResolver = (ClassResolver)getClassResolver();
     this.pairs = classResolver.pairs;
     classResolver.init();
+    dataStatePair = new DataStatePair();
   }
 
   @Override
-  public Object fromDataStatePair(DataStatePair dspair)
+  public T fromDataStatePair(DataStatePair dspair)
   {
     if (dspair.state != null) {
       try {
@@ -93,7 +95,7 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements
StatefulStrea
     // the following code does not need to be in the try-catch block. It can be
     // taken out of it, once the stability of the code is validated by 4/1/2014.
     try {
-      return readClassAndObject(input);
+      return (T)readClassAndObject(input);
     } catch (Throwable th) {
       logger.error("Catastrophic Error: Execution halted due to Kryo exception!", th);
       synchronized (this) {
@@ -110,7 +112,6 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements
StatefulStrea
   @Override
   public DataStatePair toDataStatePair(T o)
   {
-    DataStatePair pair = new DataStatePair();
     data.setPosition(0);
     writeClassAndObject(data, o);
 
@@ -121,14 +122,13 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements
StatefulStrea
       }
       pairs.clear();
 
-      // can we optimize this?
-      byte[] bytes = state.toBytes();
-      pair.state = new Slice(bytes, 0, bytes.length);
+      dataStatePair.state = new Slice(state.getBuffer(), 0, state.position());
+    } else {
+      dataStatePair.state = null;
     }
 
-    byte[] bytes = data.toBytes();
-    pair.data = new Slice(bytes, 0, bytes.length);
-    return pair;
+    dataStatePair.data = new Slice(data.getBuffer(), 0, data.position());
+    return dataStatePair;
   }
 
   @Override
@@ -149,13 +149,13 @@ public class DefaultStatefulStreamCodec<T> extends Kryo implements
StatefulStrea
   @Override
   public Object fromByteArray(Slice fragment)
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated
methods, choose Tools | Templates.
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public Slice toByteArray(T o)
   {
-    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated
methods, choose Tools | Templates.
+    throw new UnsupportedOperationException();
   }
 
   static class ClassIdPair

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e3d47be/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java b/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java
index e531289..3797830 100644
--- a/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java
+++ b/engine/src/main/java/com/datatorrent/stram/codec/StatefulStreamCodec.java
@@ -63,10 +63,10 @@ public interface StatefulStreamCodec<T> extends StreamCodec<T>
   /**
    * Create POJO from the byte array for consumption by the downstream.
    *
-   * @param dspair
-   * @return plain old java object, the type is intentionally not T since the consumer does
not care about it.
+   * @param dspair serialized representation of the object
+   * @return plain old java object
    */
-  Object fromDataStatePair(DataStatePair dspair);
+  T fromDataStatePair(DataStatePair dspair);
 
   /**
    * Serialize the POJO emitted by the upstream node to byte array so that

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e3d47be/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
index 42ac5dc..d1a18ae 100644
--- a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java
@@ -33,7 +33,10 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 
-import com.datatorrent.netlet.util.Slice;
+import com.datatorrent.bufferserver.packet.DataTuple;
+import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.bufferserver.packet.PayloadTuple;
+import com.datatorrent.bufferserver.packet.Tuple;
 import com.datatorrent.stram.codec.DefaultStatefulStreamCodec.ClassIdPair;
 import com.datatorrent.stram.codec.StatefulStreamCodec.DataStatePair;
 
@@ -121,36 +124,41 @@ public class DefaultStatefulStreamCodecTest
   @Test
   public void testCustomObject()
   {
-    DefaultStatefulStreamCodec<Object> coder = new DefaultStatefulStreamCodec<Object>();
-    DefaultStatefulStreamCodec<Object> decoder = new DefaultStatefulStreamCodec<Object>();
+    DefaultStatefulStreamCodec<TestClass> coder = new DefaultStatefulStreamCodec<>();
+    DefaultStatefulStreamCodec<TestClass> decoder = coder.newInstance();
 
     TestClass tc = new TestClass("hello!", 42);
     //String tc = "hello";
 
-    DataStatePair dsp1 = coder.toDataStatePair(tc);
-    Slice state1 = dsp1.state;
-    DataStatePair dsp2 = coder.toDataStatePair(tc);
-    Slice state2 = dsp2.state;
-    assert (state1 != null);
-    assert (state2 == null);
-    Assert.assertEquals(dsp1.data, dsp2.data);
+    DataStatePair dsp = coder.toDataStatePair(tc);
+    Assert.assertNotNull(dsp.state);
+    byte[] state1 =  DataTuple.getSerializedTuple(MessageType.CODEC_STATE_VALUE, dsp.state);
+    byte[] data1 = PayloadTuple.getSerializedTuple(0, dsp.data);
 
-    Object tcObject1 = decoder.fromDataStatePair(dsp1);
-    assert (tc.equals(tcObject1));
+    dsp = coder.toDataStatePair(tc);
+    Assert.assertNull(dsp.state);
+    byte[] data2 = PayloadTuple.getSerializedTuple(0, dsp.data);
 
-    Object tcObject2 = decoder.fromDataStatePair(dsp2);
-    assert (tc.equals(tcObject2));
+    Assert.assertNotSame(data1, data2);
+    Assert.assertArrayEquals(data1, data2);
+
+    dsp.state = Tuple.getTuple(state1, 0, state1.length).getData();
+    dsp.data = Tuple.getTuple(data1, 0, data1.length).getData();
+    Assert.assertEquals(tc, decoder.fromDataStatePair(dsp));
+
+    dsp.state = null;
+    dsp.data = Tuple.getTuple(data2, 0, data2.length).getData();
+    Assert.assertEquals(tc, decoder.fromDataStatePair(dsp));
 
     coder.resetState();
 
-    dsp2 = coder.toDataStatePair(tc);
-    state2 = dsp2.state;
-    Assert.assertEquals(state1, state2);
+    dsp = coder.toDataStatePair(tc);
+    Assert.assertArrayEquals(state1, DataTuple.getSerializedTuple(MessageType.CODEC_STATE_VALUE,
dsp.state));
 
-    dsp1 = coder.toDataStatePair(tc);
-    dsp2 = coder.toDataStatePair(tc);
-    Assert.assertEquals(dsp1.data, dsp2.data);
-    Assert.assertEquals(dsp1.state, dsp2.state);
+    Assert.assertNull(coder.toDataStatePair(tc).state);
+    data1 = PayloadTuple.getSerializedTuple(Integer.MAX_VALUE, coder.toDataStatePair(tc).data);
+    data2 = PayloadTuple.getSerializedTuple(Integer.MAX_VALUE, coder.toDataStatePair(tc).data);
+    Assert.assertArrayEquals(data1, data2);
   }
 
   public static class TestTuple


Mime
View raw message