Skip to content

deep.api.resource

Constant values for Resource data.

DEEP_RESOURCE_ATTRIBUTES = 'DEEP_RESOURCE_ATTRIBUTES' module-attribute

The environment key to find user defined attributes as key value string.

DEEP_SERVICE_NAME = 'DEEP_SERVICE_NAME' module-attribute

The environment key to define the service name for deep.

PROCESS_EXECUTABLE_NAME = 'process.executable.name' module-attribute

The name of the process executable. On Linux based systems, can be set to the Name in proc/[pid]/status. On Windows, can be set to the base name of GetProcessImageFileNameW.

SERVICE_INSTANCE_ID = 'service.instance.id' module-attribute

The string ID of the service instance. Note: MUST be unique for each instance of the same service.namespace,service.name pair (in other words service.namespace,service.name,service.instance.id triplet MUST be globally unique). The ID helps to distinguish instances of the same service that exist at the same time (e.g. instances of a horizontally scaled service). It is preferable for the ID to be persistent and stay the same for the lifetime of the service instance, however it is acceptable that the ID is ephemeral and changes during important lifetime events for the service (e.g. service restarts). If the service has no inherent unique ID that can be used as the value of this attribute it is recommended to generate a random Version 1 or Version 4 RFC 4122 UUID (services aiming for reproducible UUIDs may also use Version 5, see RFC 4122 for more recommendations).

SERVICE_NAME = 'service.name' module-attribute

Logical name of the service. Note: MUST be the same for all instances of horizontally scaled services. If the value was not specified, SDKs MUST fallback to unknown_service: concatenated with process.executable.name, e.g. unknown_service:bash. If process.executable.name is not available, the value MUST be set to unknown_service.

SERVICE_NAMESPACE = 'service.namespace' module-attribute

A namespace for service.name. Note: A string value having a meaning that helps to distinguish a group of services, for example the team name that owns a group of services. service.name is expected to be unique within the same namespace. If service.namespace is not specified in the Resource then service.name is expected to be unique for all services that have no explicit namespace defined (so the empty/unspecified namespace is simply one more valid namespace). Zero-length namespace string is assumed equal to unspecified namespace.

SERVICE_VERSION = 'service.version' module-attribute

The version string of the service API or implementation.

TELEMETRY_AUTO_VERSION = 'telemetry.auto.version' module-attribute

The version string of the auto instrumentation agent, if used.

TELEMETRY_SDK_LANGUAGE = 'telemetry.sdk.language' module-attribute

The language of the telemetry SDK.

TELEMETRY_SDK_NAME = 'telemetry.sdk.name' module-attribute

The name of the telemetry SDK as defined above.

TELEMETRY_SDK_VERSION = 'telemetry.sdk.version' module-attribute

The version string of the telemetry SDK.

DeepResourceDetector

Bases: ResourceDetector

Detect the resource information for Deep.

Source code in deep/api/resource/__init__.py
class DeepResourceDetector(ResourceDetector):
    """Detect the resource information for Deep."""

    def detect(self) -> "Resource":
        """
        Create a resource from the discovered environment data.

        :return: the created resource
        """
        env_resources_items = os.environ.get(DEEP_RESOURCE_ATTRIBUTES)
        env_resource_map = {}

        if env_resources_items:
            for item in env_resources_items.split(","):
                try:
                    key, value = item.split("=", maxsplit=1)
                except ValueError as exc:
                    logging.warning(
                        "Invalid key value resource attribute pair %s: %s",
                        item,
                        exc,
                    )
                    continue
                value_url_decoded = parse.unquote(value.strip())
                env_resource_map[key.strip()] = value_url_decoded

        service_name = os.environ.get(DEEP_SERVICE_NAME)
        if service_name:
            env_resource_map[SERVICE_NAME] = service_name
        return Resource(env_resource_map)

detect()

Create a resource from the discovered environment data.

:return: the created resource

Source code in deep/api/resource/__init__.py
def detect(self) -> "Resource":
    """
    Create a resource from the discovered environment data.

    :return: the created resource
    """
    env_resources_items = os.environ.get(DEEP_RESOURCE_ATTRIBUTES)
    env_resource_map = {}

    if env_resources_items:
        for item in env_resources_items.split(","):
            try:
                key, value = item.split("=", maxsplit=1)
            except ValueError as exc:
                logging.warning(
                    "Invalid key value resource attribute pair %s: %s",
                    item,
                    exc,
                )
                continue
            value_url_decoded = parse.unquote(value.strip())
            env_resource_map[key.strip()] = value_url_decoded

    service_name = os.environ.get(DEEP_SERVICE_NAME)
    if service_name:
        env_resource_map[SERVICE_NAME] = service_name
    return Resource(env_resource_map)

Resource

A Resource is an immutable representation of the entity producing telemetry as Attributes.

