ciocore.api_client
The api_client module is used to make requests to the Conductor API.
ApiClient
The ApiClient class is a wrapper around the requests library that handles authentication and retries.
Source code in ciocore/api_client.py
class ApiClient:
"""
The ApiClient class is a wrapper around the requests library that handles authentication and retries.
"""
http_verbs = ["PUT", "POST", "GET", "DELETE", "HEAD", "PATCH"]
USER_AGENT_TEMPLATE = "client {client_name}/{client_version} (ciocore {ciocore_version}; {runtime} {runtime_version}; {platform} {platform_details}; {hostname} {pid}; {python_path})"
USER_AGENT_MAX_PATH_LENGTH = 1024
user_agent_header = None
def __init__(self):
logger.debug("")
def _make_request(self, verb, conductor_url, headers, params, data, raise_on_error=True):
response = requests.request(
method=verb, url=conductor_url, headers=headers, params=params, data=data
)
logger.debug(f"verb: {verb}")
logger.debug(f"conductor_url: {conductor_url}")
logger.debug(f"headers: {headers}")
logger.debug(f"params: {params}")
logger.debug(f"data: {data}")
# If we get 300s/400s debug out the response. TODO(lws): REMOVE THIS
if 300 <= response.status_code < 500:
logger.debug("***** ERROR!! *****")
logger.debug(f"Reason: {response.reason}")
logger.debug(f"Text: {response.text}")
# trigger an exception to be raised for 4XX or 5XX http responses
if raise_on_error:
response.raise_for_status()
return response
def make_prepared_request(
self,
verb,
url,
headers=None,
params=None,
json_payload=None,
data=None,
stream=False,
remove_headers_list=None,
raise_on_error=True,
tries=5,
):
"""
Make a request to the Conductor API.
Deprecated:
Primarily used to removed enforced headers by requests.Request. Requests 2.x will add
Transfer-Encoding: chunked with file like object that is 0 bytes, causing s3 failures (501)
- https://github.com/psf/requests/issues/4215#issuecomment-319521235
To get around this bug make_prepared_request has functionality to remove the enforced header
that would occur when using requests.request(...). Requests 3.x resolves this issue, when
client is built to use Requests 3.x this function can be removed.
Returns:
requests.Response: The response object.
Args:
verb (str): The HTTP verb to use.
url (str): The URL to make the request to.
headers (dict): A dictionary of headers to send with the request.
params (dict): A dictionary of query parameters to send with the request.
json (dict): A JSON payload to send with the request.
stream (bool): Whether or not to stream the response.
remove_headers_list (list): A list of headers to remove from the request.
raise_on_error (bool): Whether or not to raise an exception if the request fails.
tries (int): The number of times to retry the request.
"""
req = requests.Request(
method=verb,
url=url,
headers=headers,
params=params,
json=json_payload,
data=data,
)
prepped = req.prepare()
if remove_headers_list:
for header in remove_headers_list:
prepped.headers.pop(header, None)
# Create a retry wrapper function
retry_wrapper = common.DecRetry(
retry_exceptions=CONNECTION_EXCEPTIONS, tries=tries
)
# requests sessions potentially not thread-safe, but need to removed enforced
# headers by using a prepared request.create which can only be done through an
# request.Session object. Create Session object per call of make_prepared_request, it will
# not benefit from connection pooling reuse. https://github.com/psf/requests/issues/1871
session = requests.Session()
# wrap the request function with the retry wrapper
wrapped_func = retry_wrapper(session.send)
# call the wrapped request function
response = wrapped_func(prepped, stream=stream)
logger.debug("verb: %s", prepped.method)
logger.debug("url: %s", prepped.url)
logger.debug("headers: %s", prepped.headers)
logger.debug("params: %s", req.params)
logger.debug("response: %s", response)
# trigger an exception to be raised for 4XX or 5XX http responses
if raise_on_error:
response.raise_for_status()
return response
def make_request(
self,
uri_path="/",
headers=None,
params=None,
data=None,
verb=None,
conductor_url=None,
raise_on_error=True,
tries=5,
use_api_key=False,
):
"""
Make a request to the Conductor API.
Args:
uri_path (str): The path to the resource to request.
headers (dict): A dictionary of headers to send with the request.
params (dict): A dictionary of query parameters to send with the request.
data (dict): A dictionary of data to send with the request.
verb (str): The HTTP verb to use.
conductor_url (str): The Conductor URL.
raise_on_error (bool): Whether or not to raise an exception if the request fails.
tries (int): The number of times to retry the request.
use`_api_key (bool): Whether or not to use the API key for authentication.
Returns:
tuple(str, int): The response text and status code.
"""
cfg = config.get()
# TODO: set Content Content-Type to json if data arg
if not headers:
headers = {"Content-Type": "application/json", "Accept": "application/json"}
logger.debug("read_conductor_credentials({})".format(use_api_key))
bearer_token = read_conductor_credentials(use_api_key)
if not bearer_token:
raise Exception("Error: Could not get conductor credentials!")
headers["Authorization"] = "Bearer %s" % bearer_token
if not ApiClient.user_agent_header:
self.register_client("ciocore")
headers["User-Agent"] = ApiClient.user_agent_header
# Construct URL
if not conductor_url:
conductor_url = parse.urljoin(cfg["url"], uri_path)
if not verb:
if data:
verb = "POST"
else:
verb = "GET"
assert verb in self.http_verbs, "Invalid http verb: %s" % verb
# Create a retry wrapper function
retry_wrapper = common.DecRetry(
retry_exceptions=CONNECTION_EXCEPTIONS, tries=tries
)
# wrap the request function with the retry wrapper
wrapped_func = retry_wrapper(self._make_request)
# call the wrapped request function
response = wrapped_func(
verb, conductor_url, headers, params, data, raise_on_error=raise_on_error
)
return response.text, response.status_code
@classmethod
def register_client(cls, client_name, client_version=None):
"""
Generates the http User Agent header that includes helpful debug info.
"""
# Use the provided client_version.
if not client_version:
client_version = 'unknown'
python_version = platform.python_version()
system_info = platform.system()
release_info = platform.release()
# Get the MD5 hex digest of the path to the python executable
python_executable_path = truncate_middle(sys.executable.encode('utf-8'), cls.USER_AGENT_MAX_PATH_LENGTH)
md5_hash = hashlib.md5(python_executable_path).hexdigest()
user_agent = (
f"{client_name}/{client_version} "
f"(python {python_version}; {system_info} {release_info}; {md5_hash})"
)
cls.user_agent_header = user_agent
return user_agent
make_prepared_request(self, verb, url, headers=None, params=None, json_payload=None, data=None, stream=False, remove_headers_list=None, raise_on_error=True, tries=5)
Make a request to the Conductor API.
Deprecated
Primarily used to removed enforced headers by requests.Request. Requests 2.x will add Transfer-Encoding: chunked with file like object that is 0 bytes, causing s3 failures (501) - https://github.com/psf/requests/issues/4215#issuecomment-319521235
To get around this bug make_prepared_request has functionality to remove the enforced header that would occur when using requests.request(...). Requests 3.x resolves this issue, when client is built to use Requests 3.x this function can be removed.
Returns:
Type | Description |
---|---|
requests.Response |
The response object. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
verb |
str |
The HTTP verb to use. |
required |
url |
str |
The URL to make the request to. |
required |
headers |
dict |
A dictionary of headers to send with the request. |
None |
params |
dict |
A dictionary of query parameters to send with the request. |
None |
json |
dict |
A JSON payload to send with the request. |
required |
stream |
bool |
Whether or not to stream the response. |
False |
remove_headers_list |
list |
A list of headers to remove from the request. |
None |
raise_on_error |
bool |
Whether or not to raise an exception if the request fails. |
True |
tries |
int |
The number of times to retry the request. |
5 |
Source code in ciocore/api_client.py
def make_prepared_request(
self,
verb,
url,
headers=None,
params=None,
json_payload=None,
data=None,
stream=False,
remove_headers_list=None,
raise_on_error=True,
tries=5,
):
"""
Make a request to the Conductor API.
Deprecated:
Primarily used to removed enforced headers by requests.Request. Requests 2.x will add
Transfer-Encoding: chunked with file like object that is 0 bytes, causing s3 failures (501)
- https://github.com/psf/requests/issues/4215#issuecomment-319521235
To get around this bug make_prepared_request has functionality to remove the enforced header
that would occur when using requests.request(...). Requests 3.x resolves this issue, when
client is built to use Requests 3.x this function can be removed.
Returns:
requests.Response: The response object.
Args:
verb (str): The HTTP verb to use.
url (str): The URL to make the request to.
headers (dict): A dictionary of headers to send with the request.
params (dict): A dictionary of query parameters to send with the request.
json (dict): A JSON payload to send with the request.
stream (bool): Whether or not to stream the response.
remove_headers_list (list): A list of headers to remove from the request.
raise_on_error (bool): Whether or not to raise an exception if the request fails.
tries (int): The number of times to retry the request.
"""
req = requests.Request(
method=verb,
url=url,
headers=headers,
params=params,
json=json_payload,
data=data,
)
prepped = req.prepare()
if remove_headers_list:
for header in remove_headers_list:
prepped.headers.pop(header, None)
# Create a retry wrapper function
retry_wrapper = common.DecRetry(
retry_exceptions=CONNECTION_EXCEPTIONS, tries=tries
)
# requests sessions potentially not thread-safe, but need to removed enforced
# headers by using a prepared request.create which can only be done through an
# request.Session object. Create Session object per call of make_prepared_request, it will
# not benefit from connection pooling reuse. https://github.com/psf/requests/issues/1871
session = requests.Session()
# wrap the request function with the retry wrapper
wrapped_func = retry_wrapper(session.send)
# call the wrapped request function
response = wrapped_func(prepped, stream=stream)
logger.debug("verb: %s", prepped.method)
logger.debug("url: %s", prepped.url)
logger.debug("headers: %s", prepped.headers)
logger.debug("params: %s", req.params)
logger.debug("response: %s", response)
# trigger an exception to be raised for 4XX or 5XX http responses
if raise_on_error:
response.raise_for_status()
return response
make_request(self, uri_path='/', headers=None, params=None, data=None, verb=None, conductor_url=None, raise_on_error=True, tries=5, use_api_key=False)
Make a request to the Conductor API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri_path |
str |
The path to the resource to request. |
'/' |
headers |
dict |
A dictionary of headers to send with the request. |
None |
params |
dict |
A dictionary of query parameters to send with the request. |
None |
data |
dict |
A dictionary of data to send with the request. |
None |
verb |
str |
The HTTP verb to use. |
None |
conductor_url |
str |
The Conductor URL. |
None |
raise_on_error |
bool |
Whether or not to raise an exception if the request fails. |
True |
tries |
int |
The number of times to retry the request. |
5 |
use`_api_key |
bool |
Whether or not to use the API key for authentication. |
required |
Returns:
Type | Description |
---|---|
tuple(str, int) |
The response text and status code. |
Source code in ciocore/api_client.py
def make_request(
self,
uri_path="/",
headers=None,
params=None,
data=None,
verb=None,
conductor_url=None,
raise_on_error=True,
tries=5,
use_api_key=False,
):
"""
Make a request to the Conductor API.
Args:
uri_path (str): The path to the resource to request.
headers (dict): A dictionary of headers to send with the request.
params (dict): A dictionary of query parameters to send with the request.
data (dict): A dictionary of data to send with the request.
verb (str): The HTTP verb to use.
conductor_url (str): The Conductor URL.
raise_on_error (bool): Whether or not to raise an exception if the request fails.
tries (int): The number of times to retry the request.
use`_api_key (bool): Whether or not to use the API key for authentication.
Returns:
tuple(str, int): The response text and status code.
"""
cfg = config.get()
# TODO: set Content Content-Type to json if data arg
if not headers:
headers = {"Content-Type": "application/json", "Accept": "application/json"}
logger.debug("read_conductor_credentials({})".format(use_api_key))
bearer_token = read_conductor_credentials(use_api_key)
if not bearer_token:
raise Exception("Error: Could not get conductor credentials!")
headers["Authorization"] = "Bearer %s" % bearer_token
if not ApiClient.user_agent_header:
self.register_client("ciocore")
headers["User-Agent"] = ApiClient.user_agent_header
# Construct URL
if not conductor_url:
conductor_url = parse.urljoin(cfg["url"], uri_path)
if not verb:
if data:
verb = "POST"
else:
verb = "GET"
assert verb in self.http_verbs, "Invalid http verb: %s" % verb
# Create a retry wrapper function
retry_wrapper = common.DecRetry(
retry_exceptions=CONNECTION_EXCEPTIONS, tries=tries
)
# wrap the request function with the retry wrapper
wrapped_func = retry_wrapper(self._make_request)
# call the wrapped request function
response = wrapped_func(
verb, conductor_url, headers, params, data, raise_on_error=raise_on_error
)
return response.text, response.status_code
register_client(client_name, client_version=None)
classmethod
Generates the http User Agent header that includes helpful debug info.
Source code in ciocore/api_client.py
@classmethod
def register_client(cls, client_name, client_version=None):
"""
Generates the http User Agent header that includes helpful debug info.
"""
# Use the provided client_version.
if not client_version:
client_version = 'unknown'
python_version = platform.python_version()
system_info = platform.system()
release_info = platform.release()
# Get the MD5 hex digest of the path to the python executable
python_executable_path = truncate_middle(sys.executable.encode('utf-8'), cls.USER_AGENT_MAX_PATH_LENGTH)
md5_hash = hashlib.md5(python_executable_path).hexdigest()
user_agent = (
f"{client_name}/{client_version} "
f"(python {python_version}; {system_info} {release_info}; {md5_hash})"
)
cls.user_agent_header = user_agent
return user_agent
truncate_middle(s, max_length)
Truncate the string s
to max_length
by removing characters from the middle.
:param s: The original string to be truncated. :type s: str :param max_length: The maximum allowed length of the string after truncation. :type max_length: int :return: The truncated string. :rtype: str
Source code in ciocore/api_client.py
def truncate_middle(s, max_length):
"""
Truncate the string `s` to `max_length` by removing characters from the middle.
:param s: The original string to be truncated.
:type s: str
:param max_length: The maximum allowed length of the string after truncation.
:type max_length: int
:return: The truncated string.
:rtype: str
"""
if len(s) <= max_length:
# String is already at or below the maximum length, return it as is
return s
# Calculate the number of characters to keep from the start and end of the string
num_keep_front = (max_length // 2)
num_keep_end = max_length - num_keep_front - 1 # -1 for the ellipsis
# Construct the truncated string
return s[:num_keep_front] + '~' + s[-num_keep_end:]
read_conductor_credentials(use_api_key=False)
Read the conductor credentials file.
If the credentials file exists, it will contain a bearer token from either the user or the API key.
If the credentials file doesn't exist, or is expired, or is from a different domain, we try to fetch a new one in the API key scenario or prompt the user to log in.
Args: use_api_key (bool): Whether or not to try to use the API key
Returns: A Bearer token in the event of a success or None
Source code in ciocore/api_client.py
def read_conductor_credentials(use_api_key=False):
"""
Read the conductor credentials file.
If the credentials file exists, it will contain a bearer token from either
the user or the API key.
If the credentials file doesn't exist, or is
expired, or is from a different domain, we try to fetch a new one in the API key scenario or
prompt the user to log in.
Args:
use_api_key (bool): Whether or not to try to use the API key
Returns:
A Bearer token in the event of a success or None
"""
cfg = config.get()
logger.debug("Reading conductor credentials...")
if use_api_key:
if not cfg.get("api_key"):
use_api_key = False
if use_api_key and not cfg["api_key"].get("client_id"):
use_api_key = False
logger.debug("use_api_key = %s" % use_api_key)
creds_file = get_creds_path(use_api_key)
logger.debug("Creds file is %s" % creds_file)
logger.debug("Auth url is %s" % cfg["url"])
if not os.path.exists(creds_file):
if use_api_key:
if not cfg["api_key"]:
logger.debug("Attempted to use API key, but no api key in in config!")
return None
# Exchange the API key for a bearer token
logger.debug("Attempting to get API key bearer token")
get_api_key_bearer_token(creds_file)
else:
auth.run(creds_file, cfg["url"])
if not os.path.exists(creds_file):
return None
logger.debug("Reading credentials file...")
with open(creds_file, "r", encoding="utf-8") as fp:
file_contents = json.loads(fp.read())
expiration = file_contents.get("expiration")
same_domain = creds_same_domain(file_contents)
if same_domain and expiration and expiration >= int(time.time()):
return file_contents["access_token"]
logger.debug("Credentials have expired or are from a different domain")
if use_api_key:
logger.debug("Refreshing API key bearer token!")
get_api_key_bearer_token(creds_file)
else:
logger.debug("Sending to auth page...")
auth.run(creds_file, cfg["url"])
# Re-read the creds file, since it has been re-upped
with open(creds_file, "r", encoding="utf-8") as fp:
file_contents = json.loads(fp.read())
return file_contents["access_token"]
get_api_key_bearer_token(creds_file=None)
Get a bearer token from the API key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
creds_file |
str |
The path to the credentials file. If not provided, the bearer token will not be written to disk. |
None |
Returns:
Type | Description |
---|---|
A dictionary containing the bearer token and other information. |
Source code in ciocore/api_client.py
def get_api_key_bearer_token(creds_file=None):
"""
Get a bearer token from the API key.
Args:
creds_file (str): The path to the credentials file. If not provided, the bearer token will not be written to disk.
Returns:
A dictionary containing the bearer token and other information.
"""
cfg = config.get()
url = "{}/api/oauth_jwt".format(cfg["url"])
response = requests.get(
url,
params={
"grant_type": "client_credentials",
"scope": "owner admin user",
"client_id": cfg["api_key"]["client_id"],
"client_secret": cfg["api_key"]["private_key"],
},
)
if response.status_code == 200:
response_dict = json.loads(response.text)
credentials_dict = {
"access_token": response_dict["access_token"],
"token_type": "Bearer",
"expiration": int(time.time()) + int(response_dict["expires_in"]),
"scope": "user admin owner",
}
if not creds_file:
return credentials_dict
if not os.path.exists(os.path.dirname(creds_file)):
os.makedirs(os.path.dirname(creds_file))
with open(creds_file, "w") as fp:
fp.write(json.dumps(credentials_dict))
return
get_creds_path(api_key=False)
Get the path to the credentials file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
api_key |
bool |
Whether or not to use the API key. |
False |
Returns:
Type | Description |
---|---|
str |
The path to the credentials file. |
Source code in ciocore/api_client.py
def get_creds_path(api_key=False):
"""
Get the path to the credentials file.
Args:
api_key (bool): Whether or not to use the API key.
Returns:
str: The path to the credentials file.
"""
creds_dir = os.path.join(os.path.expanduser("~"), ".config", "conductor")
if api_key:
creds_file = os.path.join(creds_dir, "api_key_credentials")
else:
creds_file = os.path.join(creds_dir, "credentials")
return creds_file
get_bearer_token(refresh=False)
Return the bearer token.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
refresh |
bool |
Whether or not to refresh the token. |
False |
Source code in ciocore/api_client.py
def get_bearer_token(refresh=False):
"""
Return the bearer token.
Args:
refresh (bool): Whether or not to refresh the token.
TODO: Thread safe multiproc caching, like it used to be pre-python3.7.
"""
return read_conductor_credentials(True)
creds_same_domain(creds)
Check if the creds are for the same domain as the config.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
creds |
dict |
The credentials dictionary. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether or not the creds are for the same domain as the config. |
Source code in ciocore/api_client.py
def creds_same_domain(creds):
"""
Check if the creds are for the same domain as the config.
Args:
creds (dict): The credentials dictionary.
Returns:
bool: Whether or not the creds are for the same domain as the config.
"""
cfg = config.get()
"""Ensure the creds file refers to the domain in config"""
token = creds.get("access_token")
if not token:
return False
decoded = jwt.decode(creds["access_token"], verify=False)
audience_domain = decoded.get("aud")
return (
audience_domain
and audience_domain.rpartition("/")[-1] == cfg["api_url"].rpartition("/")[-1]
)
account_id_from_jwt(token)
Fetch the accounts id from a jwt token value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
str |
The jwt token. |
required |
Returns:
Type | Description |
---|---|
str |
The account id. |
Source code in ciocore/api_client.py
def account_id_from_jwt(token):
"""
Fetch the accounts id from a jwt token value.
Args:
token (str): The jwt token.
Returns:
str: The account id.
"""
payload = jwt.decode(token, verify=False)
return payload.get("account")
account_name_from_jwt(token)
Fetch the accounts name from a jwt token value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
str |
The jwt token. |
required |
Returns:
Type | Description |
---|---|
str |
The account name. |
Source code in ciocore/api_client.py
def account_name_from_jwt(token):
"""
Fetch the accounts name from a jwt token value.
Args:
token (str): The jwt token.
Returns:
str: The account name.
"""
account_id = account_id_from_jwt(token)
cfg = config.get()
if account_id:
url = "%s/api/v1/accounts/%s" % (cfg["api_url"], account_id)
response = requests.get(url, headers={"authorization": "Bearer %s" % token})
if response.status_code == 200:
response_dict = json.loads(response.text)
return response_dict["data"]["name"]
return None
request_instance_types(as_dict=False)
Get the list of available instances types.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
as_dict |
bool |
Whether or not to return the instance types as a dictionary. |
False |
Returns:
Type | Description |
---|---|
list |
The list of instance types. |
Source code in ciocore/api_client.py
def request_instance_types(as_dict=False):
"""
Get the list of available instances types.
Args:
as_dict (bool): Whether or not to return the instance types as a dictionary.
Returns:
list: The list of instance types.
"""
api = ApiClient()
response, response_code = api.make_request(
"api/v1/instance-types", use_api_key=True, raise_on_error=False
)
if response_code not in (200,):
msg = "Failed to get instance types"
msg += "\nError %s ...\n%s" % (response_code, response)
raise Exception(msg)
instance_types = json.loads(response).get("data", [])
logger.debug("Found available instance types: %s", instance_types)
if as_dict:
return dict(
[(instance["description"], instance) for instance in instance_types]
)
return instance_types
request_projects(statuses=('active',))
Query Conductor for all client Projects that are in the given status(es).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
statuses |
tuple |
The statuses to filter for. |
('active',) |
Returns:
Type | Description |
---|---|
list |
The list of project names. |
Source code in ciocore/api_client.py
def request_projects(statuses=("active",)):
"""
Query Conductor for all client Projects that are in the given status(es).
Args:
statuses (tuple): The statuses to filter for.
Returns:
list: The list of project names.
"""
api = ApiClient()
logger.debug("statuses: %s", statuses)
uri = "api/v1/projects/"
response, response_code = api.make_request(
uri_path=uri, verb="GET", raise_on_error=False, use_api_key=True
)
logger.debug("response: %s", response)
logger.debug("response: %s", response_code)
if response_code not in [200]:
msg = "Failed to get available projects from Conductor"
msg += "\nError %s ...\n%s" % (response_code, response)
raise Exception(msg)
projects = []
# Filter for only projects of the proper status
for project in json.loads(response).get("data") or []:
if not statuses or project.get("status") in statuses:
projects.append(project["name"])
return projects
request_software_packages()
Query Conductor for all software packages for the currently available sidecar.
Returns:
Type | Description |
---|---|
list |
The list of software packages. |
Source code in ciocore/api_client.py
def request_software_packages():
"""
Query Conductor for all software packages for the currently available sidecar.
Returns:
list: The list of software packages.
"""
api = ApiClient()
uri = "api/v1/ee/packages?all=true,"
response, response_code = api.make_request(
uri_path=uri, verb="GET", raise_on_error=False, use_api_key=True
)
if response_code not in [200]:
msg = "Failed to get software packages for latest sidecar"
msg += "\nError %s ...\n%s" % (response_code, response)
raise Exception(msg)
software = json.loads(response).get("data", [])
software = [sw for sw in software if not ("3dsmax" in sw["product"] and sw["platform"] == "linux")]
return software
request_extra_environment()
Query Conductor for extra environment.
Source code in ciocore/api_client.py
def request_extra_environment():
"""
Query Conductor for extra environment.
"""
api = ApiClient()
uri = "api/v1/integrations/env-vars-configs"
response, response_code = api.make_request(
uri_path=uri, verb="GET", raise_on_error=False, use_api_key=True
)
if response_code not in [200]:
msg = "Failed to get extra environment"
msg += "\nError %s ...\n%s" % (response_code, response)
raise Exception(msg)
all_accounts = json.loads(response).get("data", [])
token = read_conductor_credentials(True)
if not token:
raise Exception("Error: Could not get conductor credentials!")
account_id = str(account_id_from_jwt(token))
if not account_id:
raise Exception("Error: Could not get account id from jwt!")
account_env = next((account for account in all_accounts if account["account_id"] == account_id), None)
if not account_env:
raise Exception("Error: Could not get account environment!")
return account_env.get("env", [])
get_jobs(first_jid, last_jid=None)
Query Conductor for all jobs between the given job ids.
Returns:
Type | Description |
---|---|
list |
The list of jobs. |
Exceptions:
Type | Description |
---|---|
Exception |
If the request fails. |
Examples:
>>> from ciocore import api_client
>>> jobs = api_client.get_jobs(1959)
>>> len(jobs)
1
>>> jobs[0]["jid"]
'01959'
>>> jobs = api_client.get_jobs(1959, 1961)
>>> len(jobs)
3
Source code in ciocore/api_client.py
def get_jobs(first_jid, last_jid=None):
"""
Query Conductor for all jobs between the given job ids.
Returns:
list: The list of jobs.
Raises:
Exception: If the request fails.
Examples:
>>> from ciocore import api_client
>>> jobs = api_client.get_jobs(1959)
>>> len(jobs)
1
>>> jobs[0]["jid"]
'01959'
>>> jobs = api_client.get_jobs(1959, 1961)
>>> len(jobs)
3
"""
if last_jid is None:
last_jid = first_jid
low = str(int(first_jid) - 1).zfill(5)
high = str(int(last_jid) + 1).zfill(5)
api = ApiClient()
uri = "api/v1/jobs"
response, response_code = api.make_request(
uri_path=uri,
verb="GET",
raise_on_error=False,
use_api_key=True,
params={"filter": f"jid_gt_{low},jid_lt_{high}"},
)
if response_code not in [200]:
msg = f"Failed to get jobs {first_jid}-{last_jid}"
msg += "\nError %s ...\n%s" % (response_code, response)
raise Exception(msg)
jobs = json.loads(response).get("data")
return jobs
get_log(job_id, task_id)
Get the log for the given job and task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id |
str |
The job id. |
required |
task_id |
str |
The task id. |
required |
Returns:
Type | Description |
---|---|
list |
A list of logs. |
Exceptions:
Type | Description |
---|---|
Exception |
If the request fails. |
Examples:
>>> from ciocore import api_client
>>> logs = api_client.get_log(1959, 0)
{
"logs": [
{
"container_id": "j-5669544198668288-5619559933149184-5095331660038144-stde",
"instance_name": "renderer-5669544198668288-170062309438-62994",
"log": [
"Blender 2.93.0 (hash 84da05a8b806 built 2021-06-02 11:29:24)",
...
...
"Saved: '/var/folders/8r/46lmjdmj50x_0swd9klwptzm0000gq/T/blender_bmw/renders/render_0001.png'",
" Time: 00:29.22 (Saving: 00:00.32)",
"",
"",
"Blender quit"
],
"timestamp": "1.700623521101516E9"
}
],
"new_num_lines": [
144
],
"status_description": "",
"task_status": "success"
}
Source code in ciocore/api_client.py
def get_log(job_id, task_id):
"""
Get the log for the given job and task.
Args:
job_id (str): The job id.
task_id (str): The task id.
Returns:
list: A list of logs.
Raises:
Exception: If the request fails.
Examples:
>>> from ciocore import api_client
>>> logs = api_client.get_log(1959, 0)
{
"logs": [
{
"container_id": "j-5669544198668288-5619559933149184-5095331660038144-stde",
"instance_name": "renderer-5669544198668288-170062309438-62994",
"log": [
"Blender 2.93.0 (hash 84da05a8b806 built 2021-06-02 11:29:24)",
...
...
"Saved: '/var/folders/8r/46lmjdmj50x_0swd9klwptzm0000gq/T/blender_bmw/renders/render_0001.png'",
" Time: 00:29.22 (Saving: 00:00.32)",
"",
"",
"Blender quit"
],
"timestamp": "1.700623521101516E9"
}
],
"new_num_lines": [
144
],
"status_description": "",
"task_status": "success"
}
"""
job_id = str(job_id).zfill(5)
task_id = str(task_id).zfill(3)
api = ApiClient()
uri = f"get_log_file?job={job_id}&task={task_id}&num_lines[]=0"
response, response_code = api.make_request(
uri_path=uri, verb="GET", raise_on_error=False, use_api_key=True
)
if response_code not in [200]:
msg = f"Failed to get log for job {job_id} task {task_id}"
msg += "\nError %s ...\n%s" % (response_code, response)
raise Exception(msg)
return response
kill_jobs(*job_ids)
Kill the given jobs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_ids |
list |
The list of job ids. |
() |
Returns:
Type | Description |
---|---|
dict |
The response. |
Examples:
>>> from ciocore import api_client
>>> api_client.kill_jobs("03095","03094")
{'body': 'success', 'message': "Jobs [u'03095', u'03094'] have been kill."}
Source code in ciocore/api_client.py
def kill_jobs(*job_ids):
"""
Kill the given jobs.
Args:
job_ids (list): The list of job ids.
Returns:
dict: The response.
Examples:
>>> from ciocore import api_client
>>> api_client.kill_jobs("03095","03094")
{'body': 'success', 'message': "Jobs [u'03095', u'03094'] have been kill."}
"""
job_ids = [str(job_id).zfill(5) for job_id in job_ids]
api = ApiClient()
payload = {
"action": "kill",
"jobids": job_ids,
}
response, response_code = api.make_request(
uri_path="jobs_multi",
verb="PUT",
raise_on_error=False,
use_api_key=True,
data=json.dumps(payload)
)
if response_code not in [200]:
msg = f"Failed to kill jobs {job_ids}"
msg += "\nError %s ...\n%s" % (response_code, response)
raise Exception(msg)
return json.loads(response)
kill_tasks(job_id, *task_ids)
Kill the given tasks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id |
str |
The job id. |
required |
task_ids |
list |
The list of task ids. |
() |
Returns:
Type | Description |
---|---|
dict |
The response. |
Examples:
>>> from ciocore import api_client
>>> api_client.kill_tasks("03096", *range(50,56))
{'body': 'success', 'message': ' 6 Tasks set to "kill"
050
051
052
053
054
055'}
Source code in ciocore/api_client.py
def kill_tasks(job_id, *task_ids):
"""
Kill the given tasks.
Args:
job_id (str): The job id.
task_ids (list): The list of task ids.
Returns:
dict: The response.
Examples:
>>> from ciocore import api_client
>>> api_client.kill_tasks("03096", *range(50,56))
{'body': 'success', 'message': ' 6 Tasks set to "kill"\n\t050\n\t051\n\t052\n\t053\n\t054\n\t055'}
"""
job_id = str(job_id).zfill(5)
task_ids = [str(task_id).zfill(3) for task_id in task_ids]
api = ApiClient()
payload = {
"action": "kill",
"jobid": job_id,
"taskids": task_ids,
}
response, response_code = api.make_request(
uri_path="tasks_multi",
verb="PUT",
raise_on_error=False,
use_api_key=True,
data=json.dumps(payload)
)
if response_code not in [200]:
msg = f"Failed to kill tasks {task_ids} of job {job_id}"
msg += "\nError %s ...\n%s" % (response_code, response)
raise Exception(msg)
return json.loads(response)