Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,81 @@ +package org.apache.hadoop.chukwa.extraction.demux.processor.reducer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; + +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; +import org.apache.hadoop.chukwa.extraction.engine.Record; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +public class MRJobReduceProcessor implements ReduceProcessor +{ + static Logger log = Logger.getLogger(MRJobReduceProcessor.class); + @Override + public String getDataType() + { + return MRJobReduceProcessor.class.getName(); + } + + @Override + public void process(ChukwaRecordKey key, Iterator values, + OutputCollector output, + Reporter reporter) + { + try + { + HashMap data = new HashMap(); + + ChukwaRecord record = null; + String[] fields = null; + while(values.hasNext()) + { + record = values.next(); + fields = record.getFields(); + for(String field: fields) + { + data.put(field, record.getValue(field)); + } + } + + //Extract initial time: SUBMIT_TIME + long initTime = Long.parseLong(data.get("SUBMIT_TIME")); + + // Extract HodId + // maybe use a regex to extract this and load it from configuration + // JOBCONF="/user/xxx/mapredsystem/563976.xxx.yyy.com/job_200809062051_0001/job.xml" + String jobConf = data.get("JOBCONF"); + int idx = jobConf.indexOf("mapredsystem/"); + idx += 13; + int idx2 = jobConf.indexOf(".", idx); + data.put("HodId", jobConf.substring(idx, idx2)); + + ChukwaRecordKey newKey = new ChukwaRecordKey(); + newKey.setKey(""+initTime); + newKey.setReduceType("MRJob"); + + ChukwaRecord newRecord = new ChukwaRecord(); + newRecord.add(Record.tagsField, record.getValue(Record.tagsField)); + newRecord.setTime(initTime); + newRecord.add(Record.tagsField, record.getValue(Record.tagsField)); + Iterator it = data.keySet().iterator(); + while(it.hasNext()) + { + String field = it.next(); + newRecord.add(field, data.get(field)); + } + + output.collect(newKey, newRecord); + } + catch (IOException e) + { + log.warn("Unable to collect output in JobLogHistoryReduceProcessor [" + key + "]", e); + e.printStackTrace(); + } + + } + +} Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.demux.processor.reducer; + +import java.util.Iterator; + +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +public interface ReduceProcessor +{ + public String getDataType(); + public void process(ChukwaRecordKey key,Iterator values, + OutputCollector output, Reporter reporter); +} Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.demux.processor.reducer; + +import java.util.HashMap; + +import org.apache.log4j.Logger; + + + +public class ReduceProcessorFactory +{ + static Logger log = Logger.getLogger(ReduceProcessorFactory.class); + + // TODO + // add new mapper package at the end. + // We should have a more generic way to do this. + // Ex: read from config + // list of alias + // and + // alias -> processor class + + // ******** WARNING ******** + // If the ReduceProcessor is not there use Identity instead + + + private static HashMap processors = + new HashMap(); // registry + + private ReduceProcessorFactory() + {} + + public static ReduceProcessor getProcessor(String reduceType) + throws UnknownReduceTypeException + { + String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."+reduceType; + if (processors.containsKey(reduceType)) { + return processors.get(reduceType); + } else { + ReduceProcessor processor = null; + try { + processor = (ReduceProcessor)Class.forName(path).getConstructor().newInstance(); + } + catch(ClassNotFoundException e) + { + // ******** WARNING ******** + // If the ReduceProcessor is not there use Identity instead + processor = getProcessor("IdentityReducer"); + register(reduceType,processor); + return processor; + } + catch(Exception e) { + throw new UnknownReduceTypeException("error constructing processor", e); + } + + //TODO using a ThreadSafe/reuse flag to actually decide if we want + // to reuse the same processor again and again + register(reduceType,processor); + return processor; + } + } + + /** Register a specific parser for a {@link ReduceProcessor} + * implementation. */ + public static synchronized void register(String reduceType, + ReduceProcessor processor) + { + log.info("register " + processor.getClass().getName() + " for this recordType :" + reduceType); + if (processors.containsKey(reduceType)) + { + throw new DuplicateReduceProcessorException("Duplicate processor for recordType:" + reduceType); + } + ReduceProcessorFactory.processors.put(reduceType, processor); + } + +} Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.demux.processor.reducer; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +public class SystemMetrics implements ReduceProcessor { + static Logger log = Logger.getLogger(SystemMetrics.class); + @Override + public String getDataType() { + return this.getClass().getName(); + } + + @Override + public void process(ChukwaRecordKey key, + Iterator values, + OutputCollector output, + Reporter reporter) { + try { + + ChukwaRecord record = null; + ChukwaRecord newRecord = new ChukwaRecord(); + + while(values.hasNext()) { + record = values.next(); + newRecord.setTime(record.getTime()); + + if(record.containsField("IFACE")) { + if(record.containsField("rxpck/s")) { + if (record.containsField("rxbyt/s") && record.containsField("txbyt/s")) { + double netBusyPcnt=0, netRxByts=0, netTxByts=0, netSpeed=128000000.00; + netRxByts=Double.parseDouble(record.getValue("rxbyt/s")); + netTxByts=Double.parseDouble(record.getValue("txbyt/s")); + netBusyPcnt=(netRxByts/netSpeed*100)+(netTxByts/netSpeed*100); + record.add(record.getValue("IFACE")+"_busy_pcnt", "" + netBusyPcnt); + record.add("csource", record.getValue("csource")); + } + record.add(record.getValue("IFACE")+".rxbyt/s", record.getValue("rxbyt/s")); + record.add(record.getValue("IFACE")+".rxpck/s", record.getValue("rxpck/s")); + record.add(record.getValue("IFACE")+".txbyt/s", record.getValue("txbyt/s")); + record.add(record.getValue("IFACE")+".txpck/s", record.getValue("txpck/s")); + record.removeValue("rxbyt/s"); + record.removeValue("rxpck/s"); + record.removeValue("txbyt/s"); + record.removeValue("txpck/s"); + } + if(record.containsField("rxerr/s")) { + record.add(record.getValue("IFACE")+".rxerr/s", record.getValue("rxerr/s")); + record.add(record.getValue("IFACE")+".rxdrop/s", record.getValue("rxdrop/s")); + record.add(record.getValue("IFACE")+".txerr/s", record.getValue("txerr/s")); + record.add(record.getValue("IFACE")+".txdrop/s", record.getValue("txdrop/s")); + record.removeValue("rxerr/s"); + record.removeValue("rxdrop/s"); + record.removeValue("txerr/s"); + record.removeValue("txdrop/s"); + } + record.removeValue("IFACE"); + } + + if(record.containsField("Device:")) { + record.add(record.getValue("Device:")+".rkB/s", record.getValue("rkB/s")); + record.add(record.getValue("Device:")+".wkB/s", record.getValue("wkB/s")); + record.add(record.getValue("Device:")+".%util", record.getValue("%util")); + record.removeValue("rkB/s"); + record.removeValue("wkB/s"); + record.removeValue("%util"); + record.removeValue("Device:"); + } + + if (record.containsField("swap_free")) { + float swapUsedPcnt=0, swapUsed=0, swapTotal=0; + swapUsed=Long.parseLong(record.getValue("swap_used")); + swapTotal=Long.parseLong(record.getValue("swap_total")); + swapUsedPcnt=swapUsed/swapTotal*100; + record.add("swap_used_pcnt", "" + swapUsedPcnt); + record.add("csource", record.getValue("csource")); + } + + if (record.containsField("mem_used")) { + double memUsedPcnt=0, memTotal=0, memUsed=0; + memTotal=Double.parseDouble(record.getValue("mem_total")); + memUsed=Double.parseDouble(record.getValue("mem_used")); + memUsedPcnt=memUsed/memTotal*100; + record.add("mem_used_pcnt", "" + memUsedPcnt); + record.add("csource", record.getValue("csource")); + } + + if (record.containsField("mem_buffers")) { + double memBuffersPcnt=0, memTotal=0, memBuffers=0; + memTotal=Double.parseDouble(record.getValue("mem_total")); + memBuffers=Double.parseDouble(record.getValue("mem_buffers")); + memBuffersPcnt=memBuffers/memTotal*100; + record.add("mem_buffers_pcnt", "" + memBuffersPcnt); + record.add("csource", record.getValue("csource")); + } + + // Copy over all fields + String[] fields = record.getFields(); + for(String f: fields){ + newRecord.add(f, record.getValue(f)); + } + } + record.add("capp", "systemMetrics"); + output.collect(key, newRecord); + } catch (IOException e) { + log.warn("Unable to collect output in SystemMetricsReduceProcessor [" + key + "]", e); + e.printStackTrace(); + } + + } +} Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.demux.processor.reducer; + +public class UnknownReduceTypeException extends Exception +{ + + /** + * + */ + private static final long serialVersionUID = 5760553864088487836L; + + public UnknownReduceTypeException() + { + } + + public UnknownReduceTypeException(String message) + { + super(message); + } + + public UnknownReduceTypeException(Throwable cause) + { + super(cause); + } + + public UnknownReduceTypeException(String message, Throwable cause) + { + super(message, cause); + } + +} Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java Fri Dec 5 12:30:14 2008 @@ -67,6 +67,12 @@ return this.mapFields.containsKey(field); } + public void removeValue(String field) { + if(this.mapFields.containsKey(field)) { + this.mapFields.remove(field); + } + } + @Override public String toString() { @@ -76,36 +82,37 @@ Map.Entry entry = null; StringBuilder sb = new StringBuilder(); sb.append(""); } + + } - sb.append(">").append(body); + if (hasBody) + { sb.append(">").append(bodyVal);} + else + { sb.append(">").append(body);} sb.append(""); return sb.toString(); -// //body -// return "" + this.getValue(Record.bodyField) + "" ; } Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,212 @@ +// File generated by hadoop record compiler. Do not edit. +package org.apache.hadoop.chukwa.extraction.engine; + +public class ChukwaRecordKey extends org.apache.hadoop.record.Record { + private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo; + private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter; + private static int[] _rio_rtiFilterFields; + static { + _rio_recTypeInfo = new org.apache.hadoop.record.meta.RecordTypeInfo("ChukwaRecordKey"); + _rio_recTypeInfo.addField("reduceType", org.apache.hadoop.record.meta.TypeID.StringTypeID); + _rio_recTypeInfo.addField("key", org.apache.hadoop.record.meta.TypeID.StringTypeID); + } + + private String reduceType; + private String key; + public ChukwaRecordKey() { } + public ChukwaRecordKey( + final String reduceType, + final String key) { + this.reduceType = reduceType; + this.key = key; + } + public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() { + return _rio_recTypeInfo; + } + public static void setTypeFilter(org.apache.hadoop.record.meta.RecordTypeInfo rti) { + if (null == rti) return; + _rio_rtiFilter = rti; + _rio_rtiFilterFields = null; + } + private static void setupRtiFields() + { + if (null == _rio_rtiFilter) return; + // we may already have done this + if (null != _rio_rtiFilterFields) return; + int _rio_i, _rio_j; + _rio_rtiFilterFields = new int [_rio_rtiFilter.getFieldTypeInfos().size()]; + for (_rio_i=0; _rio_i<_rio_rtiFilterFields.length; _rio_i++) { + _rio_rtiFilterFields[_rio_i] = 0; + } + java.util.Iterator _rio_itFilter = _rio_rtiFilter.getFieldTypeInfos().iterator(); + _rio_i=0; + while (_rio_itFilter.hasNext()) { + org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfoFilter = _rio_itFilter.next(); + java.util.Iterator _rio_it = _rio_recTypeInfo.getFieldTypeInfos().iterator(); + _rio_j=1; + while (_rio_it.hasNext()) { + org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfo = _rio_it.next(); + if (_rio_tInfo.equals(_rio_tInfoFilter)) { + _rio_rtiFilterFields[_rio_i] = _rio_j; + break; + } + _rio_j++; + } + _rio_i++; + } + } + public String getReduceType() { + return reduceType; + } + public void setReduceType(final String reduceType) { + this.reduceType=reduceType; + } + public String getKey() { + return key; + } + public void setKey(final String key) { + this.key=key; + } + public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a, final String _rio_tag) + throws java.io.IOException { + _rio_a.startRecord(this,_rio_tag); + _rio_a.writeString(reduceType,"reduceType"); + _rio_a.writeString(key,"key"); + _rio_a.endRecord(this,_rio_tag); + } + private void deserializeWithoutFilter(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag) + throws java.io.IOException { + _rio_a.startRecord(_rio_tag); + reduceType=_rio_a.readString("reduceType"); + key=_rio_a.readString("key"); + _rio_a.endRecord(_rio_tag); + } + public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag) + throws java.io.IOException { + if (null == _rio_rtiFilter) { + deserializeWithoutFilter(_rio_a, _rio_tag); + return; + } + // if we're here, we need to read based on version info + _rio_a.startRecord(_rio_tag); + setupRtiFields(); + for (int _rio_i=0; _rio_i<_rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) { + if (1 == _rio_rtiFilterFields[_rio_i]) { + reduceType=_rio_a.readString("reduceType"); + } + else if (2 == _rio_rtiFilterFields[_rio_i]) { + key=_rio_a.readString("key"); + } + else { + java.util.ArrayList typeInfos = (java.util.ArrayList)(_rio_rtiFilter.getFieldTypeInfos()); + org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i).getFieldID(), typeInfos.get(_rio_i).getTypeID()); + } + } + _rio_a.endRecord(_rio_tag); + } + public int compareTo (final Object _rio_peer_) throws ClassCastException { + if (!(_rio_peer_ instanceof ChukwaRecordKey)) { + throw new ClassCastException("Comparing different types of records."); + } + ChukwaRecordKey _rio_peer = (ChukwaRecordKey) _rio_peer_; + int _rio_ret = 0; + _rio_ret = reduceType.compareTo(_rio_peer.reduceType); + if (_rio_ret != 0) return _rio_ret; + _rio_ret = key.compareTo(_rio_peer.key); + if (_rio_ret != 0) return _rio_ret; + return _rio_ret; + } + public boolean equals(final Object _rio_peer_) { + if (!(_rio_peer_ instanceof ChukwaRecordKey)) { + return false; + } + if (_rio_peer_ == this) { + return true; + } + ChukwaRecordKey _rio_peer = (ChukwaRecordKey) _rio_peer_; + boolean _rio_ret = false; + _rio_ret = reduceType.equals(_rio_peer.reduceType); + if (!_rio_ret) return _rio_ret; + _rio_ret = key.equals(_rio_peer.key); + if (!_rio_ret) return _rio_ret; + return _rio_ret; + } + public Object clone() throws CloneNotSupportedException { + ChukwaRecordKey _rio_other = new ChukwaRecordKey(); + _rio_other.reduceType = this.reduceType; + _rio_other.key = this.key; + return _rio_other; + } + public int hashCode() { + int _rio_result = 17; + int _rio_ret; + _rio_ret = reduceType.hashCode(); + _rio_result = 37*_rio_result + _rio_ret; + _rio_ret = key.hashCode(); + _rio_result = 37*_rio_result + _rio_ret; + return _rio_result; + } + public static String signature() { + return "LChukwaRecordKey(ss)"; + } + public static class Comparator extends org.apache.hadoop.record.RecordComparator { + public Comparator() { + super(ChukwaRecordKey.class); + } + static public int slurpRaw(byte[] b, int s, int l) { + try { + int os = s; + { + int i = org.apache.hadoop.record.Utils.readVInt(b, s); + int z = org.apache.hadoop.record.Utils.getVIntSize(i); + s+=(z+i); l-= (z+i); + } + { + int i = org.apache.hadoop.record.Utils.readVInt(b, s); + int z = org.apache.hadoop.record.Utils.getVIntSize(i); + s+=(z+i); l-= (z+i); + } + return (os - s); + } catch(java.io.IOException e) { + throw new RuntimeException(e); + } + } + static public int compareRaw(byte[] b1, int s1, int l1, + byte[] b2, int s2, int l2) { + try { + int os1 = s1; + { + int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1); + int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2); + int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1); + int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2); + s1+=z1; s2+=z2; l1-=z1; l2-=z2; + int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2); + if (r1 != 0) { return (r1<0)?-1:0; } + s1+=i1; s2+=i2; l1-=i1; l1-=i2; + } + { + int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1); + int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2); + int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1); + int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2); + s1+=z1; s2+=z2; l1-=z1; l2-=z2; + int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2); + if (r1 != 0) { return (r1<0)?-1:0; } + s1+=i1; s2+=i2; l1-=i1; l1-=i2; + } + return (os1 - s1); + } catch(java.io.IOException e) { + throw new RuntimeException(e); + } + } + public int compare(byte[] b1, int s1, int l1, + byte[] b2, int s2, int l2) { + int ret = compareRaw(b1,s1,l1,b2,s2,l2); + return (ret == -1)? -1 : ((ret==0)? 1 : 0);} + } + + static { + org.apache.hadoop.record.RecordComparator.define(ChukwaRecordKey.class, new Comparator()); + } +} Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java Fri Dec 5 12:30:14 2008 @@ -22,10 +22,12 @@ import java.util.TreeMap; + public class ChukwaSearchResult implements SearchResult { private TreeMap> records; - + private Token token = null; + public TreeMap> getRecords() { return records; @@ -35,5 +37,15 @@ { this.records = records; } + + public Token getToken() + { + return token; + } + + public void setToken(Token token) + { + this.token = token; + } } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java Fri Dec 5 12:30:14 2008 @@ -29,7 +29,7 @@ { private DataSourceFactory dataSourceFactory = DataSourceFactory.getInstance(); - public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter) + public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter,Token token) throws DataSourceException { SearchResult result = new ChukwaSearchResult(); @@ -40,7 +40,7 @@ for(int i=0;i> getRecords(); public void setRecords(TreeMap> records); } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java Fri Dec 5 12:30:14 2008 @@ -23,6 +23,6 @@ public interface SearchService { - public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter) + public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter,Token token) throws DataSourceException; } Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,7 @@ +package org.apache.hadoop.chukwa.extraction.engine; + +public class Token +{ + public String key = null; + public boolean hasMore = false; +} Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java Fri Dec 5 12:30:14 2008 @@ -19,6 +19,7 @@ package org.apache.hadoop.chukwa.extraction.engine.datasource; import org.apache.hadoop.chukwa.extraction.engine.SearchResult; +import org.apache.hadoop.chukwa.extraction.engine.Token; @@ -28,7 +29,8 @@ public SearchResult search( SearchResult result,String cluster,String dataSource, long t0,long t1, - String filter) + String filter, + Token token) throws DataSourceException; public boolean isThreadSafe(); Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java Fri Dec 5 12:30:14 2008 @@ -21,7 +21,7 @@ import java.util.HashMap; import org.apache.hadoop.chukwa.extraction.engine.datasource.database.DatabaseDS; -import org.apache.hadoop.chukwa.extraction.engine.datasource.record.RecordDS; +import org.apache.hadoop.chukwa.extraction.engine.datasource.record.ChukwaRecordDataSource; public class DataSourceFactory { @@ -37,10 +37,7 @@ dataSources.put("MRJob", databaseDS); dataSources.put("HodJob", databaseDS); dataSources.put("QueueInfo", databaseDS); - - DataSource recordDS = new RecordDS(); - dataSources.put("NameNode", recordDS); - dataSources.put("ChukwaLocalAgent", recordDS); + } public static DataSourceFactory getInstance() @@ -64,7 +61,7 @@ } else { - DataSource hsdfsDS = new RecordDS(); + DataSource hsdfsDS = new ChukwaRecordDataSource(); dataSources.put(datasourceName, hsdfsDS); return hsdfsDS; //TODO proto only! Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java Fri Dec 5 12:30:14 2008 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -44,15 +45,20 @@ private DsDirectory() { dataConfig = new DataConfig(); - conf = new Configuration(); + conf = new ChukwaConfiguration(); try { fs = FileSystem.get(conf); - } catch (IOException e) + } + catch (IOException e) { e.printStackTrace(); } rootFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder"); + if (!rootFolder.endsWith("/")) + { + rootFolder +="/"; + } } public static DsDirectory getInstance() @@ -94,7 +100,7 @@ public static void main(String[] args) throws DataSourceException { DsDirectory dsd = DsDirectory.getInstance(); - String[] dss = dsd.list("localhost"); + String[] dss = dsd.list("unknown"); for (String d : dss) { System.out.println(d); Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java Fri Dec 5 12:30:14 2008 @@ -34,15 +34,16 @@ import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.Record; import org.apache.hadoop.chukwa.extraction.engine.SearchResult; +import org.apache.hadoop.chukwa.extraction.engine.Token; import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource; import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException; -import org.apache.hadoop.chukwa.hicc.ClusterConfig; +//import org.apache.hadoop.chukwa.hicc.ClusterConfig; public class DatabaseDS implements DataSource { public SearchResult search(SearchResult result, String cluster, - String dataSource, long t0, long t1, String filter) + String dataSource, long t0, long t1, String filter,Token token) throws DataSourceException { SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd kk:mm:ss"); @@ -79,8 +80,8 @@ String dateclause = timeField + " >= '" + startS + "' and " + timeField + " <= '" + endS + "'"; - ClusterConfig cc = new ClusterConfig(); - String jdbc = cc.getURL(cluster); + //ClusterConfig cc = new ClusterConfig(); + String jdbc = ""; //cc.getURL(cluster); Connection conn = DriverManager.getConnection(jdbc); Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,21 @@ +package org.apache.hadoop.chukwa.extraction.engine.datasource.record; + +import java.util.List; + +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; +import org.apache.hadoop.chukwa.extraction.engine.Record; + +public class ChukwaDSInternalResult +{ + List records = null; + String day = null; + int hour = 0; + int rawIndex = 0; + int spill = 1; + long position = -1; + long currentTs = -1; + + String fileName = null; + + ChukwaRecordKey key = null; +} Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,520 @@ +package org.apache.hadoop.chukwa.extraction.engine.datasource.record; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeMap; + +import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaSearchResult; +import org.apache.hadoop.chukwa.extraction.engine.Record; +import org.apache.hadoop.chukwa.extraction.engine.SearchResult; +import org.apache.hadoop.chukwa.extraction.engine.Token; +import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource; +import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException; +import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.log4j.Logger; + +public class ChukwaRecordDataSource implements DataSource +{ + //TODO need some cleanup after 1st production + // First implementation to get it working with the new directory structure + + static Logger log = Logger.getLogger(ChukwaRecordDataSource.class); + + private static final int dayFolder = 100; + private static final int hourFolder = 200; + private static final int rawFolder = 300; + + static final String[] raws = {"0","5","10","15","20","25","30","35","40","45","50","55"}; + + private static FileSystem fs = null; + private static ChukwaConfiguration conf = null; + + private static String rootDsFolder = null; + private static DataConfig dataConfig = null; + + static + { + dataConfig = new DataConfig(); + rootDsFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder"); + conf = new ChukwaConfiguration(); + try + { + fs = FileSystem.get(conf); + } catch (IOException e) + { + e.printStackTrace(); + } + } + + @Override + public boolean isThreadSafe() + { + return true; + } + + + @Override + public SearchResult search(SearchResult result, String cluster, + String dataSource, long t0, long t1, String filter,Token token) + throws DataSourceException + { + String filePath = rootDsFolder + "/" + cluster + "/"; + + log.debug("filePath [" + filePath + "]"); + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(t0); + SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd"); + int maxCount = 200; + + List records = new ArrayList(); + + ChukwaDSInternalResult res = new ChukwaDSInternalResult(); + + if (token != null) + { + // token.key = day + "|" + hour + "|" + raw + "|" + spill + "|" + res.currentTs + "|"+ res.position + "|"+ res.fileName; + try + { + String[] vars = token.key.split("\\|"); + res.day = vars[0]; + res.hour = Integer.parseInt(vars[1]); + res.rawIndex = Integer.parseInt(vars[2]); + res.spill = Integer.parseInt(vars[3]); + res.currentTs = Long.parseLong(vars[4]); + res.position = Long.parseLong(vars[5]); + res.fileName = vars[5]; + log.info("Token is not null! :" + token.key); + } + catch(Exception e) + { + log.error("Incalid Key: [" + token.key + "] exception: ", e); + } + } + else + { + log.debug("Token is null!" ); + } + + try + { + do + { + log.debug("start Date [" + calendar.getTime() + "]"); + String workingDay = sdf.format(calendar.getTime()); + int workingHour = calendar.get(Calendar.HOUR_OF_DAY); + int startRawIndex = 0; + if (token !=null) + { + workingDay = res.day ; + workingHour = res.hour; + startRawIndex = res.rawIndex; + } + else + { + token = new Token(); + } + + log.debug("workingDay " + workingDay); + log.debug("workingHour " + workingHour); + + if (exist(dayFolder,filePath,dataSource,workingDay,null,null)) + { + // Extract Data for Day + if (containsRotateFlag(dayFolder,filePath,dataSource,workingDay,null)) + { + // read data from day + // SystemMetrics/20080922/SystemMetrics_20080922.1.evt + log.debug("fs.exists(workingDayRotatePath) "); + extractRecords(res,ChukwaRecordDataSource.dayFolder,filePath,dataSource,workingDay, null, -1, + token, records, maxCount, t0, t1, filter); + maxCount = maxCount - records.size(); + if ( (maxCount <= 0) || (res.currentTs > t1)) + { break; } + + } // End process Day File + else // check for hours + { + log.debug("check for hours"); + for (int hour = 0; hour<24;hour ++) + { + if ( workingDay == res.day && hour" + filePath + dataSource + "/"+ workingDay+ "/" + hour); + if (exist(dayFolder,filePath,dataSource,workingDay,""+hour,null)) + { + if (containsRotateFlag(dayFolder,filePath,dataSource,workingDay,""+hour)) + { + // read data from Hour + // SystemMetrics/20080922/12/SystemMetrics_20080922_12.1.evt + extractRecords(res,ChukwaRecordDataSource.hourFolder,filePath,dataSource,workingDay, ""+hour, -1, + token, records, maxCount, t0, t1, filter); + } + else // check for raw + { + log.debug("Working on Raw"); + + for(int rawIndex=startRawIndex;rawIndex<12;rawIndex++) + { + // read data from Raw + // SystemMetrics/20080922/0/25/SystemMetrics_20080922_0_25.1.evt + if (exist(dayFolder,filePath,dataSource,workingDay,""+hour,raws[rawIndex])) + { + extractRecords(res,ChukwaRecordDataSource.rawFolder,filePath,dataSource,workingDay, ""+hour, rawIndex, + token, records, maxCount, t0, t1, filter); + maxCount = maxCount - records.size(); + if ( (maxCount <= 0) || (res.currentTs > t1)) + { break; } + } + else + { + log.debug("<<<<<<<< " + + filePath + dataSource + "/" + workingDay+ "/" + workingHour + "/" + raws[rawIndex] ); + } + res.spill = 1; + } + } + } // End if (fs.exists(new Path(filePath + workingDay+ "/" + hour))) + + maxCount = maxCount - records.size(); + if ( (maxCount <= 0) || (res.currentTs > t1)) + { break; } + + } // End process all Hourly/raw files + } + } + + maxCount = maxCount - records.size(); + if ( (maxCount <= 0) || (res.currentTs > t1)) + { break; } + + // move to the next day + calendar.add(Calendar.DAY_OF_MONTH, +1); + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + + } + while (calendar.getTimeInMillis() < t1); + + } + catch(Exception e) + { + e.printStackTrace(); + throw new DataSourceException(e); + } + + TreeMap> recordsInResult = result.getRecords(); + for (Record record : records) + { + long timestamp = record.getTime(); + if (recordsInResult.containsKey(timestamp)) + { + recordsInResult.get(timestamp).add(record); + } + else + { + List list = new LinkedList(); + list.add(record); + recordsInResult.put(timestamp, list); + } + } + result.setToken(token); + return result; + + } + + public void extractRecords(ChukwaDSInternalResult res,int directoryType,String rootFolder,String dataSource,String day,String hour,int rawIndex, + Token token,List records,int maxRows,long t0,long t1,String filter) throws Exception + { + // for each spill file + // extract records + int spill = res.spill; + + boolean workdone = false; + do + { + String fileName = buildFileName(directoryType,rootFolder,dataSource,spill,day,hour,rawIndex); + log.debug("extractRecords : " + fileName); + + if (fs.exists(new Path(fileName))) + { + readData(res,token,fileName,maxRows,t0,t1,filter); + res.spill = spill; + List localRecords = res.records; + log.debug("localRecords size : " + localRecords.size()); + maxRows = maxRows - localRecords.size(); + if (maxRows <= 0) + { + workdone = true; + } + records.addAll(localRecords); + log.debug("AFTER fileName [" +fileName + "] count=" + localRecords.size() + " maxCount=" + maxRows); + spill ++; + } + else + { + // no more spill + workdone = true; + } + } + while(!workdone); + token.key = day + "|" + hour + "|" + rawIndex + "|" + spill + "|" + res.currentTs + "|"+ res.position + "|" + res.fileName; + } + + + public void readData(ChukwaDSInternalResult res,Token token,String fileName,int maxRows,long t0, long t1,String filter) throws + Exception + { + List records = new LinkedList(); + res.records = records; + SequenceFile.Reader r= null; + if (filter != null) + { filter = filter.toLowerCase();} + + try + { + + if (!fs.exists(new Path(fileName))) + { + log.debug("fileName not there!"); + return; + } + log.debug("Parser Open [" +fileName + "]"); + + long timestamp = 0; + int listSize = 0; + ChukwaRecordKey key = new ChukwaRecordKey(); + ChukwaRecord record = new ChukwaRecord(); + + r= new SequenceFile.Reader(fs, new Path(fileName), conf); + + log.debug("readData Open2 [" +fileName + "]"); + if ( (fileName.equals(res.fileName)) && (res.position != -1)) + { + r.seek(res.position); + } + res.fileName = fileName; + + while(r.next(key, record)) + { + if (record != null) + { + res.position = r.getPosition(); + + timestamp = record.getTime(); + res.currentTs = timestamp; + log.debug("\nSearch for startDate: " + new Date(t0) + " is :" + new Date(timestamp)); + + if (timestamp < t0) + { + //log.debug("Line not in range. Skipping: " +record); + continue; + } + else if (timestamp < t1) + { + log.debug("In Range: " + record.toString()); + boolean valid = false; + + if ( (filter == null || filter.equals("") )) + { + valid = true; + } + else if ( isValid(record,filter)) + { + valid = true; + } + + if (valid) + { + records.add(record); + record = new ChukwaRecord(); + listSize = records.size(); + if (listSize >= maxRows) + { + // maxRow so stop here + //Update token + token.key = key.getKey(); + token.hasMore = true; + break; + } + } + else + { + log.debug("In Range ==================>>>>>>>>> OUT Regex: " + record); + } + } + else + { + log.debug("Line out of range. Stopping now: " +record); + // Update Token + token.key = key.getKey(); + token.hasMore = false; + break; + } + } + } + + } + catch(Exception e) + { + e.printStackTrace(); + } + finally + { + try + { + r.close(); + } + catch(Exception e){} + } + } + + public boolean containsRotateFlag(int directoryType,String rootFolder,String dataSource,String workingDay,String workingHour) throws Exception + { + boolean contains = false; + switch(directoryType) + { + case ChukwaRecordDataSource.dayFolder: + // SystemMetrics/20080922/rotateDone + contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+"/rotateDone")); + break; + + case ChukwaRecordDataSource.hourFolder: + // SystemMetrics/20080922/12/rotateDone + contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+ "/" + workingHour +"/rotateDone")); + break; + + } + return contains; + } + + public boolean exist(int directoryType,String rootFolder,String dataSource,String workingDay,String workingHour,String raw) throws Exception + { + boolean contains = false; + switch(directoryType) + { + case ChukwaRecordDataSource.dayFolder: + // SystemMetrics/20080922/rotateDone + contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay)); + break; + + case ChukwaRecordDataSource.hourFolder: + // SystemMetrics/20080922/12/rotateDone + contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+ "/" + workingHour )); + break; + case ChukwaRecordDataSource.rawFolder: + // SystemMetrics/20080922/12/rotateDone + contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+ "/" + workingHour + "/" + raw)); + break; + + } + return contains; + } + + + protected boolean isValid(ChukwaRecord record, String filter) + { + String[] fields = record.getFields(); + for(String field: fields) + { + if ( record.getValue(field).toLowerCase().indexOf(filter) >= 0) + { + return true; + } + } + return false; + } + + public String buildFileName(int directoryType,String rootFolder,String dataSource,int spill,String day,String hour,int rawIndex) + { + String fileName = null; + // TODO use StringBuilder + // TODO revisit the way we're building fileName + + switch(directoryType) + { + case ChukwaRecordDataSource.dayFolder: + // SystemMetrics/20080922/SystemMetrics_20080922.1.evt + fileName = rootFolder + "/" + dataSource + "/" + day + "/" + + dataSource + "_" + day + "." + spill + ".evt"; + break; + + case ChukwaRecordDataSource.hourFolder: + // SystemMetrics/20080922/12/SystemMetrics_20080922_12.1.evt + fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/" + + dataSource + "_" + day + "_" + hour + "." + spill + ".evt"; + break; + + case ChukwaRecordDataSource.rawFolder: + // SystemMetrics/20080922/0/25/SystemMetrics_20080922_0_25.1.evt + fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/" + raws[rawIndex] + "/" + + dataSource + "_" + day + "_" + hour + "_" + raws[rawIndex] + "." + spill + ".evt"; + break; + } + log.debug("buildFileName :" + fileName); + return fileName; + } + + public static void main(String[] args) throws DataSourceException + { + ChukwaRecordDataSource ds = new ChukwaRecordDataSource(); + SearchResult result = new ChukwaSearchResult(); + result.setRecords( new TreeMap>()); + String cluster = args[0]; + String dataSource = args[1]; + long t0 = Long.parseLong(args[2]); + long t1 = Long.parseLong(args[3]); + String filter = null; + Token token = null; + + if (args.length >= 5 && !args[4].equalsIgnoreCase("null")) + { + filter = args[4]; + } + if (args.length == 6) + { + token = new Token(); + token.key = args[5]; + System.out.println("token :" + token.key); + } + + System.out.println("cluster :" + cluster); + System.out.println("dataSource :" + dataSource); + System.out.println("t0 :" + t0); + System.out.println("t1 :" + t1); + System.out.println("filter :" +filter ); + + + ds.search(result, cluster, dataSource, t0, t1, filter,token); + TreeMap> records = result.getRecords(); + Iterator it = records.keySet().iterator(); + + while(it.hasNext()) + { + long ts = it.next(); + System.out.println("\n\nTimestamp: " + new Date(ts)); + List list = records.get(ts); + for (int i=0;i" + result.getToken().key);} + } +} Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java Fri Dec 5 12:30:14 2008 @@ -19,18 +19,17 @@ package org.apache.hadoop.chukwa.extraction.engine.datasource.record; import java.io.IOException; -import java.util.Calendar; +import java.util.Date; import java.util.LinkedList; import java.util.List; -import java.util.Date; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.chukwa.extraction.engine.Record; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; public class ChukwaSequenceFileParser { @@ -67,60 +66,59 @@ long offset = 0; // HdfsWriter.HdfsWriterKey key = new HdfsWriter.HdfsWriterKey(); - Text key = new Text(); - - ChukwaRecord evt = new ChukwaRecord(); - while(r.next(key, evt)) + ChukwaRecordKey key = new ChukwaRecordKey(); + ChukwaRecord record = new ChukwaRecord(); + + while(r.next(key, record)) { lineCount ++; - System.out.println("NameNodeParser Line [" +evt.getValue(Record.bodyField) + "]"); + System.out.println("NameNodeParser Line [" +record.getValue(Record.bodyField) + "]"); - if (evt != null) + if (record != null) { - timestamp = evt.getTime(); + timestamp = record.getTime(); if (timestamp < t0) { - System.out.println("Line not in range. Skipping: " +evt.getValue(Record.bodyField)); + System.out.println("Line not in range. Skipping: " +record.getValue(Record.bodyField)); System.out.println("Search for: " + new Date(t0) + " is :" + new Date(timestamp)); continue; } else if ((timestamp < t1) && (offset < maxOffset )) //JB (epochTS < maxDate) { - System.out.println("In Range: " + evt.getValue(Record.bodyField)); + System.out.println("In Range: " + record.getValue(Record.bodyField)); boolean valid = false; if ( (filter == null || filter.equals("") )) { valid = true; } - else if (evt.getValue(Record.rawField).toLowerCase().indexOf(filter) > 0) - { - System.out.println("MATCH " + filter + "===========================>>>>>>>" + evt.getValue(Record.rawField)); - valid = true; - } + else if ( isValid(record,filter)) + { + valid = true; + } if (valid) { - records.add(evt); -evt = new ChukwaRecord(); + records.add(record); + record = new ChukwaRecord(); listSize = records.size(); if (listSize > maxRows) { records.remove(0); - System.out.println("==========>>>>>REMOVING: " + evt.getValue(Record.bodyField)); + System.out.println("==========>>>>>REMOVING: " + record.getValue(Record.bodyField)); } } else { - System.out.println("In Range ==================>>>>>>>>> OUT Regex: " + evt.getValue(Record.bodyField)); + System.out.println("In Range ==================>>>>>>>>> OUT Regex: " + record.getValue(Record.bodyField)); } } else { - System.out.println("Line out of range. Stopping now: " +evt.getValue(Record.bodyField)); + System.out.println("Line out of range. Stopping now: " +record.getValue(Record.bodyField)); break; } } @@ -146,26 +144,17 @@ return records; } - public static void main(String[] args) throws Throwable + + protected static boolean isValid(ChukwaRecord record, String filter) { - Configuration conf = new Configuration(); - - FileSystem fs = FileSystem.get(conf);//FileSystem.get(new URI(fsURL), conf); - Calendar c = Calendar.getInstance(); - c.add(Calendar.MONTH, -2); - - ChukwaSequenceFileParser.readData( "/tmp/t1", "NameNode", - 200, new java.util.Date().getTime(), - c.getTimeInMillis(), Long.MAX_VALUE, null, - args[0], fs, conf); - - SequenceFile.Reader r= new SequenceFile.Reader(fs, new Path(args[0]), conf); - Text key = new Text(); - - ChukwaRecord evt = new ChukwaRecord(); - while(r.next(key, evt)) - { - System.out.println( evt); - } + String[] fields = record.getFields(); + for(String field: fields) + { + if ( record.getValue(field).toLowerCase().indexOf(filter) >= 0) + { + return true; + } + } + return false; } } Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java Fri Dec 5 12:30:14 2008 @@ -19,20 +19,19 @@ package org.apache.hadoop.chukwa.extraction.engine.datasource.record; import java.io.IOException; -import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Calendar; -import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.TreeMap; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaSearchResult; +import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.chukwa.extraction.engine.Record; import org.apache.hadoop.chukwa.extraction.engine.SearchResult; +import org.apache.hadoop.chukwa.extraction.engine.Token; import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource; import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException; import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig; -import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.fs.FileSystem; public class RecordDS implements DataSource @@ -64,7 +63,7 @@ String dataSource, long t0, long t1, - String filter) + String filter,Token token) throws DataSourceException { @@ -77,20 +76,20 @@ TreeMap> records = result.getRecords(); int maxCount = 200; - + SimpleDateFormat sdf = new java.text.SimpleDateFormat("_yyyyMMdd_HH_"); do { System.out.println("start Date [" + calendar.getTime() + "]"); - String fileName = new java.text.SimpleDateFormat("_yyyy_MM_dd_HH").format(calendar.getTime()); + String fileName = sdf.format(calendar.getTime()); int minutes = calendar.get(Calendar.MINUTE); int dec = minutes/10; - fileName += "_" + dec ; + fileName += dec ; int m = minutes - (dec*10); if (m < 5) - { fileName += "0.evt";} + { fileName += "0.1.evt";} else - { fileName += "5.evt";} + { fileName += "5.1.evt";} fileName = filePath + "/" + dataSource + fileName; @@ -145,39 +144,6 @@ return result; } - - - public static void main(String[] args) throws DataSourceException - { - long t1 = 0; - long t0 = 0; - System.out.println("Hello"); - Calendar calendar = Calendar.getInstance(); - Date d1; - try - { - d1 = new java.text.SimpleDateFormat ("dd/MM/yyyy HH:mm:ss").parse("05/06/2008 19:31:05"); - calendar.setTime(d1); - t1 = calendar.getTimeInMillis(); - d1 = new java.text.SimpleDateFormat ("dd/MM/yyyy HH:mm:ss").parse("05/06/2008 19:26:05"); - calendar.setTime(d1); - t0 = calendar.getTimeInMillis(); - - } catch (ParseException e) - { - e.printStackTrace(); - throw new RuntimeException(e); - } - - String filter = null; - RecordDS dao = new RecordDS(); - SearchResult result = new ChukwaSearchResult(); - - TreeMap> records = new TreeMap> (); - result.setRecords(records); - - dao.search(result,"output2","NameNode",t0,t1,filter); - } public boolean isThreadSafe() { Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java?rev=723855&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java (added) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java Fri Dec 5 12:30:14 2008 @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.hicc; + +import java.io.PrintWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.TreeMap; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.Date; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import javax.servlet.http.HttpServletRequest; +import org.apache.hadoop.chukwa.hicc.ColorPicker; + + +public class Chart { + private String id; + private String title; + private String graphType; + private ArrayList>> dataset; + private ArrayList chartType; + private boolean xLabelOn; + private boolean yLabelOn; + private boolean yRightLabelOn; + private int width; + private int height; + private List xLabelRange; + private HttpServletRequest request = null; + private boolean legend; + private String xLabel=""; + private String yLabel=""; + private String yRightLabel=""; + private int datasetCounter=0; + private double max=0; + private int seriesCounter=0; + private List rightList; + private boolean userDefinedMax = false; + private String[] seriesOrder=null; + + public Chart(HttpServletRequest request) { + if(request.getParameter("boxId")!=null) { + this.id=request.getParameter("boxId"); + } else { + this.id="0"; + } + this.title="Untitled Chart"; + this.graphType="image"; + this.xLabelOn=true; + this.yLabelOn=true; + this.width=400; + this.height=200; + this.request=request; + this.legend=true; + this.max=0; + this.datasetCounter=0; + this.seriesCounter=0; + this.rightList = new ArrayList(); + this.userDefinedMax=false; + this.seriesOrder=null; + } + + public void setYMax(double max) { + this.max=max; + this.userDefinedMax=true; + } + + public void setSize(int width, int height) { + this.width=width; + this.height=height; + } + public void setGraphType(String graphType) { + if(graphType!=null) { + this.graphType = graphType; + } + } + + public void setTitle(String title) { + this.title=title; + } + + public void setId(String id) { + this.id=id; + } + + public void setDataSet(String chartType, TreeMap> data) { + if(this.dataset==null) { + this.dataset = new ArrayList>>(); + this.chartType = new ArrayList(); + } + this.dataset.add(data); + this.chartType.add(chartType); + } + + public void setSeriesOrder(String[] metrics) { + this.seriesOrder = metrics; + } + + public void setXAxisLabels(boolean toggle) { + xLabelOn = toggle; + } + + public void setYAxisLabels(boolean toggle) { + yLabelOn = toggle; + } + + public void setYAxisRightLabels(boolean toggle) { + yRightLabelOn = toggle; + } + + public void setXAxisLabel(String label) { + xLabel = label; + } + + public void setYAxisLabel(String label) { + yLabel = label; + } + + public void setYAxisRightLabel(String label) { + yRightLabel = label; + } + + public void setXLabelsRange(List range) { + xLabelRange = range; + } + + public void setLegend(boolean toggle) { + legend = toggle; + } + public String plot() { + String output=""; + if(dataset==null) { + output = "No Data available."; + return output; + } + String dateFormat="%H:%M"; + if(xLabel.equals("Time")) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long xMin; + try { + xMin = Long.parseLong(xLabelRange.get(0)); + long xMax = Long.parseLong(xLabelRange.get(xLabelRange.size()-1)); + if(xMax-xMin>31536000000L) { + dateFormat="%y"; + } else if(xMax-xMin>2592000000L) { + dateFormat="%y-%m"; + } else if(xMax-xMin>604800000L) { + dateFormat="%m-%d"; + } else if(xMax-xMin>86400000L) { + dateFormat="%m-%d %H:%M"; + } + } catch (NumberFormatException e) { + dateFormat="%y-%m-%d %H:%M"; + } + } + output = "\n" + + "\n"+ + "\n"+ + "\n"+ + "
"+title+"
\n"+ + "
\n"+ + "
\n"+ + "\n"+ + "\n" + + "\n"; + return output; + } +} Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java Fri Dec 5 12:30:14 2008 @@ -23,7 +23,7 @@ public class ClusterConfig { public static HashMap clusterMap = new HashMap(); - private String path=System.getenv("CHUKWA_HOME")+File.separator+"conf"+File.separator; + private String path=System.getenv("CHUKWA_CONF_DIR")+File.separator; static public String getContents(File aFile) { //...checks on aFile are elided StringBuffer contents = new StringBuffer(); Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java?rev=723855&r1=723854&r2=723855&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java (original) +++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java Fri Dec 5 12:30:14 2008 @@ -18,6 +18,8 @@ package org.apache.hadoop.chukwa.hicc; +import java.text.SimpleDateFormat; +import java.util.TreeMap; import java.util.HashMap; import java.util.ArrayList; import java.util.List; @@ -29,19 +31,21 @@ public class DatasetMapper { private String jdbc; private static Log log = LogFactory.getLog(DatasetMapper.class); - private HashMap> dataset; + private TreeMap> dataset; private List labels; public DatasetMapper(String jdbc) { this.jdbc=jdbc; - this.dataset = new HashMap>(); + this.dataset = new TreeMap>(); this.labels = new ArrayList(); } - public void execute(String query, boolean groupBySecondColumn) { + public void execute(String query, boolean groupBySecondColumn, boolean calculateSlope, String formatTime) { + SimpleDateFormat sdf = null; dataset.clear(); try { // The newInstance() call is a work around for some // broken Java implementations - Class.forName("com.mysql.jdbc.Driver").newInstance(); + String jdbcDriver = System.getenv("JDBC_DRIVER"); + Class.forName(jdbcDriver).newInstance(); } catch (Exception ex) { // handle the error } @@ -61,11 +65,18 @@ rs = stmt.getResultSet(); ResultSetMetaData rmeta = rs.getMetaData(); int col=rmeta.getColumnCount(); + double[] previousArray = new double[col+1]; + for(int k=0;k data = null; + java.util.TreeMap data = null; + HashMap previousHash = new HashMap(); HashMap xAxisMap = new HashMap(); while (rs.next()) { - String label = rs.getString(1); + String label = ""; + long time = rs.getTimestamp(1).getTime(); + label = ""+time; if(!xAxisMap.containsKey(label)) { xAxisMap.put(label, i); labels.add(label); @@ -74,15 +85,33 @@ if(groupBySecondColumn) { String item = rs.getString(2); // Get the data from the row using the series column - double current = rs.getDouble(3); - if(current>max) { - max=current; - } data = dataset.get(item); if(data == null) { - data = new java.util.ArrayList(); + data = new java.util.TreeMap(); + } + if(calculateSlope) { + double current = rs.getDouble(3); + double tmp = 0L; + if(data.size()>1) { + tmp = current - previousHash.get(item).doubleValue(); + } else { + tmp = 0; + } + if(tmp<0) { + tmp=0; + } + if(tmp>max) { + max=tmp; + } + previousHash.put(item,current); + data.put(label, tmp); + } else { + double current = rs.getDouble(3); + if(current>max) { + max=current; + } + data.put(label, current); } - data.add(rs.getDouble(3)); dataset.put(item,data); } else { for(int j=2;j<=col;j++) { @@ -94,16 +123,30 @@ } data = dataset.get(item); if(data == null) { - data = new java.util.ArrayList(); + data = new java.util.TreeMap(); + } + if(calculateSlope) { + double tmp = rs.getDouble(j); + if(data.size()>1) { + tmp = tmp - previousArray[j]; + } else { + tmp = 0.0; + } + previousArray[j]=current; + if(tmp<0) { + tmp=0; + } + data.put(label, tmp); + } else { + data.put(label, current); } - data.add(rs.getDouble(j)); dataset.put(item,data); } } } labelsCount=i; } else { - log.error("query is not executed."); + log.error("query is not executed."); } // Now do something with the ResultSet .... } catch (SQLException ex) { @@ -146,7 +189,7 @@ public List getXAxisMap() { return labels; } - public HashMap> getDataset() { + public TreeMap> getDataset() { return dataset; } }