ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [14/50] [abbrv] ignite git commit: Restored services compatibility.
Date Thu, 17 Nov 2016 10:51:37 GMT
Restored services compatibility.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/92fff630
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/92fff630
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/92fff630

Branch: refs/heads/ignite-2693
Commit: 92fff630fbf36c82f93bbd9ddd53d11bed44e772
Parents: 61ab650
Author: devozerov <vozerov@gridgain.com>
Authored: Wed Nov 2 17:50:51 2016 +0300
Committer: thatcoach <ppozerov@list.ru>
Committed: Wed Nov 2 17:51:06 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/job/GridJobWorker.java  |  10 +-
 .../service/GridServiceProcessor.java           | 197 +++++++++++--------
 .../internal/util/SerializableTransient.java    |  58 ++++++
 .../ignite/marshaller/MarshallerUtils.java      |  22 +++
 .../optimized/OptimizedClassDescriptor.java     |  90 ++++++++-
 5 files changed, 296 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/92fff630/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 8169eb1..5f38b29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -57,6 +57,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED;
@@ -421,7 +422,14 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject
{
 
         try {
             if (job == null) {
-                job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(),
ctx.config()));
+                MarshallerUtils.jobSenderVersion(taskNode.version());
+
+                try {
+                    job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(),
ctx.config()));
+                }
+                finally {
+                    MarshallerUtils.jobSenderVersion(null);
+                }
 
                 // No need to hold reference any more.
                 jobBytes = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/92fff630/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 527d360..8489875 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.service;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
@@ -87,6 +90,7 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.internal.util.SerializableTransient;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.JobContextResource;
 import org.apache.ignite.resources.LoggerResource;
@@ -115,6 +119,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /** */
     public static final IgniteProductVersion LAZY_SERVICES_CFG_SINCE = IgniteProductVersion.fromString("1.5.22");
 
+    /** Versions that only compatible with each other, and from 1.5.33. */
+    private static final Set<IgniteProductVersion> SERVICE_TOP_CALLABLE_VER1;
+
     /** */
     private final Boolean srvcCompatibilitySysProp;
 
@@ -162,6 +169,31 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /** Topology listener. */
     private GridLocalEventListener topLsnr = new TopologyListener();
 
+    static {
+        Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>()
{
+            @Override public int compare(final IgniteProductVersion o1, final IgniteProductVersion
o2) {
+                return o1.compareToIgnoreTimestamp(o2);
+            }
+        });
+
+        versions.add(IgniteProductVersion.fromString("1.5.30"));
+        versions.add(IgniteProductVersion.fromString("1.5.31"));
+        versions.add(IgniteProductVersion.fromString("1.5.32"));
+        versions.add(IgniteProductVersion.fromString("1.6.3"));
+        versions.add(IgniteProductVersion.fromString("1.6.4"));
+        versions.add(IgniteProductVersion.fromString("1.6.5"));
+        versions.add(IgniteProductVersion.fromString("1.6.6"));
+        versions.add(IgniteProductVersion.fromString("1.6.7"));
+        versions.add(IgniteProductVersion.fromString("1.6.8"));
+        versions.add(IgniteProductVersion.fromString("1.6.9"));
+        versions.add(IgniteProductVersion.fromString("1.6.10"));
+        versions.add(IgniteProductVersion.fromString("1.7.0"));
+        versions.add(IgniteProductVersion.fromString("1.7.1"));
+        versions.add(IgniteProductVersion.fromString("1.7.2"));
+
+        SERVICE_TOP_CALLABLE_VER1 = Collections.unmodifiableSet(versions);
+    }
+
     /**
      * @param ctx Kernal context.
      */
@@ -668,9 +700,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         ClusterNode node = cache.affinity().mapKeyToNode(name);
 
         if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) {
+            final ServiceTopologyCallable call = new ServiceTopologyCallable(name);
+
+            call.serialize = SERVICE_TOP_CALLABLE_VER1.contains(node.version());
+
             return ctx.closure().callAsyncNoFailover(
                 GridClosureCallMode.BROADCAST,
-                new ServiceTopologyCallable(name),
+                call,
                 Collections.singletonList(node),
                 false
             ).get();
@@ -815,7 +851,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             }
         }
 
