"""
client provides the runZero platform Client which must be provided to all objects which interact
with the runZero API.
"""
from __future__ import annotations
from enum import Enum
from typing import Any, Optional
from urllib.parse import urlparse
import requests
from pydantic import BaseModel
from requests.exceptions import ConnectionError as RequestsConnectionError
from requests.exceptions import ConnectTimeout as RequestsConnectTimeout
from requests.exceptions import ContentDecodingError
from requests.exceptions import HTTPError as RequestsHTTPError
from runzero.types import RateLimitInformation
from ._http.auth import OAuthToken, RegisteredAPIClient
from ._http.io import Request, Response
from .errors import AuthError
[docs]class Client:
"""
The authenticated connection to your runZero service destination.
A client must be built and provided to objects which interact
with the runZero API. It is responsible for authentication
and communication, and instantiation should be your first step
in doing just about anything in this SDK.
:param account_key: Optional account key, sometimes known as client key, which
is a high-privilege key which grants the holder the rights of an org_key across
all organizations in the account, as well as additional administrative actions.
Account keys are 30-character hexadecimal strings that with 'CT'. Set this value,
use an org_key, or use Oauth by calling register_api_client(). OAuth should be
preferred.
:param org_key: Optional organization key which grants the holder rights
to operations confined to a specific runZero organization.
Org keys are 30-character hexadecimal strings that with 'OT'. Set this value,
use an account_key, or use Oauth by calling register_api_client(). OAuth should be
preferred.
:param server_url: Optional URL to the server hosting the API. Self-hosted API
server targets must provide the server url in string form,
e.g. 'https://runzero.local:8443'
If not provided, the default hosted infrastructure URL
'https://console.runzero.com' is used.
:type server_url: str
:param validate_certificate: Optional bool to change whether Client checks
the validity of the API server's certificate before proceeding. We recommend
not setting this to false unless you are doing local development or testing.
Ignoring certificate validation errors can result in credential theft or other
bad outcomes.
:type validate_certificate: bool
"""
__default_timeout__ = 180
__default_server_url__ = "https://console.runzero.com"
class _Paths(str, Enum):
"""Enum of resource paths for the runZero APIs"""
TOKEN = "api/v1.0/account/api/token"
class _AuthScope(Enum):
"""Enum of auth scopes for the runZero APIs"""
ACCOUNT = 1
ORG = 2
def __init__(
self,
account_key: Optional[str] = None,
org_key: Optional[str] = None,
server_url: Optional[str] = None,
timeout_seconds: Optional[int] = None,
validate_certificate: Optional[bool] = None,
):
"""Constructor method"""
self.__account_key: Optional[str] = account_key
self.__org_key: Optional[str] = org_key
self._use_token: bool = False
self.__client_id: Optional[str] = None
self.__client_secret: Optional[str] = None
self.__token: Optional[OAuthToken] = None
server_url = server_url or self.__default_server_url__
parsed = urlparse(server_url)
if not all([parsed.scheme, parsed.netloc]):
raise ValueError(f"Url {server_url} is not valid")
if not parsed.scheme == "https":
raise ValueError(f"Url {server_url} must be https")
self.server_url = parsed.geturl().rstrip("/")
if timeout_seconds is not None and timeout_seconds <= 0:
raise ValueError("Timeout must be greater than 0")
self._timeout = timeout_seconds or self.__default_timeout__
if validate_certificate is None:
self._validate_cert = True
else:
self._validate_cert = validate_certificate
self._rate_limit_information: Optional[RateLimitInformation] = None
@property
def oauth_token_is_expired(self) -> bool:
"""Returns true if the oauth token is no longer valid.
Note that the token is automatically refreshed for you when
possible. This value doesn't need to be checked and manually
refreshed in most cases.
:returns: bool: indicating whether the oauth token is expired.
If OAuth is not in-use with the client, value is always false.
"""
if self.__token:
return self.__token.is_expired()
return False
@property
def oauth_active(self) -> bool:
"""
Returns true if the OAuth is in use.
This happens when register_api_client was called successfully
:returns: bool: indicating whether the oauth token is expired.
If there is no oauth used with the client, value is always false.
"""
return self._use_token
[docs] def oauth_login(self, client_id: str, client_secret: str) -> None:
"""
Registers the runZero SDK client using OAuth credentials, and
enables the Client to use OAuth.
To obtain a client ID and client secret, see the API Clients area
of the product. Generation of these values is restricted to account
administrators.
:param client_id: The client ID for the runZero registered API client
:param client_secret: The client secret for the runZero registered API client
:raises: AuthError: Exception for invalid OAuth configurations
"""
self.__client_id = client_id
self.__client_secret = client_secret
self._use_token = True
return self._login()
@property
def url(self) -> str:
"""The url of the server
:returns: str: The URL of the runZero server
"""
return self.server_url
# This is only used if the auth type is a registered api client
def _login(self) -> None:
"""
Attempts to use the client secret and client id to generate an OAuth token.
:raises: AuthError
"""
if not self._use_token or (self.__client_id is None or self.__client_secret is None):
raise AuthError("invalid auth configuration")
try:
resp = requests.post(
f"{self.server_url}/{self._Paths.TOKEN.value}",
data=RegisteredAPIClient(self.__client_id, self.__client_secret).register(),
timeout=self._timeout,
verify=self._validate_cert,
)
resp.raise_for_status()
self.__token = resp.json(object_hook=OAuthToken.parse_obj)
except (
RequestsConnectTimeout,
RequestsConnectionError,
RequestsHTTPError,
ContentDecodingError,
ConnectionRefusedError,
) as exc:
raise AuthError("failed to authenticate") from exc
def _get_auth_token(self, scope: _AuthScope) -> str:
"""
Validates and resolves the bearer token to use depending on the provided API scope.
Will also refresh the OAuth token if it's about to expire.
:param scope: Authentication scope for the requested credential to resolve
:returns: Bearer token for the required API scope
:rtype string
:raises: AuthError
"""
self._validate_scope_permissions(scope)
if self._use_token:
if self.__token is not None:
# this handles refreshing the token if necessary
if self.__token.is_expired():
self._login()
return self.__token.access_token
if scope == self._AuthScope.ACCOUNT:
if self.__account_key is not None:
return self.__account_key
if scope == self._AuthScope.ORG:
if self.__account_key is not None:
return self.__account_key
if self.__org_key is not None:
return self.__org_key
raise AuthError("invalid credential configurations")
def _validate_scope_permissions(self, scope: _AuthScope) -> None:
if self._use_token:
if self.__token is None:
raise AuthError("no valid OAuth token")
return
if scope is self._AuthScope.ACCOUNT:
if self.__account_key is None:
raise AuthError("missing account key")
return
if scope is self._AuthScope.ORG:
if self.__org_key is None:
if self.__account_key is None:
raise AuthError("missing organization or account key")
return
@property
def last_rate_limit_information(self) -> Optional[RateLimitInformation]:
"""
The last rate limit information retrieved from the server.
:returns: Rate limit information when provided.
"""
return self._rate_limit_information
@property
def timeout(self) -> int:
"""
The set request timeout value in seconds
:returns: timeout in seconds
"""
return self._timeout
@property
def validate_cert(self) -> bool:
"""
Boolean indicating whether the https cert must valid before proceeding.
:returns: true if certficate information is validated, false if not
"""
return self._validate_cert
[docs] def execute(
self,
method: str,
endpoint: str,
params: Optional[Any] = None,
data: Optional[BaseModel] = None,
files: Optional[Any] = None,
multipart: Optional[bool] = None,
) -> Response:
"""Executes the request
:param method: The REST verb to use
:param endpoint: The path to execute against
:param params: URL query parameters
:param data: The data to send in form body (POST, PATCH, PUT)
:param files: For multipart form data or file uploads. Format varies.
:param multipart: True if using a multipart form data (combination file[s] and form data)
:returns: The result of the execution as class:.`Response`
:raises: ValidationError, ConnTimeoutError, ConnError, CommunicationError
"""
token: str = ""
try:
token = self._get_auth_token(self._AuthScope.ACCOUNT)
except AuthError:
pass
if not token:
token = self._get_auth_token(self._AuthScope.ORG)
form_data = None
if data:
form_data = data.json()
resp = Request(
url=f"{self.url}/{endpoint}",
token=token,
method=method,
handlers=None,
params=params,
timeout=self.timeout,
validate_certificate=self.validate_cert,
data=form_data,
files=files,
multipart=multipart,
).execute()
self._rate_limit_information = resp.rate_limit_information
return resp