helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject git commit: HELIX-107 rb: 13193 Add support to set custom objects into ZNRecord
Date Fri, 02 Aug 2013 23:11:48 GMT
Updated Branches:
  refs/heads/master 6e7b34acc -> d4e448bea


HELIX-107 rb: 13193 Add support to set custom objects into ZNRecord


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/d4e448be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/d4e448be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/d4e448be

Branch: refs/heads/master
Commit: d4e448bea69f56d7dbb6724206ec76e372cad247
Parents: 6e7b34a
Author: kishoreg <kishoreg@apache.org>
Authored: Fri Aug 2 16:11:36 2013 -0700
Committer: kishoreg <kishoreg@apache.org>
Committed: Fri Aug 2 16:11:36 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/ZNRecord.java    | 202 +++++++++++++++++++
 .../manager/zk/ZNRecordStreamingSerializer.java |  10 +
 .../zk/serializer/JacksonPayloadSerializer.java |  90 +++++++++
 .../zk/serializer/PayloadSerializer.java        |  41 ++++
 .../java/org/apache/helix/model/HealthStat.java |  21 +-
 .../java/org/apache/helix/model/IdealState.java |  72 +------
 .../org/apache/helix/model/InstanceConfig.java  |   5 +-
 .../java/org/apache/helix/model/Message.java    | 108 ++--------
 .../TestJacksonPayloadSerializer.java           | 181 +++++++++++++++++
 9 files changed, 557 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d4e448be/helix-core/src/main/java/org/apache/helix/ZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
index c5cb4a6..4e6080a 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.helix.ZNRecordDelta.MergeOperation;
+import org.apache.helix.manager.zk.serializer.PayloadSerializer;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.annotate.JsonCreator;
 import org.codehaus.jackson.annotate.JsonIgnore;
@@ -54,6 +55,9 @@ public class ZNRecord
   private Map<String, String> simpleFields;
   private Map<String, Map<String, String>> mapFields;
   private Map<String, List<String>> listFields;
+  private byte[] rawPayload;
+
+  private PayloadSerializer _serializer;
 
   // the version field of zookeeper Stat
   private int _version;
@@ -69,6 +73,13 @@ public class ZNRecord
     simpleFields = new TreeMap<String, String>();
     mapFields = new TreeMap<String, Map<String, String>>();
     listFields = new TreeMap<String, List<String>>();
+    rawPayload = new byte[0];
+  }
+
+  public ZNRecord(String id, PayloadSerializer serializer)
+  {
+    this(id);
+    _serializer = serializer;
   }
 
   public ZNRecord(ZNRecord record)
@@ -82,6 +93,8 @@ public class ZNRecord
     simpleFields.putAll(record.getSimpleFields());
     mapFields.putAll(record.getMapFields());
     listFields.putAll(record.getListFields());
+    rawPayload = new byte[record.rawPayload.length];
+    System.arraycopy(record.rawPayload, 0, rawPayload, 0, record.rawPayload.length);
     _version = record.getVersion();
     _creationTime = record.getCreationTime();
     _modifiedTime = record.getModifiedTime();
@@ -93,6 +106,24 @@ public class ZNRecord
     _version = version;
   }
 
+  public ZNRecord(ZNRecord record, String id, PayloadSerializer serializer)
+  {
+    this(record, id);
+    _serializer = serializer;
+  }
+
+  public ZNRecord(ZNRecord record, int version, PayloadSerializer serializer)
+  {
+    this(record, version);
+    _serializer = serializer;
+  }
+
+  @JsonIgnore(true)
+  public void setPayloadSerializer(PayloadSerializer serializer)
+  {
+    _serializer = serializer;
+  }
+
   @JsonIgnore(true)
   public void setDeltaList(List<ZNRecordDelta> deltaList)
   {
@@ -153,6 +184,44 @@ public class ZNRecord
     return id;
   }
 
