# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Manages OpenTelemetry tracing span creation and handling. This is a PREVIEW FEATURE: Coverage and functionality may change."""

import logging
import os

from contextlib import contextmanager
from urllib.parse import urlparse
from google.api_core import exceptions as api_exceptions
from google.api_core import retry as api_retry
from google.cloud.storage import __version__
from google.cloud.storage.retry import ConditionalRetryPolicy


ENABLE_OTEL_TRACES_ENV_VAR = "ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES"
_DEFAULT_ENABLE_OTEL_TRACES_VALUE = False


def _parse_bool_env(name: str, default: bool = False) -> bool:
    val = os.environ.get(name, None)
    if val is None:
        return default
    return str(val).strip().lower() in {"1", "true", "yes", "on"}


enable_otel_traces = _parse_bool_env(
    ENABLE_OTEL_TRACES_ENV_VAR, _DEFAULT_ENABLE_OTEL_TRACES_VALUE
)
logger = logging.getLogger(__name__)

try:
    from opentelemetry import trace

    HAS_OPENTELEMETRY = True

except ImportError:
    logger.debug(
        "This service is instrumented using OpenTelemetry. "
        "OpenTelemetry or one of its components could not be imported; "
        "please add compatible versions of opentelemetry-api and "
        "opentelemetry-instrumentation packages in order to get Storage "
        "Tracing data."
    )
    HAS_OPENTELEMETRY = False

_default_attributes = {
    "rpc.service": "CloudStorage",
    "rpc.system": "http",
    "user_agent.original": f"gcloud-python/{__version__}",
}

_cloud_trace_adoption_attrs = {
    "gcp.client.service": "storage",
    "gcp.client.version": __version__,
    "gcp.client.repo": "googleapis/python-storage",
}


@contextmanager
def create_trace_span(name, attributes=None, client=None, api_request=None, retry=None):
    """Creates a context manager for a new span and set it as the current span
    in the configured tracer. If no configuration exists yields None."""
    if not HAS_OPENTELEMETRY or not enable_otel_traces:
        yield None
        return

    tracer = trace.get_tracer(__name__)
    final_attributes = _get_final_attributes(attributes, client, api_request, retry)
    # Yield new span.
    with tracer.start_as_current_span(
        name=name, kind=trace.SpanKind.CLIENT, attributes=final_attributes
    ) as span:
        try:
            yield span
        except api_exceptions.GoogleAPICallError as error:
            span.set_status(trace.Status(trace.StatusCode.ERROR))
            span.record_exception(error)
            raise


def _get_final_attributes(attributes=None, client=None, api_request=None, retry=None):
    collected_attr = _default_attributes.copy()
    collected_attr.update(_cloud_trace_adoption_attrs)
    if api_request:
        collected_attr.update(_set_api_request_attr(api_request, client))
    if isinstance(retry, api_retry.Retry):
        collected_attr.update(_set_retry_attr(retry))
    if isinstance(retry, ConditionalRetryPolicy):
        collected_attr.update(
            _set_retry_attr(retry.retry_policy, retry.conditional_predicate)
        )
    if attributes:
        collected_attr.update(attributes)
    final_attributes = {k: v for k, v in collected_attr.items() if v is not None}
    return final_attributes


def _set_api_request_attr(request, client):
    attr = {}
    if request.get("method"):
        attr["http.request.method"] = request.get("method")
    if request.get("path"):
        full_url = client._connection.build_api_url(request.get("path"))
        attr.update(_get_opentelemetry_attributes_from_url(full_url, strip_query=True))
    if "timeout" in request:
        attr["connect_timeout,read_timeout"] = str(request.get("timeout"))
    return attr


def _set_retry_attr(retry, conditional_predicate=None):
    predicate = conditional_predicate if conditional_predicate else retry._predicate
    retry_info = f"multiplier{retry._multiplier}/deadline{retry._deadline}/max{retry._maximum}/initial{retry._initial}/predicate{predicate}"
    return {"retry": retry_info}


def _get_opentelemetry_attributes_from_url(url, strip_query=True):
    """Helper to assemble OpenTelemetry span attributes from a URL."""
    u = urlparse(url)
    netloc = u.netloc
    # u.hostname is always lowercase. We parse netloc to preserve casing.
    # netloc format: [userinfo@]host[:port]
    if "@" in netloc:
        netloc = netloc.split("@", 1)[1]
    if ":" in netloc and not netloc.endswith("]"):  # Handle IPv6 literal
        netloc = netloc.split(":", 1)[0]

    attributes = {
        "server.address": netloc,
        "server.port": u.port,
        "url.scheme": u.scheme,
        "url.path": u.path,
    }
    if not strip_query:
        attributes["url.query"] = u.query

    return attributes
