Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DC9A518253 for ; Sun, 20 Mar 2016 03:31:37 +0000 (UTC) Received: (qmail 63820 invoked by uid 500); 20 Mar 2016 03:31:37 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 63767 invoked by uid 500); 20 Mar 2016 03:31:37 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 63756 invoked by uid 99); 20 Mar 2016 03:31:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 20 Mar 2016 03:31:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 3E64E1A0349 for ; Sun, 20 Mar 2016 03:31:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id bClA_FpSgBfp for ; Sun, 20 Mar 2016 03:31:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 7906E5F3F1 for ; Sun, 20 Mar 2016 03:31:34 +0000 (UTC) Received: (qmail 63739 invoked by uid 99); 20 Mar 2016 03:31:33 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 20 Mar 2016 03:31:33 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 6F7262C14F8 for ; Sun, 20 Mar 2016 03:31:33 +0000 (UTC) Date: Sun, 20 Mar 2016 03:31:33 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@apex.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (APEXMALHAR-1897) Create ManagedState MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/APEXMALHAR-1897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15203071#comment-15203071 ] ASF GitHub Bot commented on APEXMALHAR-1897: -------------------------------------------- Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r56758813 --- Diff: library/src/main/java/com/datatorrent/lib/state/managed/Bucket.java --- @@ -0,0 +1,534 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.state.managed; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.lib.fileaccess.FileAccess; +import com.datatorrent.netlet.util.Slice; + +/** + * A bucket that groups events. + */ +public interface Bucket extends ManagedStateComponent +{ + /** + * @return bucket id + */ + long getBucketId(); + + /** + * + * @return size of bucket in memory. + */ + long getSizeInBytes(); + + /** + * Get value of a key. + * + * @param key key. + * @param timeBucket time bucket of the key if known; -1 otherwise. + * @param source source to read from + * @return value of the key. + */ + Slice get(Slice key, long timeBucket, ReadSource source); + + /** + * Set value of a key. + * + * @param key key. + * @param timeBucket timeBucket of the key. + * @param value value of the key. + */ + void put(Slice key, long timeBucket, Slice value); + + /** + * Triggers the bucket to checkpoint. Returns the non checkpointed data so far. + * + * @return non checkpointed data. + */ + Map checkpoint(long windowId); + + /** + * Triggers the bucket to commit data till provided window id. + * + * @param windowId window id + */ + void committed(long windowId); + + /** + * Triggers bucket to free memory which is already persisted in bucket data files. + * + * @return amount of memory freed in bytes. + * @throws IOException + */ + long freeMemory() throws IOException; + + /** + * Allows the bucket to process/cache data which is recovered (from window files) after failure. + * + * @param windowId largest recovery window + * @param recoveredData recovered data + */ + void recoveredData(long windowId, Map recoveredData); + + enum ReadSource + { + MEMORY, //state in memory in key/value form + READERS, //these are streams in which the key will be searched and serialized. + ALL //both the above states. + } + + class BucketedValue + { + private long timeBucket; + private Slice value; + + protected BucketedValue() + { + } + + protected BucketedValue(long timeBucket, Slice value) + { + this.timeBucket = timeBucket; + this.value = value; + } + + protected long getTimeBucket() + { + return timeBucket; + } + + protected void setTimeBucket(long timeBucket) + { + this.timeBucket = timeBucket; + } + + public Slice getValue() + { + return value; + } + + public void setValue(Slice value) + { + this.value = value; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof BucketedValue)) { + return false; + } + + BucketedValue that = (BucketedValue)o; + + return timeBucket == that.timeBucket && value.equals(that.value); + + } + + @Override + public int hashCode() + { + return Objects.hash(timeBucket, value); + } + } + + /** + * Default bucket.
+ * Not thread-safe. + */ + class DefaultBucket implements Bucket + { + private final long bucketId; + + //Key -> Ordered values + private Map flash = Maps.newHashMap(); + + //Data persisted in write ahead logs. window -> bucket + private final transient TreeMap> checkpointedData = Maps.newTreeMap(); + + //Data persisted in bucket data files + private final transient Map committedData = Maps.newHashMap(); + + //Data recovered + private final transient TreeMap> recoveredData = Maps.newTreeMap(); --- End diff -- Yes. I think it can be. I will make the change. > Create ManagedState > ------------------- > > Key: APEXMALHAR-1897 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1897 > Project: Apache Apex Malhar > Issue Type: Sub-task > Reporter: Chandni Singh > Assignee: Chandni Singh > Fix For: 3.4.0 > > > ManagedState is described in the document below: > https://docs.google.com/document/d/1gRWN9ufKSZSZD0N-pthlhpC9TZ8KwJ6hJlAX6nxl5f8/edit#heading=h.z87ti1fwyt0t -- This message was sent by Atlassian JIRA (v6.3.4#6332)