+  @JsonProperty
+  public void setRawPayload(byte[] payload)
+  {
+    rawPayload = payload;
+  }
+
+  @JsonProperty
+  public byte[] getRawPayload()
+  {
+    return rawPayload;
+  }
+
+  @JsonIgnore(true)
+  public <T> void setPayload(T payload)
+  {
+    if (_serializer != null && payload != null)
+    {
+      rawPayload = _serializer.serialize(payload);
+    }
+    else
+    {
+      rawPayload = null;
+    }
+  }
+
+  @JsonIgnore(true)
+  public <T> T getPayload(Class<T> clazz)
+  {
+    if (_serializer != null && rawPayload != null)
+    {
+      return _serializer.deserialize(clazz, rawPayload);
+    }
+    else
+    {
+      return null;
+    }
+  }
+
   public void setMapField(String k, Map<String, String> v)
   {
     mapFields.put(k, v);
@@ -178,6 +247,139 @@ public class ZNRecord
     return listFields.get(k);
   }
 
+  public void setIntField(String k, int v)
+  {
+    setSimpleField(k, Integer.toString(v));
+  }
+
+  public int getIntField(String k, int defaultValue)
+  {
+    int v = defaultValue;
+    String valueStr = getSimpleField(k);
+    if (valueStr != null)
+    {
+      try
+      {
+        v = Integer.parseInt(valueStr);
+      }
+      catch (NumberFormatException e)
+      {
+        _logger.warn("", e);
+      }
+    }
+    return v;
+  }
+
+  public void setLongField(String k, long v)
+  {
+    setSimpleField(k, Long.toString(v));
+  }
+
+  public long getLongField(String k, long defaultValue)
+  {
+    long v = defaultValue;
+    String valueStr = getSimpleField(k);
+    if (valueStr != null)
+    {
+      try
+      {
+        v = Long.parseLong(valueStr);
+      }
+      catch (NumberFormatException e)
+      {
+        _logger.warn("", e);
+      }
+    }
+    return v;
+  }
+
+  public void setDoubleField(String k, double v)
+  {
+    setSimpleField(k, Double.toString(v));
+  }
+
+  public double getDoubleField(String k, double defaultValue)
+  {
+    double v = defaultValue;
+    String valueStr = getSimpleField(k);
+    if (valueStr != null)
+    {
+      try
+      {
+        v = Double.parseDouble(valueStr);
+      }
+      catch (NumberFormatException e)
+      {
+        _logger.warn("", e);
+      }
+    }
+    return v;
+  }
+
+  public void setBooleanField(String k, boolean v)
+  {
+    setSimpleField(k, Boolean.toString(v));
+  }
+
+  public boolean getBooleanField(String k, boolean defaultValue)
+  {
+    boolean v = defaultValue;
+    String valueStr = getSimpleField(k);
+    if (valueStr != null)
+    {
+      // Boolean.parseBoolean() doesn't throw an exception if the string isn't a valid boolean.
+      // Thus, a direct comparison is necessary to make sure the value is actually "true"
or
+      // "false"
+      if (valueStr.equalsIgnoreCase(Boolean.TRUE.toString()))
+      {
+        v = true;
+      }
+      else if (valueStr.equalsIgnoreCase(Boolean.FALSE.toString()))
+      {
+        v = false;
+      }
+    }
+    return v;
+  }
+
+  public <T extends Enum<T>> void setEnumField(String k, T v)
+  {
+    setSimpleField(k, v.toString());
+  }
+
+  public <T extends Enum<T>> T getEnumField(String k, Class<T> enumType,
T defaultValue)
+  {
+    T v = defaultValue;
+    String valueStr = getSimpleField(k);
+    if (valueStr != null)
+    {
+      try
+      {
+        v = Enum.valueOf(enumType, valueStr);
+      }
+      catch (NullPointerException e)
+      {
+        _logger.warn("", e);
+      }
+      catch (IllegalArgumentException e)
+      {
+        _logger.warn("", e);
+      }
+    }
+    return v;
+  }
+
+  public String getStringField(String k, String defaultValue)
+  {
+    String v = defaultValue;
+    String valueStr = getSimpleField(k);
+    if (valueStr != null)
+    {
+      v = valueStr;
+    }
+    return v;
+  }
+
   @Override
   public String toString()
   {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d4e448be/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
index 674afa4..1504ff8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
@@ -36,6 +36,8 @@ import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonToken;
 
+import com.noelios.restlet.util.Base64;
+
 
 public class ZNRecordStreamingSerializer implements ZkSerializer
 {
@@ -151,6 +153,10 @@ public class ZNRecordStreamingSerializer implements ZkSerializer
       g.writeRaw("\n  ");
       g.writeEndObject(); // for mapFields
 
+      // write rawPayload
+      g.writeRaw("\n  ");
+      g.writeStringField("rawPayload", Base64.encode(record.getRawPayload(), false));
+
       g.writeRaw("\n");
       g.writeEndObject(); // for whole znrecord
 
@@ -246,6 +252,10 @@ public class ZNRecordStreamingSerializer implements ZkSerializer
           }
 
         }
