Source code for fmu.sumo.explorer.objects.cube

"""Module containing class for cube object"""

from typing import Dict, Tuple

from sumo.wrapper import SumoClient

from ._child import Child


[docs] class Cube(Child): """Class representig a seismic cube object in Sumo""" def __init__(self, sumo: SumoClient, metadata: Dict, blob=None) -> None: """ Args: sumo (SumoClient): connection to Sumo metadata (dict): cube metadata """ super().__init__(sumo, metadata, blob) def _extract_auth(self, res) -> Tuple[str, str]: try: res = res.json() url = res.get("baseuri") + self.uuid sas = res.get("auth") except Exception: url, sas = res.text.split("?") pass return url, sas @property def auth(self) -> Tuple[str, str]: res = self._sumo.get(f"/objects('{self.uuid}')/blob/authuri") return self._extract_auth(res) @property async def auth_async(self) -> Tuple[str, str]: res = await self._sumo.get_async( f"/objects('{self.uuid}')/blob/authuri" ) return self._extract_auth(res) @property def openvds_handle(self): try: import openvds except ModuleNotFoundError: raise RuntimeError( "Unable to import openvds; probably not installed." ) url, sas = self.auth url = url.replace("https://", "azureSAS://") + "/" sas = "Suffix=?" + sas return openvds.open(url, sas) @property async def openvds_handle_async(self): try: import openvds except ModuleNotFoundError: raise RuntimeError( "Unable to import openvds; probably not installed." ) url, sas = await self.auth_async url = url.replace("https://", "azureSAS://") + "/" sas = "Suffix=?" + sas return openvds.open(url, sas)