Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
+import org.apache.hadoop.chukwa.datacollection.connector.*;
+import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.io.*;
+
+/**
+ * The local agent daemon that runs on each machine.
+ * This class is designed to be embeddable, for use in testing.
+ *
+ */
+public class ChukwaAgent
+{
+ boolean DO_CHECKPOINT_RESTORE = false;
+ boolean WRITE_CHECKPOINTS = false;
+
+ static Logger log = Logger.getLogger(ChukwaAgent.class);
+
+ //doesn't need an equals(), comparator, etc
+ private static class Offset {
+ public Offset(long l, long id) {
+ offset = l;
+ this.id = id;
+ }
+ private volatile long id;
+ private volatile long offset;
+ }
+
+ public static class AlreadyRunningException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public AlreadyRunningException() {
+ super("Agent already running; aborting");
+ }
+ }
+
+
+ private final Map<Adaptor, Offset> adaptorPositions;
+
+ //basically only used by the control socket thread.
+ private final Map<Long, Adaptor> adaptorsByNumber;
+
+ File checkpointDir; //lock this object to indicate checkpoint in progress
+ File initialAdaptors;
+ String CHECKPOINT_BASE_NAME; //base filename for checkpoint files
+ int CHECKPOINT_INTERVAL_MS ; //min interval at which to write checkpoints
+
+
+ private Timer checkpointer;
+ private volatile boolean needNewCheckpoint = false; //set to true if any event has happened
+ //that should cause a new checkpoint to be written
+
+
+ private long lastAdaptorNumber= 0; //ID number of the last adaptor to be started
+ private int checkpointNumber; //id number of next checkpoint.
+ //should be protected by grabbing lock on checkpointDir
+
+
+ private final AgentControlSocketListener controlSock;
+
+ /**
+ * @param args
+ * @throws AdaptorException
+ */
+ public static void main(String[] args) throws AdaptorException {
+
+ try{
+ System.out.println("usage: LocalAgent [-restore] [default collector URL]");
+ ChukwaAgent agent = new ChukwaAgent();
+ if(agent.anotherAgentIsRunning()) {
+ System.out.println("another agent is running (or port has been usurped). Bailing out now");
+ }
+
+ Connector connector;
+
+ int uriArgNumber= 0;
+ if(args.length > 0) {
+ if(args[0].equals("-restore")) {
+ agent.DO_CHECKPOINT_RESTORE = true;
+ uriArgNumber = 1;
+ }
+ if(args[uriArgNumber].equals("local"))
+ connector = new ConsoleOutConnector(agent);
+ else
+ {
+ if(!args[uriArgNumber].contains("://"))
+ args[uriArgNumber] = "http://" + args[uriArgNumber];
+ connector = new HttpConnector(agent, args[uriArgNumber]);
+ }
+ }
+ else
+ connector = new HttpConnector(agent);
+
+ connector.start();
+
+ log.info("local agent started on port " + agent.getControlSock().portno);
+
+ } catch(AlreadyRunningException e){
+ log.error("agent started already on this machine with same portno ; bailing out");
+ System.out.println("agent started already on this machine with same portno ; bailing out");
+ System.exit(0); //better safe than sorry
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ private boolean anotherAgentIsRunning() {
+ return !controlSock.isBound();
+ }
+ /**
+ * @return the number of running adaptors inside this local agent
+ */
+ public int adaptorCount() {
+ return adaptorPositions.size();
+ }
+
+ public ChukwaAgent() throws AlreadyRunningException
+ {
+ readConfig();
+
+ //almost always just reading this; so use a ConcurrentHM.
+ //since we wrapped the offset, it's not a structural mod.
+ adaptorPositions= new ConcurrentHashMap<Adaptor, Offset>();
+ adaptorsByNumber = new HashMap<Long, Adaptor>();
+ checkpointNumber=0;
+ try{
+ if(DO_CHECKPOINT_RESTORE)
+ restoreFromCheckpoint();
+ } catch(IOException e) {
+ log.warn("failed to restart from checkpoint: ", e);
+ }
+
+ try {
+ if(initialAdaptors != null && initialAdaptors.exists())
+ readAdaptorsFile(initialAdaptors);
+ } catch(IOException e) {
+ log.warn("couldn't read user-specified file "+ initialAdaptors.getAbsolutePath());
+ }
+
+ controlSock = new AgentControlSocketListener(this);
+ try {
+ controlSock.tryToBind(); //do this synchronously; if it fails, we know another agent is running.
+ controlSock.start(); //this sets us up as a daemon
+ log.info("control socket started on port " + controlSock.portno);
+
+ if(CHECKPOINT_INTERVAL_MS > 0) {
+ checkpointer = new Timer();
+ checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
+ }
+ } catch(IOException e) {
+ log.info("failed to bind to socket; aborting agent launch", e);
+ throw new AlreadyRunningException();
+ }
+
+
+ }
+
+ //FIXME: should handle bad lines here
+ public long processCommand(String cmd)
+ {
+ String[] words = cmd.split(" ");
+ if(words[0].equalsIgnoreCase("add"))
+ {
+ //words should contain (space delimited):
+ // 0) command ("add")
+ // 1) AdaptorClassname
+ // 2) dataType (e.g. "hadoop_log")
+ // 3) params <optional>
+ // (e.g. for files, this is filename,
+ // but can be arbitrarily many space
+ // delimited agent specific params )
+ // 4) offset
+
+ long offset;
+ try {
+ offset = Long.parseLong(words[words.length-1]);
+ } catch(NumberFormatException e) {
+ log.warn("malformed line " + cmd);
+ return -1L;
+ }
+ String adaptorName = words[1];
+
+ Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
+ if(adaptor == null) {
+ log.warn("don't recognize adaptor name " + adaptorName);
+ return -1L;
+ }
+
+
+ String dataType = words[2];
+
+ String params = "";
+ if(words.length > 4){ //no argument
+ int begParams = adaptorName.length()+dataType.length()+6;//length("ADD x type ") = length(x) + 5, i.e. count letters & spaces
+ params = cmd.substring(begParams, cmd.length() - words[words.length-1].length() -1);
+ }
+ long adaptorID;
+ synchronized(adaptorsByNumber) {
+ adaptorID = ++lastAdaptorNumber;
+ adaptorsByNumber.put(adaptorID, adaptor);
+ adaptorPositions.put(adaptor, new Offset(offset,adaptorID));
+ }
+
+ try {
+ adaptor.start(dataType, params, offset, DataFactory.getInstance().getEventQueue());
+ log.info("started a new adaptor, id = " +adaptorID);
+ return adaptorID ;
+
+ } catch(AdaptorException e) {
+ log.warn("failed to start adaptor", e);
+ //FIXME: don't we need to clean up the adaptor maps here?
+ }
+ }
+ else
+ log.warn("only 'add' command supported in config files");
+
+ return -1;
+ }
+
+ /**
+ * Tries to restore from a checkpoint file in checkpointDir.
+ * There should usually only be one checkpoint present --
+ * two checkpoints present implies a crash during
+ * writing the higher-numbered one.
+ * As a result, this method chooses the lowest-numbered file present.
+ *
+ * Lines in the checkpoint file are processed one at a time with processCommand();
+ *
+ * @return true if the restore succeeded
+ * @throws IOException
+ */
+ public boolean restoreFromCheckpoint() throws IOException
+ {
+ synchronized(checkpointDir)
+ {
+ String[] checkpointNames = checkpointDir.list(new FilenameFilter()
+ {
+ public boolean accept(File dir, String name) {
+ return name.startsWith(CHECKPOINT_BASE_NAME);
+ }
+ });
+ if(checkpointNames.length == 0)
+ {
+ log.info("No checkpoints found in "+ checkpointDir);
+ return false;
+ }
+
+ if(checkpointNames.length > 2)
+ log.warn("expected at most two checkpoint files in " + checkpointDir + "; saw " + checkpointNames.length);
+ else if(checkpointNames.length == 0)
+ return false;
+
+ String lowestName=null;
+ int lowestIndex=Integer.MAX_VALUE;
+ for(String n: checkpointNames) {
+ int index = Integer.parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
+ if(index < lowestIndex) {
+ lowestName = n;
+ lowestIndex = index;
+ }
+ }
+
+ checkpointNumber = lowestIndex;
+ File checkpoint = new File(checkpointDir, lowestName);
+ readAdaptorsFile(checkpoint);
+ }
+ return true;
+ }
+ private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
+ IOException
+ {
+ BufferedReader br = new BufferedReader( new InputStreamReader(new FileInputStream(checkpoint)));
+ String cmd=null;
+ while((cmd = br.readLine()) != null)
+ processCommand(cmd);
+ br.close();
+ }
+
+ /**
+ * Called periodically to write checkpoints
+ * @throws IOException
+ */
+ public void writeCheckpoint() throws IOException
+ {
+ needNewCheckpoint = false;
+ synchronized(checkpointDir) {
+ log.info("writing checkpoint " + checkpointNumber);
+
+ FileOutputStream fos = new FileOutputStream(
+ new File(checkpointDir, CHECKPOINT_BASE_NAME + checkpointNumber));
+ PrintWriter out = new PrintWriter( new BufferedWriter(
+ new OutputStreamWriter(fos)));
+
+ for(Map.Entry<Adaptor, Offset> stat: adaptorPositions.entrySet()) {
+ try{
+ Adaptor a = stat.getKey();
+ out.print("ADD " + a.getClass().getCanonicalName());
+ out.print(" ");
+ out.print(a.getType());
+ out.print(" " + a.getCurrentStatus() + " ");
+ out.println(stat.getValue().offset);
+ } catch(AdaptorException e) {
+ e.printStackTrace();
+ }//don't try to recover from bad adaptor yet
+ }
+
+ out.close();
+ File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME + (checkpointNumber-1));
+ log.debug("hopefully removing old checkpoint file " + lastCheckpoint.getAbsolutePath());
+ lastCheckpoint.delete();
+ checkpointNumber++;
+ }
+ }
+
+ public void reportCommit(Adaptor src, long uuid)
+ {
+ needNewCheckpoint = true;
+ Offset o = adaptorPositions.get(src);
+ if(o != null) {
+ synchronized(o) { //order writes to offset, in case commits are processed out of order
+ if( uuid > o.offset)
+ o.offset = uuid;
+ }
+
+ log.info("got commit up to " + uuid + " on " + src+ " = "+ o.id);
+ }
+ else {
+ log.warn("got commit up to " + uuid + " for adaptor " +src +
+ " that doesn't appear to be running: " + adaptorsByNumber.size() + " total");
+ }
+ }
+
+ class CheckpointTask extends TimerTask {
+ public void run() {
+ try{
+ if(needNewCheckpoint ) {
+ writeCheckpoint();
+ }
+ } catch(IOException e) {
+ log.warn("failed to write checkpoint", e);
+ }
+ }
+ }
+
+//for use only by control socket.
+ Map<Long, Adaptor> getAdaptorList() {
+ return adaptorsByNumber;
+ }
+ /**
+ * Stop the adaptor with given ID number.
+ * Takes a parameter to indicate whether the adaptor should
+ * force out all remaining data, or just exit abruptly.
+ *
+ * If the adaptor is written correctly, its offset won't change after returning
+ * from shutdown.
+ *
+ * @param number the adaptor to stop
+ * @param gracefully if true, shutdown, if false, hardStop
+ * @return the number of bytes synched at stop. -1 on error
+ */
+ public long stopAdaptor(long number, boolean gracefully) {
+ Adaptor toStop;
+ long offset = -1;
+
+ //at most one thread can get past this critical section with toStop != null
+ //so if multiple callers try to stop the same adaptor, all but one will fail
+ synchronized(adaptorsByNumber) {
+ toStop = adaptorsByNumber.remove(number);
+ }
+ if(toStop == null) {
+ log.warn("trying to stop adaptor " + number + " that isn't running");
+ return offset;
+ }
+ try {
+ if(gracefully ) {
+
+ long bytesSentByAdaptor = toStop.shutdown(); //this can block
+ long unstableBytes = bytesSentByAdaptor -adaptorPositions.get(toStop).offset;
+ while(unstableBytes > 0 ) {
+ log.info("waiting for adaptor " + number + " to terminate " +
+ unstableBytes + " bytes are still uncommitted");
+ Thread.sleep(2000);
+ unstableBytes = bytesSentByAdaptor -adaptorPositions.get(toStop).offset;
+ }
+ }
+ else
+ toStop.hardStop();
+ Offset off = adaptorPositions.remove(toStop); //next checkpoint will have the remove
+ offset = off == null ? -1 : off.offset;
+ needNewCheckpoint = true;
+
+ } catch(AdaptorException e) {
+ log.error("adaptor failed to stop cleanly", e);
+ } catch(InterruptedException e) {
+ log.error("can't wait for adaptor to finish writing", e);
+ }
+ return offset;
+ }
+
+ protected void readConfig() {
+ Configuration conf = new Configuration();
+ String chukwaHome = System.getenv("CHUKWA_HOME");
+ if (chukwaHome == null){
+ chukwaHome = ".";
+ }
+ if(!chukwaHome.endsWith("/"))
+ chukwaHome = chukwaHome + "/";
+
+ conf.addResource(new Path("conf/chukwa-agent-conf.xml"));
+ CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name", "chukwa_checkpoint_");
+ checkpointDir= new File(conf.get("chukwaAgent.checkpoint.dir", chukwaHome+ "/var/"));
+ CHECKPOINT_INTERVAL_MS= conf.getInt("chukwaAgent.checkpoint.interval", 5000);
+ DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled", false);
+ if(DO_CHECKPOINT_RESTORE) {
+ WRITE_CHECKPOINTS = true;
+ log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
+ }
+ // String initialAdaptorsStr = conf.get("initial_adaptors_file");
+
+ initialAdaptors = new File(chukwaHome + "conf/initial_adaptors");
+ }
+
+ public void shutdown() {
+ shutdown(false);
+ }
+
+ /**
+ * Triggers agent shutdown.
+ * For now, this method doesn't shut down adaptors explicitly. It probably should.
+ */
+ public void shutdown(boolean exit) {
+ if(checkpointer != null)
+ checkpointer.cancel();
+
+ controlSock.shutdown(); //make sure we don't get new requests
+
+ try {
+ if(WRITE_CHECKPOINTS)
+ writeCheckpoint(); //write a last checkpoint here, before stopping adaptors
+ } catch(IOException e) {
+ }
+
+ synchronized(adaptorsByNumber) { //shut down each adaptor
+ for(Adaptor a: adaptorsByNumber.values()) {
+ try{
+ a.hardStop();
+ }catch(AdaptorException e) {
+ log.warn("failed to cleanly stop " + a,e);
+ }
+ }
+ }
+
+ if(exit)
+ System.exit(0);
+ }
+/**
+ * Returns the last offset at which a given adaptor was checkpointed
+ * @param a the adaptor in question
+ * @return that adaptor's last-checkpointed offset
+ */
+ public long getOffset(Adaptor a) {
+ return adaptorPositions.get(a).offset;
+ }
+ /**
+ * Returns the control socket for this agent.
+ */
+ AgentControlSocketListener getControlSock() {
+ return controlSock;
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.util.*;
+//import java.util.concurrent.*;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.log4j.Logger;
+
+/**
+ * An event queue that blocks once a fixed upper limit of data is enqueued.
+ *
+ * For now, uses the size of the data field. Should really use estimatedSerializedSize()?
+ *
+ */
+public class MemLimitQueue implements ChunkQueue
+{
+
+ static Logger log = Logger.getLogger(WaitingQueue.class);
+
+ private Queue<Chunk> queue = new LinkedList<Chunk>();
+ private long dataSize = 0;
+ private final long MAX_MEM_USAGE;
+
+ public MemLimitQueue(int limit) {
+ MAX_MEM_USAGE = limit;
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
+ */
+ public void add(Chunk event) throws InterruptedException
+ {
+ assert event != null: "can't enqueue null chunks";
+ synchronized(this) {
+ while(event.getData().length + dataSize > MAX_MEM_USAGE)
+ this.wait();
+
+ dataSize += event.getData().length;
+ queue.add(event);
+ this.notifyAll();
+ }
+
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List, int)
+ */
+ public void collect(List<Chunk> events,int maxCount) throws InterruptedException
+ {
+ synchronized(this) {
+ //we can't just say queue.take() here, since we're holding a lock.
+ while(queue.isEmpty()){
+ this.wait();
+ }
+
+ int i = 0;
+ while(!queue.isEmpty() && (i++ < maxCount)) {
+ Chunk e = this.queue.remove();
+ dataSize -= e.getData().length;
+ events.add(e);
+ }
+ this.notifyAll();
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("WaitingQueue.inQueueCount:" + queue.size() + "\tWaitingQueue.collectCount:" + events.size());
+ }
+ }
+
+ public int size(){
+ return queue.size();
+ }
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.log4j.Logger;
+
+public class WaitingQueue implements ChunkQueue
+{
+
+ static Logger log = Logger.getLogger(WaitingQueue.class);
+ private BlockingQueue<Chunk> queue = new LinkedBlockingQueue<Chunk>(5);
+
+ public void add(Chunk event)
+ {
+ try
+ {
+ this.queue.put(event);
+ }
+ catch(InterruptedException e)
+ {}//return upwards
+ }
+
+ public void add(List<Chunk> events)
+ {
+ this.queue.addAll(events);
+
+ }
+
+ public void collect(List<Chunk> events,int maxCount)
+ {
+ // Workaround to block on the queue
+ try
+ {
+ events.add(this.queue.take());
+ }
+ catch (InterruptedException e)
+ {}
+ this.queue.drainTo(events,maxCount-1);
+
+ System.out.println("collect [" + Thread.currentThread().getName() + "] [" + events.size() + "]");
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("WaitingQueue.inQueueCount:" + queue.size() + "\tWaitingQueue.collectCount:" + events.size());
+ }
+ }
+
+ public int size(){
+ return queue.size();
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.collector;
+
+import org.mortbay.jetty.*;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.servlet.*;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.writer.ConsoleWriter;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+
+public class CollectorStub {
+
+
+ public static void main(String[] args)
+ {
+
+ try {
+ System.out.println("usage: CollectorStub [portno] [pretend]");
+ System.out.println("note: if no portno defined, defaults to value in chukwa-site.xml");
+
+ ChukwaConfiguration conf = new ChukwaConfiguration();
+ int portNum = conf.getInt("chukwaCollector.http.port", 9999);
+
+ if(args.length != 0)
+ portNum = Integer.parseInt(args[0]);
+ if(args.length > 1) {
+ if(args[1].equals("pretend"))
+ ServletCollector.setWriter(new ConsoleWriter(true));
+ else if(args[1].equals("pretend-quietly"))
+ ServletCollector.setWriter(new ConsoleWriter(false));
+ else
+ System.out.println("WARNING: don't know what to do with command line arg "+ args[1]);
+ }
+
+ SelectChannelConnector jettyConnector = new SelectChannelConnector();
+ jettyConnector.setLowResourcesConnections(20);
+ jettyConnector.setLowResourceMaxIdleTime(1000);
+ jettyConnector.setPort(portNum);
+ Server server = new Server(portNum);
+ server.setConnectors(new Connector[]{ jettyConnector});
+ org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool();
+ pool.setMaxThreads(30);
+ server.setThreadPool(pool);
+ Context root = new Context(server,"/",Context.SESSIONS);
+ root.addServlet(new ServletHolder(new ServletCollector()), "/*");
+ server.start();
+ server.setStopAtShutdown(false);
+
+ System.out.println("started http collector on port number " + portNum);
+
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ System.exit(0);
+ }
+
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.collector.servlet;
+
+import java.io.*;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.log4j.Logger;
+
+public class ServletCollector extends HttpServlet
+{
+
+ static final boolean FANCY_DIAGNOSTICS = true;
+ static org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
+
+ private static final long serialVersionUID = 6286162898591407111L;
+ Logger log = Logger.getRootLogger();//.getLogger(ServletCollector.class);
+
+
+ public static void setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w) throws IOException
+ {
+ writer = w;
+ w.init();
+ }
+
+ public void init(ServletConfig servletConf) throws ServletException
+ {
+
+ log.info("initing servletCollector");
+ if(servletConf == null) {
+ log.fatal("no servlet config");
+ return;
+ }
+
+ try
+ {
+ // read the application->pipeline settings from a config file in the format:
+ // appliation_name: PipelinedWriter1, PipelinedWriter2, Writer
+ // use reflection to set up the pipeline after reading in the list of writers from the config file
+
+ /*
+ String strPipelines = "HadoopLogs:HdfsWriter\nApplication2:SameerWriter:HdfsWriter";
+ String[] pipelines = strPipelines.split("\n");
+ // split into pipes
+ for (String pipe : pipelines){
+ String[] tmp = pipe.split(":");
+ String app = tmp[0];
+ String[] stages = tmp[1].split(",");
+
+ //loop through pipes, creating linked list of stages per pipe, one at a time
+ for (String stage : stages){
+ Class curr = ClassLoader.loadClass(stage);
+ }
+ }
+ */
+ //FIXME: seems weird to initialize a static object here
+ if (writer == null)
+ writer = new SeqFileWriter();
+
+ } catch (IOException e) {
+ throw new ServletException("Problem init-ing servlet", e);
+ }
+ }
+
+ protected void accept(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException
+ {
+ ServletDiagnostics diagnosticPage = new ServletDiagnostics();
+ try {
+
+ final long currentTime = System.currentTimeMillis();
+ log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
+ java.io.InputStream in = req.getInputStream();
+
+ ServletOutputStream l_out = resp.getOutputStream();
+ final DataInputStream di = new DataInputStream(in);
+ final int numEvents = di.readInt();
+ // log.info("saw " + numEvents+ " in request");
+
+ if(FANCY_DIAGNOSTICS)
+ diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
+ for (int i = 0; i < numEvents; i++){
+ // TODO: pass new data to all registered stream handler methods for this chunk's stream
+ // TODO: should really have some dynamic assignment of events to writers
+
+ ChunkImpl logEvent = ChunkImpl.read(di);
+
+ if(FANCY_DIAGNOSTICS)
+ diagnosticPage.sawChunk(logEvent, i);
+
+ // write new data to data sync file
+ if(writer != null) {
+ writer.add(logEvent); //save() blocks until data is written
+ //this is where we ACK this connection
+ l_out.print("ok:");
+ l_out.print(logEvent.getData().length);
+ l_out.print(" bytes ending at offset ");
+ l_out.println(logEvent.getSeqID()-1);
+ }
+ else
+ l_out.println("can't write: no writer");
+ }
+
+ if(FANCY_DIAGNOSTICS)
+ diagnosticPage.doneWithPost();
+ resp.setStatus(200);
+
+ } catch (IOException e) {
+ log.warn("IO error", e);
+ throw new ServletException(e);
+ }
+ }
+
+
+ @Override
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException
+ {
+ accept(req,resp);
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException
+ {
+ PrintStream out = new PrintStream(resp.getOutputStream());
+ resp.setStatus(200);
+ out.println("<html><body><h2>Chukwa servlet running</h2>");
+ if(FANCY_DIAGNOSTICS)
+ ServletDiagnostics.printPage(out);
+ out.println("</body></html>");
+// accept(req,resp);
+ }
+
+ @Override
+ public String getServletInfo()
+ {
+ return "Chukwa Servlet Collector";
+ }
+
+ @Override
+ public void destroy()
+ {
+ synchronized(writer)
+ {
+ writer.close();
+ }
+ super.destroy();
+ }
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.collector.servlet;
+
+import java.io.PrintStream;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+
+/**
+ * One per post
+ */
+public class ServletDiagnostics {
+
+ static Logger log= Logger.getLogger(ServletDiagnostics.class);
+
+ private static class PostStats { //statistics about a chunk
+ public PostStats(String src, int count, long receivedTs)
+ {
+ this.count = count;
+ this.src = src;
+ this.receivedTs = receivedTs;
+ types = new String[count];
+ names = new String[count];
+ lengths = new int[count];
+
+ seenChunkCount = 0;
+ dataSize = 0;
+ }
+ final int count;
+ final String src;
+ final long receivedTs;
+ final String[] types, names;
+ final int[] lengths;
+
+ int seenChunkCount;
+ long dataSize;
+ public void addChunk(ChunkImpl c, int position)
+ {
+ if(position != seenChunkCount)
+ log.warn("servlet collector is passing chunk " + position + " but diagnostics has seen" +
+ seenChunkCount);
+ else if(seenChunkCount >= count){
+ log.warn("too many chunks in post declared as length " +count);
+ } else {
+ types[seenChunkCount] = c.getDataType();
+ lengths[seenChunkCount] = c.getData().length;
+ names[seenChunkCount] = c.getStreamName();
+ dataSize += c.getData().length;
+ ++seenChunkCount;
+ }
+ }
+ }
+
+ static {
+ lastPosts = new LinkedList<PostStats>();
+ }
+
+ static LinkedList<PostStats> lastPosts;
+ PostStats curPost;
+ static int CHUNKS_TO_KEEP = 300;
+
+
+ public void sawPost(String source, int chunks, long receivedTs) {
+ if(curPost != null) {
+ log.warn("should only have one HTTP post per ServletDiagnostics");
+ doneWithPost();
+ }
+ curPost = new PostStats(source, chunks, receivedTs);
+ }
+
+ public void sawChunk(ChunkImpl c, int pos) {
+ curPost.addChunk(c, pos);
+ }
+
+ public static void printPage(PrintStream out) {
+
+ HashMap<String, Long> bytesFromHost = new HashMap<String, Long>();
+ long timeWindowOfSample = Long.MAX_VALUE;
+ long now = System.currentTimeMillis();
+
+
+ out.println("<ul>");
+
+ synchronized(lastPosts) {
+ if(!lastPosts.isEmpty())
+ timeWindowOfSample = now - lastPosts.peek().receivedTs;
+
+ for(PostStats stats: lastPosts) {
+ out.print("<li>");
+
+ out.print(stats.dataSize + " bytes from " + stats.src + " at timestamp " + stats.receivedTs);
+ out.println(" which was " + ((now - stats.receivedTs)/ 1000) + " seconds ago");
+ Long oldBytes = bytesFromHost.get(stats.src);
+ long newBytes = stats.dataSize;
+ if(oldBytes != null)
+ newBytes += oldBytes;
+ bytesFromHost.put(stats.src, newBytes);
+ out.println("<ol>");
+ for(int i =0; i < stats.count; ++i)
+ out.println("<li> "+ stats.lengths[i] + " bytes of type " +
+ stats.types[i] + ". Adaptor name ="+ stats.names[i] +" </li>");
+ out.println("</ol></li>");
+ }
+ }
+ out.println("</ul>");
+ out.println("<ul>");
+ for(Map.Entry<String, Long> h: bytesFromHost.entrySet()) {
+ out.print("<li>rate from " + h.getKey() + " was " + (1000 * h.getValue() / timeWindowOfSample));
+ out.println(" bytes/second in last " + timeWindowOfSample/1000 + " seconds.</li>");
+ }
+
+
+ out.println("</ul>");
+ out.println("total of " + bytesFromHost.size() + " unique hosts seen");
+
+ out.println("<p>current time is " + System.currentTimeMillis() + " </p>");
+ }
+
+ public void doneWithPost() {
+ synchronized(lastPosts) {
+ if(lastPosts.size() > CHUNKS_TO_KEEP)
+ lastPosts.remove();
+ lastPosts.add(curPost);
+ }
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.connector;
+
+/**
+ * This class is responsible for setting up a long living process that repeatedly calls the
+ * <code>send</code> function of a Sender.
+ */
+
+public interface Connector
+{
+ static final int proxyTimestampField = 0;
+ /**
+ *
+ */
+ static final int proxyURIField = 1;
+ static final int proxyRetryField = 2;
+
+ static final int adaptorTimestampField = 3;
+ static final int adaptorURIField = 4;
+
+ static final int logTimestampField = 5;
+ static final int logSourceField = 6;
+ static final int logApplicationField = 7;
+ static final int logEventField = 8;
+
+
+ public void start();
+ public void shutdown();
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.connector.http;
+
+/**
+ * This class is responsible for setting up a {@link HttpConnectorClient} with a collectors
+ * and then repeatedly calling its send function which encapsulates the work of setting up the
+ * connection with the appropriate collector and then collecting and sending the {@link Chunk}s
+ * from the global {@link ChunkQueue} which where added by {@link Adaptors}. We want to separate
+ * the long living (i.e. looping) behavior from the ConnectorClient because we also want to be able
+ * to use the HttpConnectorClient for its add and send API in arbitrary applications that want to send
+ * chunks without an {@link LocalAgent} daemon.
+ *
+ * * <p>
+ * On error, tries the list of available collectors, pauses for a minute, and then repeats.
+ * </p>
+ * <p> Will wait forever for collectors to come up. </p>
+
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.Connector;
+import org.apache.hadoop.chukwa.datacollection.sender.*;
+import org.apache.log4j.Logger;
+
+
+public class HttpConnector implements Connector, Runnable {
+
+ static Logger log = Logger.getLogger(HttpConnector.class);
+
+ static Timer statTimer = null;
+ static volatile int chunkCount = 0;
+ static final int MAX_EVENTS_PER_POST = 1000;
+ static final int MIN_POST_INTERVAL= 4 * 1000;
+ static ChunkQueue chunkQueue;
+
+ ChukwaAgent agent;
+ String argDestination = null;
+
+ private boolean stopMe = false;
+
+ static{
+ statTimer = new Timer();
+ chunkQueue = DataFactory.getInstance().getEventQueue();
+ statTimer.schedule(new TimerTask() {
+ public void run() {
+ int count = chunkCount;
+ chunkCount = 0;
+ log.info("# http chunks ACK'ed since last report: " + count );
+ }
+ }, 100,60*1000);
+ }
+
+ public HttpConnector(ChukwaAgent agent) {
+ this.agent = agent;
+ }
+
+ public HttpConnector(ChukwaAgent agent, String destination) {
+ this.agent = agent;
+ this.argDestination = destination;
+
+ log.info("Setting HTTP Connector URL manually using arg passed to Agent: " + destination);
+ }
+
+ public void start() {
+ (new Thread(this, "HTTP post thread")).start();
+ }
+
+ public void shutdown(){
+ stopMe = true;
+ }
+
+ public void run(){
+ log.info("HttpConnector started at time:" + System.currentTimeMillis());
+
+ Iterator<String> destinations = null;
+
+ // build a list of our destinations from collectors
+ try{
+ destinations = DataFactory.getInstance().getCollectors();
+ } catch (IOException e){
+ log.error("Failed to retreive list of collectors from conf/collectors file", e);
+ }
+
+ ChukwaSender connectorClient = new ChukwaHttpSender();
+ if (argDestination != null) {
+
+ ArrayList<String> tmp = new ArrayList<String>();
+ tmp.add(argDestination);
+ connectorClient.setCollectors(tmp.iterator());
+ log.info("using collector specified at agent runtime: " + argDestination);
+ } else if (destinations != null && destinations.hasNext()) {
+ connectorClient.setCollectors(destinations);
+ log.info("using collectors from collectors file");
+ } else {
+ log.error("No collectors specified, exiting (and taking agent with us).");
+ agent.shutdown(true);//error is unrecoverable, so stop hard.
+ return;
+ }
+
+ try {
+ long lastPost = System.currentTimeMillis();
+ while(!stopMe) {
+ List<Chunk> newQueue = new ArrayList<Chunk>();
+ try {
+ //get all ready chunks from the chunkQueue to be sent
+ chunkQueue.collect(newQueue,MAX_EVENTS_PER_POST); //FIXME: should really do this by size
+
+ } catch(InterruptedException e) {
+ System.out.println("thread interrupted during addChunks(ChunkQueue)");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ int toSend = newQueue.size();
+ List<ChukwaHttpSender.CommitListEntry> results = connectorClient.send(newQueue);
+ log.info("sent " +toSend + " chunks, got back " + results.size() + " acks");
+ //checkpoint the chunks which were committed
+ for(ChukwaHttpSender.CommitListEntry cle : results) {
+ agent.reportCommit(cle.adaptor, cle.uuid);
+ chunkCount++;
+ }
+ long now = System.currentTimeMillis();
+ if( now - lastPost < MIN_POST_INTERVAL )
+ Thread.sleep(now - lastPost); //wait for stuff to accumulate
+ lastPost = now;
+ } //end of try forever loop
+ log.info("received stop() command so exiting run() loop to shutdown connector");
+ } catch(InterruptedException e) {
+ //do nothing, let thread die.
+ }catch(java.io.IOException e) {
+ log.error("connector failed; shutting down agent");
+ agent.shutdown(true);
+ }
+ }
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.controller;
+
+
+import java.net.*;
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+
+/**
+ * A convenience library for applications to communicate to the {@link ChukwaAgent}. Can be used
+ * to register and unregister new {@link Adaptor}s. Also contains functions for applications to
+ * use for handling log rations.
+ */
+public class ChukwaAgentController {
+
+ public class AddAdaptorTask extends TimerTask {
+ String adaptorName;
+ String type;
+ String params;
+ long offset;
+ long numRetries;
+ long retryInterval;
+
+ AddAdaptorTask(String adaptorName, String type, String params,
+ long offset, long numRetries, long retryInterval){
+ this.adaptorName = adaptorName;
+ this.type = type;
+ this.params = params;
+ this.offset = offset;
+ this.numRetries = numRetries;
+ this.retryInterval = retryInterval;
+ }
+ @Override
+ public void run() {
+ add(adaptorName, type, params, offset, numRetries, retryInterval);
+ }
+ }
+
+ //our default adaptors, provided here for convenience
+ public static final String CharFileTailUTF8 = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8";
+ public static final String CharFileTailUTF8NewLineEscaped = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
+
+
+ static String DEFAULT_FILE_TAILER = CharFileTailUTF8NewLineEscaped;
+ static int DEFAULT_PORT = 9093;
+ static String DEFAULT_HOST = "localhost";
+ static int numArgs = 0;
+
+ class Adaptor{
+ public long id = -1;
+ final public String name;
+ final public String params;
+ final public String appType;
+ public long offset;
+
+
+ Adaptor(String adaptorName, String appType, String params, long offset){
+ this.name = adaptorName;
+ this.appType = appType;
+ this.params = params;
+ this.offset = offset;
+ }
+
+ Adaptor(long id, String adaptorName, String appType, String params, long offset){
+ this.id = id;
+ this.name = adaptorName;
+ this.appType = appType;
+ this.params = params;
+ this.offset = offset;
+ }
+
+ /**
+ * Registers this {@link Adaptor} with the agent running at the specified hostname and portno
+ * @return The id number of the this {@link Adaptor}, assigned by the agent upon successful registration
+ * @throws IOException
+ */
+ long register() throws IOException{
+ Socket s = new Socket(hostname, portno);
+ PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
+ bw.println("ADD " + name + " " + appType + " " + params + " " + offset);
+ bw.flush();
+ BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+ String resp = br.readLine();
+ if(resp != null){
+ String[] fields = resp.split(" ");
+ if(fields[0].equals("OK")){
+ try{
+ id = Long.parseLong(fields[fields.length -1]);
+ }
+ catch (NumberFormatException e){}
+ }
+ }
+ s.close();
+ return id;
+ }
+
+ void unregister() throws IOException{
+ Socket s = new Socket(hostname, portno);
+ PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
+ bw.println("SHUTDOWN " + id);
+ bw.flush();
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+ String resp = br.readLine();
+ if( resp == null || !resp.startsWith("OK"))
+ {
+ //error. What do we do?
+ } else if (resp.startsWith("OK")){
+ String[] respSplit = resp.split(" ");
+ String newOffset = respSplit[respSplit.length-1];
+ try {
+ offset = Long.parseLong(newOffset);
+ }catch (NumberFormatException nfe){
+ System.err.println("adaptor didn't shutdown gracefully.\n" + nfe);
+ }
+ }
+
+ s.close();
+ }
+
+ public String toString(){
+ String[] namePieces = name.split("\\.");
+ String shortName = namePieces[namePieces.length-1];
+ return id + " " + shortName + " " + appType + " " + params + " " + offset;
+ }
+ }
+
+ Map<Long, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<Long,Adaptor>();
+ Map<Long, ChukwaAgentController.Adaptor> pausedAdaptors;
+ String hostname;
+ int portno;
+ private Timer addFileTimer = new Timer();
+
+ public ChukwaAgentController(){
+ portno = DEFAULT_PORT;
+ hostname = DEFAULT_HOST;
+ pausedAdaptors = new HashMap<Long,Adaptor>();
+
+ syncWithAgent();
+ }
+
+ public ChukwaAgentController(String hostname, int portno)
+ {
+ this.hostname = hostname;
+ this.portno = portno;
+ pausedAdaptors = new HashMap<Long,Adaptor>();
+
+ syncWithAgent();
+ }
+
+ private boolean syncWithAgent() {
+ //set up adaptors by using list here
+ try{
+ runningAdaptors = list();
+ return true;
+ }catch(IOException e){
+ System.err.println("Error initializing ChukwaClient with list of " +
+ "currently registered adaptors, clearing our local list of adaptors");
+ //e.printStackTrace();
+ //if we can't connect to the LocalAgent, reset/clear our local view of the Adaptors.
+ runningAdaptors = new HashMap<Long,ChukwaAgentController.Adaptor>();
+ return false;
+ }
+ }
+
+ /**
+ * Registers a new adaptor. Makes no guarantee about success. On failure,
+ * we print a message to stderr and ignore silently so that an application
+ * doesn't crash if it's attempt to register an adaptor fails. This call does
+ * not retry a conection. for that use the overloaded version of this which
+ * accepts a time interval and number of retries
+ * @return the id number of the adaptor, generated by the agent
+ */
+ public long add(String adaptorName, String type, String params, long offset){
+ return add(adaptorName, type, params, offset, 20, 15* 1000);//retry for five minutes, every fifteen seconds
+ }
+
+ /**
+ * Registers a new adaptor. Makes no guarantee about success. On failure,
+ * to connect to server, will retry <code>numRetries</code> times, every
+ * <code>retryInterval</code> milliseconds.
+ * @return the id number of the adaptor, generated by the agent
+ */
+ public long add(String adaptorName, String type, String params, long offset, long numRetries, long retryInterval){
+ ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(adaptorName, type, params, offset);
+ long adaptorID = -1;
+ if (numRetries >= 0){
+ try{
+ adaptorID = adaptor.register();
+
+ if (adaptorID > 0){
+ runningAdaptors.put(adaptorID,adaptor);
+ }
+ else{
+ System.err.println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
+ }
+ }catch(IOException ioe){
+ System.out.println("AgentClient failed to contact the agent (" + hostname + ":" + portno + ")");
+ System.out.println("Scheduling a agent connection retry for adaptor add() in another " +
+ retryInterval + " milliseconds, " + numRetries + " retries remaining");
+
+ addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params, offset, numRetries-1, retryInterval), retryInterval);
+ }
+ }else{
+ System.err.println("Giving up on connecting to the local agent");
+ }
+ return adaptorID;
+ }
+
+ public synchronized ChukwaAgentController.Adaptor remove(long adaptorID) throws IOException
+ {
+ syncWithAgent();
+ ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
+ a.unregister();
+ return a;
+
+ }
+
+ public void remove(String className, String appType, String filename) throws IOException
+ {
+ syncWithAgent();
+ // search for FileTail adaptor with string of this file name
+ // get its id, tell it to unregister itself with the agent,
+ // then remove it from the list of adaptors
+ for (Adaptor a : runningAdaptors.values()){
+ if (a.name.equals(className) && a.params.equals(filename) && a.appType.equals(appType)){
+ remove(a.id);
+ }
+ }
+ }
+
+
+ public void removeAll(){
+ syncWithAgent();
+ Long[] keyset = runningAdaptors.keySet().toArray(new Long[]{});
+
+ for (long id : keyset){
+ try {
+ remove(id);
+ }catch(IOException ioe){
+ System.err.println("Error removing an adaptor in removeAll()");
+ ioe.printStackTrace();
+ }
+ System.out.println("Successfully removed adaptor " + id);
+ }
+ }
+
+ Map<Long,ChukwaAgentController.Adaptor> list() throws IOException
+ {
+ Socket s = new Socket(hostname, portno);
+ PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
+
+ bw.println("LIST");
+ bw.flush();
+ BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+ String ln;
+ Map<Long,Adaptor> listResult = new HashMap<Long,Adaptor>();
+ while((ln = br.readLine())!= null)
+ {
+ if (ln.equals("")){
+ break;
+ }else{
+ String[] parts = ln.split("\\s+");
+ if (parts.length >= 4){ //should have id, className appType, params, offset
+ long id = Long.parseLong(parts[0].substring(0,parts[0].length()-1)); //chop off the right-paren
+ long offset = Long.parseLong(parts[parts.length-1]);
+ String tmpParams = parts[3];
+ for (int i = 4; i<parts.length-1; i++){
+ tmpParams += " " + parts[i];
+ }
+ listResult.put(id, new Adaptor(id,parts[1],parts[2],tmpParams, offset));
+ }
+ }
+ }
+ s.close();
+ return listResult;
+ }
+
+ //************************************************************************
+ // The following functions are convenience functions, defining an easy
+ // to use API for application developers to integrate chukwa into their app
+ //************************************************************************
+
+ /**
+ * Registers a new "LineFileTailUTF8" adaptor and starts it at offset 0. Checks to
+ * see if the file is being watched already, if so, won't register another adaptor
+ * with the agent. If you have run the tail adaptor on this file before and rotated
+ * or emptied the file you should use {@link ChukwaAgentController#pauseFile(String, String)}
+ * and {@link ChukwaAgentController#resumeFile(String, String)} which will store the adaptors
+ * metadata and re-use them to pick up where it left off.
+ * @param type the datatype associated with the file to pass through
+ * @param filename of the file for the tail adaptor to start monitoring
+ * @return the id number of the adaptor, generated by the agent
+ */
+ public long addFile(String appType, String filename, long numRetries, long retryInterval)
+ {
+ filename = new File(filename).getAbsolutePath();
+ //TODO: Mabye we want to check to see if the file exists here?
+ // Probably not because they might be talking to an agent on a different machine?
+
+ //check to see if this file is being watched already, if yes don't set up another adaptor for it
+ boolean isDuplicate = false;
+ for (Adaptor a : runningAdaptors.values()){
+ if (a.name.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType) && a.params.endsWith(filename)){
+ isDuplicate = true;
+ }
+ }
+ if (!isDuplicate){
+ return add(DEFAULT_FILE_TAILER, appType, 0L + " " + filename,0L, numRetries, retryInterval);
+ }
+ else{
+ System.out.println("An adaptor for filename \"" + filename + "\", type \""
+ + appType + "\", exists already, addFile() command aborted");
+ return -1;
+ }
+ }
+
+ public long addFile(String appType, String filename){
+ return addFile(appType, filename, 0, 0);
+ }
+
+ /**
+ * Pause all active adaptors of the default file tailing type who are tailing this file
+ * This means we actually stop the adaptor and it goes away forever, but we store it
+ * state so that we can re-launch a new adaptor with the same state later.
+ * @param appType
+ * @param filename
+ * @return array of adaptorID numbers which have been created and assigned the state of the formerly paused adaptors
+ * @throws IOException
+ */
+ public Collection<Long> pauseFile(String appType, String filename) throws IOException{
+ syncWithAgent();
+ //store the unique streamid of the file we are pausing.
+ //search the list of adaptors for this filename
+ //store the current offset for it
+ List<Long> results = new ArrayList<Long>();
+ for (Adaptor a : runningAdaptors.values()){
+ if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+ pausedAdaptors.put(a.id,a); //add it to our list of paused adaptors
+ remove(a.id); //tell the agent to remove/unregister it
+ results.add(a.id);
+ }
+ }
+ return results;
+ }
+
+ public boolean isFilePaused(String appType, String filename){
+ for (Adaptor a : pausedAdaptors.values()){
+ if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Resume all adaptors for this filename that have been paused
+ * @param appType the appType
+ * @param filename filename by which to lookup adaptors which are paused (and tailing this file)
+ * @return an array of the new adaptor ID numbers which have resumed where the old adaptors left off
+ * @throws IOException
+ */
+ public Collection<Long> resumeFile(String appType, String filename) throws IOException{
+ syncWithAgent();
+ //search for a record of this paused file
+ List<Long> results = new ArrayList<Long>();
+ for (Adaptor a : pausedAdaptors.values()){
+ if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+ long newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " " + filename, a.offset);
+ pausedAdaptors.remove(a.id);
+ a.id = newID;
+ results.add(a.id);
+ }
+ }
+ return results;
+ }
+
+
+ public void removeFile(String appType, String filename) throws IOException
+ {
+ syncWithAgent();
+ // search for FileTail adaptor with string of this file name
+ // get its id, tell it to unregister itself with the agent,
+ // then remove it from the list of adaptors
+ for (Adaptor a : runningAdaptors.values()){
+ if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+ remove(a.id);
+ }
+ }
+ }
+
+ //************************************************************************
+ // command line utilities
+ //************************************************************************
+
+ public static void main(String[] args)
+ {
+ ChukwaAgentController c = getClient(args);
+ if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")){
+ doAddFile(c, args[1], args[2]);
+ }
+ else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")){
+ doRemoveFile(c, args[1], args[2]);
+ }
+ else if(numArgs >= 1 && args[0].toLowerCase().equals("list")){
+ doList(c);
+ }
+ else if(numArgs >= 1 && args[0].equalsIgnoreCase("removeall")){
+ doRemoveAll(c);
+ }
+ else{
+ System.err.println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
+ System.err.println(" ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
+ System.err.println(" ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
+ System.err.println(" ChukwaClient list [IP] [port]");
+ System.err.println(" ChukwaClient removeAll [IP] [port]");
+ }
+ }
+
+ private static ChukwaAgentController getClient(String[] args){
+ int portno = 9093;
+ String hostname = "localhost";
+
+ numArgs = args.length;
+
+ for (int i = 0; i < args.length; i++){
+ if(args[i].equals("-h") && args.length > i + 1){
+ hostname = args[i+1];
+ System.out.println ("Setting hostname to: " + hostname);
+ numArgs -= 2; //subtract for the flag and value
+ }
+ else if (args[i].equals("-p") && args.length > i + 1){
+ portno = Integer.parseInt(args[i+1]);
+ System.out.println ("Setting portno to: " + portno);
+ numArgs -= 2; //subtract for the flat, i.e. -p, and value
+ }
+ }
+ return new ChukwaAgentController(hostname, portno);
+ }
+
+ private static long doAddFile(ChukwaAgentController c, String appType, String params){
+ System.out.println("Adding adaptor with filename: " + params);
+ long adaptorID = c.addFile(appType, params);
+ if (adaptorID != -1){
+ System.out.println("Successfully added adaptor, id is:" + adaptorID);
+ }else{
+ System.err.println("Agent reported failure to add adaptor, adaptor id returned was:" + adaptorID);
+ }
+ return adaptorID;
+ }
+
+ private static void doRemoveFile(ChukwaAgentController c, String appType, String params){
+ try{
+ System.out.println("Removing adaptor with filename: " + params);
+ c.removeFile(appType,params);
+ }
+ catch(IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private static void doList(ChukwaAgentController c){
+ try{
+ Iterator<Adaptor> adptrs = c.list().values().iterator();
+ while (adptrs.hasNext()){
+ System.out.println(adptrs.next().toString());
+ }
+ } catch(Exception e){
+ e.printStackTrace();
+ }
+ }
+
+ private static void doRemoveAll(ChukwaAgentController c){
+ System.out.println("Removing all adaptors");
+ c.removeAll();
+ }
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.protocol;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+
+public interface Protocol
+{
+ public byte[] toByteArray(List<Chunk> chunks);
+ public List<Chunk> parseFrom(byte[] bytes);
+
+ void writeTo(OutputStream output);
+ List<Chunk> parseFrom(InputStream input);
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.sender;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpException;
+import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.HttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.log4j.Logger;
+
+/**
+ * Encapsulates all of the http setup and connection details needed for
+ * chunks to be delivered to a collector.
+ * <p>
+ * On error, tries the list of available collectors, pauses for a minute, and then repeats.
+ * </p>
+ * <p> Will wait forever for collectors to come up. </p>
+ */
+public class ChukwaHttpSender implements ChukwaSender{
+ static final int MAX_RETRIES_PER_COLLECTOR = 4; //fast retries, in http client
+ static final int SENDER_RETRIES = 3;
+ static final int WAIT_FOR_COLLECTOR_REBOOT = 20 * 1000;
+ //FIXME: this should really correspond to the timer in RetryListOfCollectors
+
+ static Logger log = Logger.getLogger(ChukwaHttpSender.class);
+ static HttpClient client = null;
+ static MultiThreadedHttpConnectionManager connectionManager = null;
+ static String currCollector = null;
+
+
+ protected Iterator<String> collectors;
+
+ static
+ {
+ connectionManager =
+ new MultiThreadedHttpConnectionManager();
+ client = new HttpClient(connectionManager);
+ connectionManager.closeIdleConnections(1000);
+ }
+
+ public static class CommitListEntry {
+ public Adaptor adaptor;
+ public long uuid;
+
+ public CommitListEntry(Adaptor a, long uuid) {
+ adaptor = a;
+ this.uuid = uuid;
+ }
+ }
+
+//FIXME: probably we're better off with an EventListRequestEntity
+ static class BuffersRequestEntity implements RequestEntity {
+ List<DataOutputBuffer> buffers;
+
+ public BuffersRequestEntity(List<DataOutputBuffer> buf) {
+ buffers=buf;
+ }
+
+ public long getContentLength() {
+ long len=4;//first we send post length, then buffers
+ for(DataOutputBuffer b: buffers)
+ len += b.getLength();
+ return len;
+ }
+
+ public String getContentType() {
+ return "application/octet-stream";
+ }
+
+ public boolean isRepeatable() {
+ return true;
+ }
+
+ public void writeRequest(OutputStream out) throws IOException {
+ DataOutputStream dos = new DataOutputStream(out);
+ dos.writeInt(buffers.size());
+ for(DataOutputBuffer b: buffers)
+ dos.write(b.getData(), 0, b.getLength());
+ }
+ }
+
+ public ChukwaHttpSender(){
+ //setup default collector
+ ArrayList<String> tmp = new ArrayList<String>();
+ this.collectors = tmp.iterator();
+ currCollector = "http://localhost:8080";
+ log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: " + collectors.hasNext());
+
+ }
+
+ /**
+ * Set up a single connector for this client to send {@link Chunk}s to
+ * @param collector the url of the collector
+ */
+ public void setCollectors(String collector){
+ }
+
+ /**
+ * Set up a list of connectors for this client to send {@link Chunk}s to
+ * @param collectors
+ */
+ public void setCollectors(Iterator<String> collectors){
+ this.collectors = collectors;
+ //setup a new destination from our list of collectors if one hasn't been set up
+ if (currCollector == null){
+ if (collectors.hasNext()){
+ currCollector = collectors.next();
+ }
+ else
+ log.error("No collectors to try in send(), not even trying to do doPost()");
+ }
+ }
+
+
+ /**
+ * grab all of the chunks currently in the chunkQueue, stores a copy of them locally, calculates
+ * their size, sets them up
+ * @return array of chunk id's which were ACKed by collector
+ */
+ public List<CommitListEntry> send(List<Chunk> toSend) throws InterruptedException, IOException{
+ List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
+ List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
+
+ log.info("collected " + toSend.size() + " chunks");
+
+ //Serialize each chunk in turn into it's own DataOutputBuffer and add that buffer to serializedEvents
+ for(Chunk c: toSend) {
+ DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
+ try {
+ c.write(b);
+ }catch(IOException err) {
+ log.error("serialization threw IOException", err);
+ }
+ serializedEvents.add(b);
+ //store a CLE for this chunk which we will use to ack this chunk to the caller of send()
+ //(e.g. the agent will use the list of CLE's for checkpointing)
+ commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID()));
+ }
+ toSend.clear();
+
+ //collect all serialized chunks into a single buffer to send
+ RequestEntity postData = new BuffersRequestEntity(serializedEvents);
+
+
+ int retries = SENDER_RETRIES;
+ while(currCollector != null)
+ {
+ //need to pick a destination here
+ PostMethod method = new PostMethod();
+ try {
+ doPost(method, postData, currCollector);
+
+ retries = SENDER_RETRIES; //reset count on success
+ //if no exception was thrown from doPost, ACK that these chunks were sent
+ return commitResults;
+ } catch (Throwable e) {
+ log.error("Http post exception", e);
+ log.info("Checking list of collectors to see if another collector has been specified for rollover");
+ if (collectors.hasNext()){
+ currCollector = collectors.next();
+ log.info("Found a new collector to roll over to, retrying HTTP Post to collector " + currCollector);
+ } else {
+ if(retries > 0) {
+ log.warn("No more collectors to try rolling over to; waiting " + WAIT_FOR_COLLECTOR_REBOOT +
+ " ms (" + retries + "retries left)");
+ Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
+ retries --;
+ } else {
+ log.error("No more collectors to try rolling over to; aborting");
+ throw new IOException("no collectors");
+ }
+ }
+ }
+ finally {
+ // be sure the connection is released back to the connection manager
+ method.releaseConnection();
+ }
+ } //end retry loop
+ return new ArrayList<CommitListEntry>();
+ }
+
+ /**
+ * Handles the HTTP post. Throws HttpException on failure
+ */
+ @SuppressWarnings("deprecation")
+ private void doPost(PostMethod method, RequestEntity data, String dest)
+ throws IOException, HttpException
+ {
+
+ HttpMethodParams pars = method.getParams();
+ pars.setParameter (HttpMethodParams.RETRY_HANDLER, (Object) new HttpMethodRetryHandler()
+ {
+ public boolean retryMethod(HttpMethod m, IOException e, int exec)
+ {
+ return !(e instanceof java.net.ConnectException) && (exec < MAX_RETRIES_PER_COLLECTOR);
+ }
+ });
+ method.setParams(pars);
+ method.setPath(dest);
+
+ //send it across the network
+ method.setRequestEntity(data);
+
+ log.info("HTTP post to " + dest+" length = "+ data.getContentLength());
+ // Send POST request
+
+ client.setTimeout(8000);
+ int statusCode = client.executeMethod(method);
+
+ if (statusCode != HttpStatus.SC_OK) {
+ log.error("HTTP post response statusCode: " +statusCode + ", statusLine: " + method.getStatusLine());
+ //do something aggressive here
+ throw new HttpException("got back a failure from server");
+ }
+ //implicitly "else"
+ log.info("got success back from the remote collector; response length "+ method.getResponseContentLength());
+
+ //FIXME: should parse acks here
+ InputStream rstream = null;
+
+ // Get the response body
+ byte[] resp_buf = method.getResponseBody();
+ rstream = new ByteArrayInputStream(resp_buf);
+ BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
+ String line;
+ while ((line = br.readLine()) != null) {
+ System.out.println("response: " + line);
+ }
+ }
+
+ public static void main(String[] argv) throws InterruptedException{
+ //HttpConnectorClient cc = new HttpConnectorClient();
+ //do something smarter than to hide record headaches, like force them to create and add records to a chunk
+ //cc.addChunk("test-source", "test-streamName", "test-application", "test-dataType", new byte[]{1,2,3,4,5});
+ }
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,25 @@
+package org.apache.hadoop.chukwa.datacollection.sender;
+
+/**
+ * Encapsulates all of the communication overhead needed for chunks to be delivered
+ * to a collector.
+ */
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
+
+public interface ChukwaSender {
+
+ /**
+ *
+ * @param chunksToSend a list of chunks to commit
+ * @return the list of committed chunks
+ * @throws InterruptedException if interrupted while trying to send
+ */
+ public List<CommitListEntry> send(List<Chunk> chunksToSend) throws InterruptedException, java.io.IOException;
+
+ public void setCollectors(Iterator<String> collectors);
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.sender;
+
+import java.io.*;
+import java.net.URL;
+import java.util.*;
+
+/***
+ * An iterator returning a list of Collectors to try.
+ * This class is nondeterministic, since it puts collectors back on the list after some period.
+ *
+ * No node will be polled more than once per maxRetryRateMs milliseconds. hasNext() will continue return
+ * true if you have not called it recently.
+ *
+ */
+public class RetryListOfCollectors implements Iterator<String> {
+
+ int maxRetryRateMs;
+ List<String> collectors;
+ long lastLookAtFirstNode;
+ int nextCollector=0;
+
+
+ public RetryListOfCollectors(File collectorFile, int maxRetryRateMs) throws IOException {
+ this.maxRetryRateMs = maxRetryRateMs;
+ lastLookAtFirstNode = 0;
+ collectors = new ArrayList<String>();
+
+ try{
+ BufferedReader br = new BufferedReader(new FileReader(collectorFile));
+ String line;
+ while((line = br.readLine()) != null) {
+ if(!line.contains("://")) //no protocol, assume http
+ collectors.add("http://"+line);
+ else
+ collectors.add(line);
+ }
+ br.close();
+ }catch(FileNotFoundException e){
+ System.err.println("Error in RetryListOfCollectors() opening file conf/connectors file from agent, double check that you have set the CHUKWA_HOME environment variable. Also, ensure file exists and is in classpath");
+ }catch(IOException e){
+ System.err.println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
+ throw e;
+ }
+ shuffleList();
+ }
+
+ public RetryListOfCollectors(final List<String> collectors, int maxRetryRateMs) {
+ this.maxRetryRateMs = maxRetryRateMs;
+ lastLookAtFirstNode = 0;
+ this.collectors = new ArrayList<String>();
+ this.collectors.addAll(collectors);
+ shuffleList();
+ }
+
+ //for now, use a simple O(n^2) algorithm.
+ //safe, because we only do this once, and on smalls list
+ private void shuffleList() {
+ ArrayList<String> newList = new ArrayList<String>();
+ Random r = new java.util.Random();
+ while(!collectors.isEmpty()) {
+ int toRemove = r.nextInt(collectors.size());
+ String next = collectors.remove(toRemove);
+ newList.add(next);
+ }
+ collectors = newList;
+ }
+
+ public boolean hasNext() {
+ return collectors.size() > 0 &&
+ ( (nextCollector != 0) ||
+ (System.currentTimeMillis() - lastLookAtFirstNode > maxRetryRateMs ));
+ }
+
+ public String next() {
+ if(hasNext()) {
+ int currCollector = nextCollector;
+ nextCollector = (nextCollector +1)% collectors.size();
+ if(currCollector == 0)
+ lastLookAtFirstNode = System.currentTimeMillis();
+ return collectors.get(currCollector);
+ }
+ else
+ return null;
+ }
+
+ public String getRandomCollector(){
+ return collectors.get( (int)java.lang.Math.random() * collectors.size());
+ }
+
+ public void add(URL collector){
+ collectors.add(collector.toString());
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ //FIXME: maybe just remove a collector from our list and then
+ //FIXME: make sure next doesn't break (i.e. reset nextCollector if necessary)
+ }
+
+ /**
+ *
+ * @return total number of collectors in list
+ */
+ int total() {
+ return collectors.size();
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.test;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.*;
+import org.apache.hadoop.chukwa.datacollection.agent.*;
+import org.apache.hadoop.chukwa.datacollection.connector.Connector;
+
+import java.util.*;
+
+/**
+ * Output events to stdout.
+ * Intended for debugging use.
+ *
+ */
+public class ConsoleOutConnector extends Thread implements Connector {
+
+ final ChukwaAgent agent;
+ volatile boolean shutdown;
+ final boolean silent;
+
+
+ public ConsoleOutConnector(ChukwaAgent a) {
+ this(a, false);
+ }
+
+ public ConsoleOutConnector(ChukwaAgent a, boolean silent)
+ {
+ agent = a;
+ this.silent = silent;
+ }
+
+ public void run()
+ {
+ try{
+ System.out.println("console connector started");
+ ChunkQueue eventQueue = DataFactory.getInstance().getEventQueue();
+ if(!silent)
+ System.out.println("-------------------");
+
+ while(!shutdown)
+ {
+ List<Chunk> evts = new ArrayList<Chunk>();
+ eventQueue.collect(evts, 1);
+
+ for(Chunk e: evts)
+ {
+ if(!silent) {
+ System.out.println("Console out connector got event at offset " + e.getSeqID());
+ System.out.println("data type was " + e.getDataType());
+ if(e.getData().length > 1000)
+ System.out.println("data length was " + e.getData().length+ ", not printing");
+ else
+ System.out.println(new String(e.getData()));
+ }
+
+ agent.reportCommit(e.getInitiator(), e.getSeqID());
+
+ if(!silent)
+ System.out.println("-------------------");
+ }
+ }
+ }
+ catch(InterruptedException e)
+ {} //thread is about to exit anyway
+ }
+
+ public void shutdown()
+ {
+ shutdown = true;
+ this.interrupt();
+ }
+
+}
|