Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C8F98758A for ; Mon, 5 Dec 2011 20:06:30 +0000 (UTC) Received: (qmail 15909 invoked by uid 500); 5 Dec 2011 20:06:30 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 15882 invoked by uid 500); 5 Dec 2011 20:06:30 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 15875 invoked by uid 99); 5 Dec 2011 20:06:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Dec 2011 20:06:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Dec 2011 20:06:18 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1D23923888FD; Mon, 5 Dec 2011 20:05:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1210600 [1/16] - in /incubator/accumulo/trunk/contrib/accumulo_sample: ./ ingest/ ingest/src/main/java/aggregator/ ingest/src/main/java/ingest/ ingest/src/main/java/iterator/ ingest/src/main/java/normalizer/ ingest/src/main/java/protobuf/ ... Date: Mon, 05 Dec 2011 20:05:51 -0000 To: accumulo-commits@incubator.apache.org From: billie@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111205200555.1D23923888FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: billie Date: Mon Dec 5 20:05:49 2011 New Revision: 1210600 URL: http://svn.apache.org/viewvc?rev=1210600&view=rev Log: ACCUMULO-41 formatted java and pom files Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/iterator/TotalAggregatingIterator.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/LcNoDiacriticsNormalizer.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/NoOpNormalizer.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/Normalizer.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/normalizer/NumberNormalizer.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/protobuf/TermWeight.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/protobuf/Uid.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/AggregatingRecordReader.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/LfLineReader.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/reader/LongLineRecordReader.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/util/TextUtil.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/GlobalIndexUidAggregatorTest.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/aggregator/TextIndexAggregatorTest.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/StandaloneStatusReporter.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/ingest/WikipediaMapperTest.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/normalizer/testNumberNormalizer.java incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/test/java/reader/AggregatingRecordReaderTest.java incubator/accumulo/trunk/contrib/accumulo_sample/pom.xml incubator/accumulo/trunk/contrib/accumulo_sample/query-war/pom.xml incubator/accumulo/trunk/contrib/accumulo_sample/query/pom.xml incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/function/QueryFunctions.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AbstractEvaluatingIterator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/AndIterator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/BooleanLogicIterator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/BooleanLogicTreeNode.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/DefaultIteratorEnvironment.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/EvaluatingIterator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/FieldIndexIterator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/OptimizedQueryIterator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/OrIterator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/ReadAheadIterator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/iterator/UniqFieldNameValueIterator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/jexl/Arithmetic.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/AbstractQueryLogic.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/ContentLogic.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/FieldIndexQueryReWriter.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/JexlOperatorConstants.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/QueryEvaluator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/QueryParser.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/RangeCalculator.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/TreeBuilder.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/TreeNode.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/Document.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/Field.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/Results.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/query/IQuery.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/sample/query/Query.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/BaseKeyParser.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/FieldIndexKeyParser.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/util/KeyParser.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/StandaloneStatusReporter.java incubator/accumulo/trunk/contrib/accumulo_sample/query/src/test/java/logic/TestQueryLogic.java Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/pom.xml Mon Dec 5 20:05:49 2011 @@ -1,20 +1,20 @@ - + 4.0.0 Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/GlobalIndexUidAggregator.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package aggregator; import java.util.HashSet; @@ -27,62 +27,61 @@ import org.apache.accumulo.core.iterator import com.google.protobuf.InvalidProtocolBufferException; /** - * Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization - * for the global index and global reverse index, where the list of UIDs for events will be maintained in the - * index for low cardinality terms (Low in this case being less than 20). - * + * Implementation of an Aggregator that aggregates objects of the type Uid.List. This is an optimization for the global index and global reverse index, where + * the list of UIDs for events will be maintained in the index for low cardinality terms (Low in this case being less than 20). + * */ public class GlobalIndexUidAggregator implements Aggregator { - - private static final Logger log = Logger.getLogger(GlobalIndexUidAggregator.class); - private Uid.List.Builder builder = Uid.List.newBuilder(); - //Using a set instead of a list so that duplicate IDs are filtered out of the list. - private HashSet uids = new HashSet(); - private boolean seenIgnore = false; - public static final int MAX = 20; - private long count = 0; - - @Override - public Value aggregate() { - //Special case logic - //If we have aggregated more than MAX UIDs, then null out the UID list and set IGNORE to true - //However, always maintain the count - if (uids.size() > MAX || seenIgnore) { - builder.setCOUNT(count); - builder.setIGNORE(true); - builder.clearUID(); - } else { - builder.setCOUNT(count); - builder.setIGNORE(false); - builder.addAllUID(uids); - } - return new Value(builder.build().toByteArray()); - } - - @Override - public void collect(Value value) { - if (null == value || value.get().length == 0) - return; - //Collect the values, which are serialized Uid.List objects - try { - Uid.List v = Uid.List.parseFrom(value.get()); - count = count + v.getCOUNT(); - if (v.getIGNORE()) { - seenIgnore = true; - } - //Add the incoming list to this list - uids.addAll(v.getUIDList()); - } catch (InvalidProtocolBufferException e) { - log.error("Value passed to aggregator was not of type Uid.List", e); - } - } - - @Override - public void reset() { - count = 0; - seenIgnore = false; - builder = Uid.List.newBuilder(); - uids.clear(); - } - + + private static final Logger log = Logger.getLogger(GlobalIndexUidAggregator.class); + private Uid.List.Builder builder = Uid.List.newBuilder(); + // Using a set instead of a list so that duplicate IDs are filtered out of the list. + private HashSet uids = new HashSet(); + private boolean seenIgnore = false; + public static final int MAX = 20; + private long count = 0; + + @Override + public Value aggregate() { + // Special case logic + // If we have aggregated more than MAX UIDs, then null out the UID list and set IGNORE to true + // However, always maintain the count + if (uids.size() > MAX || seenIgnore) { + builder.setCOUNT(count); + builder.setIGNORE(true); + builder.clearUID(); + } else { + builder.setCOUNT(count); + builder.setIGNORE(false); + builder.addAllUID(uids); + } + return new Value(builder.build().toByteArray()); + } + + @Override + public void collect(Value value) { + if (null == value || value.get().length == 0) + return; + // Collect the values, which are serialized Uid.List objects + try { + Uid.List v = Uid.List.parseFrom(value.get()); + count = count + v.getCOUNT(); + if (v.getIGNORE()) { + seenIgnore = true; + } + // Add the incoming list to this list + uids.addAll(v.getUIDList()); + } catch (InvalidProtocolBufferException e) { + log.error("Value passed to aggregator was not of type Uid.List", e); + } + } + + @Override + public void reset() { + count = 0; + seenIgnore = false; + builder = Uid.List.newBuilder(); + uids.clear(); + } + } Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/aggregator/TextIndexAggregator.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package aggregator; import java.util.ArrayList; @@ -30,65 +30,65 @@ import com.google.protobuf.InvalidProtoc /** * An Aggregator to merge together a list of term offsets and one normalized term frequency - * + * */ public class TextIndexAggregator implements Aggregator { - private static final Logger log = Logger.getLogger(TextIndexAggregator.class); + private static final Logger log = Logger.getLogger(TextIndexAggregator.class); + + private List offsets = new ArrayList(); + private TermWeight.Info.Builder builder = TermWeight.Info.newBuilder(); + private float normalizedTermFrequency = 0f; + + @Override + public Value aggregate() { + // Keep the sorted order we tried to maintain + for (int i = 0; i < offsets.size(); ++i) { + builder.addWordOffset(offsets.get(i)); + } - private List offsets = new ArrayList(); - private TermWeight.Info.Builder builder = TermWeight.Info.newBuilder(); - private float normalizedTermFrequency = 0f; - - @Override - public Value aggregate() { - // Keep the sorted order we tried to maintain - for (int i = 0; i < offsets.size(); ++i) { - builder.addWordOffset(offsets.get(i)); - } - - builder.setNormalizedTermFrequency(normalizedTermFrequency); - - return new Value(builder.build().toByteArray()); + builder.setNormalizedTermFrequency(normalizedTermFrequency); + + return new Value(builder.build().toByteArray()); + } + + @Override + public void collect(Value value) { + // Make sure we don't aggregate something else + if (value == null || value.get().length == 0) { + return; } - - @Override - public void collect(Value value) { - // Make sure we don't aggregate something else - if (value == null || value.get().length == 0) { - return; - } - - TermWeight.Info info; - - try { - info = TermWeight.Info.parseFrom(value.get()); - } catch (InvalidProtocolBufferException e) { - log.error("Value passed to aggregator was not of type TermWeight.Info", e); - return; - } - - // Add each offset into the list maintaining sorted order - for (int offset : info.getWordOffsetList()) { - int pos = Collections.binarySearch(offsets, offset); - - if (pos < 0) { - // Undo the transform on the insertion point - offsets.add((-1 * pos) - 1, offset); - } else { - offsets.add(pos, offset); - } - } - - if (info.getNormalizedTermFrequency() > 0) { - this.normalizedTermFrequency += info.getNormalizedTermFrequency(); - } + + TermWeight.Info info; + + try { + info = TermWeight.Info.parseFrom(value.get()); + } catch (InvalidProtocolBufferException e) { + log.error("Value passed to aggregator was not of type TermWeight.Info", e); + return; } - - @Override - public void reset() { - this.offsets.clear(); - this.normalizedTermFrequency = 0f; - this.builder = TermWeight.Info.newBuilder(); + + // Add each offset into the list maintaining sorted order + for (int offset : info.getWordOffsetList()) { + int pos = Collections.binarySearch(offsets, offset); + + if (pos < 0) { + // Undo the transform on the insertion point + offsets.add((-1 * pos) - 1, offset); + } else { + offsets.add(pos, offset); + } } - + + if (info.getNormalizedTermFrequency() > 0) { + this.normalizedTermFrequency += info.getNormalizedTermFrequency(); + } + } + + @Override + public void reset() { + this.offsets.clear(); + this.normalizedTermFrequency = 0f; + this.builder = TermWeight.Info.newBuilder(); + } + } Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/ArticleExtractor.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package ingest; import java.io.Reader; @@ -31,138 +31,142 @@ import normalizer.LcNoDiacriticsNormaliz import normalizer.NumberNormalizer; public class ArticleExtractor { - - public final static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'Z"); - private static NumberNormalizer nn = new NumberNormalizer(); - private static LcNoDiacriticsNormalizer lcdn = new LcNoDiacriticsNormalizer(); - - public static class Article { - int id; - String title; - long timestamp; - String comments; - String text; - - private Article(int id, String title, long timestamp, String comments, String text) { - super(); - this.id = id; - this.title = title; - this.timestamp = timestamp; - this.comments = comments; - this.text = text; - } - public int getId() { - return id; - } - public String getTitle() { - return title; - } - public String getComments() { - return comments; - } - public String getText() { - return text; - } - public long getTimestamp() { - return timestamp; - } - - public Map getFieldValues() { - Map fields = new HashMap(); - fields.put("ID", this.id); - fields.put("TITLE", this.title); - fields.put("TIMESTAMP", this.timestamp); - fields.put("COMMENTS", this.comments); - return fields; - } - - public Map getNormalizedFieldValues() { - Map fields = new HashMap(); - fields.put("ID", nn.normalizeFieldValue("ID", this.id)); - fields.put("TITLE", lcdn.normalizeFieldValue("TITLE", this.title)); - fields.put("TIMESTAMP", nn.normalizeFieldValue("TIMESTAMP", this.timestamp)); - fields.put("COMMENTS", lcdn.normalizeFieldValue("COMMENTS", this.comments)); - return fields; - } - - } - - public ArticleExtractor() { - } - - public Article extract(Reader reader) { - XMLInputFactory xmlif = XMLInputFactory.newInstance(); - xmlif.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.TRUE); - - XMLStreamReader xmlr = null; - - try { - xmlr = xmlif.createXMLStreamReader(reader); - } catch (XMLStreamException e1) { - throw new RuntimeException(e1); - } - - QName titleName = QName.valueOf("title"); - QName textName = QName.valueOf("text"); - QName revisionName = QName.valueOf("revision"); - QName timestampName = QName.valueOf("timestamp"); - QName commentName = QName.valueOf("comment"); - QName idName = QName.valueOf("id"); - - Map tags = new HashMap(); - for (QName tag : new QName[] { titleName, textName, timestampName, commentName, idName }) { - tags.put(tag, new StringBuilder()); - } - - StringBuilder articleText = tags.get(textName); - StringBuilder titleText = tags.get(titleName); - StringBuilder timestampText = tags.get(timestampName); - StringBuilder commentText = tags.get(commentName); - StringBuilder idText = tags.get(idName); - - StringBuilder current = null; - boolean inRevision = false; - while (true) { - try { - if (!xmlr.hasNext()) - break; - xmlr.next(); - } catch (XMLStreamException e) { - throw new RuntimeException(e); - } - QName currentName = null; - if (xmlr.hasName()) { - currentName = xmlr.getName(); - } - if (xmlr.isStartElement() && tags.containsKey(currentName)) { - if (!inRevision || (!currentName.equals(revisionName) && !currentName.equals(idName))) { - current = tags.get(currentName); - current.setLength(0); - } - } else if (xmlr.isStartElement() && currentName.equals(revisionName)) { - inRevision = true; - } else if (xmlr.isEndElement() && currentName.equals(revisionName)) { - inRevision = false; - } else if (xmlr.isEndElement() && current != null) { - if (textName.equals(currentName)) { - - String title = titleText.toString(); - String text = articleText.toString(); - String comment = commentText.toString(); - int id = Integer.parseInt(idText.toString()); - long timestamp; - try { - timestamp = dateFormat.parse(timestampText.append("+0000").toString()).getTime(); - return new Article(id, title, timestamp, comment, text); - } catch (ParseException e) { - return null; - } - } - current = null; - } else if (current != null && xmlr.hasText()) { - current.append(xmlr.getText()); - } - } - return null; - } + + public final static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'Z"); + private static NumberNormalizer nn = new NumberNormalizer(); + private static LcNoDiacriticsNormalizer lcdn = new LcNoDiacriticsNormalizer(); + + public static class Article { + int id; + String title; + long timestamp; + String comments; + String text; + + private Article(int id, String title, long timestamp, String comments, String text) { + super(); + this.id = id; + this.title = title; + this.timestamp = timestamp; + this.comments = comments; + this.text = text; + } + + public int getId() { + return id; + } + + public String getTitle() { + return title; + } + + public String getComments() { + return comments; + } + + public String getText() { + return text; + } + + public long getTimestamp() { + return timestamp; + } + + public Map getFieldValues() { + Map fields = new HashMap(); + fields.put("ID", this.id); + fields.put("TITLE", this.title); + fields.put("TIMESTAMP", this.timestamp); + fields.put("COMMENTS", this.comments); + return fields; + } + + public Map getNormalizedFieldValues() { + Map fields = new HashMap(); + fields.put("ID", nn.normalizeFieldValue("ID", this.id)); + fields.put("TITLE", lcdn.normalizeFieldValue("TITLE", this.title)); + fields.put("TIMESTAMP", nn.normalizeFieldValue("TIMESTAMP", this.timestamp)); + fields.put("COMMENTS", lcdn.normalizeFieldValue("COMMENTS", this.comments)); + return fields; + } + + } + + public ArticleExtractor() {} + + public Article extract(Reader reader) { + XMLInputFactory xmlif = XMLInputFactory.newInstance(); + xmlif.setProperty(XMLInputFactory.IS_REPLACING_ENTITY_REFERENCES, Boolean.TRUE); + + XMLStreamReader xmlr = null; + + try { + xmlr = xmlif.createXMLStreamReader(reader); + } catch (XMLStreamException e1) { + throw new RuntimeException(e1); + } + + QName titleName = QName.valueOf("title"); + QName textName = QName.valueOf("text"); + QName revisionName = QName.valueOf("revision"); + QName timestampName = QName.valueOf("timestamp"); + QName commentName = QName.valueOf("comment"); + QName idName = QName.valueOf("id"); + + Map tags = new HashMap(); + for (QName tag : new QName[] {titleName, textName, timestampName, commentName, idName}) { + tags.put(tag, new StringBuilder()); + } + + StringBuilder articleText = tags.get(textName); + StringBuilder titleText = tags.get(titleName); + StringBuilder timestampText = tags.get(timestampName); + StringBuilder commentText = tags.get(commentName); + StringBuilder idText = tags.get(idName); + + StringBuilder current = null; + boolean inRevision = false; + while (true) { + try { + if (!xmlr.hasNext()) + break; + xmlr.next(); + } catch (XMLStreamException e) { + throw new RuntimeException(e); + } + QName currentName = null; + if (xmlr.hasName()) { + currentName = xmlr.getName(); + } + if (xmlr.isStartElement() && tags.containsKey(currentName)) { + if (!inRevision || (!currentName.equals(revisionName) && !currentName.equals(idName))) { + current = tags.get(currentName); + current.setLength(0); + } + } else if (xmlr.isStartElement() && currentName.equals(revisionName)) { + inRevision = true; + } else if (xmlr.isEndElement() && currentName.equals(revisionName)) { + inRevision = false; + } else if (xmlr.isEndElement() && current != null) { + if (textName.equals(currentName)) { + + String title = titleText.toString(); + String text = articleText.toString(); + String comment = commentText.toString(); + int id = Integer.parseInt(idText.toString()); + long timestamp; + try { + timestamp = dateFormat.parse(timestampText.append("+0000").toString()).getTime(); + return new Article(id, title, timestamp, comment, text); + } catch (ParseException e) { + return null; + } + } + current = null; + } else if (current != null && xmlr.hasText()) { + current.append(xmlr.getText()); + } + } + return null; + } } Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaConfiguration.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package ingest; import java.io.IOException; @@ -31,113 +31,120 @@ import org.apache.accumulo.core.client.C import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; - public class WikipediaConfiguration { - public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name"; - public final static String USER = "wikipedia.accumulo.user"; - public final static String PASSWORD = "wikipedia.accumulo.password"; - public final static String TABLE_NAME = "wikipedia.accumulo.table"; - - public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers"; - - public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename"; - public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename"; - public final static String WORKING_DIRECTORY = "wikipedia.ingest.working"; - - public final static String ANALYZER = "wikipedia.index.analyzer"; - - public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions"; - - public static String getUser(Configuration conf) { return conf.get(USER); }; - - public static byte[] getPassword(Configuration conf) { - String pass = conf.get(PASSWORD); - if (pass == null) { - return null; - } - return pass.getBytes(); - } - - public static String getTableName(Configuration conf) { - String tablename = conf.get(TABLE_NAME); - if (tablename == null) { - throw new RuntimeException("No data table name specified in " + TABLE_NAME); - } - return tablename; - } - - public static String getInstanceName(Configuration conf) { return conf.get(INSTANCE_NAME); } - - public static String getZookeepers(Configuration conf) { - String zookeepers = conf.get(ZOOKEEPERS); - if (zookeepers == null) { - throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS); - } - return zookeepers; - } - - public static Path getNamespacesFile(Configuration conf) { - String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString()); - return new Path(filename); - } - public static Path getLanguagesFile(Configuration conf) { - String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString()); - return new Path(filename); - } - public static Path getWorkingDirectory(Configuration conf) { - String filename = conf.get(WORKING_DIRECTORY); - return new Path(filename); - } - - public static Analyzer getAnalyzer(Configuration conf) throws IOException { - Class analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class); - return ReflectionUtils.newInstance(analyzerClass, conf); - } - - public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException { - return new Connector(getInstance(conf), getUser(conf), getPassword(conf)); - } - - public static Instance getInstance(Configuration conf) { - return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf)); - } - - public static int getNumPartitions(Configuration conf) { - return conf.getInt(NUM_PARTITIONS, 25); - } - - /** - * Helper method to get properties from Hadoop configuration - * @param - * @param conf - * @param propertyName - * @param resultClass - * @throws IllegalArgumentException if property is not defined, null, or empty. Or if resultClass is not handled. - * @return value of property - */ - @SuppressWarnings("unchecked") - public static T isNull(Configuration conf, String propertyName, Class resultClass) { - String p = conf.get(propertyName); - if (StringUtils.isEmpty(p)) - throw new IllegalArgumentException(propertyName + " must be specified"); - - if (resultClass.equals(String.class)) - return (T) p; - else if (resultClass.equals(String[].class)) - return (T) conf.getStrings(propertyName); - else if (resultClass.equals(Boolean.class)) - return (T) Boolean.valueOf(p); - else if (resultClass.equals(Long.class)) - return (T) Long.valueOf(p); - else if (resultClass.equals(Integer.class)) - return (T) Integer.valueOf(p); - else if (resultClass.equals(Float.class)) - return (T) Float.valueOf(p); - else if (resultClass.equals(Double.class)) - return (T) Double.valueOf(p); - else - throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled."); - - } - + public final static String INSTANCE_NAME = "wikipedia.accumulo.instance_name"; + public final static String USER = "wikipedia.accumulo.user"; + public final static String PASSWORD = "wikipedia.accumulo.password"; + public final static String TABLE_NAME = "wikipedia.accumulo.table"; + + public final static String ZOOKEEPERS = "wikipedia.accumulo.zookeepers"; + + public final static String NAMESPACES_FILENAME = "wikipedia.namespaces.filename"; + public final static String LANGUAGES_FILENAME = "wikipedia.languages.filename"; + public final static String WORKING_DIRECTORY = "wikipedia.ingest.working"; + + public final static String ANALYZER = "wikipedia.index.analyzer"; + + public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions"; + + public static String getUser(Configuration conf) { + return conf.get(USER); + }; + + public static byte[] getPassword(Configuration conf) { + String pass = conf.get(PASSWORD); + if (pass == null) { + return null; + } + return pass.getBytes(); + } + + public static String getTableName(Configuration conf) { + String tablename = conf.get(TABLE_NAME); + if (tablename == null) { + throw new RuntimeException("No data table name specified in " + TABLE_NAME); + } + return tablename; + } + + public static String getInstanceName(Configuration conf) { + return conf.get(INSTANCE_NAME); + } + + public static String getZookeepers(Configuration conf) { + String zookeepers = conf.get(ZOOKEEPERS); + if (zookeepers == null) { + throw new RuntimeException("No zookeepers specified in " + ZOOKEEPERS); + } + return zookeepers; + } + + public static Path getNamespacesFile(Configuration conf) { + String filename = conf.get(NAMESPACES_FILENAME, new Path(getWorkingDirectory(conf), "namespaces.dat").toString()); + return new Path(filename); + } + + public static Path getLanguagesFile(Configuration conf) { + String filename = conf.get(LANGUAGES_FILENAME, new Path(getWorkingDirectory(conf), "languages.txt").toString()); + return new Path(filename); + } + + public static Path getWorkingDirectory(Configuration conf) { + String filename = conf.get(WORKING_DIRECTORY); + return new Path(filename); + } + + public static Analyzer getAnalyzer(Configuration conf) throws IOException { + Class analyzerClass = conf.getClass(ANALYZER, SimpleAnalyzer.class, Analyzer.class); + return ReflectionUtils.newInstance(analyzerClass, conf); + } + + public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException { + return new Connector(getInstance(conf), getUser(conf), getPassword(conf)); + } + + public static Instance getInstance(Configuration conf) { + return new ZooKeeperInstance(getInstanceName(conf), getZookeepers(conf)); + } + + public static int getNumPartitions(Configuration conf) { + return conf.getInt(NUM_PARTITIONS, 25); + } + + /** + * Helper method to get properties from Hadoop configuration + * + * @param + * @param conf + * @param propertyName + * @param resultClass + * @throws IllegalArgumentException + * if property is not defined, null, or empty. Or if resultClass is not handled. + * @return value of property + */ + @SuppressWarnings("unchecked") + public static T isNull(Configuration conf, String propertyName, Class resultClass) { + String p = conf.get(propertyName); + if (StringUtils.isEmpty(p)) + throw new IllegalArgumentException(propertyName + " must be specified"); + + if (resultClass.equals(String.class)) + return (T) p; + else if (resultClass.equals(String[].class)) + return (T) conf.getStrings(propertyName); + else if (resultClass.equals(Boolean.class)) + return (T) Boolean.valueOf(p); + else if (resultClass.equals(Long.class)) + return (T) Long.valueOf(p); + else if (resultClass.equals(Integer.class)) + return (T) Integer.valueOf(p); + else if (resultClass.equals(Float.class)) + return (T) Float.valueOf(p); + else if (resultClass.equals(Double.class)) + return (T) Double.valueOf(p); + else + throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled."); + + } + } Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaIngester.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package ingest; import java.io.IOException; @@ -54,158 +54,156 @@ import org.apache.accumulo.core.iterator import org.apache.accumulo.core.iterators.aggregation.conf.AggregatorConfiguration; public class WikipediaIngester extends Configured implements Tool { - - public final static String INGEST_LANGUAGE = "wikipedia.ingest_language"; - public final static String SPLIT_FILE = "wikipedia.split_file"; - public final static String TABLE_NAME = "wikipedia.table"; - - - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(new Configuration(), new WikipediaIngester(), args); - System.exit(res); - } - - private void createTables(TableOperations tops, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { - //Create the shard table - String indexTableName = tableName + "Index"; - String reverseIndexTableName = tableName + "ReverseIndex"; - String metadataTableName = tableName + "Metadata"; - - //create the shard table - if (!tops.exists(tableName)) { - // Set a text index aggregator on the given field names. No aggregator is set if the option is not supplied - String textIndexFamilies = WikipediaMapper.TOKENS_FIELD_NAME; - - if (textIndexFamilies.length() > 0) { - System.out.println("Adding content aggregator on the fields: " + textIndexFamilies); - - // Create and set the aggregators in one shot - List aggregators = new ArrayList(); - - for (String family : StringUtils.split(textIndexFamilies, ',')) { - aggregators.add(new AggregatorConfiguration(new Text("fi\0" + family), aggregator.TextIndexAggregator.class.getName())); - } - - tops.create(tableName); - tops.addAggregators(tableName, aggregators); - } else { - tops.create(tableName); - } - - // Set the locality group for the full content column family - tops.setLocalityGroups(tableName, - Collections.singletonMap("WikipediaDocuments", - Collections.singleton(new Text(WikipediaMapper.DOCUMENT_COLUMN_FAMILY)))); - - } - - if (!tops.exists(indexTableName)) { - tops.create(indexTableName); - //Add the UID aggregator - for (IteratorScope scope : IteratorScope.values()) { - String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator"); - tops.setProperty(indexTableName, stem, "19,iterator.TotalAggregatingIterator"); - stem += ".opt."; - tops.setProperty(indexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator"); - - } - } - - if (!tops.exists(reverseIndexTableName)) { - tops.create(reverseIndexTableName); - //Add the UID aggregator - for (IteratorScope scope : IteratorScope.values()) { - String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator"); - tops.setProperty(reverseIndexTableName, stem, "19,iterator.TotalAggregatingIterator"); - stem += ".opt."; - tops.setProperty(reverseIndexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator"); - - } - } - - if (!tops.exists(metadataTableName)) { - //Add the NumSummation aggregator for the frequency column - List aggregators = new ArrayList(); - aggregators.add(new AggregatorConfiguration(new Text("f"), NumSummation.class.getName())); - tops.create(metadataTableName); - tops.addAggregators(metadataTableName, aggregators); - } - } - - @Override - public int run(String[] args) throws Exception { - Job job = new Job(getConf(), "Ingest Wikipedia"); - Configuration conf = job.getConfiguration(); - - - String tablename = WikipediaConfiguration.getTableName(conf); - - String zookeepers = WikipediaConfiguration.getZookeepers(conf); - String instanceName = WikipediaConfiguration.getInstanceName(conf); - - String user = WikipediaConfiguration.getUser(conf); - byte[] password = WikipediaConfiguration.getPassword(conf); - Connector connector = WikipediaConfiguration.getConnector(conf); - - TableOperations tops = connector.tableOperations(); - - createTables(tops, tablename); - - configureJob(job); - - List inputPaths = new ArrayList(); - SortedSet languages = new TreeSet(); - FileSystem fs = FileSystem.get(conf); - Path parent = new Path(conf.get("wikipedia.input")); - listFiles(parent, fs, inputPaths, languages); - - System.out.println("Input files in " + parent + ":" + inputPaths.size()); - Path[] inputPathsArray = new Path[inputPaths.size()]; - inputPaths.toArray(inputPathsArray); - - System.out.println("Languages:" + languages.size()); - - FileInputFormat.setInputPaths(job, inputPathsArray); - - job.setMapperClass(WikipediaMapper.class); - job.setNumReduceTasks(0); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Mutation.class); - job.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.setOutputInfo(job, user, password, true, tablename); - AccumuloOutputFormat.setZooKeeperInstance(job, instanceName, zookeepers); - - return job.waitForCompletion(true) ? 0 : 1; - } - - public final static PathFilter partFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - return path.getName().startsWith("part"); - }; - }; - - protected void configureJob(Job job) { - Configuration conf = job.getConfiguration(); - job.setJarByClass(WikipediaIngester.class); - job.setInputFormatClass(WikipediaInputFormat.class); - conf.set(AggregatingRecordReader.START_TOKEN, ""); - conf.set(AggregatingRecordReader.END_TOKEN, ""); - } - - protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?"); - protected void listFiles(Path path, FileSystem fs, List files, Set languages) throws IOException { - for (FileStatus status : fs.listStatus(path)) { - if (status.isDir()) { - listFiles(status.getPath(), fs, files, languages); - } else { - Path p = status.getPath(); - Matcher matcher = filePattern.matcher(p.getName()); - if (matcher.matches()) { - languages.add(matcher.group(1)); - files.add(p); - } - } - } - } + + public final static String INGEST_LANGUAGE = "wikipedia.ingest_language"; + public final static String SPLIT_FILE = "wikipedia.split_file"; + public final static String TABLE_NAME = "wikipedia.table"; + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new WikipediaIngester(), args); + System.exit(res); + } + + private void createTables(TableOperations tops, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + TableExistsException { + // Create the shard table + String indexTableName = tableName + "Index"; + String reverseIndexTableName = tableName + "ReverseIndex"; + String metadataTableName = tableName + "Metadata"; + + // create the shard table + if (!tops.exists(tableName)) { + // Set a text index aggregator on the given field names. No aggregator is set if the option is not supplied + String textIndexFamilies = WikipediaMapper.TOKENS_FIELD_NAME; + + if (textIndexFamilies.length() > 0) { + System.out.println("Adding content aggregator on the fields: " + textIndexFamilies); + + // Create and set the aggregators in one shot + List aggregators = new ArrayList(); + + for (String family : StringUtils.split(textIndexFamilies, ',')) { + aggregators.add(new AggregatorConfiguration(new Text("fi\0" + family), aggregator.TextIndexAggregator.class.getName())); + } + + tops.create(tableName); + tops.addAggregators(tableName, aggregators); + } else { + tops.create(tableName); + } + + // Set the locality group for the full content column family + tops.setLocalityGroups(tableName, Collections.singletonMap("WikipediaDocuments", Collections.singleton(new Text(WikipediaMapper.DOCUMENT_COLUMN_FAMILY)))); + + } + + if (!tops.exists(indexTableName)) { + tops.create(indexTableName); + // Add the UID aggregator + for (IteratorScope scope : IteratorScope.values()) { + String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator"); + tops.setProperty(indexTableName, stem, "19,iterator.TotalAggregatingIterator"); + stem += ".opt."; + tops.setProperty(indexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator"); + + } + } + + if (!tops.exists(reverseIndexTableName)) { + tops.create(reverseIndexTableName); + // Add the UID aggregator + for (IteratorScope scope : IteratorScope.values()) { + String stem = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name(), "UIDAggregator"); + tops.setProperty(reverseIndexTableName, stem, "19,iterator.TotalAggregatingIterator"); + stem += ".opt."; + tops.setProperty(reverseIndexTableName, stem + "*", "aggregator.GlobalIndexUidAggregator"); + + } + } + + if (!tops.exists(metadataTableName)) { + // Add the NumSummation aggregator for the frequency column + List aggregators = new ArrayList(); + aggregators.add(new AggregatorConfiguration(new Text("f"), NumSummation.class.getName())); + tops.create(metadataTableName); + tops.addAggregators(metadataTableName, aggregators); + } + } + + @Override + public int run(String[] args) throws Exception { + Job job = new Job(getConf(), "Ingest Wikipedia"); + Configuration conf = job.getConfiguration(); + + String tablename = WikipediaConfiguration.getTableName(conf); + + String zookeepers = WikipediaConfiguration.getZookeepers(conf); + String instanceName = WikipediaConfiguration.getInstanceName(conf); + + String user = WikipediaConfiguration.getUser(conf); + byte[] password = WikipediaConfiguration.getPassword(conf); + Connector connector = WikipediaConfiguration.getConnector(conf); + + TableOperations tops = connector.tableOperations(); + + createTables(tops, tablename); + + configureJob(job); + + List inputPaths = new ArrayList(); + SortedSet languages = new TreeSet(); + FileSystem fs = FileSystem.get(conf); + Path parent = new Path(conf.get("wikipedia.input")); + listFiles(parent, fs, inputPaths, languages); + + System.out.println("Input files in " + parent + ":" + inputPaths.size()); + Path[] inputPathsArray = new Path[inputPaths.size()]; + inputPaths.toArray(inputPathsArray); + + System.out.println("Languages:" + languages.size()); + + FileInputFormat.setInputPaths(job, inputPathsArray); + + job.setMapperClass(WikipediaMapper.class); + job.setNumReduceTasks(0); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Mutation.class); + job.setOutputFormatClass(AccumuloOutputFormat.class); + AccumuloOutputFormat.setOutputInfo(job, user, password, true, tablename); + AccumuloOutputFormat.setZooKeeperInstance(job, instanceName, zookeepers); + + return job.waitForCompletion(true) ? 0 : 1; + } + + public final static PathFilter partFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith("part"); + }; + }; + + protected void configureJob(Job job) { + Configuration conf = job.getConfiguration(); + job.setJarByClass(WikipediaIngester.class); + job.setInputFormatClass(WikipediaInputFormat.class); + conf.set(AggregatingRecordReader.START_TOKEN, ""); + conf.set(AggregatingRecordReader.END_TOKEN, ""); + } + + protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?"); + + protected void listFiles(Path path, FileSystem fs, List files, Set languages) throws IOException { + for (FileStatus status : fs.listStatus(path)) { + if (status.isDir()) { + listFiles(status.getPath(), fs, files, languages); + } else { + Path p = status.getPath(); + Matcher matcher = filePattern.matcher(p.getName()); + if (matcher.matches()) { + languages.add(matcher.group(1)); + files.add(p); + } + } + } + } } Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaInputFormat.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package ingest; import org.apache.hadoop.fs.Path; @@ -28,15 +28,15 @@ import org.apache.hadoop.mapreduce.lib.i import reader.AggregatingRecordReader; public class WikipediaInputFormat extends TextInputFormat { - - @Override - public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { - return new AggregatingRecordReader(); - } - - @Override - protected boolean isSplitable(JobContext context, Path file) { - return false; - } - + + @Override + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { + return new AggregatingRecordReader(); + } + + @Override + protected boolean isSplitable(JobContext context, Path file) { + return false; + } + } Modified: incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/ingest/src/main/java/ingest/WikipediaMapper.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ /** * */ @@ -65,200 +65,195 @@ import org.apache.accumulo.core.security import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -public class WikipediaMapper extends Mapper { - - private static final Logger log = Logger.getLogger(WikipediaMapper.class); - - public final static Charset UTF8 = Charset.forName("UTF-8"); - public static final String DOCUMENT_COLUMN_FAMILY = "d"; - public static final String METADATA_EVENT_COLUMN_FAMILY = "e"; - public static final String METADATA_INDEX_COLUMN_FAMILY = "i"; - public static final String TOKENS_FIELD_NAME = "TEXT"; - - private final static Pattern languagePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?"); - private static final Value NULL_VALUE = new Value(new byte[0]); - private static final String cvPrefix = "all|"; - - - private ArticleExtractor extractor; - private String language; - private int numPartitions = 0; - private Set stopwords = null; - private ColumnVisibility cv = null; - - - private Text tablename = null; - private Text indexTableName = null; - private Text reverseIndexTableName = null; - private Text metadataTableName = null; - - - @Override - public void setup(Context context) { - Configuration conf = context.getConfiguration(); - tablename = new Text(WikipediaConfiguration.getTableName(conf)); - indexTableName = new Text(tablename + "Index"); - reverseIndexTableName = new Text(tablename + "ReverseIndex"); - metadataTableName = new Text(tablename + "Metadata"); - - FileSplit split = (FileSplit) context.getInputSplit(); - String fileName = split.getPath().getName(); - Matcher matcher = languagePattern.matcher(fileName); - if (matcher.matches()) { - language = matcher.group(1).replace('_', '-').toLowerCase(); - if (language.equals("arwiki")) - stopwords = ArabicAnalyzer.getDefaultStopSet(); - else if (language.equals("brwiki")) - stopwords = BrazilianAnalyzer.getDefaultStopSet(); - else if (language.startsWith("zh")) - stopwords = CJKAnalyzer.getDefaultStopSet(); - else if (language.equals("dewiki")) - stopwords = GermanAnalyzer.getDefaultStopSet(); - else if (language.equals("elwiki")) - stopwords = GreekAnalyzer.getDefaultStopSet(); - else if (language.equals("fawiki")) - stopwords = PersianAnalyzer.getDefaultStopSet(); - else if (language.equals("frwiki")) - stopwords = FrenchAnalyzer.getDefaultStopSet(); - else if (language.equals("nlwiki")) - stopwords = DutchAnalyzer.getDefaultStopSet(); - else - stopwords = StopAnalyzer.ENGLISH_STOP_WORDS_SET; - - } else { - throw new RuntimeException("Unknown ingest language! " + fileName); - } - extractor = new ArticleExtractor(); - numPartitions = WikipediaConfiguration.getNumPartitions(conf); - cv = new ColumnVisibility(cvPrefix + language); - - } - - /** - * We will partition the documents based on the document id - * @param article - * @param numPartitions - * @return - * @throws IllegalFormatException - */ - public static int getPartitionId(Article article, int numPartitions) throws IllegalFormatException { - return article.getId() % numPartitions; - } - - @Override - protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { - Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8)); - String NULL_BYTE = "\u0000"; - String colfPrefix = language+NULL_BYTE; - String indexPrefix = "fi"+NULL_BYTE; - if (article != null) { - Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions))); - - //Create the mutations for the document. - //Row is partition id, colf is language0articleid, colq is fieldName\0fieldValue - Mutation m = new Mutation(partitionId); - for (Entry entry : article.getFieldValues().entrySet()) { - m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp() , NULL_VALUE); - //Create mutations for the metadata table. - Mutation mm = new Mutation(entry.getKey()); - mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE); - context.write(metadataTableName, mm); - } - - //Tokenize the content - Set tokens = getTokens(article); - - //We are going to put the fields to be indexed into a multimap. This allows us to iterate - //over the entire set once. - Multimap indexFields = HashMultimap.create(); - //Add the normalized field values - LcNoDiacriticsNormalizer normalizer = new LcNoDiacriticsNormalizer(); - for (Entry index : article.getNormalizedFieldValues().entrySet()) - indexFields.put(index.getKey(), index.getValue()); - //Add the tokens - for (String token : tokens) - indexFields.put(TOKENS_FIELD_NAME, normalizer.normalizeFieldValue("", token)); - - - for (Entry index : indexFields.entries()) { - //Create mutations for the in partition index - //Row is partition id, colf is 'fi'\0fieldName, colq is fieldValue\0language\0article id - m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(), cv, article.getTimestamp() , NULL_VALUE); - - - //Create mutations for the global index - //Create a UID object for the Value - Builder uidBuilder = Uid.List.newBuilder(); - uidBuilder.setIGNORE(false); - uidBuilder.setCOUNT(1); - uidBuilder.addUID(Integer.toString(article.getId())); - Uid.List uidList = uidBuilder.build(); - Value val = new Value(uidList.toByteArray()); - - //Create mutations for the global index - //Row is field value, colf is field name, colq is partitionid\0language, value is Uid.List object - Mutation gm = new Mutation(index.getValue()); - gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp() , val); - context.write(indexTableName, gm); - - //Create mutations for the global reverse index - Mutation grm = new Mutation(StringUtils.reverse(index.getValue())); - grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp() , val); - context.write(reverseIndexTableName, grm); - - //Create mutations for the metadata table. - Mutation mm = new Mutation(index.getKey()); - mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE); - context.write(metadataTableName, mm); - - - } - //Add the entire text to the document section of the table. - //row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document - m.put(DOCUMENT_COLUMN_FAMILY, colfPrefix + article.getId(), cv, article.getTimestamp(), - new Value(Base64.encodeBytes(article.getText().getBytes(), Base64.GZIP).getBytes())); - context.write(tablename, m); - - } else { - context.getCounter("wikipedia", "invalid articles").increment(1); - } - context.progress(); - } - - /** - * Tokenize the wikipedia content - * - * @param article - * @return - * @throws IOException - */ - private Set getTokens(Article article) throws IOException { - Set tokenList = new HashSet(); - WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText())); - TermAttribute term = tok.addAttribute(TermAttribute.class); - StopFilter filter = new StopFilter(false, tok, stopwords, true); - try { - while (filter.incrementToken()) { - String token = term.term(); - if (!StringUtils.isEmpty(token)) - tokenList.add(token); - } - } catch (IOException e) { - log.error("Error tokenizing text", e); - } finally { - try { - tok.end(); - } catch (IOException e) { - log.error("Error calling end()", e); - } finally { - try { - tok.close(); - } catch (IOException e) { - log.error("Error closing tokenizer", e); - } - } - } - return tokenList; - } - +public class WikipediaMapper extends Mapper { + + private static final Logger log = Logger.getLogger(WikipediaMapper.class); + + public final static Charset UTF8 = Charset.forName("UTF-8"); + public static final String DOCUMENT_COLUMN_FAMILY = "d"; + public static final String METADATA_EVENT_COLUMN_FAMILY = "e"; + public static final String METADATA_INDEX_COLUMN_FAMILY = "i"; + public static final String TOKENS_FIELD_NAME = "TEXT"; + + private final static Pattern languagePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?"); + private static final Value NULL_VALUE = new Value(new byte[0]); + private static final String cvPrefix = "all|"; + + private ArticleExtractor extractor; + private String language; + private int numPartitions = 0; + private Set stopwords = null; + private ColumnVisibility cv = null; + + private Text tablename = null; + private Text indexTableName = null; + private Text reverseIndexTableName = null; + private Text metadataTableName = null; + + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + tablename = new Text(WikipediaConfiguration.getTableName(conf)); + indexTableName = new Text(tablename + "Index"); + reverseIndexTableName = new Text(tablename + "ReverseIndex"); + metadataTableName = new Text(tablename + "Metadata"); + + FileSplit split = (FileSplit) context.getInputSplit(); + String fileName = split.getPath().getName(); + Matcher matcher = languagePattern.matcher(fileName); + if (matcher.matches()) { + language = matcher.group(1).replace('_', '-').toLowerCase(); + if (language.equals("arwiki")) + stopwords = ArabicAnalyzer.getDefaultStopSet(); + else if (language.equals("brwiki")) + stopwords = BrazilianAnalyzer.getDefaultStopSet(); + else if (language.startsWith("zh")) + stopwords = CJKAnalyzer.getDefaultStopSet(); + else if (language.equals("dewiki")) + stopwords = GermanAnalyzer.getDefaultStopSet(); + else if (language.equals("elwiki")) + stopwords = GreekAnalyzer.getDefaultStopSet(); + else if (language.equals("fawiki")) + stopwords = PersianAnalyzer.getDefaultStopSet(); + else if (language.equals("frwiki")) + stopwords = FrenchAnalyzer.getDefaultStopSet(); + else if (language.equals("nlwiki")) + stopwords = DutchAnalyzer.getDefaultStopSet(); + else + stopwords = StopAnalyzer.ENGLISH_STOP_WORDS_SET; + + } else { + throw new RuntimeException("Unknown ingest language! " + fileName); + } + extractor = new ArticleExtractor(); + numPartitions = WikipediaConfiguration.getNumPartitions(conf); + cv = new ColumnVisibility(cvPrefix + language); + + } + + /** + * We will partition the documents based on the document id + * + * @param article + * @param numPartitions + * @return + * @throws IllegalFormatException + */ + public static int getPartitionId(Article article, int numPartitions) throws IllegalFormatException { + return article.getId() % numPartitions; + } + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8)); + String NULL_BYTE = "\u0000"; + String colfPrefix = language + NULL_BYTE; + String indexPrefix = "fi" + NULL_BYTE; + if (article != null) { + Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions))); + + // Create the mutations for the document. + // Row is partition id, colf is language0articleid, colq is fieldName\0fieldValue + Mutation m = new Mutation(partitionId); + for (Entry entry : article.getFieldValues().entrySet()) { + m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE); + // Create mutations for the metadata table. + Mutation mm = new Mutation(entry.getKey()); + mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE); + context.write(metadataTableName, mm); + } + + // Tokenize the content + Set tokens = getTokens(article); + + // We are going to put the fields to be indexed into a multimap. This allows us to iterate + // over the entire set once. + Multimap indexFields = HashMultimap.create(); + // Add the normalized field values + LcNoDiacriticsNormalizer normalizer = new LcNoDiacriticsNormalizer(); + for (Entry index : article.getNormalizedFieldValues().entrySet()) + indexFields.put(index.getKey(), index.getValue()); + // Add the tokens + for (String token : tokens) + indexFields.put(TOKENS_FIELD_NAME, normalizer.normalizeFieldValue("", token)); + + for (Entry index : indexFields.entries()) { + // Create mutations for the in partition index + // Row is partition id, colf is 'fi'\0fieldName, colq is fieldValue\0language\0article id + m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(), cv, article.getTimestamp(), NULL_VALUE); + + // Create mutations for the global index + // Create a UID object for the Value + Builder uidBuilder = Uid.List.newBuilder(); + uidBuilder.setIGNORE(false); + uidBuilder.setCOUNT(1); + uidBuilder.addUID(Integer.toString(article.getId())); + Uid.List uidList = uidBuilder.build(); + Value val = new Value(uidList.toByteArray()); + + // Create mutations for the global index + // Row is field value, colf is field name, colq is partitionid\0language, value is Uid.List object + Mutation gm = new Mutation(index.getValue()); + gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val); + context.write(indexTableName, gm); + + // Create mutations for the global reverse index + Mutation grm = new Mutation(StringUtils.reverse(index.getValue())); + grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val); + context.write(reverseIndexTableName, grm); + + // Create mutations for the metadata table. + Mutation mm = new Mutation(index.getKey()); + mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE); + context.write(metadataTableName, mm); + + } + // Add the entire text to the document section of the table. + // row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document + m.put(DOCUMENT_COLUMN_FAMILY, colfPrefix + article.getId(), cv, article.getTimestamp(), + new Value(Base64.encodeBytes(article.getText().getBytes(), Base64.GZIP).getBytes())); + context.write(tablename, m); + + } else { + context.getCounter("wikipedia", "invalid articles").increment(1); + } + context.progress(); + } + + /** + * Tokenize the wikipedia content + * + * @param article + * @return + * @throws IOException + */ + private Set getTokens(Article article) throws IOException { + Set tokenList = new HashSet(); + WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText())); + TermAttribute term = tok.addAttribute(TermAttribute.class); + StopFilter filter = new StopFilter(false, tok, stopwords, true); + try { + while (filter.incrementToken()) { + String token = term.term(); + if (!StringUtils.isEmpty(token)) + tokenList.add(token); + } + } catch (IOException e) { + log.error("Error tokenizing text", e); + } finally { + try { + tok.end(); + } catch (IOException e) { + log.error("Error calling end()", e); + } finally { + try { + tok.close(); + } catch (IOException e) { + log.error("Error closing tokenizer", e); + } + } + } + return tokenList; + } + }