storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Züleyha Toptas <>
Subject Big data in Storm
Date Sat, 04 Jan 2014 12:28:56 GMT
I am trying to make Storm read a file and outputs its content. The problem is that this works
only with very small data (a few lines), butI have a data with about 300.000 lines. I tried
to write the output to a file because I thought it could be a time problem when printing all
the lineson the terminal. I really don't know what to do else. Does anybody have an idea?

Here is the code, maybe something is wrong:

package spouts;import spouts.WriteIntoFile;
import;import;import;import;import;import;import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import
backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import
backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout {
	private static final long serialVersionUID = 1L;	private SpoutOutputCollector collector;
private FileReader fileReader;	private boolean completed = false;	public void ack(Object msgId)
{		//System.out.println("OK:"+msgId);		String os =msgId.toString();		WriteIntoFile data= new
WriteIntoFile("/home/me/testoutput.txt", true);		try {			data.writeToFile(os);		} catch (IOException
e) {			// TODO Auto-generated catch block			e.printStackTrace();		}			}	public void close()
{}	public void fail(Object msgId) {		System.out.println("FAIL:"+msgId);	}	/**	 * The only
thing that the methods will do It is emit each 	 * file line	 */	public void nextTuple() {
	/**		 * The nextuple it is called forever, so if we have been readed the file		 * we will
wait and then return		 */		if(completed){			try {				Thread.sleep(1000);			} catch (InterruptedException
e) {				//Do nothing			}			return;		}		String str;		//Open the reader		BufferedReader reader
= new BufferedReader(fileReader);
			//Read all lines			while((str = reader.readLine()) != null){				/**				 * By each line
emmit a new value with the line as a their				 */				this.collector.emit(new Values(str),str);
             			}		}catch(Exception e){			throw new RuntimeException("Error reading tuple",e);
	}finally{			completed = true;		} 	}
	/**	 * We will create the file and get the collector object	 */	public void open(Map conf,
TopologyContext context,			SpoutOutputCollector collector) {		try {			this.fileReader = new
FileReader(conf.get("wordsFile").toString());		} catch (FileNotFoundException e) {			throw
new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");		}		this.collector
= collector;	}
	/**	 * Declare the output field "word"	 */	public void declareOutputFields(OutputFieldsDeclarer
declarer) {		declarer.declare(new Fields("line"));	}}
Best RegardsZüleyha 		 	   		  
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message