+        else if ("rawPayload".equals(fieldname))
+        {
+          record.setRawPayload(Base64.decode(jp.getText()));
+        }
         else
         {
           throw new IllegalStateException("Unrecognized field '" + fieldname + "'!");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d4e448be/helix-core/src/main/java/org/apache/helix/manager/zk/serializer/JacksonPayloadSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/serializer/JacksonPayloadSerializer.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/serializer/JacksonPayloadSerializer.java
new file mode 100644
index 0000000..eedc20c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/serializer/JacksonPayloadSerializer.java
@@ -0,0 +1,90 @@
+package org.apache.helix.manager.zk.serializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.serializer.PayloadSerializer;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/**
+ * Serializes and deserializes data of a generic type using Jackson
+ */
+public class JacksonPayloadSerializer implements PayloadSerializer {
+
+  private static Logger logger = Logger.getLogger(JacksonPayloadSerializer.class);
+
+  @Override
+  public <T> byte[] serialize(final T data) {
+    if (data == null)
+    {
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    StringWriter sw = new StringWriter();
+    try
+    {
+      mapper.writeValue(sw, data);
+    }
+    catch (Exception e)
+    {
+      logger.error("Exception during payload data serialization.", e);
+      throw new HelixException(e);
+    }
+    return sw.toString().getBytes();
+  }
+
+  @Override
+  public <T> T deserialize(final Class<T> clazz, final byte[] bytes) {
+    if (bytes == null || bytes.length == 0)
+    {
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+
+    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+    try
+    {
+      T payload = mapper.readValue(bais, clazz);
+      return payload;
+    }
+    catch (Exception e)
+    {
+      logger.error("Exception during deserialization of payload bytes: " + new String(bytes),
e);
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d4e448be/helix-core/src/main/java/org/apache/helix/manager/zk/serializer/PayloadSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/serializer/PayloadSerializer.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/serializer/PayloadSerializer.java
new file mode 100644
index 0000000..a9531bd
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/serializer/PayloadSerializer.java
@@ -0,0 +1,41 @@
+package org.apache.helix.manager.zk.serializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Interface for converting back and forth between raw bytes and generic objects
+ */
+public interface PayloadSerializer {
+
+  /**
+   * Convert a generic object instance to raw bytes
+   * @param data instance of the generic type
+   * @return byte array representing the object
+   */
+  public <T> byte[] serialize(final T data);
+
+  /**
+   * Convert raw bytes to a generic object instance
+   * @param clazz The class represented by the deserialized bytes
+   * @param bytes byte array representing the object
+   * @return instance of the generic type or null if the conversion failed
+   */
+  public <T> T deserialize(final Class<T> clazz, final byte[] bytes);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d4e448be/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/HealthStat.java b/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
index 29a4e6c..a8a0ef0 100644
--- a/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
+++ b/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
@@ -49,8 +49,7 @@ public class HealthStat extends HelixProperty
       super(record);
       if(getCreateTimeStamp() == 0)
       {
-        _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(), ""
-            + new Date().getTime());
+        _record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), new Date().getTime());
       }
     }
 
@@ -61,17 +60,7 @@ public class HealthStat extends HelixProperty
 
   public long getCreateTimeStamp()
   {
-    if (_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()) == null)
-    {
-      return 0;
-    }
-    try
-    {
-      return Long.parseLong(_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()));
-    } catch (Exception e)
-    {
-      return 0;
-    }
+    return _record.getLongField(Attributes.CREATE_TIMESTAMP.toString(), 0L);
   }
   
   public String getTestField()
@@ -100,11 +89,7 @@ public class HealthStat extends HelixProperty
     for (String key : currMapFields.keySet())
     {
       Map<String, String> currMap = currMapFields.get(key);
-      String timestamp = "-1";
-      if (_record.getSimpleFields().keySet().contains(StatsHolder.TIMESTAMP_NAME))
-      {
-        timestamp = _record.getSimpleField(StatsHolder.TIMESTAMP_NAME);
-      }
+      String timestamp = _record.getStringField(StatsHolder.TIMESTAMP_NAME, "-1");
       for (String subKey : currMap.keySet())
       {
         if (subKey.equals("StatsHolder.TIMESTAMP_NAME"))

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d4e448be/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 10d7483..8733cc7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -76,25 +76,13 @@ public class IdealState extends HelixProperty
 
   public void setIdealStateMode(String mode)
   {
-    _record
-        .setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), mode);
+    _record.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), mode);
   }
   
   public int getMaxPartitionsPerInstance()
   {
-    try
-    {
-      String strVal =  _record
-          .getSimpleField(IdealStateProperty.MAX_PARTITONS_PER_INSTANCE.toString());
-      if(strVal != null)
-      {
-        return Integer.parseInt(strVal);
-      }
-    } 
-    catch (Exception e)
-    {
-    }
-    return Integer.MAX_VALUE;
+    return _record.getIntField(IdealStateProperty.MAX_PARTITONS_PER_INSTANCE.toString(),
+        Integer.MAX_VALUE);
   }
   
   public void setRebalancerClassName(String rebalancerClassName)
