Skip to content

Commit

Permalink
Merge pull request #2 from matthewhanson/develop
Browse files Browse the repository at this point in the history
publish 0.1.0
  • Loading branch information
matthewhanson authored Dec 5, 2019
2 parents e076b05 + b131d99 commit b65a4bc
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 204 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.1.0] = 2019-12-05

### Changed
- The s3 and stepfunction modules are now a class, and the init function accepts a boto3 Session. If not provided a default session is created
- s3.upload now accepts an `http_url` keyword. If set to True it will return the https URL instead of the S3 URL
- s3.find now returns complete s3 URL for each found object, not just the key

## [v0.0.3] = 2019-11-22

### Added
Expand All @@ -17,5 +24,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
Initial Release

[Unreleased]: https://github.com/matthewhanson/boto3-utils/compare/master...develop
[v0.1.0]: https://github.com/matthewhanson/boto3-utils/compare/0.0.3...0.1.0
[v0.0.3]: https://github.com/matthewhanson/boto3-utils/compare/0.0.2...0.0.3
[v0.0.2]: https://github.com/matthewhanson/boto3-utils/tree/0.0.2
4 changes: 3 additions & 1 deletion boto3utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .version import __version__
from .version import __version__

from .s3 import s3
323 changes: 163 additions & 160 deletions boto3utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,175 +15,178 @@

logger = logging.getLogger(__name__)

# s3 client
s3 = boto3.client('s3')

class s3(object):

def urlparse(url):
""" Split S3 URL into bucket, key, filename """
if url[0:5] != 's3://':
raise Exception('Invalid S3 url %s' % url)
def __init__(self, session=None):
if session is None:
self.s3 = boto3.client('s3')
else:
self.s3 = session.client('s3')

url_obj = url.replace('s3://', '').split('/')
@classmethod
def urlparse(cls, url):
""" Split S3 URL into bucket, key, filename """
if url[0:5] != 's3://':
raise Exception('Invalid S3 url %s' % url)

# remove empty items
url_obj = list(filter(lambda x: x, url_obj))

return {
'bucket': url_obj[0],
'key': '/'.join(url_obj[1:]),
'filename': url_obj[-1] if len(url_obj) > 1 else ''
}


def s3_to_https(url, region=getenv('AWS_REGION', getenv('AWS_DEFAULT_REGION', 'us-east-1'))):
""" Convert an s3 URL to an s3 https URL """
parts = urlparse(url)
return 'https://%s.s3.%s.amazonaws.com/%s' % (parts['bucket'], region, parts['key'])
url_obj = url.replace('s3://', '').split('/')

# remove empty items
url_obj = list(filter(lambda x: x, url_obj))

def exists(url):
""" Check if this URL exists on S3 """
parts = urlparse(url)
try:
s3.head_object(Bucket=parts['bucket'], Key=parts['key'])
return True
except ClientError as exc:
if exc.response['Error']['Code'] != '404':
raise
return False
return {
'bucket': url_obj[0],
'key': '/'.join(url_obj[1:]),
'filename': url_obj[-1] if len(url_obj) > 1 else ''
}

@classmethod
def s3_to_https(cls, url, region=getenv('AWS_REGION', getenv('AWS_DEFAULT_REGION', 'us-east-1'))):
""" Convert an s3 URL to an s3 https URL """
parts = cls.urlparse(url)
return 'https://%s.s3.%s.amazonaws.com/%s' % (parts['bucket'], region, parts['key'])

def upload(filename, uri, public=False, extra={}):
""" Upload object to S3 uri (bucket + prefix), keeping same base filename """
logger.debug("Uploading %s to %s" % (filename, uri))
s3_uri = urlparse(uri)
uri_out = 's3://%s' % op.join(s3_uri['bucket'], s3_uri['key'])
if public:
extra['ACL'] = 'public-read'
with open(filename, 'rb') as data:
s3.upload_fileobj(data, s3_uri['bucket'], s3_uri['key'], ExtraArgs=extra)
return uri_out


def download(uri, path=''):
"""
Download object from S3
:param uri: URI of object to download
:param path: Output path
"""
s3_uri = urlparse(uri)
fout = op.join(path, s3_uri['filename'])
logger.debug("Downloading %s as %s" % (uri, fout))
if path != '':
makedirs(path, exist_ok=True)

with open(fout, 'wb') as f:
s3.download_fileobj(
Bucket=s3_uri['bucket'],
Key=s3_uri['key'],
Fileobj=f
)
return fout