-        return new GridServiceProxy<T>(prj, name, svcItf, sticky, ctx).proxy();
+        return new GridServiceProxy<>(prj, name, svcItf, sticky, ctx).proxy();
     }
 
     /**
@@ -868,7 +904,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
      * @param topVer Topology version.
      * @throws IgniteCheckedException If failed.
      */
-    private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws
IgniteCheckedException {
+    private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheckedException
{
         ServiceConfiguration cfg = dep.configuration();
 
         Object nodeFilter = cfg.getNodeFilter();
@@ -882,7 +918,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         Object affKey = cfg.getAffinityKey();
 
         while (true) {
-            GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(),
topVer.topologyVersion());
+            GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(),
topVer);
 
              Collection<ClusterNode> nodes;
 
@@ -912,7 +948,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 Map<UUID, Integer> cnts = new HashMap<>();
 
                 if (affKey != null) {
-                    ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, topVer);
+                    ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, new AffinityTopologyVersion(topVer));
 
                     if (n != null) {
                         int cnt = maxPerNodeCnt == 0 ? totalCnt == 0 ? 1 : totalCnt : maxPerNodeCnt;
@@ -1144,7 +1180,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         if (cfg instanceof LazyServiceConfiguration) {
             byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
 
-            Service srvc = U.unmarshal(m, bytes, U.resolveClassLoader(null, ctx.config()));
+            Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config()));
 
             ctx.resource().inject(srvc);
 
@@ -1154,9 +1190,10 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             Service svc = cfg.getService();
 
             try {
-                byte[] bytes = U.marshal(m, svc);
+                byte[] bytes = m.marshal(svc);
 
-                Service cp = U.unmarshal(m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(),
ctx.config()));
+                Service cp = m.unmarshal(bytes,
+                    U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
 
                 ctx.resource().inject(cp);
 
@@ -1231,8 +1268,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 ClusterNode oldestSrvNode =
                     CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
 
-                if (oldestSrvNode == null)
-                    return new GridEmptyIterator<>();
+            if (oldestSrvNode == null)
+                return F.emptyIterator();
 
                 GridCacheQueryManager qryMgr = cache.context().queries();
 
@@ -1418,7 +1455,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             svcName.set(dep.configuration().getName());
 
             // Ignore other utility cache events.
-            AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+            long topVer = ctx.discovery().topologyVersion();
 
             ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
 
@@ -1469,60 +1506,60 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         }
     }
 
-        /**
-         * Deployment callback.
-         *
-         * @param dep Service deployment.
-         * @param topVer Topology version.
-         */
-        private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion
topVer) {
-            // Retry forever.
-            try {
-                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+    /**
+     * Deployment callback.
+     *
+     * @param dep Service deployment.
+     * @param topVer Topology version.
+     */
+    private void onDeployment(final GridServiceDeployment dep, final long topVer) {
+        // Retry forever.
+        try {
+            long newTopVer = ctx.discovery().topologyVersion();
 
-                // If topology version changed, reassignment will happen from topology event.
-                if (newTopVer.equals(topVer))
-                    reassign(dep, topVer);
-            }
-            catch (IgniteCheckedException e) {
-                if (!(e instanceof ClusterTopologyCheckedException))
-                    log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(),
e);
+            // If topology version changed, reassignment will happen from topology event.
+            if (newTopVer == topVer)
+                reassign(dep, topVer);
+        }
+        catch (IgniteCheckedException e) {
+            if (!(e instanceof ClusterTopologyCheckedException))
+                log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(),
e);
 
-                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+            long newTopVer = ctx.discovery().topologyVersion();
 
-                if (!newTopVer.equals(topVer)) {
-                    assert newTopVer.compareTo(topVer) > 0;
+            if (newTopVer != topVer) {
+                assert newTopVer > topVer;
 
-                    // Reassignment will happen from topology event.
-                    return;
-                }
+                // Reassignment will happen from topology event.
+                return;
+            }
 
-                ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
-                    private IgniteUuid id = IgniteUuid.randomUuid();
+            ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+                private IgniteUuid id = IgniteUuid.randomUuid();
 
-                    private long start = System.currentTimeMillis();
+                private long start = System.currentTimeMillis();
 
-                    @Override public IgniteUuid timeoutId() {
-                        return id;
-                    }
+                @Override public IgniteUuid timeoutId() {
+                    return id;
+                }
 
-                    @Override public long endTime() {
-                        return start + RETRY_TIMEOUT;
-                    }
+                @Override public long endTime() {
+                    return start + RETRY_TIMEOUT;
+                }
 
-                    @Override public void onTimeout() {
-                        if (!busyLock.enterBusy())
-                            return;
+                @Override public void onTimeout() {
+                    if (!busyLock.enterBusy())
+                        return;
 
-                        try {
-                            // Try again.
-                            onDeployment(dep, topVer);
-                        }
-                        finally {
-                            busyLock.leaveBusy();
-                        }
+                    try {
+                        // Try again.
+                        onDeployment(dep, topVer);
                     }
-                });
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            });
         }
     }
 