@@ -110,21 +98,13 @@ public class IdealState extends HelixProperty
   
   public void setMaxPartitionsPerInstance(int max)
   {
-    _record
-    .setSimpleField(IdealStateProperty.MAX_PARTITONS_PER_INSTANCE.toString(), Integer.toString(max));
+    _record.setIntField(IdealStateProperty.MAX_PARTITONS_PER_INSTANCE.toString(), max);
   }
 
   public IdealStateModeProperty getIdealStateMode()
   {
-    String mode = _record.getSimpleField(IdealStateProperty.IDEAL_STATE_MODE
-        .toString());
-    try
-    {
-      return IdealStateModeProperty.valueOf(mode);
-    } catch (Exception e)
-    {
-      return IdealStateModeProperty.AUTO;
-    }
+    return _record.getEnumField(IdealStateProperty.IDEAL_STATE_MODE.toString(),
+        IdealStateModeProperty.class, IdealStateModeProperty.AUTO);
   }
 
   public void setPartitionState(String partitionName, String instanceName,
@@ -219,23 +199,12 @@ public class IdealState extends HelixProperty
 
   public void setNumPartitions(int numPartitions)
   {
-    _record.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
-        String.valueOf(numPartitions));
+    _record.setIntField(IdealStateProperty.NUM_PARTITIONS.toString(), numPartitions);
   }
 
   public int getNumPartitions()
   {
-    String numPartitionStr = _record
-        .getSimpleField(IdealStateProperty.NUM_PARTITIONS.toString());
-
-    try
-    {
-      return Integer.parseInt(numPartitionStr);
-    } catch (Exception e)
-    {
-      logger.error("Can't parse number of partitions: " + numPartitionStr, e);
-      return -1;
-    }
+    return _record.getIntField(IdealStateProperty.NUM_PARTITIONS.toString(), -1);
   }
 
   public void setReplicas(String replicas)