Source code in deep/api/resource/__init__.py
class Resource:
    """A Resource is an immutable representation of the entity producing telemetry as Attributes."""

    def __init__(
            self, attributes: Attributes, schema_url: typing.Optional[str] = None
    ):
        """
        Create new resource.

        :param attributes: the attributes
        :param schema_url: the schema url
        """
        self._attributes = BoundedAttributes(attributes=attributes)
        if schema_url is None:
            schema_url = ""
        self._schema_url = schema_url

    @staticmethod
    def create(
            attributes: typing.Optional[Attributes] = None,
            schema_url: typing.Optional[str] = None,
    ) -> "Resource":
        """
        Create a new `Resource` from attributes.

        Args:
            attributes: Optional zero or more key-value pairs.
            schema_url: Optional URL pointing to the schema

        Returns:
            The newly-created Resource.
        """
        if not attributes:
            attributes = {}
        resource = _DEFAULT_RESOURCE.merge(
            DeepResourceDetector().detect()
        ).merge(Resource(attributes, schema_url))
        if not resource.attributes.get(SERVICE_NAME, None):
            default_service_name = "unknown_service"
            process_executable_name = resource.attributes.get(
                PROCESS_EXECUTABLE_NAME, None
            )
            if process_executable_name:
                default_service_name += ":" + process_executable_name
            else:
                default_service_name += ":python"
            resource = resource.merge(
                Resource({SERVICE_NAME: default_service_name}, schema_url)
            )
        return resource

    @staticmethod
    def get_empty() -> "Resource":
        """Get an empty resource."""
        return _EMPTY_RESOURCE

    @property
    def attributes(self) -> BoundedAttributes:
        """The underlying attributes for the resource."""
        return self._attributes

    @property
    def schema_url(self) -> str:
        """The schema url for the resource."""
        return self._schema_url

    def merge(self, other: "Resource") -> "Resource":
        """
        Merge another resource into this one.

        Merges this resource and an updating resource into a new `Resource`.

        If a key exists on both the old and updating resource, the value of the
        updating resource will override the old resource value.

        The updating resource's `schema_url` will be used only if the old
        `schema_url` is empty. Attempting to merge two resources with
        different, non-empty values for `schema_url` will result in an error
        and return the old resource.

        Args:
            other: The other resource to be merged.

        Returns:
            The newly-created Resource.
        """
        merged_attributes = self.attributes.copy()
        merged_attributes.update(other.attributes)

        if self.schema_url == "":
            schema_url = other.schema_url
        elif other.schema_url == "":
            schema_url = self.schema_url
        elif self.schema_url == other.schema_url:
            schema_url = other.schema_url
        else:
            logging.error(
                "Failed to merge resources: The two schemas %s and %s are incompatible",
                self.schema_url,
                other.schema_url,
            )
            return self

        return Resource(merged_attributes, schema_url)

    def __eq__(self, other: object) -> bool:
        """Check if other object is equals to this one."""
        if not isinstance(other, Resource):
            return False
        return (
                self._attributes == other._attributes
                and self._schema_url == other._schema_url
        )

    def __hash__(self):
        """Create hash value for this object."""
        return hash(
            f"{dumps(self._attributes.copy(), sort_keys=True)}|{self._schema_url}"
        )

    def to_json(self, indent=4) -> str:
        """Convert this object to json."""
        return dumps(
            {
                "attributes": dict(self._attributes),
                "schema_url": self._schema_url,
            },
            indent=indent,
        )

attributes: BoundedAttributes property

The underlying attributes for the resource.

schema_url: str property

The schema url for the resource.

__eq__(other)

Check if other object is equals to this one.

Source code in deep/api/resource/__init__.py
def __eq__(self, other: object) -> bool:
    """Check if other object is equals to this one."""
    if not isinstance(other, Resource):
        return False
    return (
            self._attributes == other._attributes
            and self._schema_url == other._schema_url
    )

__hash__()

Create hash value for this object.

Source code in deep/api/resource/__init__.py
def __hash__(self):
    """Create hash value for this object."""
    return hash(
        f"{dumps(self._attributes.copy(), sort_keys=True)}|{self._schema_url}"
    )

__init__(attributes, schema_url=None)

Create new resource.

:param attributes: the attributes :param schema_url: the schema url

Source code in deep/api/resource/__init__.py
def __init__(
        self, attributes: Attributes, schema_url: typing.Optional[str] = None
):
    """
    Create new resource.

    :param attributes: the attributes
    :param schema_url: the schema url
    """
    self._attributes = BoundedAttributes(attributes=attributes)
    if schema_url is None:
        schema_url = ""
    self._schema_url = schema_url

create(attributes=None, schema_url=None) staticmethod

Create a new Resource from attributes.

Parameters:

Name Type Description Default
attributes Optional[Attributes]

Optional zero or more key-value pairs.

None
schema_url Optional[str]

Optional URL pointing to the schema

None

Returns:

Type Description
Resource

The newly-created Resource.