@@ -1531,28 +1568,16 @@ public class GridServiceProcessor extends GridProcessorAdapter {
      */
     private class TopologyListener implements GridLocalEventListener {
         /** {@inheritDoc} */
-        @Override public void onEvent(Event evt) {
+        @Override public void onEvent(final Event evt) {
             if (!busyLock.enterBusy())
                 return;
 
             try {
-                final AffinityTopologyVersion topVer;
-
-                if (evt instanceof DiscoveryCustomEvent) {
-                    DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
-
-                    topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
-
-                    if (msg instanceof CacheAffinityChangeMessage) {
-                        if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
-                            return;
-                    }
-                }
-                else
-                    topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(),
0);
-
                 depExe.submit(new BusyRunnable() {
                     @Override public void run0() {
+                        AffinityTopologyVersion topVer =
+                            new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion());
+
                         ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(),
topVer);
 
                         if (oldest != null && oldest.isLocal()) {
@@ -1587,7 +1612,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                                         ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity().
                                             affinityReadyFuture(topVer).get();
 
-                                        reassign(dep, topVer);
+                                        reassign(dep, topVer.topologyVersion());
                                     }
                                     catch (IgniteCheckedException ex) {
                                         if (!(e instanceof ClusterTopologyCheckedException))
@@ -1604,7 +1629,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                             }
 
                             if (!retries.isEmpty())
-                                onReassignmentFailed(topVer, retries);
+                                onReassignmentFailed(topVer.topologyVersion(), retries);
                         }
 
                         // Clean up zombie assignments.
@@ -1641,14 +1666,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
          * @param topVer Topology version.
          * @param retries Retries.
          */
-        private void onReassignmentFailed(final AffinityTopologyVersion topVer,
-            final Collection<GridServiceDeployment> retries) {
+        private void onReassignmentFailed(final long topVer, final Collection<GridServiceDeployment>
retries) {
             if (!busyLock.enterBusy())
                 return;
 
             try {
                 // If topology changed again, let next event handle it.
-                if (ctx.discovery().topologyVersionEx().equals(topVer))
+                if (ctx.discovery().topologyVersion() != topVer)
                     return;
 
                 for (Iterator<GridServiceDeployment> it = retries.iterator(); it.hasNext();
) {
@@ -1829,6 +1853,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /**
      */
     @GridInternal
+    @SerializableTransient(methodName = "serializableTransient")
     private static class ServiceTopologyCallable implements IgniteCallable<Map<UUID,
Integer>> {
         /** */
         private static final long serialVersionUID = 0L;
@@ -1837,10 +1862,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.7");
 
         /** */
+        private static final String[] SER_FIELDS = {"waitedCacheInit", "jCtx", "log"};
+
+        /** */
         private final String svcName;
 
         /** */
-        private boolean waitedCacheInit;
+        private transient boolean waitedCacheInit;
 
         /** */
         @IgniteInstanceResource
@@ -1848,11 +1876,14 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         /** */
         @JobContextResource
-        private ComputeJobContext jCtx;
+        private transient ComputeJobContext jCtx;
 
         /** */
         @LoggerResource
-        private IgniteLogger log;
+        private transient IgniteLogger log;
+
+        /** */
+        transient boolean serialize;
 
         /**
          * @param svcName Service name.
@@ -1898,6 +1929,16 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
             return serviceTopology(cache, svcName);
         }
+
+        /**
+         * @param self Instance of current class before serialization.
+         * @param ver Sender job version.
+         * @return List of serializable transient fields.
+         */
+        @SuppressWarnings("unused")
+        private static String[] serializableTransient(ServiceTopologyCallable self, IgniteProductVersion
ver) {
+            return (self != null && self.serialize) || (ver != null && SERVICE_TOP_CALLABLE_VER1.contains(ver))
? SER_FIELDS : null;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/92fff630/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
new file mode 100644
index 0000000..14a2f27
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.lang.IgniteProductVersion;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks class as it has transient fields that should be serialized.
+ * Annotated class must have method that returns list of transient
+ * fields that should be serialized.
+ * <p>
+ *     Works only for jobs. For other messages node version is not available.
+ * </p>
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface SerializableTransient {
+    /**
+     * Name of the private static method that returns list of transient fields
+     * that should be serialized (String[]), and accepts itself (before serialization)
+     * and {@link IgniteProductVersion}, e.g.
+     * <pre>
+     *     private static String[] fields(Object self, IgniteProductVersion ver){
+     *         return ver.compareTo("1.5.30") > 0 ? SERIALIZABLE_FIELDS : null;
+     *     }
+     * </pre>
+     * <p>
+     *     On serialization version argument <tt>ver</tt> is null, on deserialization
- <tt>self</tt> is null.
+     * </p>
+     * <p>
+     *     If it returns empty array or null all transient fields will be normally
+     *     ignored.
+     * </p>
+     *
+     * @return Name of the method.
+     */
+    String methodName();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/92fff630/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
index 9668baf..ad63702 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.marshaller;
 
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.jetbrains.annotations.Nullable;
 
@@ -24,6 +25,9 @@ import org.jetbrains.annotations.Nullable;
  * Utility marshaller methods.
  */
 public class MarshallerUtils {
+    /** Job sender node version. */
+    private static final ThreadLocal<IgniteProductVersion> JOB_SND_NODE_VER = new ThreadLocal<>();
+
     /**
      * Set node name to marshaller context if possible.
      *
@@ -55,4 +59,22 @@ public class MarshallerUtils {
     private MarshallerUtils() {
         // No-op.
     }
+
+    /**
+     * Sets thread local job sender node version.
+     *
+     * @param ver Thread local job sender node version.
+     */
+    public static void jobSenderVersion(IgniteProductVersion ver) {
+        JOB_SND_NODE_VER.set(ver);
+    }
+
+    /**
+     * Returns thread local job sender node version.
+     *
+     * @return Thread local job sender node version.
+     */
+    public static IgniteProductVersion jobSenderVersion() {
+        return JOB_SND_NODE_VER.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/92fff630/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
index 5a5b54d..160f2c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java
@@ -47,8 +47,11 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.marshaller.MarshallerContext;
 import org.apache.ignite.marshaller.MarshallerExclusions;
+import org.apache.ignite.internal.util.SerializableTransient;
+import org.apache.ignite.marshaller.MarshallerUtils;
 
 import static java.lang.reflect.Modifier.isFinal;
 import static java.lang.reflect.Modifier.isPrivate;
@@ -166,6 +169,9 @@ class OptimizedClassDescriptor {
     /** Proxy interfaces. */
     private Class<?>[] proxyIntfs;
 
+    /** Method returns serializable transient fields. */
+    private Method serTransMtd;
+
     /**
      * Creates descriptor for class.
      *
@@ -441,6 +447,27 @@ class OptimizedClassDescriptor {
 
                         readObjMtds.add(mtd);
 
+                        final SerializableTransient serTransAn = c.getAnnotation(SerializableTransient.class);
+
+                        // Custom serialization policy for transient fields.
+                        if (serTransAn != null) {
+                            try {
+                                serTransMtd = c.getDeclaredMethod(serTransAn.methodName(),
cls, IgniteProductVersion.class);
+
+                                int mod = serTransMtd.getModifiers();
+
+                                if (isStatic(mod) && isPrivate(mod)
+                                    && serTransMtd.getReturnType() == String[].class)
+                                    serTransMtd.setAccessible(true);
+                                else
+                                    // Set method back to null if it has incorrect signature.
+                                    serTransMtd = null;
+                            }
+                            catch (NoSuchMethodException ignored) {
+                                serTransMtd = null;
+                            }
+                        }
+
                         Field[] clsFields0 = c.getDeclaredFields();
 
                         Map<String, Field> fieldNames = new HashMap<>();
@@ -797,7 +824,7 @@ class OptimizedClassDescriptor {
                 writeTypeData(out);
 
                 out.writeShort(checksum);
-                out.writeSerializable(obj, writeObjMtds, fields);
+                out.writeSerializable(obj, writeObjMtds, serializableFields(obj.getClass(),
obj, null));
 
                 break;
 
@@ -807,6 +834,60 @@ class OptimizedClassDescriptor {
     }
 
     /**
+     * Gets list of serializable fields. If {@link #serTransMtd} method
+     * returns list of transient fields, they will be added to other fields.
+     * Transient fields that are not included in that list will be normally
+     * ignored.
+     *
+     * @param cls Class.
+     * @param obj Object.
+     * @param ver Job sender version.
+     * @return Serializable fields.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private Fields serializableFields(Class<?> cls, Object obj, IgniteProductVersion
ver) {
+        if (serTransMtd == null)
+            return fields;
+
+        try {
+            final String[] transFields = (String[])serTransMtd.invoke(cls, obj, ver);
+
+            if (transFields == null || transFields.length == 0)
+                return fields;
+
+            List<FieldInfo> clsFields = new ArrayList<>();
+
+            clsFields.addAll(fields.fields.get(0).fields);
+
+            for (int i = 0; i < transFields.length; i++) {
+                final String fieldName = transFields[i];
+
+                final Field f = cls.getDeclaredField(fieldName);
+
+                FieldInfo fieldInfo = new FieldInfo(f, f.getName(),
+                    GridUnsafe.objectFieldOffset(f), fieldType(f.getType()));
+
+                clsFields.add(fieldInfo);
+            }
+
+            Collections.sort(clsFields, new Comparator<FieldInfo>() {
+                @Override public int compare(FieldInfo t1, FieldInfo t2) {
+                    return t1.name().compareTo(t2.name());
+                }
+            });
+
+            List<ClassFields> fields = new ArrayList<>();
+
+            fields.add(new ClassFields(clsFields));
+
+            return new Fields(fields);
+        }
+        catch (Exception e) {
+            return fields;
+        }
+    }
+
+    /**
      * @param out Output stream.
      * @throws IOException In case of error.
      */
@@ -838,7 +919,12 @@ class OptimizedClassDescriptor {
             case SERIALIZABLE:
                 verifyChecksum(in.readShort());
 
-                return in.readSerializable(cls, readObjMtds, readResolveMtd, fields);
+                // If no serialize method, then unmarshal as usual.
+                if (serTransMtd != null)
+                    return in.readSerializable(cls, readObjMtds, readResolveMtd,
+                        serializableFields(cls, null, MarshallerUtils.jobSenderVersion()));
+                else
+                    return in.readSerializable(cls, readObjMtds, readResolveMtd, fields);
 
             default:
                 assert false : "Unexpected type: " + type;


Mime
View raw message