From user-return-25432-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Jan 18 11:49:03 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B7BEC180647 for ; Fri, 18 Jan 2019 11:49:02 +0100 (CET) Received: (qmail 68267 invoked by uid 500); 18 Jan 2019 10:49:01 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 68256 invoked by uid 99); 18 Jan 2019 10:49:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Jan 2019 10:49:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 038831808A3 for ; Fri, 18 Jan 2019 10:49:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.798 X-Spam-Level: * X-Spam-Status: No, score=1.798 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 4kVWzrBzltMt for ; Fri, 18 Jan 2019 10:48:59 +0000 (UTC) Received: from mail-qt1-f174.google.com (mail-qt1-f174.google.com [209.85.160.174]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id CF4D2610FF for ; Fri, 18 Jan 2019 10:48:58 +0000 (UTC) Received: by mail-qt1-f174.google.com with SMTP id l11so14739966qtp.0 for ; Fri, 18 Jan 2019 02:48:58 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=9kXNiAPymLp0jrpPgLrMk8JMt8Wyb3FQl70bJypoKy4=; b=s9Cw7OtALI54tgtP/dCBRPtMoehgFAPoF1DgOKGdxXqYq6Gr7SeWar2iX5rWP/hxx1 fzXhJ/XqyPbAjdfROAVhAeQ741a9Tp4xN+B9eEkx/ugV8erhIZ6z7Y7VO6S/mexZGWO2 y10wzI9m5qEQR5Lk/YVh78+IZsRQpBNEkoscw/hCFdGXrXyCzZRGPPGJV0YPeNY32BYl WckPKTfemjHtIqFZfji2tWf/j+43H79vc7COaTz0g8DooAD/tLHROrUDX5ASHQ99eTp2 vn3rh0Z+qJbtuhMlBZgfp0HEE/pO0mt3+xljQ+FyDzGjABAUQlpLL3L5n5q8QoL6THbG l0Aw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=9kXNiAPymLp0jrpPgLrMk8JMt8Wyb3FQl70bJypoKy4=; b=AUxa1+FhAF6QU843SUqH5xSI2Swobm/8ID7kD5a3xvGUUs/XktHRR8FEH6jRbCvlcD WTc34v0mXHX3OKPWq8kGTFOfW4EBXLy2R8WjN6LDY0UAEfwKi86ozV01c0PXi6S0U+Wd eyGjdsBO8rCmxcvvkafnT7tDhJGpbAToK3X203yA5qSX47uyLZF4JCmn6zcdy2Ae1IbG 79uTuOabrOwisnOgG+OPIFvhCOBUixmfQqRKazPHrdx0JTSekfHrIB49Qg8DebWY9/1A 6GrrP8wpl2+iinjIoeO9093JLHkei+taotm8w2jcGbK6SSosDj482GrrBgTm0BKiJaEO zHWA== X-Gm-Message-State: AJcUukcL+mktwnMGCKxEDTg4BcGPIEm7qfSIMxu8uP1vGG1UBDD4UvMz ijEbbKQZSyATxu3H3jsgx0XCJrlJUm8fsNBk77G+vQ== X-Google-Smtp-Source: ALg8bN7jUdB7WRxOFU+ZIC+R/2iqaltmRd1M0YwI8FBJLSXG7Z/vW5x2dfM8G2O/LUG3PqdyWQpGjCIDZEuuPc85En0= X-Received: by 2002:a0c:9418:: with SMTP id h24mr15172591qvh.216.1547808532236; Fri, 18 Jan 2019 02:48:52 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: madan Date: Fri, 18 Jan 2019 16:18:17 +0530 Message-ID: Subject: Re: NPE when using spring bean in custom input format To: user@flink.apache.org Content-Type: multipart/alternative; boundary="000000000000be8c0d057fb941db" --000000000000be8c0d057fb941db Content-Type: text/plain; charset="UTF-8" Suggestions please. Thinking of options 1. Initilizing spring application context in the 'open' method. Instead of loading entire context, move service related beans to one/multiple packages and scan only those packages. Requires code refactoring. 2. Direct database query - direct query cannot be used since business logic is around while fetching records 3. Write initially to csv and do transformation on csv. Last possible option. Please share your thoughts. Thank you. On Wed, Jan 16, 2019 at 2:50 PM madan wrote: > Hi, > > Need help in the below scenario, > > I have CustomInputFormat which loads the records using a bean, > > public class CustomInputFormat extends GenericInputFormat { > > private Iterator> recordsIterator; > > @Override > > public void open(GenericInputSplit split) throws IOException { > > ServiceX serviceX = SpringBeanFinder.getBean(ServiceX.class); > > recordsIterator = serviceX.getRecords(..); > > } > > } > > The above input format works fine when using Flink LocalEnvironment in > spring application. Problem is when running flink in a cluster mode and > trying to connect to it using RemoveEnvironment. Since Spring applicaiton > context will not be initialized, NPE is thrown. Please suggest what could > be the solution in this scenario. > > > > -- > Thank you, > Madan. > -- Thank you, Madan. --000000000000be8c0d057fb941db Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Suggestions please.

Th= inking of options
1. Initilizing spring application context in th= e 'open' method. Instead of loading entire context, move service re= lated beans to one/multiple packages=C2=A0 and scan only those packages. Re= quires code refactoring.
2. Direct database query - direct query = cannot be used since business logic is around while fetching records
<= div>3. Write initially to csv and do transformation on csv. Last possible o= ption.

Please share your thoughts.

<= /div>
Thank you.

On Wed, Jan 16, 2019 at 2:50 PM madan <madan.yellanki@gmail.com> wrote= :
Hi,

Need help in the below scenario,

I have CustomInputFormat which loads the records using a bean,

public class CustomInput=
Format extends G=
enericInputFormat {
     =
 private Iterator<Map<String, Object>=
;> recordsIterator;
@Override
        public void=
 open(GenericInputSplit spli=
t) throws IOException {=
               ServiceX serviceX =3D SpringBeanFinder.getBean(=
ServiceX.class);
                recordsIterator =3D serviceX.g=
etRecords(..); 
         }
}
The above input format works fine whe= n using Flink LocalEnvironment in spring application. Problem is when runni= ng flink in a cluster mode and trying to connect to it using RemoveEnvironm= ent. Since Spring applicaiton context will not be initialized, NPE is throw= n. Please suggest what could be the solution in this scenario.


--
Thank you,
Mad= an.


--
Thank you,
Madan.
--000000000000be8c0d057fb941db--