apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vlad Rozov <v.ro...@datatorrent.com>
Subject Re: Information Needed
Date Tue, 02 Aug 2016 21:48:00 GMT
The problem is likely in the parser. Please check it's documentation to 
see if parser can be reused to parse tuples. If not and you need to 
construct a new parser for each tuple, read configuration file into a 
byte array and construct new input stream using the byte array.

Vlad

On 8/2/16 14:05, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>
> Hi,
>
> Below is the method which gets called inside the emit() method. I am 
> using the input stream to parse each line. The highlighted inputStream 
> I would want to create only Once. But the stream is getting closed if 
> I create the inputStream inside setup() or beginWindow() method.
>
> *private*KeyValue<String, String> parseTuple(KeyValue<String, String> 
> tuple) {
>
> KeyValue<String, String> newTuple= *new*KeyValue<String, String>();
>
> *try*{
>
> Parser parser;
>
> inputConfStream= getFS().open(*new*Path(getInputConfFile()));
>
> *if*(inputDelimiter!= *null*) {
>
> */LOG/*.debug("parseTuple:sourceId = {},delimiter = {},inputConf = 
> {}", getSourceId(), getInputDelimiter(),
>
> getInputConfFile());
>
> parser= 
> DefaultParserFactory./getInstance/().newDelimitedParser(*new*InputStreamReader(inputConfStream),
>
> *new*StringReader(tuple.value), getInputDelimiter().charAt(0), '"', 
> *false*);
>
> } *else*{
>
> parser= 
> DefaultParserFactory./getInstance/().newFixedLengthParser(*new*InputStreamReader(inputConfStream),
>
> *new*StringReader(tuple.value));
>
> }
>
> parser.setIgnoreExtraColumns(*true*);
>
> *final*DataSet ds= parser.parse();
>
> *if*(ds== *null*|| ds.getRowCount() == 0) {
>
> *throw**new*RuntimeException("Could not parse record:"+ tuple.value);
>
> }
>
> *if*(ds.next()) {
>
> StringBuilder sb= *new*StringBuilder();
>
> *for*(String col: ds.getColumns()) {
>
> */LOG/*.debug("parseTuple: Col: {}", col);
>
> }
>
> List<Field> fields= outputFields.getFields();
>
> String oldValue;
>
> String adjustedValue;
>
> *for*(Field field: fields) {
>
> *if*(field.getValue().equals("")) {
>
> oldValue= ds.getString(field.getName());
>
> adjustedValue= Helper./adjustValue/(oldValue, 
> Integer./parseInt/(field.getLength()),
>
> outputDelimiter, inputDelimiter);
>
> sb.append(adjustedValue);
>
> } *else*{
>
> oldValue= field.getValue();
>
> adjustedValue= Helper./adjustValue/(oldValue, 
> Integer./parseInt/(field.getLength()),
>
> outputDelimiter, inputDelimiter);
>
> sb.append(adjustedValue);
>
> }
>
> *if*(outputDelimiter!= *null*) {
>
> sb.append(outputDelimiter);
>
> }
>
> }
>
> sb.append("\n");
>
> newTuple.key= tuple.key;
>
> newTuple.value= sb.toString();
>
> }
>
> } *catch*(Exception e) {
>
> */LOG/*.error("parseTuple:error while parsing the sourceID : 
> {},line:{},Error Message : {}", getSourceId(),
>
> tuple.value, e.getMessage());
>
> e.printStackTrace();
>
> *return**new*KeyValue<String, String>(tuple.key, *null*);
>
> }
>
> */LOG/*.debug("parseTuple: The old tuple is:{} ## The new tuple 
> is:{}", tuple, newTuple);
>
> *return*newTuple;
>
>        }
>
> Regards,
>
> Surya Vamshi
>
> *From:*Mukkamula, Suryavamshivardhan (CWM-NR) 
> [mailto:suryavamshivardhan.mukkamula@rbc.com]
> *Sent:* 2016, August, 02 4:48 PM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
> Hi,
>
> inputConfStream is used to parse the input line from the feed. This is 
> used for all the lines from the feed. Not sure why the stream is 
> getting closed?
>
> Regards,
>
> Surya Vamshi
>
> *From:*Vlad Rozov [mailto:v.rozov@datatorrent.com]
> *Sent:* 2016, August, 02 4:26 PM
> *To:* users@apex.apache.org <mailto:users@apex.apache.org>
> *Subject:* Re: Information Needed
>
> Both setup() and beginWindow() should work. It will be more correct to 
> open the configuration stream and parse the configuration file in 
> setup() as you tried in the initial implementation as long as 
> configuration path does not depend on window Id. Where the 
> inputConfStream is used? Most likely it reaches EOF unexpectedly.
>
> Vlad
>
> On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>
>     Hi Team,
>
>     When I am trying to read input line from feed, to parse the line I
>     am reading another configuration file from HDFS. To avoid reading
>     the configuration file for every line I would like to read it in
>     the beginWindow() method. But the Input stream is getting closed
>     and operator is not holding the stream for all the tuples.
>
>     Can I read the input Stream Once for all the tuples? (I tried in
>     the setup() method as well , but no luck)
>
>     @Override
>
>       public void beginWindow(long windowId)
>
>       {
>
>     super.beginWindow(windowId);
>
>     try {
>
>     inputConfStream = getFS().open(new Path(getInputConfFile()));
>
>     } catch (Exception e) {
>
>     // TODO Auto-generated catch block
>
>     e.printStackTrace();
>
>     LOG.error("beginWindow: Error while streaming the input
>     Configuration File = {}", getInputConfFile());
>
>     }
>
>       }
>
>     Regards,
>
>     Surya Vamshi
>
>     _______________________________________________________________________
>
>     If you received this email in error, please advise the sender (by
>     return email or otherwise) immediately. You have consented to
>     receive the attached electronically at the above-noted email
>     address; please retain a copy of this confirmation for future
>     reference.
>
>     Si vous recevez ce courriel par erreur, veuillez en aviser
>     l'expéditeur immédiatement, par retour de courriel ou par un autre
>     moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s)
>     par voie électronique à l'adresse courriel indiquée ci-dessus;
>     veuillez conserver une copie de cette confirmation pour les fins
>     de reference future.
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by 
> return email or otherwise) immediately. You have consented to receive 
> the attached electronically at the above-noted email address; please 
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser 
> l'expéditeur immédiatement, par retour de courriel ou par un autre 
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par 
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez 
> conserver une copie de cette confirmation pour les fins de reference 
> future.
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by 
> return email or otherwise) immediately. You have consented to receive 
> the attached electronically at the above-noted email address; please 
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser 
> l'expéditeur immédiatement, par retour de courriel ou par un autre 
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par 
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez 
> conserver une copie de cette confirmation pour les fins de reference 
> future.
>


Mime
View raw message