Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0B76E200B67 for ; Tue, 2 Aug 2016 01:49:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 05652160AA7; Mon, 1 Aug 2016 23:49:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4D492160A6C for ; Tue, 2 Aug 2016 01:49:46 +0200 (CEST) Received: (qmail 21373 invoked by uid 500); 1 Aug 2016 23:49:45 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 21359 invoked by uid 99); 1 Aug 2016 23:49:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2016 23:49:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D49F01A5FB1 for ; Mon, 1 Aug 2016 23:49:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -5.446 X-Spam-Level: X-Spam-Status: No, score=-5.446 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id rRIYHOGZJ53n for ; Mon, 1 Aug 2016 23:49:43 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id A7EF85F4E9 for ; Mon, 1 Aug 2016 23:49:42 +0000 (UTC) Received: (qmail 21351 invoked by uid 99); 1 Aug 2016 23:49:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2016 23:49:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1FFCDE058E; Mon, 1 Aug 2016 23:49:42 +0000 (UTC) From: davidyan74 To: dev@apex.incubator.apache.org Reply-To: dev@apex.incubator.apache.org References: In-Reply-To: Subject: [GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only Content-Type: text/plain Message-Id: <20160801234942.1FFCDE058E@git1-us-west.apache.org> Date: Mon, 1 Aug 2016 23:49:42 +0000 (UTC) archived-at: Mon, 01 Aug 2016 23:49:47 -0000 Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73073791 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +@DefaultSerializer(FieldSerializer.class) +public class SpillableByteArrayListMultimapImpl implements Spillable.SpillableByteArrayListMultimap, + Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private transient WindowBoundedMapCache> cache = new WindowBoundedMapCache<>(); + private transient boolean isRunning = false; + private transient boolean isInWindow = false; + + private int batchSize = DEFAULT_BATCH_SIZE; + @NotNull + private SpillableByteMapImpl map; + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; + private Serde serdeValue; + + private SpillableByteArrayListMultimapImpl() + { + // for kryo + } + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue) + { + this.store = Preconditions.checkNotNull(store); + this.identifier = Preconditions.checkNotNull(identifier); + this.bucket = bucket; + this.serdeKey = Preconditions.checkNotNull(serdeKey); + this.serdeValue = Preconditions.checkNotNull(serdeValue); + + map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + public SpillableStateStore getStore() + { + return store; + } + + @Override + public List get(@Nullable K key) + { + return getHelper(key); + } + + private SpillableArrayListImpl getHelper(@Nullable K key) + { + SpillableArrayListImpl spillableArrayList = cache.get(key); + + if (spillableArrayList == null) { + Slice keySlice = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray()); --- End diff -- I see that you removed the identifier as part of the key that stores the size. Would this create a conflict with different identifiers? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---