def read(url):
""" Read object from s3 """
parts = urlparse(url)
response = s3.get_object(Bucket=parts['bucket'], Key=parts['key'])
body = response['Body'].read()
if op.splitext(parts['key'])[1] == '.gz':
body = GzipFile(None, 'rb', fileobj=BytesIO(body)).read()
return body.decode('utf-8')


def read_json(url):
""" Download object from S3 as JSON """
return json.loads(read(url))


# function derived from https://alexwlchan.net/2018/01/listing-s3-keys-redux/
def find(url, suffix=''):
"""
Generate objects in an S3 bucket.
:param url: The beginning part of the URL to match (bucket + optional prefix)
:param suffix: Only fetch objects whose keys end with this suffix.
"""
parts = urlparse(url)
kwargs = {'Bucket': parts['bucket']}

# If the prefix is a single string (not a tuple of strings), we can
# do the filtering directly in the S3 API.
if isinstance(parts['key'], str):
kwargs['Prefix'] = parts['key']

while True:
# The S3 API response is a large blob of metadata.
# 'Contents' contains information about the listed objects.
resp = s3.list_objects_v2(**kwargs)
try:
contents = resp['Contents']
except KeyError:
return

for obj in contents:
key = obj['Key']
if key.startswith(parts['key']) and key.endswith(suffix):
yield obj['Key']

# The S3 API is paginated, returning up to 1000 keys at a time.
# Pass the continuation token into the next response, until we
# reach the final page (when this field is missing).
def exists(self, url):
""" Check if this URL exists on S3 """
parts = self.urlparse(url)
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break


def latest_inventory(url, prefix=None, suffix=None, start_date=None, end_date=None, datetime_key='LastModifiedDate'):
""" Return generator function for objects in Bucket with suffix (all files if suffix=None) """
parts = urlparse(url)
# get latest manifest file
today = datetime.now()
manifest_key = None
for dt in [today, today - timedelta(1)]:
_key = op.join(parts['key'], dt.strftime('%Y-%m-%d'))
_url = 's3://%s/%s' % (parts['bucket'], _key)
keys = [k for k in find(_url, suffix='manifest.json')]
if len(keys) == 1:
manifest_key = keys[0]
break
# read through latest manifest looking for matches
if manifest_key:
_url = 's3://%s/%s' % (parts['bucket'], manifest_key)
manifest = read_json(_url)
# get file schema
keys = [str(key).strip() for key in manifest['fileSchema'].split(',')]

logger.info('Getting latest inventory from %s' % url)
counter = 0
for f in manifest.get('files', []):
_url = 's3://%s/%s' % (parts['bucket'], f['key'])
inv = read(_url).split('\n')
for line in inv:
counter += 1
if counter % 100000 == 0:
logger.debug('%s: Scanned %s records' % (datetime.now(), str(counter)))
info = {keys[i]: v for i, v in enumerate(line.replace('"', '').split(','))}
if 'Key' not in info:
continue
# skip to next if last modified date not between start_date and end_date
dt = datetime.strptime(info[datetime_key], "%Y-%m-%dT%H:%M:%S.%fZ").date()
if (start_date is not None and dt < start_date) or (end_date is not None and dt > end_date):
continue
if prefix is not None:
# if path doesn't match provided prefix skip to next record
if info['Key'][:len(prefix)] != prefix:
self.s3.head_object(Bucket=parts['bucket'], Key=parts['key'])
return True
except ClientError as exc:
if exc.response['Error']['Code'] != '404':
raise
return False

def upload(self, filename, url, public=False, extra={}, http_url=False):
""" Upload object to S3 uri (bucket + prefix), keeping same base filename """
logger.debug("Uploading %s to %s" % (filename, url))
parts = self.urlparse(url)
url_out = 's3://%s' % op.join(parts['bucket'], parts['key'])
if public:
extra['ACL'] = 'public-read'
with open(filename, 'rb') as data:
self.s3.upload_fileobj(data, parts['bucket'], parts['key'], ExtraArgs=extra)
if http_url:
region = self.s3.get_bucket_location(Bucket=parts['bucket'])['LocationConstraint']
return self.s3_to_https(url_out, region)
else:
return url_out

def download(self, uri, path=''):
"""
Download object from S3
:param uri: URI of object to download
:param path: Output path
"""
s3_uri = self.urlparse(uri)
fout = op.join(path, s3_uri['filename'])
logger.debug("Downloading %s as %s" % (uri, fout))
if path != '':
makedirs(path, exist_ok=True)