@@ -303,32 +272,13 @@ public class IdealState extends HelixProperty
 
   public String getStateModelFactoryName()
   {
-    String ftyName = _record
-        .getSimpleField(IdealStateProperty.STATE_MODEL_FACTORY_NAME.toString());
-    if (ftyName == null)
-    {
-      ftyName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
-    }
-    return ftyName;
+    return _record.getStringField(IdealStateProperty.STATE_MODEL_FACTORY_NAME.toString(),
+        HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
   }
 
   public int getRebalanceTimerPeriod()
   {
-    if (_record.getSimpleFields().containsKey(
-        IdealStateProperty.REBALANCE_TIMER_PERIOD.toString()))
-    {
-      try
-      {
-        int result = Integer.parseInt(_record
-            .getSimpleField(IdealStateProperty.REBALANCE_TIMER_PERIOD
-                .toString()));
-        return result;
-      } catch (Exception e)
-      {
-        logger.error("", e);
-      }
-    }
-    return -1;
+    return _record.getIntField(IdealStateProperty.REBALANCE_TIMER_PERIOD.toString(), -1);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d4e448be/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 9ac594a..6c9ac24 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -125,13 +125,12 @@ public class InstanceConfig extends HelixProperty
 
   public boolean getInstanceEnabled()
   {
-    String isEnabled = _record.getSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString());
-    return isEnabled == null || Boolean.parseBoolean(isEnabled);
+    return _record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.toString(), true);
   }
 
   public void setInstanceEnabled(boolean enabled)
   {
-    _record.setSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString(), Boolean.toString(enabled));
+    _record.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.toString(), enabled);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d4e448be/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 9d07404..fc4df4c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -113,8 +113,7 @@ public class Message extends HelixProperty
     _record.setSimpleField(Attributes.MSG_TYPE.toString(), type);
     setMsgId(msgId);
     setMsgState(MessageState.NEW);
-    _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(),
-                           "" + new Date().getTime());
+    _record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), new Date().getTime());
   }
 
   public Message(ZNRecord record)
@@ -126,14 +125,13 @@ public class Message extends HelixProperty
     }
     if (getCreateTimeStamp() == 0)
     {
-      _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(),
-                             "" + new Date().getTime());
+      _record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), new Date().getTime());
     }
   }
 
   public void setCreateTimeStamp(long timestamp)
   {
-    _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(), "" + timestamp);
+    _record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), timestamp);
   }
 
   public Message(ZNRecord record, String id)
@@ -199,16 +197,13 @@ public class Message extends HelixProperty
 
   public void setSrcInstanceType(InstanceType type)
   {
-    _record.setSimpleField(Attributes.SRC_INSTANCE_TYPE.toString(), type.toString());
+    _record.setEnumField(Attributes.SRC_INSTANCE_TYPE.toString(), type);
   }
 
   public InstanceType getSrcInstanceType()
   {
-    if (_record.getSimpleFields().containsKey(Attributes.SRC_INSTANCE_TYPE.toString()))
-    {
-      return InstanceType.valueOf(_record.getSimpleField(Attributes.SRC_INSTANCE_TYPE.toString()));
-    }
-    return InstanceType.PARTICIPANT;
+    return _record.getEnumField(Attributes.SRC_INSTANCE_TYPE.toString(),
+        InstanceType.class, InstanceType.PARTICIPANT);
   }
 
   public void setSrcName(String msgSrc)
@@ -311,64 +306,27 @@ public class Message extends HelixProperty
 
   public void setReadTimeStamp(long time)
   {
-    _record.setSimpleField(Attributes.READ_TIMESTAMP.toString(), "" + time);
+    _record.setLongField(Attributes.READ_TIMESTAMP.toString(), time);
   }
 
   public void setExecuteStartTimeStamp(long time)
   {
-    _record.setSimpleField(Attributes.EXECUTE_START_TIMESTAMP.toString(), "" + time);
+    _record.setLongField(Attributes.EXECUTE_START_TIMESTAMP.toString(), time);
   }
 
   public long getReadTimeStamp()
   {
-    String timestamp = _record.getSimpleField(Attributes.READ_TIMESTAMP.toString());
-    if (timestamp == null)
-    {
-      return 0;
-    }
-    try
-    {
-      return Long.parseLong(timestamp);
-    }
-    catch (Exception e)
-    {
-      return 0;
-    }
-
+    return _record.getLongField(Attributes.READ_TIMESTAMP.toString(), 0L);
   }
 
   public long getExecuteStartTimeStamp()
   {
-    String timestamp =
-        _record.getSimpleField(Attributes.EXECUTE_START_TIMESTAMP.toString());
-    if (timestamp == null)
-    {
-      return 0;
-    }
-    try
-    {
-      return Long.parseLong(timestamp);
-    }
-    catch (Exception e)
-    {
-      return 0;
-    }
+    return _record.getLongField(Attributes.EXECUTE_START_TIMESTAMP.toString(), 0L);
   }
 
   public long getCreateTimeStamp()
   {
-    if (_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()) == null)
-    {
-      return 0;
-    }
-    try
-    {
-      return Long.parseLong(_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()));
-    }
-    catch (Exception e)
-    {
-      return 0;
-    }
+    return _record.getLongField(Attributes.CREATE_TIMESTAMP.toString(), 0L);
   }
 
   public void setCorrelationId(String correlationId)
