Source code for runzero.api.imports.assets

"""
enables management of runZero custom integrations.

These operations are privileged and require an account token directly or an OAuth key that can generate one.
"""

import gzip
import tempfile
import time
import uuid
from typing import Iterable, List, Optional

from runzero.client import Client
from runzero.types import ImportAsset, ImportTask, NewAssetImport, Task


[docs]class CustomAssets: """Management of Custom Asset Data for your own custom integrations. Custom data integrations are descriptive registered associations between integrations of data and assets imported which are associated with those integrations. The data sent to the server has basic checks performed and is loaded as an import task when it can find the next available worker to do so. Therefore, the result is a `class:runzero.Task` which you can check the status of. See related :class:`runzero.account.CustomIntegrations` to work with the custom asset data integrations registered to the account. :param client: A handle to the :class:`runzero.Client` which manages interactions with the runZero server. """ _ENDPOINT = "api/v1.0/import/org/{oid}/assets" def __init__(self, client: Client): """Constructor method""" self._client = client
[docs] def upload_assets( self, org_id: uuid.UUID, site_id: uuid.UUID, custom_integration_id: uuid.UUID, assets: List[ImportAsset], task_info: Optional[ImportTask] = None, ) -> Task: """ Upload your custom assets to the runZero platform. See the ImportAsset object for a description of the data that can be imported. Assets are merged according to the merge logic in the release of the platform. This involves fields other than the custom_properties dictionary. If the runZero asset ID is known externally, it may be specified on any single ImportAsset object to override all merge rules and force that object's data onto the runZero asset with that ID. :param org_id: Organization ID to import these assets into :param site_id: ID of the Site to import these asstes into :param custom_integration_id: custom integration id for the provided Import Assets :param assets: A collection of ImportAssets to upload :param task_info: Descriptive information associated with the import task to be created. If omitted, a task name is generated for you :returns: Task: The runZero task associated with processing the asset upload :raises: ServerError, ClientError, AuthError """ # create default task_info not supplied if task_info is None: task_info = ImportTask(name=f"Custom Asset Import {time.time_ns():.0f}", description="py-sdk import") else: # set defaults if user sets these to empty if task_info.name == "": task_info.name = f"Custom Asset Import {time.time_ns():.0f}" if task_info.description is None or task_info.description == "": task_info.description = "py-sdk import" if task_info.exclude_unknown is None: task_info.exclude_unknown = False asset_import_req = _create_custom_asset_request( site_id=site_id, custom_integration_id=custom_integration_id, import_task=task_info, assets=assets, ) tags_as_str = "" if asset_import_req.import_task.tags is not None: tags_as_str = ",".join([tag.__root__ for tag in asset_import_req.import_task.tags]) multipart_form_data = ( ("assetData", ("asset_data.jsonl.gz", asset_import_req.asset_data)), ("siteId", (None, str(asset_import_req.site_id))), ("customIntegrationId", (None, str(asset_import_req.custom_integration_id))), ("importTask.name", (None, asset_import_req.import_task.name)), ("importTask.description", (None, asset_import_req.import_task.description)), # this requires casting to a lower-cased string to function properly ("importTask.excludeUnknown", (None, str(asset_import_req.import_task.exclude_unknown).lower())), ("importTask.tags", (None, tags_as_str)), ) res = self._client.execute("POST", self._ENDPOINT.format(oid=org_id), files=multipart_form_data, multipart=True) return Task.parse_obj(res.json_obj)
def _import_assets_into_gzip_jsonl(import_assets: Iterable[ImportAsset]) -> bytes: tmp = tempfile.TemporaryFile(mode="w+b") with gzip.GzipFile(fileobj=tmp, mode="wb") as gzw: for asset_obj in import_assets: gzw.write(asset_obj.json(by_alias=True).encode("utf-8") + "\n".encode("utf-8")) tmp.seek(0) return tmp.read() def _create_custom_asset_request( site_id: uuid.UUID, custom_integration_id: uuid.UUID, assets: Iterable[ImportAsset], import_task: ImportTask ) -> NewAssetImport: return NewAssetImport( site_id=site_id, custom_integration_id=custom_integration_id, import_task=import_task, asset_data=_import_assets_into_gzip_jsonl(assets), )