使用Python还原s3的GLACIER对象的代码
注意:发起还原之前要先检查是不是有正在进行的还原任务,或者有已经还原的临时对象。
以下是发起任务后调用head-object返回的json。
{
"AcceptRanges": "bytes",
"Restore": "ongoing-request=\"true\"",
"LastModified": "Tue, 26 Nov 2019 08:26:24 GMT",
"ContentLength": 735,
"ETag": "\"1e0284fa4441896a4a267777caef067a\"",
"ContentType": "text/plain",
"Metadata": {},
"StorageClass": "GLACIER"
}
"Restore"
表示有正还原的任务或者存在已经还原的临时对象。值"ongoing-request=\"true\""
表示正在还原对象,当值为"ongoing-request=\"false\", expiry-date=\"Sun, 05 Jan 2020 00:00:00 GMT\""
时表示对象已经还原成功,持续时间到05 Jan 2020 00:00:00 GMT
。
测试脚本
#!/usr/bin/ python3
# -*- coding: utf-8 -*-
import logging
import boto3
from botocore.exceptions import ClientError
import sys
import os
import time
import botocore
BUCKET_NAME=""
OBJECT_NAME=""
# 发起还原 bucket_name桶名 object_name对象名
# days还原出来的临时对象有效天数,到期后自动删除,不包括还原出来当天。
def restore_object(bucket_name, object_name, days, retrieval_type='Standard'):
request = {'Days': days,
'GlacierJobParameters': {'Tier': retrieval_type}}
s3 = boto3.client('s3')
try:
s3.restore_object(Bucket=bucket_name, Key=object_name, RestoreRequest=request)
except ClientError as e:
logging.error(e)
logging.error(f"NoSuchBucket, NoSuchKey, or InvalidObjectState error == the object's, storage class was not GLACIER. {bucket_name} {object_name} ")
return False
return True
# 查看对象 bucket_name桶名 object_name对象名
def head_object(bucket_name, object_name):
s3 = boto3.client('s3')
response = None
try:
response = s3.head_object(Bucket=bucket_name, Key=object_name)
except ClientError as e:
logging.error(e)
logging.error(f"NoSuchBucket, NoSuchKey, or InvalidObjectState error == the object's, storage class was not GLACIER. {bucket_name} {object_name} ")
return None
return response
# 测试restore_object
def test_restore_object():
test_bucket_name = BUCKET_NAME
test_object_name = OBJECT_NAME
# Expedited 快速还原
days = 1 # 还原出来的临时对象有效天数,到期后自动删除,不包括还原出来当天。
success = restore_object(test_bucket_name, test_object_name, days, 'Expedited')
if success:
logging.info(f'Submitted request to restore {test_object_name} '
f'in {test_bucket_name}')
logging.info(f'restore object {success}')
return success
# 测试head_object
def test_head_object():
test_bucket_name = BUCKET_NAME
test_object_name = OBJECT_NAME
# head object.
success = head_object(test_bucket_name, test_object_name)
if success:
logging.info(f'Submitted request to restore {test_object_name} '
f'in {test_bucket_name}')
if success.get('Restore'):
logging.info('Restore {}'.format(success['Restore']))
index = success['Restore'].find('ongoing-request=\"false\"')
if -1 == index:
logging.info("正在还原...")
else:
logging.info(success['Restore'][success['Restore'].find('expiry-date='):])
logging.info("还原成功.")
else:
logging.info('need restore object {} {}'.format(test_bucket_name, test_object_name))
# logging.info(f'{success}')
return success
if __name__ == '__main__':
# Set up logging
# logging.basicConfig(level=logging.ERROR, format='%(levelname)s: %(asctime)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# logging.basicConfig(level=logging.WARNING, format='%(levelname)s: %(asctime)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(asctime)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(asctime)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
BUCKET_NAME="mybucket"
OBJECT_NAME="obj/test.png"
test_restore_object()
test_head_object()