Source code in deep/api/resource/__init__.py
@staticmethod
def create(
        attributes: typing.Optional[Attributes] = None,
        schema_url: typing.Optional[str] = None,
) -> "Resource":
    """
    Create a new `Resource` from attributes.

    Args:
        attributes: Optional zero or more key-value pairs.
        schema_url: Optional URL pointing to the schema

    Returns:
        The newly-created Resource.
    """
    if not attributes:
        attributes = {}
    resource = _DEFAULT_RESOURCE.merge(
        DeepResourceDetector().detect()
    ).merge(Resource(attributes, schema_url))
    if not resource.attributes.get(SERVICE_NAME, None):
        default_service_name = "unknown_service"
        process_executable_name = resource.attributes.get(
            PROCESS_EXECUTABLE_NAME, None
        )
        if process_executable_name:
            default_service_name += ":" + process_executable_name
        else:
            default_service_name += ":python"
        resource = resource.merge(
            Resource({SERVICE_NAME: default_service_name}, schema_url)
        )
    return resource

get_empty() staticmethod

Get an empty resource.

Source code in deep/api/resource/__init__.py
@staticmethod
def get_empty() -> "Resource":
    """Get an empty resource."""
    return _EMPTY_RESOURCE

merge(other)

Merge another resource into this one.

Merges this resource and an updating resource into a new Resource.

If a key exists on both the old and updating resource, the value of the updating resource will override the old resource value.

The updating resource's schema_url will be used only if the old schema_url is empty. Attempting to merge two resources with different, non-empty values for schema_url will result in an error and return the old resource.

Parameters:

Name Type Description Default
other Resource

The other resource to be merged.

required

Returns:

Type Description
Resource

The newly-created Resource.

Source code in deep/api/resource/__init__.py
def merge(self, other: "Resource") -> "Resource":
    """
    Merge another resource into this one.

    Merges this resource and an updating resource into a new `Resource`.

    If a key exists on both the old and updating resource, the value of the
    updating resource will override the old resource value.

    The updating resource's `schema_url` will be used only if the old
    `schema_url` is empty. Attempting to merge two resources with
    different, non-empty values for `schema_url` will result in an error
    and return the old resource.

    Args:
        other: The other resource to be merged.

    Returns:
        The newly-created Resource.
    """
    merged_attributes = self.attributes.copy()
    merged_attributes.update(other.attributes)

    if self.schema_url == "":
        schema_url = other.schema_url
    elif other.schema_url == "":
        schema_url = self.schema_url
    elif self.schema_url == other.schema_url:
        schema_url = other.schema_url
    else:
        logging.error(
            "Failed to merge resources: The two schemas %s and %s are incompatible",
            self.schema_url,
            other.schema_url,
        )
        return self

    return Resource(merged_attributes, schema_url)

to_json(indent=4)

Convert this object to json.

Source code in deep/api/resource/__init__.py
def to_json(self, indent=4) -> str:
    """Convert this object to json."""
    return dumps(
        {
            "attributes": dict(self._attributes),
            "schema_url": self._schema_url,
        },
        indent=indent,
    )

ResourceDetector

Bases: ABC

Detect the resource information for Deep.

Source code in deep/api/resource/__init__.py
class ResourceDetector(abc.ABC):
    """Detect the resource information for Deep."""

    def __init__(self, raise_on_error=False):
        """
        Create a new detector.

        :param raise_on_error: should raise exception on error
        """
        self.raise_on_error = raise_on_error

    @abc.abstractmethod
    def detect(self) -> "Resource":
        """
        Create a resource.

        :return: the created resource
        """
        raise NotImplementedError()

__init__(raise_on_error=False)

Create a new detector.

:param raise_on_error: should raise exception on error

Source code in deep/api/resource/__init__.py
def __init__(self, raise_on_error=False):
    """
    Create a new detector.

    :param raise_on_error: should raise exception on error
    """
    self.raise_on_error = raise_on_error

detect() abstractmethod

Create a resource.

:return: the created resource

Source code in deep/api/resource/__init__.py
@abc.abstractmethod
def detect(self) -> "Resource":
    """
    Create a resource.

    :return: the created resource
    """
    raise NotImplementedError()

get_aggregated_resources(detectors, initial_resource=None, timeout=5)

Retrieve resources from detectors in the order that they were passed.

:param detectors: List of resources in order of priority :param initial_resource: Static resource. This has the highest priority :param timeout: Number of seconds to wait for each detector to return :return:

Source code in deep/api/resource/__init__.py
def get_aggregated_resources(
        detectors: typing.List["ResourceDetector"],
        initial_resource: typing.Optional[Resource] = None,
        timeout=5,
) -> "Resource":
    """Retrieve resources from detectors in the order that they were passed.

    :param detectors: List of resources in order of priority
    :param initial_resource: Static resource. This has the highest priority
    :param timeout: Number of seconds to wait for each detector to return
    :return:
    """
    detectors_merged_resource = initial_resource or Resource.create()
    import concurrent.futures

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(detector.detect) for detector in detectors]
        for detector_ind, future in enumerate(futures):
            detector = detectors[detector_ind]
            try:
                detected_resource = future.result(timeout=timeout)
            # pylint: disable=broad-except
            except Exception as ex:
                detected_resource = _EMPTY_RESOURCE
                if detector.raise_on_error:
                    raise ex
                logging.warning(
                    "Exception %s in detector %s, ignoring", ex, detector
                )
            finally:
                detectors_merged_resource = detectors_merged_resource.merge(
                    detected_resource
                )

    return detectors_merged_resource