21.10 サンプルコードで使用している関数
URLの生成
サンプルコードでは、HTTPリクエストに使用するURLを生成する関数をBlockStorageAPIクラスに定義し、メインの処理から呼び出して使用します。操作対象のリソースやメソッドごとに必要なパラメータを指定すると、対応するURLを生成して返します。BlockStorageAPIのサンプルコードについて次に説明します。
このサンプルコード内では、URLで使用するストレージデバイスIDを、パラメータで指定したストレージシステムのモデル名とシリアル番号から生成します。STORAGE_MODEL_DICTパラメータで設定されたモデル名から、ストレージシステムの機種ごとの固定値へ読み替えます。必要に応じて、システムの環境や要件に合わせた値に変更してください。
# coding:utf-8
"""
This class creates a URL for the REST API
to manage block storage
"""
STORAGE_MODEL_DICT = {
"VSP E990": "936000",
"VSP 5500H": "900000",
"VSP G900": "886000",
"VSP G800": "836000",
"VSP G1500":"800000",
"HUS VM": "730000",
"VSP": "7000000"}
class BlockStorageAPI():
# SVP IP address
svp_ip_addr = None
# port number
port = None
# storage URL
base_url = None
# object URL
object_url = None
# service URL
service_url = None
# storage device ID
storage_id = None
URLのうち、各リクエストで共通となる次の形式の部分を生成します。
- objectsドメインの場合:
-
<プロトコル>://<ホスト名>:<ポート番号>/ConfigurationManager/<バージョン>/objects/storages/<ストレージデバイスID>
- servicesドメインの場合:
-
<プロトコル>://<ホスト名>:<ポート番号>/ConfigurationManager/<バージョン>/<ストレージデバイスID>/services
def __init__(self, svp_ip_addr, port, storage_model,
serial_number):
self.svp_ip_addr = svp_ip_addr
self.port = port
self.storage_id = STORAGE_MODEL_DICT[storage_model]\
+ serial_number
self.base_url = "https://" + \
self.svp_ip_addr + ":" + self.port + \
"/ConfigurationManager/v1"
self.object_url = "/objects/storages/" + self.storage_id
self.service_url = "/" + self.storage_id + "/services"
次に、操作対象のリソースと操作に応じて関数を定義します。例えば、ボリューム作成を実行するためのURLは、ldevsに対応するblock_storage_api.ldevsで生成します。
def __init__(self, svp_ip_addr, port, storage_model,
serial_number):
self.svp_ip_addr = svp_ip_addr
self.port = port
self.storage_id = STORAGE_MODEL_DICT[storage_model]\
+ serial_number
self.base_url = "https://" + \
self.svp_ip_addr + ":" + self.port + \
"/ConfigurationManager/v1"
self.object_url = "/objects/storages/" + self.storage_id
self.service_url = "/" + self.storage_id + "/services"
def get_storage_id(self):
return self.storage_id
def ldevs(self):
url = self.base_url + self.object_url + "/ldevs"
return url
def undefined_ldev(self):
url = self.ldevs() + \
"?ldevOption=undefined&count=1"
return url
def ldev(self, object_id):
url = self.ldevs() + "/" + str(object_id)
return url
def views_ldevs(self):
url = self.base_url + "/views/ldevs?$query=ldev.storageDeviceId eq '" + \
self.storage_id + "'"
return url
def views_undefined_ldev(self):
url = self.views_ldevs() + \
"&$query=ldev.isDefined eq false&$count=1"
return url
def host_groups(self):
url = self.base_url + self.object_url + "/host-groups"
return url
def host_wwns(self):
url = self.base_url + self.object_url + "/host-wwns"
return url
def luns(self):
url = self.base_url + self.object_url + "/luns"
return url
def local_copy_pairs(self):
url = self.base_url + self.object_url + "/local-clone-copypairs"
return url
def split_local_copy_pair_template(self, pair_url):
url = pair_url + "/actions/split"
return url
def split_local_copy_pair(self, pair_url):
url = pair_url + "/actions/split/invoke"
return url
def generate_session(self):
url = self.base_url + self.object_url + "/sessions"
return url
def discard_session(self, object_id):
url = self.base_url + self.object_url + "/sessions/" + \
str(object_id)
return url
def lock(self):
url = self.base_url + self.service_url + \
"/resource-group-service/" + \
"actions/lock/invoke"
return url
def unlock(self):
url = self.base_url + self.service_url + \
"/resource-group-service/" + \
"actions/unlock/invoke"
return url
def remote_storage(self):
url = self.base_url + self.object_url + "/remote-storages"
return url
def remote_copy_pairs(self):
url = self.base_url + self.object_url +\
"/remote-mirror-copypairs"
return url
def job(self, object_id):
url = self.base_url + self.object_url + "/jobs/" + \
str(object_id)
return url
def affected_resource(self, affected_resource):
url = "https://" + self.svp_ip_addr + ":" + \
self.port \
+ affected_resource
return url
def api_version(self):
url = "https://" + self.svp_ip_addr + ":" + \
self.port \
+ "/ConfigurationManager/configuration/version"
return url
def local_storages(self):
url = self.base_url + "/objects/storages"
return url
def local_storage(self, object_id):
url = self.local_storages() + "/" + str(object_id)
return url
def file_upload(self):
url = self.base_url + self.object_url + "/actions/file-upload/invoke"
return url
def auditlog_syslog(self):
url = self.base_url + self.object_url + "/auditlog-syslog-servers" + \
"/instance"
return url
def auditlog_syslog_send_test(self):
url = self.auditlog_syslog() + "/actions/send-test/invoke"
return url
def drives(self):
url = self.base_url + self.object_url + "/drives"
return url
def drives_parity_group(self, parity_group_id):
url = self.drives() + "?parityGroupId=" + str(parity_group_id)
return url
def parity_groups(self):
url = self.base_url + self.object_url + "/parity-groups"
return url
def parity_group(self, object_id):
url = self.parity_groups() + "/" + str(object_id)
return url
def encryption_keys(self):
url = self.base_url + self.object_url + "/encryption-keys"
return url
def encryption_key(self, object_id):
url = self.encryption_keys() + "/" + str(object_id)
return url
def encryption_key_file(self):
url = self.encryption_key("file")
return url
def encryption_key_file_backup(self):
url = self.encryption_key_file() + "/actions/backup/invoke"
return url
def encryption_key_file_restore(self):
url = self.encryption_key_file() + "/actions/restore/invoke"
return url
HTTPリクエストの発行と非同期処理の状態確認
REST APIでは、オブジェクトの作成や属性変更の操作はジョブとして登録され、非同期に実行されます。非同期処理の場合、リクエストの実行結果が反映されたリソースは、ジョブが完了したことを確認してから取得する必要があります。サンプルコードでは、invoke_async_command関数を使用することで、リクエストの発行とジョブの完了待ちの処理を合わせて行います。invoke_async_command関数は、メソッドの種類、URL、リクエストボディを指定すると、指定されたメソッドに応じてリクエストを発行し、ジョブの完了を待ってリソースの情報を返します。
invoke_async_command関数は、ボリューム割り当て、ShadowImageペア操作、リモートストレージシステムの情報登録の各サンプルコード内で定義されています。invoke_async_command関数のサンプルコードの内容を次に示します。
あらかじめ、invoke_async_command関数から呼び出して使用する、ジョブの状態を取得する関数を定義します。"""
Check whether the asynchronous command was finished.
@param job_id the job ID to identify
the asynchronous command
@return r the response data
"""
def check_update(job_id):
url = block_storage_api.job(str(job_id))
r = requests.get(url, headers=headers, verify=False)
return r
次に、invoke_async_command関数の定義をします。リクエスト生成時には、リクエストボディの生成をJSON形式で行うよう指定します。認証情報は、セッション生成時に取得したトークンを使用して指定します。
"""
Execute the HTTP request (POST or PUT)
@param method_type HTTP request method (POST or PUT)
@param url URL to execute HTTP method
@param body The information of a resource
@return job_result.json()["affectedResources"][0]
URL of an affected resource
"""
def invoke_async_command(method_type, url, body):
if method_type == "put":
r = requests.put(url, headers=headers,
data=json.dumps(body), verify=False)
elif method_type == "post":
r = requests.post(
url,
headers=headers,
data=json.dumps(body),
verify=False)
if r.status_code != http.client.ACCEPTED:
raise requests.HTTPError(r)
print("Request was accepted. JOB URL : " +
r.json()["self"])
リクエストを発行したあと、ジョブのステータスがCompletedになるまで、ジョブの情報を繰り返し取得します。パラメータで指定した最大リトライ回数を超えてもジョブが完了しない場合は、処理を終了します。ジョブがエラーとなった場合も、エラーコードを取得して処理を終了します。
status = "Initializing"
job_result = None
retry_count = 1
wait_time = FIRST_WAIT_TIME
while status != "Completed":
if retry_count > MAX_RETRY_COUNT:
raise Exception("Timeout Error! "
"Operation was not completed.")
time.sleep(wait_time)
job_result = check_update(r.json()["jobId"])
status = job_result.json()["status"]
double_time = wait_time * 2
if double_time < 120:
wait_time = double_time
else:
wait_time = 120
retry_count += 1
if job_result.json()["state"] == "Failed":
error_obj = job_result.json()["error"]
if "errorCode" in error_obj:
if "SSB1" in error_obj["errorCode"]:
print("Error! SSB code : ",
error_obj["errorCode"]["SSB1"],
", ", error_obj["errorCode"]["SSB2"])
elif "errorCode" in error_obj["errorCode"]:
print("Error! error code : ",
error_obj["errorCode"]["errorCode"])
raise Exception("Job Error!", job_result.text)
ジョブの実行に成功し、ステータスがCompletedになったら、ジョブの実行結果が反映されたリソースのURLを取得します。affectedResourcesに返される値は1件だけのため、1件目を取得します。
print("Async job was succeeded. affected resource : " +
job_result.json()["affectedResources"][0])
return job_result.json()["affectedResources"][0]
非同期処理の状態変化取得
wait_until_jobstatus_is_changed関数はREST APIで非同期に実行されるジョブの状態を取得し、ジョブが指定した実行状態に遷移するのを待って、リソースの情報を返します。wait_until_jobstatus_is_changed関数は、TrueCopyペア操作のサンプルコード内で定義されています。wait_until_jobstatus_is_changed関数のサンプルコードの内容を次に示します。
あらかじめ、wait_until_jobstatus_is_changed関数から呼び出して使用する、ジョブの状態を取得する関数を定義します。
"""
Check whether the asynchronous command was finished.
@param storage_api storage_api
@param job_id the job ID to identify
the asynchronous command
@param headers the array of the http headers
@return r the response data
"""
def check_update(storage_api, job_id, headers):
url = storage_api.job(str(job_id))
r = requests.get(url, headers=headers, verify=False)
return r
次に、wait_until_jobstatus_is_changed関数の定義をします。changed_statusには、遷移したことを検知したいジョブの状態を指定します。is_retry_count_enabledにTrueを指定すると、MAX_RETRY_COUNTパラメータに設定した回数までリトライしたあと、タイムアウトエラーを返します。Falseを指定すると、指定した状態にジョブが遷移するまで待ち続けます。
"""
Wait until the job status is changed
@param storage_api storage_api
@param headers the array of the http headers
@param job_id the job ID to identify
the asynchronous command
@param changed_status job status after waiting
@param is_retry_count_enabled if true, wait
until MAX_RETRY_COUNT. if false, wait forever
until job status is changed.
@return job_result.json()["affectedResources"][0]
URL of an affected resource
"""
def wait_until_jobstatus_is_changed(
storage_api,
headers,
job_id,
changed_status,
is_retry_count_enabled):
status = "Initializing"
retry_count = 1
wait_time = FIRST_WAIT_TIME
while status != changed_status:
if status == "Completed":
print("Status was already changed" +
"to Completed.")
break
if is_retry_count_enabled and \
retry_count > MAX_RETRY_COUNT:
raise Exception("Timeout Error! "
"Operation was not completed.")
time.sleep(wait_time)
job_result = check_update(storage_api,
job_id, headers)
status = job_result.json()["status"]
double_time = wait_time * 2
if double_time < 120:
wait_time = double_time
else:
wait_time = 120
retry_count += 1
if job_result.json()["state"] == "Failed":
error_obj = job_result.json()["error"]
if "errorCode" in error_obj:
if "SSB1" in error_obj["errorCode"]:
print("Error! SSB code : ",
error_obj["errorCode"]["SSB1"],
", ", error_obj["errorCode"]["SSB2"])
elif "errorCode" in error_obj["errorCode"]:
print("Error! error code : ",
error_obj["errorCode"]["errorCode"])
raise Exception("Job Error!", job_result.text)
print("Async job was succeeded. affected resource : " +
job_result.json()["affectedResources"][0])
return job_result.json()["affectedResources"][0]