@@ -383,41 +341,22 @@ public class Message extends HelixProperty
 
   public int getExecutionTimeout()
   {
-    if (!_record.getSimpleFields().containsKey(Attributes.TIMEOUT.toString()))
-    {
-      return -1;
-    }
-    try
-    {
-      return Integer.parseInt(_record.getSimpleField(Attributes.TIMEOUT.toString()));
-    }
-    catch (Exception e)
-    {
-    }
-    return -1;
+    return _record.getIntField(Attributes.TIMEOUT.toString(), -1);
   }
 
   public void setExecutionTimeout(int timeout)
   {
-    _record.setSimpleField(Attributes.TIMEOUT.toString(), "" + timeout);
+    _record.setIntField(Attributes.TIMEOUT.toString(), timeout);
   }
 
   public void setRetryCount(int retryCount)
   {
-    _record.setSimpleField(Attributes.RETRY_COUNT.toString(), "" + retryCount);
+    _record.setIntField(Attributes.RETRY_COUNT.toString(), retryCount);
   }
 
   public int getRetryCount()
   {
-    try
-    {
-      return Integer.parseInt(_record.getSimpleField(Attributes.RETRY_COUNT.toString()));
-    }
-    catch (Exception e)
-    {
-    }
-    // Default to 0, and there is no retry if timeout happens
-    return 0;
+    return _record.getIntField(Attributes.RETRY_COUNT.toString(), 0);
   }
 
   public Map<String, String> getResultMap()
@@ -444,20 +383,7 @@ public class Message extends HelixProperty
   @Override
   public int getBucketSize()
   {
-    String bucketSizeStr = _record.getSimpleField(Attributes.BUCKET_SIZE.toString());
-    int bucketSize = 0;
-    if (bucketSizeStr != null)
-    {
-      try
-      {
-        bucketSize = Integer.parseInt(bucketSizeStr);
-      }
-      catch (NumberFormatException e)
-      {
-        // OK
-      }
-    }
-    return bucketSize;
+    return _record.getIntField(Attributes.BUCKET_SIZE.toString(), 0);
   }
 
   @Override
