Hi,
I am trying to make Storm read a file and outputs its content. The problem is that this works
only with very small data (a few lines), butI have a data with about 300.000 lines. I tried
to write the output to a file because I thought it could be a time problem when printing all
the lineson the terminal. I really don't know what to do else. Does anybody have an idea?
Here is the code, maybe something is wrong:
Topopogy.main:
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
WordReader.java:
package spouts;import spouts.WriteIntoFile;
import java.io.BufferedReader;import java.io.FileNotFoundException;import java.io.FileOutputStream;import
java.io.FileReader;import java.io.IOException;import java.io.ObjectOutputStream;import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import
backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import
backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout {
private static final long serialVersionUID = 1L; private SpoutOutputCollector collector;
private FileReader fileReader; private boolean completed = false; public void ack(Object msgId)
{ //System.out.println("OK:"+msgId); String os =msgId.toString(); WriteIntoFile data= new
WriteIntoFile("/home/me/testoutput.txt", true); try { data.writeToFile(os); } catch (IOException
e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void close()
{} public void fail(Object msgId) { System.out.println("FAIL:"+msgId); } /** * The only
thing that the methods will do It is emit each * file line */ public void nextTuple() {
/** * The nextuple it is called forever, so if we have been readed the file * we will
wait and then return */ if(completed){ try { Thread.sleep(1000); } catch (InterruptedException
e) { //Do nothing } return; } String str; //Open the reader BufferedReader reader
= new BufferedReader(fileReader);
try{
//Read all lines while((str = reader.readLine()) != null){ /** * By each line
emmit a new value with the line as a their */ this.collector.emit(new Values(str),str);
} }catch(Exception e){ throw new RuntimeException("Error reading tuple",e);
}finally{ completed = true; } }
/** * We will create the file and get the collector object */ public void open(Map conf,
TopologyContext context, SpoutOutputCollector collector) { try { this.fileReader = new
FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw
new RuntimeException("Error reading file ["+conf.get("wordFile")+"]"); } this.collector
= collector; }
/** * Declare the output field "word" */ public void declareOutputFields(OutputFieldsDeclarer
declarer) { declarer.declare(new Fields("line")); }}
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Best RegardsZüleyha
|
Mime |
- Unnamed multipart/related (inline, None, 0 bytes)
- Unnamed multipart/alternative (inline, None, 0 bytes)
|