ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [7/7] ignite git commit: ignite-4154 zip
Date Wed, 02 Nov 2016 14:10:47 GMT
ignite-4154 zip


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e59a532e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e59a532e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e59a532e

Branch: refs/heads/ignite-4154-3
Commit: e59a532e59dbeeb089897e4f52a3daa78afc3923
Parents: d5d58f0
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Nov 2 16:07:15 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Nov 2 17:10:26 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 15 +++++-
 .../GridDhtPartitionsAbstractMessage.java       | 34 ++++++++++++-
 .../GridDhtPartitionsExchangeFuture.java        | 14 +++++-
 .../preloader/GridDhtPartitionsFullMessage.java | 53 ++++++++++++++++----
 .../GridDhtPartitionsSingleMessage.java         | 52 ++++++++++++++-----
 .../GridDhtPartitionsSingleRequest.java         |  4 +-
 .../ignite/internal/util/IgniteUtils.java       | 52 +++++++++++++++++++
 7 files changed, 194 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a901e2a..a81bf0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -765,12 +766,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
 
         boolean useOldApi = false;
+        boolean compress = true;
 
         for (ClusterNode node : nodes) {
-            if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
+            if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
                 useOldApi = true;
+                compress = false;
+
+                break;
+            }
+            else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE)
< 0)
+                compress = false;
         }
 
+        m.compress(compress);
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal() && cacheCtx.started()) {
                 GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
@@ -817,7 +827,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId
id) {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
             cctx.kernalContext().clientNode(),
-            cctx.versions().last());
+            cctx.versions().last(),
+            node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE)
>= 0);
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 4e714ed..a3bb5f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
@@ -29,7 +30,13 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Request for single partition info.
  */
-abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
+public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
+    /** */
+    public static final IgniteProductVersion PART_MAP_COMPRESS_SINCE = IgniteProductVersion.fromString("1.6.11");
+
+    /** */
+    protected static final byte COMPRESSED_FLAG_MASK = 1;
+
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -39,6 +46,9 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
{
     /** Last used cache version. */
     private GridCacheVersion lastVer;
 
+    /** */
+    private byte flags;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -79,6 +89,14 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
{
         return lastVer;
     }
 
+    protected final boolean compressed() {
+        return (flags & COMPRESSED_FLAG_MASK) != 0;
+    }
+
+    protected final void compressed(boolean compressed) {
+        flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -101,6 +119,12 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
{
                 writer.incrementState();
 
             case 4:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
                 if (!writer.writeMessage("lastVer", lastVer))
                     return false;
 
@@ -131,6 +155,14 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
{
                 reader.incrementState();
 
             case 4:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
                 lastVer = reader.readMessage("lastVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 80b3768..6a17583 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -935,7 +935,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         throws IgniteCheckedException {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
             clientOnlyExchange,
-            cctx.versions().last());
+            cctx.versions().last(),
+            node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE)
>= 0);
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {
@@ -974,14 +975,23 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             topologyVersion());
 
         boolean useOldApi = false;
+        boolean compress = true;
 
         if (nodes != null) {
             for (ClusterNode node : nodes) {
-                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
+                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
                     useOldApi = true;
+                    compress = false;
+
+                    break;
+                }
+                else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE)
< 0)
+                    compress = false;
             }
         }
 
+        m.compress(compress);
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {
                 AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index a4ff04b..ea51f6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -61,6 +61,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    @GridDirectTransient
+    private boolean compress;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -83,6 +87,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         this.topVer = topVer;
     }
 
+    public void compress(boolean compress) {
+        this.compress = compress;
+    }
+
     /**
      * @return Local partitions.
      */
@@ -137,6 +145,21 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         if (partCntrs != null && partCntrsBytes == null)
             partCntrsBytes = U.marshal(ctx, partCntrs);