@@ -465,7 +391,7 @@ public class Message extends HelixProperty
   {
     if (bucketSize > 0)
     {
-      _record.setSimpleField(Attributes.BUCKET_SIZE.toString(), "" + bucketSize);
+      _record.setIntField(Attributes.BUCKET_SIZE.toString(), bucketSize);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d4e448be/helix-core/src/test/java/org/apache/helix/manager/zk/serializer/TestJacksonPayloadSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/serializer/TestJacksonPayloadSerializer.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/serializer/TestJacksonPayloadSerializer.java
new file mode 100644
index 0000000..0c4bc66
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/serializer/TestJacksonPayloadSerializer.java
@@ -0,0 +1,181 @@
+package org.apache.helix.manager.zk.serializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestJacksonPayloadSerializer
+{
+  /**
+   * Ensure that the JacksonPayloadSerializer can serialize and deserialize arbitrary objects
+   */
+  @Test
+  public void testJacksonSerializeDeserialize()
+  {
+    final String RECORD_ID = "testJacksonSerializeDeserialize";
+    SampleDeserialized sample = getSample();
+    ZNRecord znRecord = new ZNRecord(RECORD_ID, new JacksonPayloadSerializer());
+    znRecord.setPayload(sample);
+    SampleDeserialized duplicate = znRecord.getPayload(SampleDeserialized.class);
+    Assert.assertEquals(duplicate, sample);
+  }
+
+  /**
+   * Test that the payload can be deserialized after serializing and deserializing the ZNRecord
+   * that encloses it. This uses ZNRecordSerializer.
+   */
+  @Test
+  public void testFullZNRecordSerializeDeserialize()
+  {
+    final String RECORD_ID = "testFullZNRecordSerializeDeserialize";
+    SampleDeserialized sample = getSample();
+    ZNRecord znRecord = new ZNRecord(RECORD_ID, new JacksonPayloadSerializer());
+    znRecord.setPayload(sample);
+    ZNRecordSerializer znRecordSerializer = new ZNRecordSerializer();
+    byte[] serialized = znRecordSerializer.serialize(znRecord);
+    ZNRecord deserialized = (ZNRecord) znRecordSerializer.deserialize(serialized);
+    deserialized.setPayloadSerializer(new JacksonPayloadSerializer());
+    SampleDeserialized duplicate = deserialized.getPayload(SampleDeserialized.class);
+    Assert.assertEquals(duplicate, sample);
+  }
+
+  /**
+   * Test that the payload can be deserialized after serializing and deserializing the ZNRecord
+   * that encloses it. This uses ZNRecordStreamingSerializer.
+   */
+  @Test
+  public void testFullZNRecordStreamingSerializeDeserialize()
+  {
+    final String RECORD_ID = "testFullZNRecordStreamingSerializeDeserialize";
+    SampleDeserialized sample = getSample();
+    ZNRecord znRecord = new ZNRecord(RECORD_ID, new JacksonPayloadSerializer());
+    znRecord.setPayload(sample);
+    ZNRecordStreamingSerializer znRecordSerializer = new ZNRecordStreamingSerializer();
+    byte[] serialized = znRecordSerializer.serialize(znRecord);
+    ZNRecord deserialized = (ZNRecord) znRecordSerializer.deserialize(serialized);
+    deserialized.setPayloadSerializer(new JacksonPayloadSerializer());
+    SampleDeserialized duplicate = deserialized.getPayload(SampleDeserialized.class);
+    Assert.assertEquals(duplicate, sample);
+  }
+
+  /**
+   * Get an object which can be tested for serialization success or failure
+   * @return Initialized SampleDeserialized object
+   */
+  private SampleDeserialized getSample()
+  {
+    final int INT_FIELD_VALUE = 12345;
+    final int LIST_FIELD_COUNT = 5;
+    List<Integer> intList = new LinkedList<Integer>();
+    for (int i = 0; i < LIST_FIELD_COUNT; i++)
+    {
+      intList.add(i);
+    }
+    return new SampleDeserialized(INT_FIELD_VALUE, intList);
+  }
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  public static class SampleDeserialized
+  {
+    private int _intField;
+    private List<Integer> _listField;
+
+    public SampleDeserialized()
+    {
+    }
+
+    public SampleDeserialized(int intField, List<Integer> listField)
+    {
+      _intField = intField;
+      _listField = listField;
+    }
+
+    @JsonProperty
+    public void setIntField(int value)
+    {
+      _intField = value;
+    }
+
+    @JsonProperty
+    public int getIntField()
+    {
+      return _intField;
+    }
+
+    @JsonProperty
+    public void setListField(final List<Integer> listField)
+    {
+      _listField = listField;
+    }
+
+    @JsonProperty
+    public List<Integer> getListField()
+    {
+      return _listField;
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+      boolean result = true;
+      if (other instanceof SampleDeserialized)
+      {
+        SampleDeserialized that = (SampleDeserialized)other;
+        if (_intField != that._intField)
+        {
+          // ints must match
+          result =  false;
+        }
+        else if (_listField != null)
+        {
+          // lists must match if one is not null
+          if (!_listField.equals(that._listField))
+          {
+            result = false;
+          }
+        }
+        else
+        {
+          // both must be null if one is null
+          if (that._listField != null)
+          {
+            result = false;
+          }
+        }
+      }
+      else
+      {
+        // only compare objects of the same type
+        result = false;
+      }
+      return result;
+    }
+  }
+
+}


Mime
View raw message