Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 772F118D01 for ; Thu, 11 Feb 2016 12:49:42 +0000 (UTC) Received: (qmail 62949 invoked by uid 500); 11 Feb 2016 12:49:42 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 62882 invoked by uid 500); 11 Feb 2016 12:49:42 -0000 Mailing-List: contact commits-help@kylin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.apache.org Delivered-To: mailing list commits@kylin.apache.org Received: (qmail 61675 invoked by uid 99); 11 Feb 2016 12:49:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Feb 2016 12:49:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5CBC1E0E4C; Thu, 11 Feb 2016 12:49:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: liyang@apache.org To: commits@kylin.apache.org Date: Thu, 11 Feb 2016 12:50:24 -0000 Message-Id: <0f4b05a74d3a4517931a02c3fddb0c44@git.apache.org> In-Reply-To: <23cda63970dd4963a0cafaebf033045d@git.apache.org> References: <23cda63970dd4963a0cafaebf033045d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [45/51] [partial] kylin git commit: KYLIN-1416 keep only website in document branch http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java deleted file mode 100644 index 646cd80..0000000 --- a/common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; - -import com.google.common.collect.Lists; - -public class FileResourceStore extends ResourceStore { - - File root; - - public FileResourceStore(KylinConfig kylinConfig) { - super(kylinConfig); - root = new File(kylinConfig.getMetadataUrl()).getAbsoluteFile(); - if (root.exists() == false) - throw new IllegalArgumentException("File not exist by '" + kylinConfig.getMetadataUrl() + "': " + root.getAbsolutePath()); - } - - @Override - protected ArrayList listResourcesImpl(String resPath) throws IOException { - String[] names = file(resPath).list(); - if (names == null) // not a directory - return null; - - ArrayList r = new ArrayList(names.length); - String prefix = resPath.endsWith("/") ? resPath : resPath + "/"; - for (String n : names) { - r.add(prefix + n); - } - return r; - } - - @Override - protected boolean existsImpl(String resPath) throws IOException { - File f = file(resPath); - return f.exists() && f.isFile(); // directory is not considered a - // resource - } - - @Override - protected List getAllResources(String rangeStart, String rangeEnd) throws IOException { - List result = Lists.newArrayList(); - try { - String commonPrefix = StringUtils.getCommonPrefix(rangeEnd, rangeStart); - commonPrefix = commonPrefix.substring(0, commonPrefix.lastIndexOf("/") + 1); - final ArrayList resources = listResourcesImpl(commonPrefix); - for (String resource : resources) { - if (resource.compareTo(rangeStart) >= 0 && resource.compareTo(rangeEnd) <= 0) { - if (existsImpl(resource)) { - result.add(getResourceImpl(resource)); - } - } - } - return result; - } catch (IOException ex) { - for (RawResource rawResource : result) { - IOUtils.closeQuietly(rawResource.inputStream); - } - throw ex; - } catch (Exception ex) { - throw new UnsupportedOperationException(ex); - } - } - - @Override - protected RawResource getResourceImpl(String resPath) throws IOException { - File f = file(resPath); - if (f.exists() && f.isFile()) - return new RawResource(new FileInputStream(f), f.lastModified()); - else - return null; - } - - @Override - protected long getResourceTimestampImpl(String resPath) throws IOException { - File f = file(resPath); - if (f.exists() && f.isFile()) - return f.lastModified(); - else - return 0; - } - - @Override - protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { - File f = file(resPath); - f.getParentFile().mkdirs(); - FileOutputStream out = new FileOutputStream(f); - try { - IOUtils.copy(content, out); - } finally { - IOUtils.closeQuietly(out); - } - - f.setLastModified(ts); - } - - @Override - protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - File f = file(resPath); - if ((f.exists() && f.lastModified() != oldTS) || (f.exists() == false && oldTS != 0)) - throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but found " + f.lastModified()); - - putResourceImpl(resPath, new ByteArrayInputStream(content), newTS); - - // some FS lose precision on given time stamp - return f.lastModified(); - } - - @Override - protected void deleteResourceImpl(String resPath) throws IOException { - File f = file(resPath); - f.delete(); - } - - @Override - protected String getReadableResourcePathImpl(String resPath) { - return file(resPath).toString(); - } - - private File file(String resPath) { - if (resPath.equals("/")) - return root; - else - return new File(root, resPath); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java deleted file mode 100644 index 5b8fe54..0000000 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.kylin.common.util.HadoopUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - * - */ -public class HBaseConnection { - - private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class); - - private static final Map ConnPool = new ConcurrentHashMap(); - - static { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - for (HConnection conn : ConnPool.values()) { - try { - conn.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - }); - } - - public static void clearCache() { - ConnPool.clear(); - } - - public static HConnection get(String url) { - - HConnection connection = ConnPool.get(url); - try { - // I don't use DCL since recreate a connection is not a big issue. - if (connection == null) { - // find configuration - Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); - connection = HConnectionManager.createConnection(conf); - ConnPool.put(url, connection); - } - } catch (Throwable t) { - throw new StorageException("Error when open connection " + url, t); - } - - return connection; - } - - public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException { - createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families); - } - - public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); - - try { - boolean tableExist = false; - try { - hbase.getTableDescriptor(TableName.valueOf(tableName)); - tableExist = true; - } catch (TableNotFoundException e) { - } - - if (tableExist) { - logger.debug("HTable '" + tableName + "' already exists"); - return; - } - - logger.debug("Creating HTable '" + tableName + "'"); - - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - - if (null != families && families.length > 0) { - for (String family : families) { - HColumnDescriptor fd = new HColumnDescriptor(family); - fd.setInMemory(true); // metadata tables are best in memory - desc.addFamily(fd); - } - } - hbase.createTable(desc); - - logger.debug("HTable '" + tableName + "' created"); - } finally { - hbase.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java deleted file mode 100644 index d1ff27a..0000000 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.KeyOnlyFilter; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.HadoopUtil; - -import com.google.common.collect.Lists; - -public class HBaseResourceStore extends ResourceStore { - - private static final String DEFAULT_TABLE_NAME = "kylin_metadata"; - private static final String FAMILY = "f"; - private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY); - private static final String COLUMN = "c"; - private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN); - private static final String COLUMN_TS = "t"; - private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS); - - private static final Map TABLE_SUFFIX_MAP = new LinkedHashMap(); - - static { - TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube"); - TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict"); - TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex"); - TABLE_SUFFIX_MAP.put(JOB_PATH_ROOT + "/", "_job"); - TABLE_SUFFIX_MAP.put(JOB_OUTPUT_PATH_ROOT + "/", "_job_output"); - TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj"); - TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot"); - TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE - } - - final String tableNameBase; - final String hbaseUrl; - - // final Map tableNameMap; // path prefix ==> HBase table name - - private HConnection getConnection() throws IOException { - return HBaseConnection.get(hbaseUrl); - } - - public HBaseResourceStore(KylinConfig kylinConfig) throws IOException { - super(kylinConfig); - - String metadataUrl = kylinConfig.getMetadataUrl(); - // split TABLE@HBASE_URL - int cut = metadataUrl.indexOf('@'); - tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut); - hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); - if (!(StringUtils.isEmpty(hbaseUrl) || "hbase".equals(hbaseUrl))) - throw new IllegalArgumentException("to use hbase storage, pls set 'kylin.metadata.url=kylin_metadata@hbase' in kylin.properties"); - - createHTableIfNeeded(getAllInOneTableName()); - - } - - private void createHTableIfNeeded(String tableName) throws IOException { - HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY); - } - - private String getAllInOneTableName() { - return tableNameBase; - } - - @Override - protected ArrayList listResourcesImpl(String resPath) throws IOException { - assert resPath.startsWith("/"); - String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/"; - byte[] startRow = Bytes.toBytes(lookForPrefix); - byte[] endRow = Bytes.toBytes(lookForPrefix); - endRow[endRow.length - 1]++; - - ArrayList result = new ArrayList(); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - Scan scan = new Scan(startRow, endRow); - scan.setFilter(new KeyOnlyFilter()); - try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - String path = Bytes.toString(r.getRow()); - assert path.startsWith(lookForPrefix); - int cut = path.indexOf('/', lookForPrefix.length()); - String child = cut < 0 ? path : path.substring(0, cut); - if (result.contains(child) == false) - result.add(child); - } - } finally { - IOUtils.closeQuietly(table); - } - // return null to indicate not a folder - return result.isEmpty() ? null : result; - } - - @Override - protected boolean existsImpl(String resPath) throws IOException { - Result r = getByScan(resPath, false, false); - return r != null; - } - - @Override - protected List getAllResources(String rangeStart, String rangeEnd) throws IOException { - byte[] startRow = Bytes.toBytes(rangeStart); - byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); - - Scan scan = new Scan(startRow, endRow); - scan.addColumn(B_FAMILY, B_COLUMN_TS); - scan.addColumn(B_FAMILY, B_COLUMN); - tuneScanParameters(scan); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - List result = Lists.newArrayList(); - try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); - } - } catch (IOException e) { - for (RawResource rawResource : result) { - IOUtils.closeQuietly(rawResource.inputStream); - } - throw e; - } finally { - IOUtils.closeQuietly(table); - } - return result; - } - - private void tuneScanParameters(Scan scan) { - // divide by 10 as some resource like dictionary or snapshot can be very large - scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10); - scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize()); - scan.setCacheBlocks(true); - } - - private InputStream getInputStream(String resPath, Result r) throws IOException { - if (r == null) { - return null; - } - byte[] value = r.getValue(B_FAMILY, B_COLUMN); - if (value.length == 0) { - Path redirectPath = bigCellHDFSPath(resPath); - Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); - - return fileSystem.open(redirectPath); - } else { - return new ByteArrayInputStream(value); - } - } - - private long getTimestamp(Result r) { - if (r == null || r.getValue(B_FAMILY, B_COLUMN_TS) == null) { - return 0; - } else { - return Bytes.toLong(r.getValue(B_FAMILY, B_COLUMN_TS)); - } - } - - @Override - protected RawResource getResourceImpl(String resPath) throws IOException { - Result r = getByScan(resPath, true, true); - if (r == null) - return null; - else - return new RawResource(getInputStream(resPath, r), getTimestamp(r)); - } - - @Override - protected long getResourceTimestampImpl(String resPath) throws IOException { - return getTimestamp(getByScan(resPath, false, true)); - } - - @Override - protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - IOUtils.copy(content, bout); - bout.close(); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - byte[] row = Bytes.toBytes(resPath); - Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); - - table.put(put); - table.flushCommits(); - } finally { - IOUtils.closeQuietly(table); - } - } - - @Override - protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - byte[] row = Bytes.toBytes(resPath); - byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); - Put put = buildPut(resPath, newTS, row, content, table); - - boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); - if (!ok) { - long real = getResourceTimestampImpl(resPath); - throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS); - } - - table.flushCommits(); - - return newTS; - } finally { - IOUtils.closeQuietly(table); - } - } - - @Override - protected void deleteResourceImpl(String resPath) throws IOException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - Delete del = new Delete(Bytes.toBytes(resPath)); - table.delete(del); - table.flushCommits(); - } finally { - IOUtils.closeQuietly(table); - } - } - - @Override - protected String getReadableResourcePathImpl(String resPath) { - return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl(); - } - - private Result getByScan(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { - byte[] startRow = Bytes.toBytes(path); - byte[] endRow = plusZero(startRow); - - Scan scan = new Scan(startRow, endRow); - if (!fetchContent && !fetchTimestamp) { - scan.setFilter(new KeyOnlyFilter()); - } else { - if (fetchContent) - scan.addColumn(B_FAMILY, B_COLUMN); - if (fetchTimestamp) - scan.addColumn(B_FAMILY, B_COLUMN_TS); - } - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - ResultScanner scanner = table.getScanner(scan); - Result result = null; - for (Result r : scanner) { - result = r; - } - return result == null || result.isEmpty() ? null : result; - } finally { - IOUtils.closeQuietly(table); - } - } - - private byte[] plusZero(byte[] startRow) { - byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1); - endRow[endRow.length - 1] = 0; - return endRow; - } - - private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { - Path redirectPath = bigCellHDFSPath(resPath); - Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); - - if (fileSystem.exists(redirectPath)) { - fileSystem.delete(redirectPath, true); - } - - FSDataOutputStream out = fileSystem.create(redirectPath); - - try { - out.write(largeColumn); - } finally { - IOUtils.closeQuietly(out); - } - - return redirectPath; - } - - public Path bigCellHDFSPath(String resPath) { - String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory(); - Path redirectPath = new Path(hdfsWorkingDirectory, "resources" + resPath); - return redirectPath; - } - - private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException { - int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize(); - if (content.length > kvSizeLimit) { - writeLargeCellToHdfs(resPath, content, table); - content = BytesUtil.EMPTY_BYTE_ARRAY; - } - - Put put = new Put(row); - put.add(B_FAMILY, B_COLUMN, content); - put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); - - return put; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/persistence/JsonSerializer.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/JsonSerializer.java b/common/src/main/java/org/apache/kylin/common/persistence/JsonSerializer.java deleted file mode 100644 index a23b933..0000000 --- a/common/src/main/java/org/apache/kylin/common/persistence/JsonSerializer.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.kylin.common.util.JsonUtil; - -/** - * @author yangli9 - */ -public class JsonSerializer implements Serializer { - - Class clz; - - public JsonSerializer(Class clz) { - this.clz = clz; - } - - @Override - public T deserialize(DataInputStream in) throws IOException { - return JsonUtil.readValue(in, clz); - } - - @Override - public void serialize(T obj, DataOutputStream out) throws IOException { - JsonUtil.writeValueIndent(out, obj); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java b/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java deleted file mode 100644 index 4f52553..0000000 --- a/common/src/main/java/org/apache/kylin/common/persistence/RawResource.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.InputStream; - -/** - */ -public class RawResource { - - public final InputStream inputStream; - public final long timestamp; - - public RawResource(InputStream inputStream, long timestamp) { - this.inputStream = inputStream; - this.timestamp = timestamp; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java deleted file mode 100644 index 505c72a..0000000 --- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.io.IOUtils; -import org.apache.kylin.common.KylinConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -abstract public class ResourceStore { - - private static final Logger logger = LoggerFactory.getLogger(ResourceStore.class); - - public static final String CUBE_RESOURCE_ROOT = "/cube"; - public static final String II_RESOURCE_ROOT = "/invertedindex"; - public static final String CUBE_DESC_RESOURCE_ROOT = "/cube_desc"; - public static final String II_DESC_RESOURCE_ROOT = "/invertedindex_desc"; - public static final String DATA_MODEL_DESC_RESOURCE_ROOT = "/model_desc"; - public static final String DICT_RESOURCE_ROOT = "/dict"; - public static final String JOB_PATH_ROOT = "/job"; - public static final String JOB_OUTPUT_PATH_ROOT = "/job_output"; - public static final String PROJECT_RESOURCE_ROOT = "/project"; - public static final String SNAPSHOT_RESOURCE_ROOT = "/table_snapshot"; - public static final String TABLE_EXD_RESOURCE_ROOT = "/table_exd"; - public static final String TABLE_RESOURCE_ROOT = "/table"; - public static final String HYBRID_RESOURCE_ROOT = "/hybrid"; - public static final String EXECUTE_PATH_ROOT = "/execute"; - public static final String EXECUTE_OUTPUT_ROOT = "/execute_output"; - - - private static ConcurrentHashMap CACHE = new ConcurrentHashMap(); - - public static final ArrayList> knownImpl = new ArrayList>(); - - static { - knownImpl.add(FileResourceStore.class); - knownImpl.add(HBaseResourceStore.class); - } - - public static ResourceStore getStore(KylinConfig kylinConfig) { - ResourceStore r = CACHE.get(kylinConfig); - List es = new ArrayList(); - if (r == null) { - logger.info("Using metadata url " + kylinConfig.getMetadataUrl() + " for resource store"); - for (Class cls : knownImpl) { - - try { - r = cls.getConstructor(KylinConfig.class).newInstance(kylinConfig); - } catch (Exception e) { - es.add(e); - } catch (NoClassDefFoundError er) { - // may throw NoClassDefFoundError - es.add(er); - } - if (r != null) { - break; - } - } - if (r == null) { - for (Throwable exceptionOrError : es) { - logger.error("Create new store instance failed ", exceptionOrError); - } - throw new IllegalArgumentException("Failed to find metadata store by url: " + kylinConfig.getMetadataUrl()); - } - - CACHE.put(kylinConfig, r); - } - return r; - } - - // ============================================================================ - - KylinConfig kylinConfig; - - ResourceStore(KylinConfig kylinConfig) { - this.kylinConfig = kylinConfig; - } - - /** - * return a list of child resources & folders under given path, return null - * if given path is not a folder - */ - final public ArrayList listResources(String resPath) throws IOException { - resPath = norm(resPath); - return listResourcesImpl(resPath); - } - - abstract protected ArrayList listResourcesImpl(String resPath) throws IOException; - - /** - * return true if a resource exists, return false in case of folder or - * non-exist - */ - final public boolean exists(String resPath) throws IOException { - return existsImpl(norm(resPath)); - } - - abstract protected boolean existsImpl(String resPath) throws IOException; - - /** - * read a resource, return null in case of not found - */ - final public T getResource(String resPath, Class clz, Serializer serializer) throws IOException { - resPath = norm(resPath); - RawResource res = getResourceImpl(resPath); - if (res == null) - return null; - - DataInputStream din = new DataInputStream(res.inputStream); - try { - T r = serializer.deserialize(din); - r.setLastModified(res.timestamp); - return r; - } finally { - IOUtils.closeQuietly(din); - IOUtils.closeQuietly(res.inputStream); - } - } - - final public RawResource getResource(String resPath) throws IOException { - return getResourceImpl(norm(resPath)); - } - - final public long getResourceTimestamp(String resPath) throws IOException { - return getResourceTimestampImpl(norm(resPath)); - } - - final public List getAllResources(String rangeStart, String rangeEnd, Class clazz, Serializer serializer) throws IOException { - final List allResources = getAllResources(rangeStart, rangeEnd); - if (allResources.isEmpty()) { - return Collections.emptyList(); - } - List result = Lists.newArrayList(); - try { - for (RawResource rawResource : allResources) { - final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream)); - element.setLastModified(rawResource.timestamp); - result.add(element); - } - return result; - } finally { - for (RawResource rawResource : allResources) { - IOUtils.closeQuietly(rawResource.inputStream); - } - } - } - - abstract protected List getAllResources(String rangeStart, String rangeEnd) throws IOException; - - /** returns null if not exists */ - abstract protected RawResource getResourceImpl(String resPath) throws IOException; - - /** returns 0 if not exists */ - abstract protected long getResourceTimestampImpl(String resPath) throws IOException; - - /** - * overwrite a resource without write conflict check - */ - final public void putResource(String resPath, InputStream content, long ts) throws IOException { - resPath = norm(resPath); - logger.debug("Saving resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); - putResourceImpl(resPath, content, ts); - } - - abstract protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException; - - /** - * check & set, overwrite a resource - */ - final public long putResource(String resPath, T obj, Serializer serializer) throws IOException { - resPath = norm(resPath); - logger.debug("Saving resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); - - long oldTS = obj.getLastModified(); - long newTS = System.currentTimeMillis(); - obj.setLastModified(newTS); - - try { - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - DataOutputStream dout = new DataOutputStream(buf); - serializer.serialize(obj, dout); - dout.close(); - buf.close(); - - newTS = checkAndPutResourceImpl(resPath, buf.toByteArray(), oldTS, newTS); - obj.setLastModified(newTS); // update again the confirmed TS - return newTS; - } catch (IOException e) { - obj.setLastModified(oldTS); // roll back TS when write fail - throw e; - } catch (RuntimeException e) { - obj.setLastModified(oldTS); // roll back TS when write fail - throw e; - } - } - - /** - * checks old timestamp when overwriting existing - */ - abstract protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException; - - /** - * delete a resource, does nothing on a folder - */ - final public void deleteResource(String resPath) throws IOException { - logger.debug("Deleting resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); - deleteResourceImpl(norm(resPath)); - } - - abstract protected void deleteResourceImpl(String resPath) throws IOException; - - /** - * get a readable string of a resource path - */ - final public String getReadableResourcePath(String resPath) { - return getReadableResourcePathImpl(norm(resPath)); - } - - abstract protected String getReadableResourcePathImpl(String resPath); - - private String norm(String resPath) { - resPath = resPath.trim(); - while (resPath.startsWith("//")) - resPath = resPath.substring(1); - while (resPath.endsWith("/")) - resPath = resPath.substring(0, resPath.length() - 1); - if (resPath.startsWith("/") == false) - resPath = "/" + resPath; - return resPath; - } - - // ============================================================================ - - public static interface Visitor { - void visit(String path) throws IOException; - } - - public void scanRecursively(String path, Visitor visitor) throws IOException { - ArrayList children = listResources(path); - if (children != null) { - for (String child : children) - scanRecursively(child, visitor); - return; - } - - if (exists(path)) - visitor.visit(path); - } - - public List collectResourceRecursively(String root, final String suffix) throws IOException { - final ArrayList collector = Lists.newArrayList(); - scanRecursively(root, new Visitor() { - @Override - public void visit(String path) { - if (path.endsWith(suffix)) - collector.add(path); - } - }); - return collector; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java deleted file mode 100644 index 0ebed3d..0000000 --- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.StringUtil; - -public class ResourceTool { - - private static String[] excludes = null; - - public static void main(String[] args) throws IOException { - args = StringUtil.filterSystemArgs(args); - - if (args.length == 0) { - System.out.println("Usage: MetadataTool reset"); - System.out.println("Usage: MetadataTool list RESOURCE_PATH"); - System.out.println("Usage: MetadataTool download LOCAL_DIR"); - System.out.println("Usage: MetadataTool upload LOCAL_DIR"); - return; - } - - String exclude = System.getProperty("exclude"); - if (exclude != null) { - excludes = exclude.split("\\s*,\\s*"); - } - - String cmd = args[0]; - switch (cmd) { - case "reset": - reset(args.length == 1 ? KylinConfig.getInstanceFromEnv() : KylinConfig.createInstanceFromUri(args[1])); - break; - case "list": - list(KylinConfig.getInstanceFromEnv(), args[1]); - break; - case "download": - copy(KylinConfig.getInstanceFromEnv(), KylinConfig.createInstanceFromUri(args[1])); - break; - case "upload": - copy(KylinConfig.createInstanceFromUri(args[1]), KylinConfig.getInstanceFromEnv()); - break; - case "remove": - remove(KylinConfig.getInstanceFromEnv(), args[1]); - break; - default: - System.out.println("Unknown cmd: " + cmd); - } - } - - public static void list(KylinConfig config, String path) throws IOException { - ResourceStore store = ResourceStore.getStore(config); - ArrayList result = store.listResources(path); - System.out.println("" + result); - } - - public static void copy(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException { - - ResourceStore src = ResourceStore.getStore(srcConfig); - ResourceStore dst = ResourceStore.getStore(dstConfig); - copyR(src, dst, "/"); - } - - private static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException { - ArrayList children = src.listResources(path); - - // case of resource (not a folder) - if (children == null) { - if (matchExclude(path) == false) { - RawResource res = src.getResource(path); - if (res != null) { - dst.putResource(path, res.inputStream, res.timestamp); - res.inputStream.close(); - } else { - System.out.println("Resource not exist for " + path); - } - } - } - // case of folder - else { - for (String child : children) - copyR(src, dst, child); - } - } - - private static boolean matchExclude(String path) { - if (excludes == null) - return false; - for (String exclude : excludes) { - if (path.startsWith(exclude)) - return true; - } - return false; - } - - public static void reset(KylinConfig config) throws IOException { - ResourceStore store = ResourceStore.getStore(config); - resetR(store, "/"); - } - - private static void resetR(ResourceStore store, String path) throws IOException { - ArrayList children = store.listResources(path); - if (children == null) { // path is a resource (not a folder) - if (matchExclude(path) == false) { - store.deleteResource(path); - } - } else { - for (String child : children) - resetR(store, child); - } - } - - private static void remove(KylinConfig config, String path) throws IOException { - ResourceStore store = ResourceStore.getStore(config); - resetR(store, path); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java b/common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java deleted file mode 100644 index c6f3f55..0000000 --- a/common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.UUID; - -import org.apache.commons.lang.time.FastDateFormat; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * Marks the root entity of JSON persistence. Unit of read, write, cache, and - * refresh. - * - * - CubeInstance - CubeDesc - SourceTable - JobMeta - Dictionary (not JSON but - * also top level persistence) - * - * @author yangli9 - */ -@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -abstract public class RootPersistentEntity implements AclEntity { - - static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss z"; - static FastDateFormat format = FastDateFormat.getInstance(DATE_PATTERN); - static DateFormat df = new SimpleDateFormat(DATE_PATTERN); - - public static String formatTime(long millis) { - return format.format(millis); - } - - public static long parseTime(String timeString) { - if (timeString == null) - return 0; - try { - Date dt = df.parse(timeString); - return dt.getTime(); - } catch (ParseException e) { - } - return 0l; - } - - // ============================================================================ - - @JsonProperty("uuid") - protected String uuid; - - @JsonProperty("last_modified") - protected long lastModified; - - public String getUuid() { - return uuid; - } - - public void setUuid(String uuid) { - this.uuid = uuid; - } - - public String getId() { - return uuid; - } - - public long getLastModified() { - return lastModified; - } - - public void setLastModified(long lastModified) { - this.lastModified = lastModified; - } - - public void updateRandomUuid() { - setUuid(UUID.randomUUID().toString()); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (lastModified ^ (lastModified >>> 32)); - result = prime * result + ((uuid == null) ? 0 : uuid.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - RootPersistentEntity other = (RootPersistentEntity) obj; - if (lastModified != other.lastModified) - return false; - if (uuid == null) { - if (other.uuid != null) - return false; - } else if (!uuid.equals(other.uuid)) - return false; - return true; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/persistence/Serializer.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/Serializer.java b/common/src/main/java/org/apache/kylin/common/persistence/Serializer.java deleted file mode 100644 index 6861f6a..0000000 --- a/common/src/main/java/org/apache/kylin/common/persistence/Serializer.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -/** - * @author yangli9 - * - */ -public interface Serializer { - - public void serialize(T obj, DataOutputStream out) throws IOException; - - public T deserialize(DataInputStream in) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/persistence/StorageException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/StorageException.java b/common/src/main/java/org/apache/kylin/common/persistence/StorageException.java deleted file mode 100644 index 8e2e183..0000000 --- a/common/src/main/java/org/apache/kylin/common/persistence/StorageException.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -/** - * - * @author xjiang - * - */ -public class StorageException extends RuntimeException { - - private static final long serialVersionUID = -3748712888242406257L; - - public StorageException(String msg, Throwable t) { - super(msg, t); - } - - public StorageException(String msg) { - super(msg); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java b/common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java deleted file mode 100644 index 679ef14..0000000 --- a/common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.restclient; - -/** - * @author xjiang - * - */ -public abstract class AbstractRestCache { - - protected final Broadcaster.TYPE syncType; - - protected AbstractRestCache(Broadcaster.TYPE syncType) { - this.syncType = syncType; - } - - protected final void syncRemote(K key, Broadcaster.EVENT syncAction) { - Broadcaster.getInstance().queue(syncType.getType(), syncAction.getType(), key.toString()); - } - - public abstract void put(K key, V value); - - public abstract void putLocal(K key, V value); - - public abstract void remove(K key); - - public abstract void removeLocal(K key); - - public abstract void clear(); - - public abstract int size(); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java deleted file mode 100644 index 7978d20..0000000 --- a/common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.restclient; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; -import com.google.common.collect.Lists; - -/** - * Broadcast kylin event out - * - * @author jianliu - * - */ -public class Broadcaster { - - private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class); - - private BlockingDeque broadcastEvents = new LinkedBlockingDeque<>(); - - private AtomicLong counter = new AtomicLong(); - - static class BroadcasterHolder { - static final Broadcaster INSTANCE = new Broadcaster(); - } - - private Broadcaster() { - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - final String[] nodes = KylinConfig.getInstanceFromEnv().getRestServers(); - if (nodes == null || nodes.length < 1) {//TODO if the node count is greater than 1, it means it is a cluster - logger.warn("There is no available rest server; check the 'kylin.rest.servers' config"); - return; - } - logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes)); - final List restClients = Lists.newArrayList(); - for (String node : nodes) { - restClients.add(new RestClient(node)); - } - final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size()); - while (true) { - try { - final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); - logger.info("new broadcast event:" + broadcastEvent); - for (final RestClient restClient : restClients) { - wipingCachePool.execute(new Runnable() { - @Override - public void run() { - try { - restClient.wipeCache(broadcastEvent.getType(), broadcastEvent.getAction(), broadcastEvent.getName()); - } catch (IOException e) { - logger.warn("Thread failed during wipe cache at " + broadcastEvent); - } - } - }); - } - } catch (Exception e) { - logger.error("error running wiping", e); - } - } - } - }); - } - - public static Broadcaster getInstance() { - return BroadcasterHolder.INSTANCE; - } - - /** - * Broadcast the cubedesc event out - * - * @param action - * event action - */ - public void queue(String type, String action, String key) { - try { - counter.incrementAndGet(); - broadcastEvents.putFirst(new BroadcastEvent(type, action, key)); - } catch (Exception e) { - counter.decrementAndGet(); - logger.error("error putting BroadcastEvent", e); - } - } - - public long getCounterAndClear() { - return counter.getAndSet(0); - } - - public static enum EVENT { - CREATE("create"), UPDATE("update"), DROP("drop"); - private String text; - - private EVENT(String text) { - this.text = text; - } - - public String getType() { - return text; - } - - public static EVENT getEvent(String event) { - for (EVENT one : values()) { - if (one.getType().equalsIgnoreCase(event)) { - return one; - } - } - - return null; - } - } - - public static enum TYPE { - ALL("all"), CUBE("cube"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid"); - private String text; - - private TYPE(String text) { - this.text = text; - } - - public String getType() { - return text; - } - - /** - * @param type - * @return - */ - public static TYPE getType(String type) { - for (TYPE one : values()) { - if (one.getType().equalsIgnoreCase(type)) { - return one; - } - } - - return null; - } - } - - public static class BroadcastEvent { - private String type; - private String action; - private String name; - - public BroadcastEvent(String type, String action, String name) { - super(); - this.type = type; - this.action = action; - this.name = name; - } - - public String getType() { - return type; - } - - public String getAction() { - return action; - } - - public String getName() { - return name; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((action == null) ? 0 : action.hashCode()); - result = prime * result + ((name == null) ? 0 : name.hashCode()); - result = prime * result + ((type == null) ? 0 : type.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (this == obj) { - return true; - } - if (getClass() != obj.getClass()) { - return false; - } - BroadcastEvent other = (BroadcastEvent) obj; - if (!StringUtils.equals(action, other.action)) { - return false; - } - if (!StringUtils.equals(name, other.name)) { - return false; - } - if (!StringUtils.equals(type, other.type)) { - return false; - } - return true; - } - - @Override - public String toString() { - return Objects.toStringHelper(this).add("type", type).add("name", name).add("action", action).toString(); - } - - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java b/common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java deleted file mode 100644 index c29e7b1..0000000 --- a/common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.restclient; - -import java.util.concurrent.ConcurrentSkipListMap; - -/** - * Created by qianzhou on 1/15/15. - */ -public class CaseInsensitiveStringCache extends SingleValueCache { - - public CaseInsensitiveStringCache(Broadcaster.TYPE syncType) { - super(syncType, new ConcurrentSkipListMap(String.CASE_INSENSITIVE_ORDER)); - } - - @Override - public void put(String key, V value) { - super.put(key, value); - } - - @Override - public void putLocal(String key, V value) { - super.putLocal(key, value); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/restclient/MultiValueCache.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/restclient/MultiValueCache.java b/common/src/main/java/org/apache/kylin/common/restclient/MultiValueCache.java deleted file mode 100644 index ca7acd2..0000000 --- a/common/src/main/java/org/apache/kylin/common/restclient/MultiValueCache.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.restclient; - -import java.util.Set; - -import com.google.common.collect.HashMultimap; - -/** - * @author xjiang - * - */ -public class MultiValueCache extends AbstractRestCache { - - private final HashMultimap innerCache; - - public MultiValueCache(Broadcaster.TYPE syncType) { - super(syncType); - innerCache = HashMultimap.create(); - } - - public void put(K key, V value) { - Broadcaster.EVENT eventType = innerCache.containsKey(key) ? Broadcaster.EVENT.UPDATE : Broadcaster.EVENT.CREATE; - synchronized (this) { - innerCache.put(key, value); - } - syncRemote(key, eventType); - } - - public void putLocal(K key, V value) { - synchronized (this) { - innerCache.put(key, value); - } - } - - public void remove(K key) { - if (innerCache.containsKey(key)) { - innerCache.removeAll(key); - syncRemote(key, Broadcaster.EVENT.DROP); - } - } - - public void removeLocal(K key) { - if (innerCache.containsKey(key)) { - innerCache.removeAll(key); - } - } - - public void clear() { - innerCache.clear(); - } - - public int size() { - return innerCache.size(); - } - - public Set get(K key) { - return innerCache.get(key); - } - - public Set keySet() { - return innerCache.keySet(); - } - - public boolean containsKey(Object key) { - return innerCache.containsKey(key); - } - - public boolean containsEntry(Object key, Object value) { - return innerCache.containsEntry(key, value); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/restclient/RestClient.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/common/src/main/java/org/apache/kylin/common/restclient/RestClient.java deleted file mode 100644 index 58b1713..0000000 --- a/common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.restclient; - -import java.io.IOException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.httpclient.Credentials; -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.HttpException; -import org.apache.commons.httpclient.HttpMethod; -import org.apache.commons.httpclient.UsernamePasswordCredentials; -import org.apache.commons.httpclient.auth.AuthScope; -import org.apache.commons.httpclient.methods.GetMethod; -import org.apache.commons.httpclient.methods.PutMethod; -import org.apache.kylin.common.util.Bytes; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; - -/** - * @author yangli9 - */ -public class RestClient { - - String host; - int port; - String baseUrl; - String userName; - String password; - HttpClient client; - - private static Pattern fullRestPattern = Pattern.compile("(?:([^:]+)[:]([^@]+)[@])?([^:]+)(?:[:](\\d+))?"); - - public static boolean matchFullRestPattern(String uri) { - Matcher m = fullRestPattern.matcher(uri); - return m.matches(); - } - - /** - * @param uri - * "user:pwd@host:port" - */ - public RestClient(String uri) { - Matcher m = fullRestPattern.matcher(uri); - if (!m.matches()) - throw new IllegalArgumentException("URI: " + uri + " -- does not match pattern " + fullRestPattern); - - String user = m.group(1); - String pwd = m.group(2); - String host = m.group(3); - String portStr = m.group(4); - int port = Integer.parseInt(portStr == null ? "7070" : portStr); - - init(host, port, user, pwd); - } - - private void init(String host, int port, String userName, String password) { - this.host = host; - this.port = port; - this.userName = userName; - this.password = password; - this.baseUrl = "http://" + host + ":" + port + "/kylin/api"; - - client = new HttpClient(); - - if (userName != null && password != null) { - client.getParams().setAuthenticationPreemptive(true); - Credentials creds = new UsernamePasswordCredentials(userName, password); - client.getState().setCredentials(new AuthScope(host, port, AuthScope.ANY_REALM), creds); - } - } - - public void wipeCache(String type, String action, String name) throws IOException { - String url = baseUrl + "/cache/" + type + "/" + name + "/" + action; - HttpMethod request = new PutMethod(url); - - try { - int code = client.executeMethod(request); - String msg = Bytes.toString(request.getResponseBody()); - - if (code != 200) - throw new IOException("Invalid response " + code + " with cache wipe url " + url + "\n" + msg); - - } catch (HttpException ex) { - throw new IOException(ex); - } finally { - request.releaseConnection(); - } - } - - public String getKylinProperties() throws IOException { - String url = baseUrl + "/admin/config"; - HttpMethod request = new GetMethod(url); - try { - int code = client.executeMethod(request); - String msg = Bytes.toString(request.getResponseBody()); - JSONObject obj = new JSONObject(msg); - msg = obj.getString("config"); - - if (code != 200) - throw new IOException("Invalid response " + code + " with cache wipe url " + url + "\n" + msg); - - return msg; - - } catch (JSONException e) { - throw new IOException("Error when parsing json response from REST"); - } finally { - request.releaseConnection(); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java b/common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java deleted file mode 100644 index 55af75e..0000000 --- a/common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.restclient; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * @author xjiang - * - */ -public abstract class SingleValueCache extends AbstractRestCache { - - private final ConcurrentMap innerCache; - - public SingleValueCache(Broadcaster.TYPE syncType) { - this(syncType, new ConcurrentHashMap()); - } - - public SingleValueCache(Broadcaster.TYPE syncType, ConcurrentMap innerCache) { - super(syncType); - this.innerCache = innerCache; - } - - public void put(K key, V value) { - final V result = innerCache.put(key, value); - if (result == null) { - syncRemote(key, Broadcaster.EVENT.CREATE); - } else { - syncRemote(key, Broadcaster.EVENT.UPDATE); - } - } - - public void putLocal(K key, V value) { - innerCache.put(key, value); - } - - public void remove(K key) { - if (innerCache.containsKey(key)) { - innerCache.remove(key); - syncRemote(key, Broadcaster.EVENT.DROP); - } - } - - public void removeLocal(K key) { - innerCache.remove(key); - } - - public void clear() { - innerCache.clear(); - } - - public int size() { - return innerCache.size(); - } - - public V get(K key) { - return innerCache.get(key); - } - - public Collection values() { - return innerCache.values(); - } - - public boolean containsKey(String key) { - return innerCache.containsKey(key); - } - - public Map getMap() { - return Collections.unmodifiableMap(innerCache); - } - - public Set keySet() { - return innerCache.keySet(); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/util/Array.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/util/Array.java b/common/src/main/java/org/apache/kylin/common/util/Array.java deleted file mode 100644 index 26cd597..0000000 --- a/common/src/main/java/org/apache/kylin/common/util/Array.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.util; - -import java.util.Arrays; - -/* - * An array with correct equals(), hashCode(), compareTo() and toString() - */ -public class Array implements Comparable> { - public T[] data; - - public Array(T[] data) { - this.data = data; - } - - public String toString() { - return Arrays.toString(data); - } - - @Override - public boolean equals(Object o) { - if (o != null && o instanceof Array) { - return Arrays.equals(this.data, ((Array) o).data); - } - return false; - } - - @Override - public int hashCode() { - return Arrays.hashCode(data); - } - - @Override - public int compareTo(Array other) { - return compare(this.data, other.data, null); - } - - @SuppressWarnings("unchecked") - public static int compare(T[] a, T[] b, boolean[] ascending) { - int r = 0; - int n = Math.min(a.length, b.length); - boolean asc = true; - - for (int i = 0; i < n; i++) { - r = ((Comparable) a[i]).compareTo(b[i]); - if (r != 0) { - asc = (ascending != null && ascending.length > i) ? ascending[i] : true; - return asc ? r : -r; - } - } - return a.length - b.length; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/common/src/main/java/org/apache/kylin/common/util/ByteArray.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/util/ByteArray.java b/common/src/main/java/org/apache/kylin/common/util/ByteArray.java deleted file mode 100644 index 92e0da2..0000000 --- a/common/src/main/java/org/apache/kylin/common/util/ByteArray.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.util; - -import java.util.Arrays; - -/** - * @author yangli9 - */ -public class ByteArray implements Comparable { - - public byte[] data; - - public ByteArray(byte[] data) { - this.data = data; - } - - @Override - public int hashCode() { - return Bytes.hashCode(data); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - ByteArray other = (ByteArray) obj; - if (!Arrays.equals(data, other.data)) - return false; - return true; - } - - @Override - public int compareTo(ByteArray o) { - return Bytes.compareTo(this.data, o.data); - } -}