Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C0EFC200B6A for ; Mon, 8 Aug 2016 07:28:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BDE61160AB1; Mon, 8 Aug 2016 05:28:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 10B9A160A87 for ; Mon, 8 Aug 2016 07:28:40 +0200 (CEST) Received: (qmail 24833 invoked by uid 500); 8 Aug 2016 05:28:40 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 24822 invoked by uid 99); 8 Aug 2016 05:28:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Aug 2016 05:28:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 79058C09E9 for ; Mon, 8 Aug 2016 05:28:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -5.446 X-Spam-Level: X-Spam-Status: No, score=-5.446 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 7zRs2DC3x6Sb for ; Mon, 8 Aug 2016 05:28:38 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 44DA35F230 for ; Mon, 8 Aug 2016 05:28:37 +0000 (UTC) Received: (qmail 24813 invoked by uid 99); 8 Aug 2016 05:28:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Aug 2016 05:28:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3EFD6E2C1A; Mon, 8 Aug 2016 05:28:36 +0000 (UTC) From: bhupeshchawda To: dev@apex.incubator.apache.org Reply-To: dev@apex.incubator.apache.org References: In-Reply-To: Subject: [GitHub] apex-malhar pull request #358: [review only]APEXMALHAR-2172: Updates to JDBC... Content-Type: text/plain Message-Id: <20160808052836.3EFD6E2C1A@git1-us-west.apache.org> Date: Mon, 8 Aug 2016 05:28:36 +0000 (UTC) archived-at: Mon, 08 Aug 2016 05:28:41 -0000 Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/358#discussion_r73823756 --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java --- @@ -80,201 +91,45 @@ * @tags database, sql, jdbc, partitionable,exactlyOnce */ @Evolving -public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator - implements ActivationListener, IdleTimeHandler, Partitioner> +public abstract class AbstractJdbcPollInputOperator extends AbstractStoreInputOperator implements + ActivationListener, Partitioner> { - /** - * poll interval in milliseconds - */ - private static int pollInterval = 10000; + private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024 * 1024; + private static int DEFAULT_POLL_INTERVAL = 10 * 1000; + private static int DEFAULT_FETCH_SIZE = 20000; + private static int DEFAULT_BATCH_SIZE = 2000; + private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds + private int queueCapacity = DEFAULT_QUEUE_CAPACITY; + private int fetchSize = DEFAULT_FETCH_SIZE; @Min(1) private int partitionCount = 1; - protected transient int operatorId; - protected transient boolean isReplayed; - protected transient boolean isPollable; - protected int batchSize; - protected static int fetchSize = 20000; - /** - * Map of windowId to of the range key - */ - protected transient MutablePair currentWindowRecoveryState; - - /** - * size of the emit queue used to hold polled records before emit - */ - private static int queueCapacity = 4 * 1024 * 1024; + private int batchSize = DEFAULT_BATCH_SIZE; + + @NotNull + private String tableName; + @NotNull + private String columnsExpression; + @NotNull + private String key; + private String whereCondition = null; + private long currentWindowId; + private WindowDataManager windowManager; + + protected KeyValPair rangeQueryPair; + protected Integer lowerBound; + private transient int operatorId; + private transient DSLContext create; private transient volatile boolean execute; - private transient AtomicReference cause; - protected transient int spinMillis; - private transient OperatorContext context; - protected String tableName; - protected String key; - protected long currentWindowId; - protected KeyValPair rangeQueryPair; - protected String lower; - protected String upper; - protected boolean recovered; - protected boolean isPolled; - protected String whereCondition = null; - protected String previousUpperBound; - protected String highestPolled; - private static final String user = ""; - private static final String password = ""; - /** - * thread to poll database - */ - private transient Thread dbPoller; - protected transient ArrayBlockingQueue> emitQueue; + private transient ScheduledExecutorService scanService; + protected transient boolean isPolled; + protected transient Integer lastPolledBound; + protected transient Integer lastEmittedRecord; --- End diff -- Can you make ```lowerBound```, ```lastPolledBound``` and ```lastEmittedRecord``` as ```int``` instead of ```Integer``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---