with open(fout, 'wb') as f:
self.s3.download_fileobj(
Bucket=s3_uri['bucket'],
Key=s3_uri['key'],
Fileobj=f
)
return fout

def read(self, url):
""" Read object from s3 """
parts = self.urlparse(url)
response = self.s3.get_object(Bucket=parts['bucket'], Key=parts['key'])
body = response['Body'].read()
if op.splitext(parts['key'])[1] == '.gz':
body = GzipFile(None, 'rb', fileobj=BytesIO(body)).read()
return body.decode('utf-8')

def read_json(self, url):
""" Download object from S3 as JSON """
return json.loads(self.read(url))


# function derived from https://alexwlchan.net/2018/01/listing-s3-keys-redux/
def find(self, url, suffix=''):
"""
Generate objects in an S3 bucket.
:param url: The beginning part of the URL to match (bucket + optional prefix)
:param suffix: Only fetch objects whose keys end with this suffix.
"""
parts = self.urlparse(url)
kwargs = {'Bucket': parts['bucket']}

# If the prefix is a single string (not a tuple of strings), we can
# do the filtering directly in the S3 API.
if isinstance(parts['key'], str):
kwargs['Prefix'] = parts['key']

while True:
# The S3 API response is a large blob of metadata.
# 'Contents' contains information about the listed objects.
resp = self.s3.list_objects_v2(**kwargs)
try:
contents = resp['Contents']
except KeyError:
return

for obj in contents:
key = obj['Key']
if key.startswith(parts['key']) and key.endswith(suffix):
yield f"s3://{parts['bucket']}/{obj['Key']}"

# The S3 API is paginated, returning up to 1000 keys at a time.
# Pass the continuation token into the next response, until we
# reach the final page (when this field is missing).
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break

def latest_inventory(self, url, prefix=None, suffix=None, start_date=None, end_date=None, datetime_key='LastModifiedDate'):
""" Return generator function for objects in Bucket with suffix (all files if suffix=None) """
parts = self.urlparse(url)
# get latest manifest file
today = datetime.now()
manifest_url = None
for dt in [today, today - timedelta(1)]:
_key = op.join(parts['key'], dt.strftime('%Y-%m-%d'))
_url = 's3://%s/%s' % (parts['bucket'], _key)
manifests = [k for k in self.find(_url, suffix='manifest.json')]
if len(manifests) == 1:
manifest_url = manifests[0]
break
# read through latest manifest looking for matches
if manifest_url:
manifest = self.read_json(manifest_url)
# get file schema
keys = [str(key).strip() for key in manifest['fileSchema'].split(',')]

logger.info('Getting latest inventory from %s' % url)
counter = 0
for f in manifest.get('files', []):
_url = 's3://%s/%s' % (parts['bucket'], f['key'])
inv = self.read(_url).split('\n')
for line in inv:
counter += 1
if counter % 100000 == 0:
logger.debug('%s: Scanned %s records' % (datetime.now(), str(counter)))
info = {keys[i]: v for i, v in enumerate(line.replace('"', '').split(','))}
if 'Key' not in info:
continue
# skip to next if last modified date not between start_date and end_date
dt = datetime.strptime(info[datetime_key], "%Y-%m-%dT%H:%M:%S.%fZ").date()
if (start_date is not None and dt < start_date) or (end_date is not None and dt > end_date):
continue
if suffix is None or info['Key'].endswith(suffix):
if 'Bucket' in keys and 'Key' in keys:
info['url'] = 's3://%s/%s' % (info['Bucket'], info['Key'])
yield info
if prefix is not None:
# if path doesn't match provided prefix skip to next record
if info['Key'][:len(prefix)] != prefix:
continue
if suffix is None or info['Key'].endswith(suffix):
if 'Bucket' in keys and 'Key' in keys:
info['url'] = 's3://%s/%s' % (info['Bucket'], info['Key'])
yield info


def get_presigned_url(url, aws_region=None,
Expand All @@ -199,7 +202,7 @@ def get_presigned_url(url, aws_region=None,
logger.debug('Not using signed URL for %s' % url)
return url, None

parts = urlparse(url)
parts = s3.urlparse(url)
bucket = parts['bucket']
key = parts['key']

Expand Down
Loading

0 comments on commit b65a4bc

Please sign in to comment.