beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/7] beam git commit: HadoopInputFormatIO with junits
Date Thu, 06 Apr 2017 14:33:20 GMT
Repository: beam
Updated Branches:
  refs/heads/master 9c284d625 -> 82694fe72


http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/es-services.yaml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/es-services.yaml
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/es-services.yaml
new file mode 100644
index 0000000..38c820e
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/es-services.yaml
@@ -0,0 +1,277 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+# Service file containing services for ES discovery, elasticsearch and master node deployment.
+
+# Kubernetes headless service for Elasticsearch discovery of nodes.
+apiVersion: v1
+kind: Service
+metadata:
+  name: elasticsearch-discovery
+  labels:
+    component: elasticsearch
+    role: master
+spec:
+  selector:
+    component: elasticsearch
+    role: master
+  ports:
+  - name: transport
+    port: 9300
+    protocol: TCP
+---
+# To create Elasticsearch frontend cluster Kubernetes service.
+# It sets up a load balancer on TCP port 9200 that distributes network traffic to the ES
client nodes.
+apiVersion: v1
+kind: Service
+metadata:
+  name: elasticsearch
+  labels:
+    component: elasticsearch
+    role: client
+spec:
+  type: LoadBalancer
+  selector:
+    component: elasticsearch
+    role: client
+  ports:
+  - name: http
+    port: 9200
+    protocol: TCP
+---
+# The Kubernetes deployment script for Elasticsearch master nodes.
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+  name: es-master
+  labels:
+    component: elasticsearch
+    role: master
+spec:
+  replicas: 3
+  template:
+    metadata:
+      labels:
+        component: elasticsearch
+        role: master
+      annotations:
+        pod.beta.kubernetes.io/init-containers: '[
+          {
+          "name": "sysctl",
+            "image": "busybox",
+            "imagePullPolicy": "IfNotPresent",
+            "command": ["sysctl", "-w", "vm.max_map_count=262144"],
+            "securityContext": {
+              "privileged": true
+            }
+          }
+        ]'
+    spec:
+      containers:
+      - name: es-master
+        securityContext:
+          privileged: false
+          capabilities:
+            add:
+# IPC_LOCK capability is enabled to allow Elasticsearch to lock the heap in memory so it
will not be swapped.
+              - IPC_LOCK
+# SYS_RESOURCE is docker capability key to control and override the resource limits.
+# This could be needed to increase base limits.(e.g. File descriptor limit for elasticsearch)
+              - SYS_RESOURCE
+        image: quay.io/pires/docker-elasticsearch-kubernetes:5.2.2
+        env:
+        - name: NAMESPACE
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.namespace
+        - name: NODE_NAME
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.name
+        - name: "CLUSTER_NAME"
+          value: "myesdb"
+        - name: "NUMBER_OF_MASTERS"
+          value: "2"
+        - name: NODE_MASTER
+          value: "true"
+        - name: NODE_INGEST
+          value: "false"
+        - name: NODE_DATA
+          value: "false"
+        - name: HTTP_ENABLE
+          value: "false"
+        - name: "ES_JAVA_OPTS"
+          value: "-Xms256m -Xmx256m"
+        ports:
+        - containerPort: 9300
+          name: transport
+          protocol: TCP
+        volumeMounts:
+        - name: storage
+          mountPath: /data
+      volumes:
+          - emptyDir:
+              medium: ""
+            name: "storage"
+---
+# Kubernetes deployment script for Elasticsearch client nodes (aka load balancing proxies).
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+  name: es-client
+  labels:
+    component: elasticsearch
+    role: client
+spec:
+  # The no. of replicas can be incremented based on the client usage using HTTP API.
+  replicas: 1
+  template:
+    metadata:
+      labels:
+        component: elasticsearch
+        role: client
+      annotations:
+      # Elasticsearch uses a hybrid mmapfs / niofs directory by default to store its indices.
+      # The default operating system limits on mmap counts is likely to be too low, which
may result
+      # in out of memory exceptions. Therefore, the need to increase virtual memory
+      # vm.max_map_count for large amount of data in the pod initialization annotation.
+        pod.beta.kubernetes.io/init-containers: '[
+          {
+          "name": "sysctl",
+            "image": "busybox",
+            "imagePullPolicy": "IfNotPresent",
+            "command": ["sysctl", "-w", "vm.max_map_count=262144"],
+            "securityContext": {
+              "privileged": true
+            }
+          }
+        ]'
+    spec:
+      containers:
+      - name: es-client
+        securityContext:
+          privileged: false
+          capabilities:
+            add:
+# IPC_LOCK capability is enabled to allow Elasticsearch to lock the heap in memory so it
will not be swapped.
+              - IPC_LOCK
+# SYS_RESOURCE is docker capability key to control and override the resource limits.
+# This could be needed to increase base limits.(e.g. File descriptor limit for elasticsearch)
+              - SYS_RESOURCE
+        image: quay.io/pires/docker-elasticsearch-kubernetes:5.2.2
+        env:
+        - name: NAMESPACE
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.namespace
+        - name: NODE_NAME
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.name
+        - name: "CLUSTER_NAME"
+          value: "myesdb"
+        - name: NODE_MASTER
+          value: "false"
+        - name: NODE_DATA
+          value: "false"
+        - name: HTTP_ENABLE
+          value: "true"
+        - name: "ES_JAVA_OPTS"
+          value: "-Xms256m -Xmx256m"
+        ports:
+        - containerPort: 9200
+          name: http
+          protocol: TCP
+        - containerPort: 9300
+          name: transport
+          protocol: TCP
+        volumeMounts:
+        - name: storage
+          mountPath: /data
+      volumes:
+          - emptyDir:
+              medium: ""
+            name: "storage"
+---
+# Kubernetes deployment script for Elasticsearch data nodes which store and index data.
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+  name: es-data
+  labels:
+    component: elasticsearch
+    role: data
+spec:
+  replicas: 2
+  template:
+    metadata:
+      labels:
+        component: elasticsearch
+        role: data
+      annotations:
+        pod.beta.kubernetes.io/init-containers: '[
+          {
+          "name": "sysctl",
+            "image": "busybox",
+            "imagePullPolicy": "IfNotPresent",
+            "command": ["sysctl", "-w", "vm.max_map_count=1048575"],
+            "securityContext": {
+              "privileged": true
+            }
+          }
+        ]'
+    spec:
+      containers:
+      - name: es-data
+        securityContext:
+          privileged: false
+          capabilities:
+            add:
+# IPC_LOCK capability is enabled to allow Elasticsearch to lock the heap in memory so it
will not be swapped.
+              - IPC_LOCK
+# SYS_RESOURCE is docker capability key to control and override the resource limits.
+# This could be needed to increase base limits.(e.g. File descriptor limit for elasticsearch)
+              - SYS_RESOURCE
+        image: quay.io/pires/docker-elasticsearch-kubernetes:5.2.2
+        env:
+        - name: NAMESPACE
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.namespace
+        - name: NODE_NAME
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.name
+        - name: "CLUSTER_NAME"
+          value: "myesdb"
+        - name: NODE_MASTER
+          value: "false"
+        - name: NODE_INGEST
+          value: "false"
+        - name: HTTP_ENABLE
+          value: "false"
+        - name: "ES_JAVA_OPTS"
+          value: "-Xms256m -Xmx256m"
+        ports:
+        - containerPort: 9300
+          name: transport
+          protocol: TCP
+        volumeMounts:
+        - name: storage
+          mountPath: /data
+      volumes:
+          - emptyDir:
+              medium: ""
+            name: "storage"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/start-up.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/start-up.sh
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/start-up.sh
new file mode 100644
index 0000000..4d277c8
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/start-up.sh
@@ -0,0 +1,21 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+#!/bin/sh
+set -e
+
+# Create Elasticsearch services and deployments.
+kubectl create -f es-services.yaml

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/teardown.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/teardown.sh
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/teardown.sh
new file mode 100644
index 0000000..a30793b
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/LargeProductionCluster/teardown.sh
@@ -0,0 +1,20 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+#!/bin/bash
+set -e
+
+# Delete elasticsearch services and deployments.
+kubectl delete -f es-services.yaml
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/elasticsearch-svc-rc.yaml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/elasticsearch-svc-rc.yaml
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/elasticsearch-svc-rc.yaml
new file mode 100644
index 0000000..9a7ac3d
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/elasticsearch-svc-rc.yaml
@@ -0,0 +1,84 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+# To create Elasticsearch frontend cluster Kubernetes service. 
+# It sets up a load balancer on TCP port 9200 that distributes network traffic to the ES
nodes.
+apiVersion: v1
+kind: Service
+metadata:
+  name: elasticsearch
+  labels:
+    component: elasticsearch
+spec:
+  type: LoadBalancer
+  selector:
+    component: elasticsearch
+  ports:
+  - name: http
+    port: 9200
+    protocol: TCP
+  - name: transport
+    port: 9300
+    protocol: TCP
+---
+# The Kubernetes deployment script for Elasticsearch replication nodes. It will create 1
node cluster.
+# To scale the cluster as desired, you can create replicas of node use 'kubectl scale --replicas=3
rc es' command
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+  name: es
+  labels:
+    component: elasticsearch
+spec:
+  replicas: 1
+  template:
+    metadata:
+      labels:
+        component: elasticsearch
+    spec:
+      containers:
+      - name: es
+        securityContext:
+          capabilities:
+            add:
+# IPC_LOCK capability is enabled to allow Elasticsearch to lock the heap in memory so it
will not be swapped.   
+              - IPC_LOCK
+# SYS_RESOURCE capability is set to control and override various resource limits.
+              - SYS_RESOURCE
+        image: quay.io/pires/docker-elasticsearch-kubernetes:5.2.2
+        env:
+        - name: "CLUSTER_NAME"
+          value: "myesdb"
+        - name: "DISCOVERY_SERVICE"
+          value: "elasticsearch"
+        - name: NODE_MASTER
+          value: "true"
+        - name: NODE_DATA
+          value: "true"
+        - name: HTTP_ENABLE
+          value: "true"
+        ports:
+        - containerPort: 9200
+          name: http
+          protocol: TCP
+        - containerPort: 9300
+          name: transport
+          protocol: TCP
+        volumeMounts:
+        - mountPath: /data
+          name: storage
+      volumes:
+      - name: storage
+        emptyDir: {}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/start-up.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/start-up.sh
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/start-up.sh
new file mode 100644
index 0000000..e8cf275
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/start-up.sh
@@ -0,0 +1,22 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+#!/bin/sh
+set -e
+
+# Create Elasticsearch services and deployments.
+kubectl create -f elasticsearch-svc-rc.yaml
+

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/teardown.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/teardown.sh
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/teardown.sh
new file mode 100644
index 0000000..079141d
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/SmallITCluster/teardown.sh
@@ -0,0 +1,20 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+#!/bin/bash
+set -e
+
+# Delete elasticsearch services and deployments.
+kubectl delete -f elasticsearch-svc-rc.yaml

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/data-load-setup.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/data-load-setup.sh
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/data-load-setup.sh
new file mode 100644
index 0000000..00991bc
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/data-load-setup.sh
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#!/bin/bash
+set -e
+
+# Install python
+sudo apt-get update
+sudo apt-get install python-pip
+sudo pip install --upgrade pip
+sudo apt-get install python-dev
+sudo pip install tornado numpy
+echo

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/data-load.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/data-load.sh
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/data-load.sh
new file mode 100644
index 0000000..21150fb
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/data-load.sh
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#!/bin/bash
+set -e
+
+# Identify external IP
+external_ip="$(kubectl get svc elasticsearch -o jsonpath='{.status.loadBalancer.ingress[0].ip}')"
+echo "Waiting for the Elasticsearch service to come up ........"
+while [ -z "$external_ip" ]
+do
+   sleep 10s
+   external_ip="$(kubectl get svc elasticsearch -o jsonpath='{.status.loadBalancer.ingress[0].ip}')"
+   echo "."
+done
+echo "External IP - $external_ip"
+echo
+
+# Run the script
+/usr/bin/python es_test_data.py --count=1000 --format=Txn_ID:int,Item_Code:int,Item_ID:int,User_Name:str,last_updated:ts,Price:int,Title:str,Description:str,Age:int,Item_Name:str,Item_Price:int,Availability:bool,Batch_Num:int,Last_Ordered:tstxt,City:text
--es_url=http://$external_ip:9200 &
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/es_test_data.py
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/es_test_data.py
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/es_test_data.py
new file mode 100644
index 0000000..1658e2c
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/es_test_data.py
@@ -0,0 +1,299 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Script to populate data on Elasticsearch
+# Hashcode for 1000 records is ed36c09b5e24a95fd8d3cc711a043a85320bb47d, 
+# For test with query to select one record from 1000 docs, 
+# hashcode is 83c108ff81e87b6f3807c638e6bb9a9e3d430dc7
+# Hashcode for 50m records (~20 gigs) is aff7390ee25c4c330f0a58dfbfe335421b11e405 
+#!/usr/bin/python
+
+import json
+import time
+import logging
+import random
+import string
+import uuid
+import datetime
+
+import tornado.gen
+import tornado.httpclient
+import tornado.ioloop
+import tornado.options
+
+async_http_client = tornado.httpclient.AsyncHTTPClient()
+id_counter = 0
+upload_data_count = 0
+_dict_data = None
+
+
+
+def delete_index(idx_name):
+    try:
+        url = "%s/%s?refresh=true" % (tornado.options.options.es_url, idx_name)
+        request = tornado.httpclient.HTTPRequest(url, method="DELETE", request_timeout=240,

+                                                 auth_username=tornado.options.options.username,

+                                                 auth_password=tornado.options.options.password)
+        response = tornado.httpclient.HTTPClient().fetch(request)
+        logging.info('Deleting index  "%s" done   %s' % (idx_name, response.body))
+    except tornado.httpclient.HTTPError:
+        pass
+
+
+def create_index(idx_name):
+    schema = {
+        "settings": {
+            "number_of_shards":   tornado.options.options.num_of_shards,
+            "number_of_replicas": tornado.options.options.num_of_replicas
+        },
+        "refresh": True
+    }
+
+    body = json.dumps(schema)
+    url = "%s/%s" % (tornado.options.options.es_url, idx_name)
+    try:
+        logging.info('Trying to create index %s' % (url))
+        request = tornado.httpclient.HTTPRequest(url, method="PUT", body=body, request_timeout=240,
+                                                 auth_username=tornado.options.options.username,

+                                                 auth_password=tornado.options.options.password)
+        response = tornado.httpclient.HTTPClient().fetch(request)
+        logging.info('Creating index "%s" done   %s' % (idx_name, response.body))
+    except tornado.httpclient.HTTPError:
+        logging.info('Looks like the index exists already')
+        pass
+
+
+@tornado.gen.coroutine
+def upload_batch(upload_data_txt):
+    try:
+        request = tornado.httpclient.HTTPRequest(tornado.options.options.es_url + "/_bulk",
+                                                 method="POST", body=upload_data_txt,
+                                                 request_timeout=
+                                                 tornado.options.options.http_upload_timeout,
+                                                 auth_username=tornado.options.options.username,

+                                                 auth_password=tornado.options.options.password)
+        response = yield async_http_client.fetch(request)
+    except Exception as ex:
+        logging.error("upload failed, error: %s" % ex)
+        return
+
+    result = json.loads(response.body.decode('utf-8'))
+    res_txt = "OK" if not result['errors'] else "FAILED"
+    took = int(result['took'])
+    logging.info("Upload: %s - upload took: %5dms, total docs uploaded: %7d" % (res_txt,
took, 
+                                                                                upload_data_count))
+
+
+def get_data_for_format(format,count):
+    split_f = format.split(":")
+    if not split_f:
+        return None, None
+
+    field_name = split_f[0]
+    field_type = split_f[1]
+
+    return_val = ''
+
+    if field_type == "bool":
+        if count%2 == 0:
+           return_val = True
+        else:
+           return_val = False
+
+    elif field_type == "str":
+        return_val = field_name + str(count)
+
+    elif field_type == "int":
+        return_val = count
+    
+    elif field_type == "ipv4":
+        return_val = "{0}.{1}.{2}.{3}".format(1,2,3,count%255)
+
+    elif field_type in ["ts", "tstxt"]:
+        return_val = int(count * 1000) if field_type == "ts" else\
+        			 datetime.datetime.fromtimestamp(count)\
+        			 .strftime("%Y-%m-%dT%H:%M:%S.000-0000")
+
+    elif field_type == "words":
+        return_val = field_name + str(count)
+
+    elif field_type == "dict":
+        mydict = dict(a=field_name + str(count), b=field_name + str(count), c=field_name
+ str(count),
+                      d=field_name + str(count), e=field_name + str(count), f=field_name
+ str(count),
+                      g=field_name + str(count), h=field_name + str(count), i=field_name
+ str(count), 
+                      j=field_name + str(count))
+        return_val = ", ".join("=".join(_) for _ in mydict.items())
+
+    elif field_type == "text":
+        return_val = field_name + str(count)
+
+    return field_name, return_val
+
+
+def generate_count(min, max):
+    if min == max:
+        return max
+    elif min > max:
+        return random.randrange(max, min);
+    else:
+        return random.randrange(min, max);
+
+
+def generate_random_doc(format,count):
+    global id_counter
+
+    res = {}
+
+    for f in format:
+        f_key, f_val = get_data_for_format(f,count)
+        if f_key:
+            res[f_key] = f_val
+
+    if not tornado.options.options.id_type:
+        return res
+
+    if tornado.options.options.id_type == 'int':
+        res['_id'] = id_counter
+        id_counter += 1
+    elif tornado.options.options.id_type == 'uuid4':
+        res['_id'] = str(uuid.uuid4())
+
+    return res
+
+
+def set_index_refresh(val):
+
+    params = {"index": {"refresh_interval": val}}
+    body = json.dumps(params)
+    url = "%s/%s/_settings" % (tornado.options.options.es_url, tornado.options.options.index_name)
+    try:
+        request = tornado.httpclient.HTTPRequest(url, method="PUT", body=body, request_timeout=240,
+                                                 auth_username=tornado.options.options.username,

+                                                 auth_password=tornado.options.options.password)
+        http_client = tornado.httpclient.HTTPClient()
+        http_client.fetch(request)
+        logging.info('Set index refresh to %s' % val)
+    except Exception as ex:
+        logging.exception(ex)
+
+
+@tornado.gen.coroutine
+def generate_test_data():
+
+    global upload_data_count
+
+    if tornado.options.options.force_init_index:
+        delete_index(tornado.options.options.index_name)
+
+    create_index(tornado.options.options.index_name)
+
+    # todo: query what refresh is set to, then restore later
+    if tornado.options.options.set_refresh:
+        set_index_refresh("-1")
+
+    if tornado.options.options.out_file:
+        out_file = open(tornado.options.options.out_file, "w")
+    else:
+        out_file = None
+
+    if tornado.options.options.dict_file:
+        global _dict_data
+        with open(tornado.options.options.dict_file, 'r') as f:
+            _dict_data = f.readlines()
+        logging.info("Loaded %d words from the %s" % (len(_dict_data), 
+                                                      tornado.options.options.dict_file))
+
+    format = tornado.options.options.format.split(',')
+    if not format:
+        logging.error('invalid format')
+        exit(1)
+
+    ts_start = int(time.time())
+    upload_data_txt = ""
+    total_uploaded = 0
+
+    logging.info("Generating %d docs, upload batch size is %d" % (tornado.options.options.count,
+                                                                  tornado.options
+                                                                  .options.batch_size))
+    for num in range(0, tornado.options.options.count):
+
+        item = generate_random_doc(format,num)
+
+        if out_file:
+            out_file.write("%s\n" % json.dumps(item))
+
+        cmd = {'index': {'_index': tornado.options.options.index_name,
+                         '_type': tornado.options.options.index_type}}
+        if '_id' in item:
+            cmd['index']['_id'] = item['_id']
+
+        upload_data_txt += json.dumps(cmd) + "\n"
+        upload_data_txt += json.dumps(item) + "\n"
+        upload_data_count += 1
+
+        if upload_data_count % tornado.options.options.batch_size == 0:
+            yield upload_batch(upload_data_txt)
+            upload_data_txt = ""
+
+    # upload remaining items in `upload_data_txt`
+    if upload_data_txt:
+        yield upload_batch(upload_data_txt)
+
+    if tornado.options.options.set_refresh:
+        set_index_refresh("1s")
+
+    if out_file:
+        out_file.close()
+
+    took_secs = int(time.time() - ts_start)
+
+    logging.info("Done - total docs uploaded: %d, took %d seconds" % 
+    			 (tornado.options.options.count, took_secs))
+
+
+if __name__ == '__main__':
+    tornado.options.define("es_url", type=str, default='http://localhost:9200/', 
+                           help="URL of your Elasticsearch node")
+    tornado.options.define("index_name", type=str, default='test_data', 
+                           help="Name of the index to store your messages")
+    tornado.options.define("index_type", type=str, default='test_type', help="Type")
+    tornado.options.define("batch_size", type=int, default=1000, 
+                           help="Elasticsearch bulk index batch size")
+    tornado.options.define("num_of_shards", type=int, default=2, 
+                           help="Number of shards for ES index")
+    tornado.options.define("http_upload_timeout", type=int, default=3, 
+                           help="Timeout in seconds when uploading data")
+    tornado.options.define("count", type=int, default=100000, help="Number of docs to generate")
+    tornado.options.define("format", type=str, default='name:str,age:int,last_updated:ts',

+                           help="message format")
+    tornado.options.define("num_of_replicas", type=int, default=0, 
+                           help="Number of replicas for ES index")
+    tornado.options.define("force_init_index", type=bool, default=False, 
+                           help="Force deleting and re-initializing the Elasticsearch index")
+    tornado.options.define("set_refresh", type=bool, default=False, 
+                           help="Set refresh rate to -1 before starting the upload")
+    tornado.options.define("out_file", type=str, default=False, 
+                           help="If set, write test data to out_file as well.")
+    tornado.options.define("id_type", type=str, default=None, 
+                           help="Type of 'id' to use for the docs, \
+                           valid settings are int and uuid4, None is default")
+    tornado.options.define("dict_file", type=str, default=None, 
+                           help="Name of dictionary file to use")
+    tornado.options.define("username", type=str, default=None, help="Username for elasticsearch")
+    tornado.options.define("password", type=str, default=None, help="Password for elasticsearch")
+    tornado.options.parse_command_line()
+
+    tornado.ioloop.IOLoop.instance().run_sync(generate_test_data)

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/show-health.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/show-health.sh
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/show-health.sh
new file mode 100644
index 0000000..8fa912c
--- /dev/null
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes/elasticsearch/show-health.sh
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#!/bin/sh
+set -e
+
+external_ip="$(kubectl get svc elasticsearch -o jsonpath='{.status.loadBalancer.ingress[0].ip}')"
+
+echo "Elasticsearch cluster health info"
+echo "---------------------------------"
+curl $external_ip:9200/_cluster/health
+echo # empty line since curl doesn't output CRLF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/pom.xml b/sdks/java/io/hadoop/pom.xml
new file mode 100644
index 0000000..1982c25
--- /dev/null
+++ b/sdks/java/io/hadoop/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <packaging>pom</packaging>
+  <artifactId>beam-sdks-java-io-hadoop-parent</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop</name>
+  <description>Parent for Beam SDK Hadoop IO which reads data from any source which
implements Hadoop Input Format.</description>
+
+  <modules>
+    <module>jdk1.8-tests</module>
+    <module>input-format</module>
+  </modules>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 73fbba1..27fc614 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -68,6 +68,7 @@
     <module>elasticsearch</module>
     <module>google-cloud-platform</module>
     <module>hadoop-common</module>
+    <module>hadoop</module>
     <module>hbase</module>
     <module>hdfs</module>
     <module>jdbc</module>
@@ -114,5 +115,4 @@
       </properties>
     </profile>
   </profiles>
-
 </project>


Mime
View raw message