Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B87AF18AB9 for ; Wed, 11 Nov 2015 08:24:59 +0000 (UTC) Received: (qmail 5440 invoked by uid 500); 11 Nov 2015 08:24:59 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 5335 invoked by uid 500); 11 Nov 2015 08:24:59 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 4463 invoked by uid 99); 11 Nov 2015 08:24:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 08:24:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A6B2CE0B20; Wed, 11 Nov 2015 08:24:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Wed, 11 Nov 2015 08:25:08 -0000 Message-Id: <17aef0bd704543908990e7815913471d@git.apache.org> In-Reply-To: <455558f17a2e4524819949c3f3e23f37@git.apache.org> References: <455558f17a2e4524819949c3f3e23f37@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/67] [abbrv] ignite git commit: Merged IGNITE-950-new into IGNITE-1282 http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java deleted file mode 100644 index b687666..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java +++ /dev/null @@ -1,1043 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.portable; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import javax.cache.Cache; -import javax.cache.CacheException; -import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryListenerException; -import javax.cache.event.CacheEntryUpdatedListener; -import javax.cache.event.EventType; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.MutableEntry; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgnitePortables; -import org.apache.ignite.cluster.ClusterTopologyException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.cache.CacheEntryEventSerializableFilter; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.portable.GridPortableMarshaller; -import org.apache.ignite.internal.portable.PortableContext; -import org.apache.ignite.internal.portable.PortableMetaDataHandler; -import org.apache.ignite.internal.portable.PortableMetaDataImpl; -import org.apache.ignite.internal.portable.PortableObjectImpl; -import org.apache.ignite.internal.portable.PortableObjectOffheapImpl; -import org.apache.ignite.internal.portable.PortableUtils; -import org.apache.ignite.internal.portable.builder.PortableBuilderImpl; -import org.apache.ignite.internal.portable.streams.PortableInputStream; -import org.apache.ignite.internal.portable.streams.PortableOffheapInputStream; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.CacheObjectImpl; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheUtils; -import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.query.CacheQuery; -import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; -import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; -import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl; -import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.util.lang.GridMapEntry; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.marshaller.portable.PortableMarshaller; -import org.apache.ignite.portable.PortableBuilder; -import org.apache.ignite.portable.PortableException; -import org.apache.ignite.portable.PortableMetadata; -import org.apache.ignite.portable.PortableObject; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; -import sun.misc.Unsafe; - -import static org.apache.ignite.internal.portable.GridPortableMarshaller.BOOLEAN; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.BOOLEAN_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.BYTE; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.BYTE_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.CHAR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.CHAR_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.CLASS; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.COL; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.DATE; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.DATE_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.DECIMAL; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.DECIMAL_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.ENUM; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.ENUM_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.INT; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.INT_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.LONG; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.LONG_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.MAP; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.MAP_ENTRY; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.OBJ; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.OBJ_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.PORTABLE_OBJ; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.TIMESTAMP; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.TIMESTAMP_ARR; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID_ARR; - -/** - * Portable processor implementation. - */ -public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessorImpl implements - CacheObjectPortableProcessor { - /** */ - public static final String[] FIELD_TYPE_NAMES; - - /** */ - private static final Unsafe UNSAFE = GridUnsafe.unsafe(); - - /** */ - private final CountDownLatch startLatch = new CountDownLatch(1); - - /** */ - private final boolean clientNode; - - /** */ - private volatile IgniteCacheProxy metaDataCache; - - /** */ - private final ConcurrentHashMap8 clientMetaDataCache; - - /** Predicate to filter portable meta data in utility cache. */ - private final CacheEntryPredicate metaPred = new CacheEntryPredicateAdapter() { - private static final long serialVersionUID = 0L; - - @Override public boolean apply(GridCacheEntryEx e) { - return e.key().value(e.context().cacheObjectContext(), false) instanceof PortableMetaDataKey; - } - }; - - /** */ - private PortableContext portableCtx; - - /** */ - private Marshaller marsh; - - /** */ - private GridPortableMarshaller portableMarsh; - - /** */ - @GridToStringExclude - private IgnitePortables portables; - - /** Metadata updates collected before metadata cache is initialized. */ - private final Map metaBuf = new ConcurrentHashMap<>(); - - /** */ - private UUID metaCacheQryId; - - /** - * - */ - static { - FIELD_TYPE_NAMES = new String[104]; - - FIELD_TYPE_NAMES[BYTE] = "byte"; - FIELD_TYPE_NAMES[SHORT] = "short"; - FIELD_TYPE_NAMES[INT] = "int"; - FIELD_TYPE_NAMES[LONG] = "long"; - FIELD_TYPE_NAMES[BOOLEAN] = "boolean"; - FIELD_TYPE_NAMES[FLOAT] = "float"; - FIELD_TYPE_NAMES[DOUBLE] = "double"; - FIELD_TYPE_NAMES[CHAR] = "char"; - FIELD_TYPE_NAMES[UUID] = "UUID"; - FIELD_TYPE_NAMES[DECIMAL] = "decimal"; - FIELD_TYPE_NAMES[STRING] = "String"; - FIELD_TYPE_NAMES[DATE] = "Date"; - FIELD_TYPE_NAMES[TIMESTAMP] = "Timestamp"; - FIELD_TYPE_NAMES[ENUM] = "Enum"; - FIELD_TYPE_NAMES[OBJ] = "Object"; - FIELD_TYPE_NAMES[PORTABLE_OBJ] = "Object"; - FIELD_TYPE_NAMES[COL] = "Collection"; - FIELD_TYPE_NAMES[MAP] = "Map"; - FIELD_TYPE_NAMES[MAP_ENTRY] = "Entry"; - FIELD_TYPE_NAMES[CLASS] = "Class"; - FIELD_TYPE_NAMES[BYTE_ARR] = "byte[]"; - FIELD_TYPE_NAMES[SHORT_ARR] = "short[]"; - FIELD_TYPE_NAMES[INT_ARR] = "int[]"; - FIELD_TYPE_NAMES[LONG_ARR] = "long[]"; - FIELD_TYPE_NAMES[BOOLEAN_ARR] = "boolean[]"; - FIELD_TYPE_NAMES[FLOAT_ARR] = "float[]"; - FIELD_TYPE_NAMES[DOUBLE_ARR] = "double[]"; - FIELD_TYPE_NAMES[CHAR_ARR] = "char[]"; - FIELD_TYPE_NAMES[UUID_ARR] = "UUID[]"; - FIELD_TYPE_NAMES[DECIMAL_ARR] = "decimal[]"; - FIELD_TYPE_NAMES[STRING_ARR] = "String[]"; - FIELD_TYPE_NAMES[DATE_ARR] = "Date[]"; - FIELD_TYPE_NAMES[TIMESTAMP_ARR] = "Timestamp[]"; - FIELD_TYPE_NAMES[OBJ_ARR] = "Object[]"; - FIELD_TYPE_NAMES[ENUM_ARR] = "Enum[]"; - } - - /** - * @param typeName Field type name. - * @return Field type ID; - */ - @SuppressWarnings("StringEquality") - public static int fieldTypeId(String typeName) { - for (int i = 0; i < FIELD_TYPE_NAMES.length; i++) { - String typeName0 = FIELD_TYPE_NAMES[i]; - - if (typeName.equals(typeName0)) - return i; - } - - throw new IllegalArgumentException("Invalid metadata type name: " + typeName); - } - - /** - * @param typeId Field type ID. - * @return Field type name. - */ - public static String fieldTypeName(int typeId) { - assert typeId >= 0 && typeId < FIELD_TYPE_NAMES.length : typeId; - - String typeName = FIELD_TYPE_NAMES[typeId]; - - assert typeName != null : typeId; - - return typeName; - } - - /** - * @param typeIds Field type IDs. - * @return Field type names. - */ - public static Map fieldTypeNames(Map typeIds) { - Map names = U.newHashMap(typeIds.size()); - - for (Map.Entry e : typeIds.entrySet()) - names.put(e.getKey(), fieldTypeName(e.getValue())); - - return names; - } - - /** - * @param ctx Kernal context. - */ - public CacheObjectPortableProcessorImpl(GridKernalContext ctx) { - super(ctx); - - marsh = ctx.grid().configuration().getMarshaller(); - - clientNode = this.ctx.clientNode(); - - clientMetaDataCache = clientNode ? new ConcurrentHashMap8() : null; - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (marsh instanceof PortableMarshaller) { - PortableMetaDataHandler metaHnd = new PortableMetaDataHandler() { - @Override public void addMeta(int typeId, PortableMetadata newMeta) - throws PortableException { - if (metaDataCache == null) { - PortableMetadata oldMeta = metaBuf.get(typeId); - - if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) { - synchronized (this) { - Map fields = new HashMap<>(); - - if (checkMeta(typeId, oldMeta, newMeta, fields)) { - newMeta = new PortableMetaDataImpl(newMeta.typeName(), - fields, - newMeta.affinityKeyFieldName()); - - metaBuf.put(typeId, newMeta); - } - else - return; - } - - if (metaDataCache == null) - return; - else - metaBuf.remove(typeId); - } - else - return; - } - - CacheObjectPortableProcessorImpl.this.addMeta(typeId, newMeta); - } - - @Override public PortableMetadata metadata(int typeId) throws PortableException { - if (metaDataCache == null) - U.awaitQuiet(startLatch); - - return CacheObjectPortableProcessorImpl.this.metadata(typeId); - } - }; - - PortableMarshaller pMarh0 = (PortableMarshaller)marsh; - - portableCtx = new PortableContext(metaHnd, ctx.gridName()); - - IgniteUtils.invoke(PortableMarshaller.class, pMarh0, "setPortableContext", portableCtx); - - portableMarsh = new GridPortableMarshaller(portableCtx); - - portables = new IgnitePortablesImpl(ctx, this); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void onUtilityCacheStarted() throws IgniteCheckedException { - metaDataCache = ctx.cache().jcache(CU.UTILITY_CACHE_NAME); - - if (clientNode) { - assert !metaDataCache.context().affinityNode(); - - metaCacheQryId = metaDataCache.context().continuousQueries().executeInternalQuery( - new MetaDataEntryListener(), - new MetaDataEntryFilter(), - false, - true); - - while (true) { - ClusterNode oldestSrvNode = - CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE); - - if (oldestSrvNode == null) - break; - - GridCacheQueryManager qryMgr = metaDataCache.context().queries(); - - CacheQuery> qry = - qryMgr.createScanQuery(new MetaDataPredicate(), null, false); - - qry.keepAll(false); - - qry.projection(ctx.cluster().get().forNode(oldestSrvNode)); - - try { - CacheQueryFuture> fut = qry.execute(); - - Map.Entry next; - - while ((next = fut.next()) != null) { - assert next.getKey() != null : next; - assert next.getValue() != null : next; - - addClientCacheMetaData(next.getKey(), next.getValue()); - } - } - catch (IgniteCheckedException e) { - if (!ctx.discovery().alive(oldestSrvNode) || !ctx.discovery().pingNode(oldestSrvNode.id())) - continue; - else - throw e; - } - catch (CacheException e) { - if (X.hasCause(e, ClusterTopologyCheckedException.class, ClusterTopologyException.class)) - continue; - else - throw e; - } - - break; - } - } - - startLatch.countDown(); - - for (Map.Entry e : metaBuf.entrySet()) - addMeta(e.getKey(), e.getValue()); - - metaBuf.clear(); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - if (metaCacheQryId != null) - metaDataCache.context().continuousQueries().cancelInternalQuery(metaCacheQryId); - } - - /** - * @param key Metadata key. - * @param newMeta Metadata. - */ - private void addClientCacheMetaData(PortableMetaDataKey key, final PortableMetadata newMeta) { - clientMetaDataCache.compute(key, - new ConcurrentHashMap8.BiFun() { - @Override public PortableMetadata apply(PortableMetaDataKey key, PortableMetadata oldMeta) { - PortableMetadata res; - - try { - res = checkMeta(key.typeId(), oldMeta, newMeta, null) ? newMeta : oldMeta; - } - catch (PortableException e) { - res = oldMeta; - } - - return res; - } - } - ); - } - - /** {@inheritDoc} */ - @Override public int typeId(String typeName) { - return portableCtx.typeId(typeName); - } - - /** - * @param obj Object. - * @return Bytes. - * @throws PortableException If failed. - */ - public byte[] marshal(@Nullable Object obj) throws PortableException { - byte[] arr = portableMarsh.marshal(obj); - - assert arr.length > 0; - - return arr; - } - - /** - * @param ptr Off-heap pointer. - * @param forceHeap If {@code true} creates heap-based object. - * @return Object. - * @throws PortableException If failed. - */ - public Object unmarshal(long ptr, boolean forceHeap) throws PortableException { - assert ptr > 0 : ptr; - - int size = UNSAFE.getInt(ptr); - - ptr += 4; - - byte type = UNSAFE.getByte(ptr++); - - if (type != CacheObject.TYPE_BYTE_ARR) { - assert size > 0 : size; - - PortableInputStream in = new PortableOffheapInputStream(ptr, size, forceHeap); - - return portableMarsh.unmarshal(in); - } - else - return U.copyMemory(ptr, size); - } - - /** {@inheritDoc} */ - @Override public Object marshalToPortable(@Nullable Object obj) throws PortableException { - if (obj == null) - return null; - - if (PortableUtils.isPortableType(obj.getClass())) - return obj; - - if (obj instanceof Object[]) { - Object[] arr = (Object[])obj; - - Object[] pArr = new Object[arr.length]; - - for (int i = 0; i < arr.length; i++) - pArr[i] = marshalToPortable(arr[i]); - - return pArr; - } - - if (obj instanceof Collection) { - Collection col = (Collection)obj; - - Collection pCol; - - if (col instanceof Set) - pCol = (Collection)PortableUtils.newSet((Set)col); - else - pCol = new ArrayList<>(col.size()); - - for (Object item : col) - pCol.add(marshalToPortable(item)); - - return pCol; - } - - if (obj instanceof Map) { - Map map = (Map)obj; - - Map pMap = PortableUtils.newMap((Map)obj); - - for (Map.Entry e : map.entrySet()) - pMap.put(marshalToPortable(e.getKey()), marshalToPortable(e.getValue())); - - return pMap; - } - - if (obj instanceof Map.Entry) { - Map.Entry e = (Map.Entry)obj; - - return new GridMapEntry<>(marshalToPortable(e.getKey()), marshalToPortable(e.getValue())); - } - - byte[] arr = portableMarsh.marshal(obj); - - assert arr.length > 0; - - Object obj0 = portableMarsh.unmarshal(arr, null); - - assert obj0 instanceof PortableObject; - - ((PortableObjectImpl)obj0).detachAllowed(true); - - return obj0; - } - - /** - * @return Marshaller. - */ - public GridPortableMarshaller marshaller() { - return portableMarsh; - } - - /** {@inheritDoc} */ - @Override public PortableBuilder builder(int typeId) { - return new PortableBuilderImpl(portableCtx, typeId); - } - - /** {@inheritDoc} */ - @Override public PortableBuilder builder(String clsName) { - return new PortableBuilderImpl(portableCtx, clsName); - } - - /** {@inheritDoc} */ - @Override public PortableBuilder builder(PortableObject portableObj) { - return PortableBuilderImpl.wrap(portableObj); - } - - /** {@inheritDoc} */ - @Override public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName, - Map fieldTypeIds) throws PortableException { - portableCtx.updateMetaData(typeId, - new PortableMetaDataImpl(typeName, fieldTypeNames(fieldTypeIds), affKeyFieldName)); - } - - /** {@inheritDoc} */ - @Override public void addMeta(final int typeId, final PortableMetadata newMeta) throws PortableException { - assert newMeta != null; - - final PortableMetaDataKey key = new PortableMetaDataKey(typeId); - - try { - PortableMetadata oldMeta = metaDataCache.localPeek(key); - - if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) { - PortableException err = metaDataCache.invoke(key, new MetaDataProcessor(typeId, newMeta)); - - if (err != null) - throw err; - } - } - catch (CacheException e) { - throw new PortableException("Failed to update meta data for type: " + newMeta.typeName(), e); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public PortableMetadata metadata(final int typeId) throws PortableException { - try { - if (clientNode) - return clientMetaDataCache.get(new PortableMetaDataKey(typeId)); - - return metaDataCache.localPeek(new PortableMetaDataKey(typeId)); - } - catch (CacheException e) { - throw new PortableException(e); - } - } - - /** {@inheritDoc} */ - @Override public Map metadata(Collection typeIds) - throws PortableException { - try { - Collection keys = new ArrayList<>(typeIds.size()); - - for (Integer typeId : typeIds) - keys.add(new PortableMetaDataKey(typeId)); - - Map meta = metaDataCache.getAll(keys); - - Map res = U.newHashMap(meta.size()); - - for (Map.Entry e : meta.entrySet()) - res.put(e.getKey().typeId(), e.getValue()); - - return res; - } - catch (CacheException e) { - throw new PortableException(e); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public Collection metadata() throws PortableException { - if (clientNode) - return new ArrayList<>(clientMetaDataCache.values()); - - return F.viewReadOnly(metaDataCache.entrySetx(metaPred), - new C1, PortableMetadata>() { - private static final long serialVersionUID = 0L; - - @Override public PortableMetadata apply( - Cache.Entry e) { - return e.getValue(); - } - }); - } - - /** {@inheritDoc} */ - @Override public IgnitePortables portables() throws IgniteException { - return portables; - } - - /** {@inheritDoc} */ - @Override public boolean isPortableObject(Object obj) { - return obj instanceof PortableObject; - } - - /** {@inheritDoc} */ - @Override public boolean isPortableEnabled(CacheConfiguration ccfg) { - return marsh instanceof PortableMarshaller; - } - - /** - * @param po Portable object. - * @return Affinity key. - */ - public Object affinityKey(PortableObject po) { - try { - PortableMetadata meta = po.metaData(); - - if (meta != null) { - String affKeyFieldName = meta.affinityKeyFieldName(); - - if (affKeyFieldName != null) - return po.field(affKeyFieldName); - } - } - catch (PortableException e) { - U.error(log, "Failed to get affinity field from portable object: " + po, e); - } - - return po; - } - - /** {@inheritDoc} */ - @Override public int typeId(Object obj) { - if (obj == null) - return 0; - - return isPortableObject(obj) ? ((PortableObject)obj).typeId() : typeId(obj.getClass().getSimpleName()); - } - - /** {@inheritDoc} */ - @Override public Object field(Object obj, String fieldName) { - if (obj == null) - return null; - - return isPortableObject(obj) ? ((PortableObject)obj).field(fieldName) : super.field(obj, fieldName); - } - - /** {@inheritDoc} */ - @Override public boolean hasField(Object obj, String fieldName) { - return obj != null && ((PortableObject)obj).hasField(fieldName); - } - - /** - * @return Portable context. - */ - public PortableContext portableContext() { - return portableCtx; - } - - /** {@inheritDoc} */ - @Override public CacheObjectContext contextForCache(CacheConfiguration cfg) throws IgniteCheckedException { - assert cfg != null; - - boolean portableEnabled = marsh instanceof PortableMarshaller && !GridCacheUtils.isSystemCache(cfg.getName()) && - !GridCacheUtils.isIgfsCache(ctx.config(), cfg.getName()); - - CacheObjectContext ctx0 = super.contextForCache(cfg); - - CacheObjectContext res = new CacheObjectPortableContext(ctx, - ctx0.copyOnGet(), - ctx0.storeValue(), - portableEnabled, - ctx0.addDeploymentInfo()); - - ctx.resource().injectGeneric(res.defaultAffMapper()); - - return res; - } - - /** {@inheritDoc} */ - @Override public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException { - if (!((CacheObjectPortableContext)ctx).portableEnabled() || portableMarsh == null) - return super.marshal(ctx, val); - - byte[] arr = portableMarsh.marshal(val); - - assert arr.length > 0; - - return arr; - } - - /** {@inheritDoc} */ - @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr) - throws IgniteCheckedException { - if (!((CacheObjectPortableContext)ctx).portableEnabled() || portableMarsh == null) - return super.unmarshal(ctx, bytes, clsLdr); - - return portableMarsh.unmarshal(bytes, clsLdr); - } - - /** {@inheritDoc} */ - @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) { - if (!((CacheObjectPortableContext)ctx).portableEnabled()) - return super.toCacheKeyObject(ctx, obj, userObj); - - if (obj instanceof KeyCacheObject) - return (KeyCacheObject)obj; - - if (((CacheObjectPortableContext)ctx).portableEnabled()) { - obj = toPortable(obj); - - if (obj instanceof PortableObject) - return (PortableObjectImpl)obj; - } - - return toCacheKeyObject0(obj, userObj); - } - - /** {@inheritDoc} */ - @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, - boolean userObj) { - if (!((CacheObjectPortableContext)ctx).portableEnabled()) - return super.toCacheObject(ctx, obj, userObj); - - if (obj == null || obj instanceof CacheObject) - return (CacheObject)obj; - - obj = toPortable(obj); - - if (obj instanceof PortableObject) - return (PortableObjectImpl)obj; - - return toCacheObject0(obj, userObj); - } - - /** {@inheritDoc} */ - @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) { - if (type == PortableObjectImpl.TYPE_PORTABLE) - return new PortableObjectImpl(portableContext(), bytes, 0); - - return super.toCacheObject(ctx, type, bytes); - } - - /** {@inheritDoc} */ - @Override public CacheObject toCacheObject(GridCacheContext ctx, long valPtr, boolean tmp) - throws IgniteCheckedException { - if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled()) - return super.toCacheObject(ctx, valPtr, tmp); - - Object val = unmarshal(valPtr, !tmp); - - if (val instanceof PortableObjectOffheapImpl) - return (PortableObjectOffheapImpl)val; - - return new CacheObjectImpl(val, null); - } - - /** {@inheritDoc} */ - @Override public Object unwrapTemporary(GridCacheContext ctx, Object obj) throws PortableException { - if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled()) - return obj; - - if (obj instanceof PortableObjectOffheapImpl) - return ((PortableObjectOffheapImpl)obj).heapCopy(); - - return obj; - } - - /** - * @param obj Object. - * @return Portable object. - * @throws IgniteException In case of error. - */ - @Nullable public Object toPortable(@Nullable Object obj) throws IgniteException { - if (obj == null) - return null; - - if (isPortableObject(obj)) - return obj; - - return marshalToPortable(obj); - } - - /** - * @param typeId Type ID. - * @param oldMeta Old meta. - * @param newMeta New meta. - * @param fields Fields map. - * @return Whether meta is changed. - * @throws PortableException In case of error. - */ - private static boolean checkMeta(int typeId, @Nullable PortableMetadata oldMeta, - PortableMetadata newMeta, @Nullable Map fields) throws PortableException { - assert newMeta != null; - - Map oldFields = oldMeta != null ? ((PortableMetaDataImpl)oldMeta).fieldsMeta() : null; - Map newFields = ((PortableMetaDataImpl)newMeta).fieldsMeta(); - - boolean changed = false; - - if (oldMeta != null) { - if (!oldMeta.typeName().equals(newMeta.typeName())) { - throw new PortableException( - "Two portable types have duplicate type ID [" + - "typeId=" + typeId + - ", typeName1=" + oldMeta.typeName() + - ", typeName2=" + newMeta.typeName() + - ']' - ); - } - - if (!F.eq(oldMeta.affinityKeyFieldName(), newMeta.affinityKeyFieldName())) { - throw new PortableException( - "Portable type has different affinity key fields on different clients [" + - "typeName=" + newMeta.typeName() + - ", affKeyFieldName1=" + oldMeta.affinityKeyFieldName() + - ", affKeyFieldName2=" + newMeta.affinityKeyFieldName() + - ']' - ); - } - - if (fields != null) - fields.putAll(oldFields); - } - else - changed = true; - - for (Map.Entry e : newFields.entrySet()) { - String typeName = oldFields != null ? oldFields.get(e.getKey()) : null; - - if (typeName != null) { - if (!typeName.equals(e.getValue())) { - throw new PortableException( - "Portable field has different types on different clients [" + - "typeName=" + newMeta.typeName() + - ", fieldName=" + e.getKey() + - ", fieldTypeName1=" + typeName + - ", fieldTypeName2=" + e.getValue() + - ']' - ); - } - } - else { - if (fields != null) - fields.put(e.getKey(), e.getValue()); - - changed = true; - } - } - - return changed; - } - - /** - */ - private static class MetaDataProcessor implements - EntryProcessor, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private int typeId; - - /** */ - private PortableMetadata newMeta; - - /** - * For {@link Externalizable}. - */ - public MetaDataProcessor() { - // No-op. - } - - /** - * @param typeId Type ID. - * @param newMeta New metadata. - */ - private MetaDataProcessor(int typeId, PortableMetadata newMeta) { - assert newMeta != null; - - this.typeId = typeId; - this.newMeta = newMeta; - } - - /** {@inheritDoc} */ - @Override public PortableException process( - MutableEntry entry, - Object... args) { - try { - PortableMetadata oldMeta = entry.getValue(); - - Map fields = new HashMap<>(); - - if (checkMeta(typeId, oldMeta, newMeta, fields)) { - PortableMetadata res = new PortableMetaDataImpl(newMeta.typeName(), - fields, - newMeta.affinityKeyFieldName()); - - entry.setValue(res); - - return null; - } - else - return null; - } - catch (PortableException e) { - return e; - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(typeId); - out.writeObject(newMeta); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - typeId = in.readInt(); - newMeta = (PortableMetadata)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MetaDataProcessor.class, this); - } - } - - /** - * - */ - class MetaDataEntryListener implements CacheEntryUpdatedListener { - /** {@inheritDoc} */ - @Override public void onUpdated( - Iterable> evts) - throws CacheEntryListenerException { - for (CacheEntryEvent evt : evts) { - assert evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED : evt; - - PortableMetaDataKey key = evt.getKey(); - - final PortableMetadata newMeta = evt.getValue(); - - assert newMeta != null : evt; - - addClientCacheMetaData(key, newMeta); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MetaDataEntryListener.class, this); - } - } - - /** - * - */ - static class MetaDataEntryFilter implements CacheEntryEventSerializableFilter { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException { - return evt.getKey() instanceof PortableMetaDataKey; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MetaDataEntryFilter.class, this); - } - } - - /** - * - */ - static class MetaDataPredicate implements IgniteBiPredicate { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public boolean apply(Object key, Object val) { - return key instanceof PortableMetaDataKey; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MetaDataPredicate.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgniteBinaryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgniteBinaryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgniteBinaryImpl.java new file mode 100644 index 0000000..6a93a53 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgniteBinaryImpl.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable; + +import java.util.Collection; +import org.apache.ignite.IgniteBinary; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.binary.BinaryObject; +import org.jetbrains.annotations.Nullable; + +/** + * {@link org.apache.ignite.IgniteBinary} implementation. + */ +public class IgniteBinaryImpl implements IgniteBinary { + /** */ + private GridKernalContext ctx; + + /** */ + private CacheObjectBinaryProcessor proc; + + /** + * @param ctx Context. + */ + public IgniteBinaryImpl(GridKernalContext ctx, CacheObjectBinaryProcessor proc) { + this.ctx = ctx; + + this.proc = proc; + } + + /** {@inheritDoc} */ + @Override public int typeId(String typeName) { + guard(); + + try { + return proc.typeId(typeName); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public T toBinary(@Nullable Object obj) throws BinaryObjectException { + guard(); + + try { + return (T)proc.marshalToPortable(obj); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public BinaryObjectBuilder builder(int typeId) { + guard(); + + try { + return proc.builder(typeId); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public BinaryObjectBuilder builder(String typeName) { + guard(); + + try { + return proc.builder(typeName); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public BinaryObjectBuilder builder(BinaryObject portableObj) { + guard(); + + try { + return proc.builder(portableObj); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public BinaryType metadata(Class cls) throws BinaryObjectException { + guard(); + + try { + return proc.metadata(proc.typeId(cls.getName())); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public BinaryType metadata(String typeName) throws BinaryObjectException { + guard(); + + try { + return proc.metadata(proc.typeId(typeName)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public BinaryType metadata(int typeId) throws BinaryObjectException { + guard(); + + try { + return proc.metadata(typeId); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public Collection metadata() throws BinaryObjectException { + guard(); + + try { + return proc.metadata(); + } + finally { + unguard(); + } + } + + /** + * @return Portable processor. + */ + public IgniteCacheObjectProcessor processor() { + return proc; + } + + /** + * ctx.gateway().readLock() + */ + private void guard() { + ctx.gateway().readLock(); + } + + /** + * ctx.gateway().readUnlock() + */ + private void unguard() { + ctx.gateway().readUnlock(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgnitePortablesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgnitePortablesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgnitePortablesImpl.java deleted file mode 100644 index 5ed6505..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/IgnitePortablesImpl.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.portable; - -import java.util.Collection; -import org.apache.ignite.IgnitePortables; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; -import org.apache.ignite.portable.PortableBuilder; -import org.apache.ignite.portable.PortableException; -import org.apache.ignite.portable.PortableMetadata; -import org.apache.ignite.portable.PortableObject; -import org.jetbrains.annotations.Nullable; - -/** - * {@link IgnitePortables} implementation. - */ -public class IgnitePortablesImpl implements IgnitePortables { - /** */ - private GridKernalContext ctx; - - /** */ - private CacheObjectPortableProcessor proc; - - /** - * @param ctx Context. - */ - public IgnitePortablesImpl(GridKernalContext ctx, CacheObjectPortableProcessor proc) { - this.ctx = ctx; - - this.proc = proc; - } - - /** {@inheritDoc} */ - @Override public int typeId(String typeName) { - guard(); - - try { - return proc.typeId(typeName); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public T toPortable(@Nullable Object obj) throws PortableException { - guard(); - - try { - return (T)proc.marshalToPortable(obj); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public PortableBuilder builder(int typeId) { - guard(); - - try { - return proc.builder(typeId); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public PortableBuilder builder(String typeName) { - guard(); - - try { - return proc.builder(typeName); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public PortableBuilder builder(PortableObject portableObj) { - guard(); - - try { - return proc.builder(portableObj); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public PortableMetadata metadata(Class cls) throws PortableException { - guard(); - - try { - return proc.metadata(proc.typeId(cls.getName())); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public PortableMetadata metadata(String typeName) throws PortableException { - guard(); - - try { - return proc.metadata(proc.typeId(typeName)); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public PortableMetadata metadata(int typeId) throws PortableException { - guard(); - - try { - return proc.metadata(typeId); - } - finally { - unguard(); - } - } - - /** {@inheritDoc} */ - @Override public Collection metadata() throws PortableException { - guard(); - - try { - return proc.metadata(); - } - finally { - unguard(); - } - } - - /** - * @return Portable processor. - */ - public IgniteCacheObjectProcessor processor() { - return proc; - } - - /** - * ctx.gateway().readLock() - */ - private void guard() { - ctx.gateway().readLock(); - } - - /** - * ctx.gateway().readUnlock() - */ - private void unguard() { - ctx.gateway().readUnlock(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 1c8107e..58a8424 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -907,7 +907,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte CacheObject cacheVal = entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null; - val = cacheVal != null ? (V)cacheVal.value(cctx.cacheObjectContext(), false) : null; + // TODO 950 nocopy + val = (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(cacheVal, qry.keepPortable()); } catch (GridCacheEntryRemovedException e) { val = null; @@ -1093,7 +1094,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte next = null; while (it.hasNext()) { - final LazySwapEntry e = new LazySwapEntry(it.next()); + final LazySwapEntry e = new LazySwapEntry(it.next(), keepPortable); if (filter != null) { K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable); @@ -2510,11 +2511,15 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte /** */ private final Map.Entry e; + /** */ + private boolean keepBinary; + /** * @param e Entry with */ - LazySwapEntry(Map.Entry e) { + LazySwapEntry(Map.Entry e, boolean keepBinary) { this.e = e; + this.keepBinary = keepBinary; } /** {@inheritDoc} */ @@ -2529,7 +2534,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); - return obj.value(cctx.cacheObjectContext(), false); + return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(obj, keepBinary); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index c7feda4..fa2d6b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -165,7 +165,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache * @param incBackups {@code true} if need to include backups. * @param fields Fields query flag. * @param all Whether to load all pages. - * @param keepPortable Whether to keep portables. + * @param keepPortable Whether to keep binary. * @param subjId Subject ID. * @param taskHash Task name hash code. * @param topVer Topology version. @@ -452,7 +452,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache } /** - * @return Whether to keep portables. + * @return Whether to keep binary. */ public boolean keepPortable() { return keepPortable; http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index 7417138..d26be5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -21,7 +21,6 @@ import javax.cache.Cache; import javax.cache.event.CacheEntryEvent; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; /** @@ -60,18 +59,18 @@ class CacheContinuousQueryEvent extends CacheEntryEvent { /** {@inheritDoc} */ @Override public K getKey() { - return e.key().value(cctx.cacheObjectContext(), false); + return (K)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.key(), true, false); } /** {@inheritDoc} */ @Override public V getValue() { - return CU.value(e.value(), cctx, false); + return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.value(), true, false); } /** {@inheritDoc} */ @Override public V getOldValue() { - return CU.value(e.oldValue(), cctx, false); + return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.oldValue(), true, false); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 174e1ce..f48ff38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.NotNull; @@ -269,10 +270,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt // Never load internal keys from store as they are never persisted. return null; - Object storeKey = key.value(cctx.cacheObjectContext(), false); - - if (convertPortable()) - storeKey = cctx.unwrapPortableIfNeeded(storeKey, false); + Object storeKey = cctx.unwrapPortableIfNeeded(key, !convertPortable()); if (log.isDebugEnabled()) log.debug("Loading value from store for key: " + storeKey); @@ -396,22 +394,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt return; } - Collection keys0; - - if (convertPortable()) { - keys0 = F.viewReadOnly(keys, new C1() { - @Override public Object apply(KeyCacheObject key) { - return cctx.unwrapPortableIfNeeded(key.value(cctx.cacheObjectContext(), false), false); - } - }); - } - else { - keys0 = F.viewReadOnly(keys, new C1() { + Collection keys0 = F.viewReadOnly(keys, + new C1() { @Override public Object apply(KeyCacheObject key) { - return key.value(cctx.cacheObjectContext(), false); + return cctx.unwrapPortableIfNeeded(key, !convertPortable()); } }); - } if (log.isDebugEnabled()) log.debug("Loading values from store for keys: " + keys0); @@ -532,10 +520,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt if (key instanceof GridCacheInternal) return true; - if (convertPortable()) { - key = cctx.unwrapPortableIfNeeded(key, false); - val = cctx.unwrapPortableIfNeeded(val, false); - } + key = cctx.unwrapPortableIfNeeded(key, !convertPortable()); + val = cctx.unwrapPortableIfNeeded(val, !convertPortable()); if (log.isDebugEnabled()) log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']'); @@ -637,8 +623,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt if (key instanceof GridCacheInternal) return false; - if (convertPortable()) - key = cctx.unwrapPortableIfNeeded(key, false); + key = cctx.unwrapPortableIfNeeded(key, !convertPortable()); if (log.isDebugEnabled()) log.debug("Removing value from cache store [key=" + key + ']'); @@ -686,7 +671,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } if (store != null) { - Collection keys0 = convertPortable() ? cctx.unwrapPortablesIfNeeded(keys, false) : keys; + Collection keys0 = cctx.unwrapPortablesIfNeeded(keys, !convertPortable()); if (log.isDebugEnabled()) log.debug("Removing values from cache store [keys=" + keys0 + ']'); @@ -773,7 +758,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt assert e != null; if (e.getMessage() != null) { - throw new IgniteCheckedException("Cache store must work with portable objects if portables are " + + throw new IgniteCheckedException("Cache store must work with portable objects if binary are " + "enabled for cache [cacheName=" + cctx.namex() + ']', e); } else @@ -1093,15 +1078,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt Object k = e.getKey(); - if (rmvd != null && rmvd.contains(k)) - continue; - Object v = locStore ? e.getValue() : e.getValue().get1(); - if (convertPortable()) { - k = cctx.unwrapPortableIfNeeded(k, false); - v = cctx.unwrapPortableIfNeeded(v, false); - } + k = cctx.unwrapPortableIfNeeded(k, !convertPortable()); + v = cctx.unwrapPortableIfNeeded(v, !convertPortable()); + + if (rmvd != null && rmvd.contains(k)) + continue; next = new CacheEntryImpl<>(k, v); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 716676f..76f2f77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -142,8 +142,13 @@ public class IgniteTransactionsImpl implements IgniteTransactionsEx { * @return Transaction. */ @SuppressWarnings("unchecked") - private IgniteInternalTx txStart0(TransactionConcurrency concurrency, TransactionIsolation isolation, - long timeout, int txSize, @Nullable GridCacheContext sysCacheCtx) { + private IgniteInternalTx txStart0( + TransactionConcurrency concurrency, + TransactionIsolation isolation, + long timeout, + int txSize, + @Nullable GridCacheContext sysCacheCtx + ) { cctx.kernalContext().gateway().readLock(); try { @@ -152,7 +157,6 @@ public class IgniteTransactionsImpl implements IgniteTransactionsEx { if (tx != null) throw new IllegalStateException("Failed to start new transaction " + "(current thread already has a transaction): " + tx); - tx = cctx.tm().newTx( false, false, http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 1c82636..4103af2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1286,7 +1286,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /*subjId*/subjId, /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null, resolveTaskName(), - null); + null, + txEntry.keepBinary()); boolean modified = false; @@ -1310,7 +1311,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter for (T2, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry invokeEntry = new CacheInvokeEntry(txEntry.context(), - txEntry.key(), key, cacheVal, val, ver); + txEntry.key(), key, cacheVal, val, ver, txEntry.keepBinary()); try { EntryProcessor processor = t.get1(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 9eb2808..60ff33c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -54,6 +54,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK; /** @@ -178,6 +179,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** * Additional flags. * GridCacheUtils.SKIP_STORE_FLAG_MASK - for skipStore flag value. + * GridCacheUtils.KEEP_BINARY_FLAG_MASK - for withKeepBinary flag. */ private byte flags; @@ -212,7 +214,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { long conflictExpireTime, GridCacheEntryEx entry, @Nullable GridCacheVersion conflictVer, - boolean skipStore) { + boolean skipStore, + boolean keepBinary + ) { assert ctx != null; assert tx != null; assert op != null; @@ -227,6 +231,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { this.conflictVer = conflictVer; skipStore(skipStore); + keepBinary(keepBinary); key = entry.key(); @@ -258,7 +263,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { GridCacheEntryEx entry, CacheEntryPredicate[] filters, GridCacheVersion conflictVer, - boolean skipStore) { + boolean skipStore, + boolean keepBinary + ) { assert ctx != null; assert tx != null; assert op != null; @@ -273,6 +280,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { this.conflictVer = conflictVer; skipStore(skipStore); + keepBinary(keepBinary); if (entryProcessor != null) addEntryProcessor(entryProcessor, invokeArgs); @@ -436,14 +444,50 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @param skipStore Skip store flag. */ public void skipStore(boolean skipStore){ - flags = skipStore ? (byte)(flags | SKIP_STORE_FLAG_MASK) : (byte)(flags & ~SKIP_STORE_FLAG_MASK); + setFlag(skipStore, SKIP_STORE_FLAG_MASK); } /** * @return Skip store flag. */ public boolean skipStore() { - return (flags & SKIP_STORE_FLAG_MASK) == 1; + return isFlag(SKIP_STORE_FLAG_MASK); + } + + /** + * Sets keep binary flag value. + * + * @param keepBinary Keep binary flag value. + */ + public void keepBinary(boolean keepBinary) { + setFlag(keepBinary, KEEP_BINARY_FLAG_MASK); + } + + /** + * @return Keep binary flag value. + */ + public boolean keepBinary() { + return isFlag(KEEP_BINARY_FLAG_MASK); + } + + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reads flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + private boolean isFlag(int mask) { + return (flags & mask) != 0; } /** @@ -616,7 +660,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { for (T2, Object[]> t : entryProcessors()) { try { CacheInvokeEntry invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val, - ver); + ver, keepBinary()); EntryProcessor processor = t.get1();