+
+        if (compress && !compressed()) {
+            try {
+                byte[] partsBytesZip = U.zip(partsBytes);
+                byte[] partCntrsBytesZip = U.zip(partCntrsBytes);
+
+                partsBytes = partsBytesZip;
+                partCntrsBytes = partCntrsBytesZip;
+
+                compressed(true);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(ctx.logger(getClass()), "Failed to compress partitions data: " +
e, e);
+            }
+        }
     }
 
     /**
@@ -157,14 +180,22 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws
IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (partsBytes != null && parts == null)
-            parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partsBytes != null && parts == null) {
+            if (compressed())
+                parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+            else
+                parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
 
         if (parts == null)
             parts = new HashMap<>();
 
-        if (partCntrsBytes != null && partCntrs == null)
-            partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partCntrsBytes != null && partCntrs == null) {
+            if (compressed())
+                partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+            else
+                partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
 
         if (partCntrs == null)
             partCntrs = new HashMap<>();
@@ -185,19 +216,19 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         }
 
         switch (writer.state()) {
-            case 5:
+            case 6:
                 if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 7:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 8:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -219,7 +250,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             return false;
 
         switch (reader.state()) {
-            case 5:
+            case 6:
                 partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -227,7 +258,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 6:
+            case 7:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -235,7 +266,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 7:
+            case 8:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -255,7 +286,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 9;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index e4356b1..fdfc485 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -59,6 +59,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** */
     private boolean client;
 
+    /** */
+    private boolean compress;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -73,10 +76,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      */
     public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
         boolean client,
-        @Nullable GridCacheVersion lastVer) {
+        @Nullable GridCacheVersion lastVer,
+        boolean compress) {
         super(exchId, lastVer);
 
         this.client = client;
+        this.compress = compress;
     }
 
     /**
@@ -141,17 +146,40 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         if (partCntrsBytes == null && partCntrs != null)
             partCntrsBytes = U.marshal(ctx, partCntrs);
+
+        if (compress && !compressed()) {
+            try {
+                byte[] partsBytesZip = U.zip(partsBytes);
+                byte[] partCntrsBytesZip = U.zip(partCntrsBytes);
+
+                partsBytes = partsBytesZip;
+                partCntrsBytes = partCntrsBytesZip;
+
+                compressed(true);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(ctx.logger(getClass()), "Failed to compress partitions data: " +
e, e);
+            }
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws
IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (partsBytes != null && parts == null)
-            parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partsBytes != null && parts == null) {
+            if (compressed())
+                parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+            else
+                parts =U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
 
-        if (partCntrsBytes != null && partCntrs == null)
-            partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (partCntrsBytes != null && partCntrs == null) {
+            if (compressed())
+                partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+            else
+                partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        }
     }
 
     /** {@inheritDoc} */
@@ -169,19 +197,19 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         }
 
         switch (writer.state()) {
-            case 5:
+            case 6:
                 if (!writer.writeBoolean("client", client))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 7:
                 if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 8:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -203,7 +231,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             return false;
 
         switch (reader.state()) {
-            case 5:
+            case 6:
                 client = reader.readBoolean("client");
 
                 if (!reader.isLastRead())
@@ -211,7 +239,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 6:
+            case 7:
                 partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -219,7 +247,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 7:
+            case 8:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -239,7 +267,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 9;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index a4106af..850b6d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -81,11 +81,11 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 6;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtPartitionsSingleRequest.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e59a532e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1e8d648..da4edc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.util;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.Externalizable;
@@ -128,6 +130,8 @@ import java.util.logging.Logger;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
 import javax.management.DynamicMBean;
 import javax.management.JMException;
 import javax.management.MBeanServer;
@@ -9665,6 +9669,25 @@ public abstract class IgniteUtils {
         }
     }
 
+    public static <T> T unmarshalZip(Marshaller marsh, byte[] zipBytes, @Nullable ClassLoader
clsLdr) throws IgniteCheckedException {
+        assert marsh != null;
+        assert zipBytes != null;
+
+        try {
+            ZipInputStream in = new ZipInputStream(new ByteArrayInputStream(zipBytes));
+
+            in.getNextEntry();
+
+            return marsh.unmarshal(in, clsLdr);
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
     /**
      * Unmarshals object from the input stream using given class loader.
      * This method should not close given input stream.
@@ -9880,4 +9903,33 @@ public abstract class IgniteUtils {
         if (oldName != curName)
             LOC_IGNITE_NAME.set(oldName);
     }
+
+    public static byte[] zip(@Nullable byte[] bytes) throws IgniteCheckedException {
+        try {
+            if (bytes == null)
+                return null;
+
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+            try (ZipOutputStream zos = new ZipOutputStream(bos)) {
+                ZipEntry entry = new ZipEntry("");
+
+                try {
+                    entry.setSize(bytes.length);
+
+                    zos.putNextEntry(entry);
+
+                    zos.write(bytes);
+                }
+                finally {
+                    zos.closeEntry();
+                }
+            }
+
+            return bos.toByteArray();
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
 }


Mime
View raw message