airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-1023) Upload file to S3 using S3 hook fails with "Connection reset by peer"
Date Thu, 01 Nov 2018 20:12:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-1023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672119#comment-16672119
] 

ASF GitHub Bot commented on AIRFLOW-1023:
-----------------------------------------

ashb closed pull request #2176: [AIRFLOW-1023] reconnect to S3 bucket location
URL: https://github.com/apache/incubator-airflow/pull/2176
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index caaa575cc4..0f36100854 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -25,6 +25,7 @@
 import warnings
 
 import boto
+from boto.s3 import connect_to_region
 from boto.s3.connection import S3Connection, NoHostProvided
 from boto.sts import STSConnection
 boto.set_stream_logger('boto')
@@ -227,7 +228,12 @@ def get_bucket(self, bucket_name):
         :param bucket_name: the name of the bucket
         :type bucket_name: str
         """
-        return self.connection.get_bucket(bucket_name)
+        bucket = self.connection.get_bucket(bucket_name)
+        bucket_location = bucket.get_location()
+        if bucket_location:
+            connection = connect_to_region(bucket_location)
+            bucket = connection.get_bucket(bucket_name)
+        return bucket
 
     def list_keys(self, bucket_name, prefix='', delimiter=''):
         """
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index bf2d3860f1..8695481a38 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -53,7 +53,7 @@ ldap3
 lxml
 markdown
 mock
-moto
+moto==0.4.31
 mysqlclient
 nose
 nose-exclude
diff --git a/setup.py b/setup.py
index 71c3f499a3..795109f081 100644
--- a/setup.py
+++ b/setup.py
@@ -190,7 +190,7 @@ def check_previous():
     'jira',
     'lxml>=3.3.4',
     'mock',
-    'moto',
+    'moto==0.4.31',
     'nose',
     'nose-ignore-docstring==0.2',
     'nose-timer',
diff --git a/tests/__init__.py b/tests/__init__.py
index 20f8c48d2a..eba35a2f23 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -21,6 +21,7 @@
 from .executors import *
 from .jobs import *
 from .impersonation import *
+from .hooks import *
 from .models import *
 from .operators import *
 from .security import *
diff --git a/tests/core.py b/tests/core.py
index a4757a70a7..db2893f73d 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -2277,26 +2277,6 @@ def test_get_ha_client(self, mock_get_connections):
         self.assertIsInstance(client, snakebite.client.HAClient)
 
 
-try:
-    from airflow.hooks.S3_hook import S3Hook
-except ImportError:
-    S3Hook = None
-
-
-@unittest.skipIf(S3Hook is None,
-                 "Skipping test because S3Hook is not installed")
-class S3HookTest(unittest.TestCase):
-    def setUp(self):
-        configuration.load_test_config()
-        self.s3_test_url = "s3://test/this/is/not/a-real-key.txt"
-
-    def test_parse_s3_url(self):
-        parsed = S3Hook.parse_s3_url(self.s3_test_url)
-        self.assertEqual(parsed,
-                         ("test", "this/is/not/a-real-key.txt"),
-                         "Incorrect parsing of the s3 url")
-
-
 HELLO_SERVER_CMD = """
 import socket, sys
 listener = socket.socket()
diff --git a/tests/hooks/S3_hook.py b/tests/hooks/S3_hook.py
new file mode 100644
index 0000000000..8a76eb6ff1
--- /dev/null
+++ b/tests/hooks/S3_hook.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import mock
+import boto
+
+from airflow import configuration
+from airflow import models
+
+try:
+    from airflow.hooks.S3_hook import S3Hook
+except ImportError:
+    S3Hook = None
+
+try:
+    from moto import mock_s3
+except ImportError:
+    mock_s3 = None
+
+
+@unittest.skipIf(S3Hook is None,
+                 "Skipping test because S3Hook is not installed")
+class S3HookTest(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        self.s3_test_url = "s3://test/this/is/not/a-real-key.txt"
+
+    def test_parse_s3_url(self):
+        parsed = S3Hook.parse_s3_url(self.s3_test_url)
+        self.assertEqual(parsed,
+                         ("test", "this/is/not/a-real-key.txt"),
+                         "Incorrect parsing of the s3 url")
+
+
+@unittest.skipIf(S3Hook is None,
+                 "Skipping test because S3Hook is not installed")
+@unittest.skipIf(mock_s3 is None, 'Skipping test because mock_s3 package not present')
+class S3HookMotoTest(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+
+    @mock.patch('airflow.hooks.S3_hook.S3Hook.get_connections')
+    @mock_s3
+    def test_s3_hook_load_file(self, mock_get_connections):
+        conn = boto.connect_s3()
+        conn.create_bucket('test_bucket')
+
+        c = models.Connection(conn_id='s3_conn', conn_type='S3')
+        mock_get_connections.return_value = [c]
+
+        s3_hook = S3Hook(s3_conn_id='s3_conn')
+        s3_hook.load_file(filename=__file__, key="test_s3_key.tmp",
+                          bucket_name="test_bucket", replace=True)
+
+        keys = s3_hook.list_keys(bucket_name="test_bucket")
+        self.assertIn("test_s3_key.tmp", keys)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/hooks/__init__.py b/tests/hooks/__init__.py
index 9d7677a99b..cf55457eda 100644
--- a/tests/hooks/__init__.py
+++ b/tests/hooks/__init__.py
@@ -11,3 +11,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from .S3_hook import *


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Upload file to S3 using S3 hook fails with "Connection reset by peer"
> ---------------------------------------------------------------------
>
>                 Key: AIRFLOW-1023
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1023
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: hooks
>    Affects Versions: 1.7.1
>            Reporter: Adrian Partl
>            Assignee: Adrian Partl
>            Priority: Major
>
> Using the S3 hook to upload local files to an S3 bucket fails with 
> {noformat}
>   File "/usr/lib/python2.7/site-packages/airflow/hooks/S3_hook.py", line 364, in load_file
>     replace=replace)
>   File "/usr/lib/python2.7/site-packages/boto/s3/key.py", line 1362, in set_contents_from_filename
>     encrypt_key=encrypt_key)
>   File "/usr/lib/python2.7/site-packages/boto/s3/key.py", line 1293, in set_contents_from_file
>     chunked_transfer=chunked_transfer, size=size)
>   File "/usr/lib/python2.7/site-packages/boto/s3/key.py", line 750, in send_file
>     chunked_transfer=chunked_transfer, size=size)
>   File "/usr/lib/python2.7/site-packages/boto/s3/key.py", line 951, in _send_file_internal
>     query_args=query_args
>   File "/usr/lib/python2.7/site-packages/boto/s3/connection.py", line 668, in make_request
>     retry_handler=retry_handler
>   File "/usr/lib/python2.7/site-packages/boto/connection.py", line 1071, in make_request
>     retry_handler=retry_handler)
>   File "/usr/lib/python2.7/site-packages/boto/connection.py", line 1030, in _mexe
>     raise ex
> error: [Errno 104] Connection reset by peer
> {noformat}
> This is a known issue with boto and only affects uploads to S3 buckets outside of the
standard US location (in my case {{eu-west-1}}).
> The issue is reported on boto side as:
> https://github.com/boto/boto/issues/2207
> A work around is mentioned by user {{anna-buttfield-sirca}} which basically reconnects
the boto S3 connection to the corresponding location.
> I will provide a PR implementing the work around, since a resolution of the issue on
the boto side seems unlikely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message