检测队列

了解如何手动检测代码以使用 Sentry 的 Queues 模块。

Sentry 提供了一个队列监控仪表板,可以自动检测流行的 Python 队列设置(如 Celery)。

如果您使用的队列不被支持,您仍然可以通过在队列生产者和消费者周围手动检测自定义 Span 和事务来确保拥有关于消息队列的性能数据。

要开始捕获性能指标,请使用 sentry_sdk.start_span() 函数包装您的队列生产者事件。您的 Span op 必须设置为 queue.publish。包括以下 Span 数据以丰富生产者 Span 的队列指标:

数据属性类型描述
messaging.message.idstring消息标识符
messaging.destination.namestring队列或主题名称
messaging.message.body.sizeint消息体的大小(以字节为单位)

您的 queue.publish Span 必须存在于一个事务中,才能被识别为生产者 Span。如果您使用的是 支持的 Web 框架,事务将由集成创建。如果您使用纯 Python,可以使用 sentry_sdk.start_transaction() 启动一个新的事务。

您还必须在消息中包含跟踪头(sentry-tracebaggage),以便您的消费者在消息被拾取后可以继续跟踪。

Copied
from datetime import datetime, timezone

import sentry_sdk
import my_custom_queue

# Initialize Sentry
sentry_sdk.init(...)

connection = my_custom_queue.connect()

# The message you want to send to the queue
queue = "messages"
message = "Hello World!"
message_id = "abc123"

# Create transaction
# If you are using a web framework, the framework integration
# will create this for you and you can omit this.
with sentry_sdk.start_transaction(
    op="function",
    name="queue_producer_transaction",
):
    # Create the span
    with sentry_sdk.start_span(
        op="queue.publish",
        name="queue_producer",
    ) as span:
        # Set span data
        span.set_data("messaging.message.id", message_id)
        span.set_data("messaging.destination.name", queue)
        span.set_data("messaging.message.body.size", len(message.encode("utf-8")))

        # Publish the message to the queue (including trace information and current time stamp)
        now = int(datetime.now(timezone.utc).timestamp())
        connection.publish(
            queue=queue,
            body=message,
            timestamp=now,
            headers={
                "sentry-trace": sentry_sdk.get_traceparent(),
                "baggage": sentry_sdk.get_baggage(),
            },
        )

要开始捕获性能指标,请使用 sentry_sdk.start_span() 函数包装您的队列消费者。您的 Span op 必须设置为 queue.process。包括以下 Span 数据以丰富消费者 Span 的队列指标:

数据属性类型描述
messaging.message.idstring消息标识符
messaging.destination.namestring队列或主题名称
messaging.message.body.sizenumber消息体的大小(以字节为单位)
messaging.message.retry.countnumber消息处理尝试的次数
messaging.message.receive.latencynumber消息在队列中等待处理的时间(以毫秒为单位)

您的 queue.process Span 必须存在于一个事务中,才能被识别为消费者 Span。如果您使用的是 支持的 Web 框架,事务将由集成创建。如果您使用纯 Python,可以使用 sentry_sdk.start_transaction() 启动一个新的事务。

使用 sentry_sdk.continue_trace() 将您的消费者 Span 连接到其关联的生产者 Span,并使用 span.set_status() 标记消息的跟踪为成功或失败。

Copied
from datetime import datetime, timezone

import sentry_sdk
import my_custom_queue

# Initialize Sentry
sentry_sdk.init(...)

connection = my_custom_queue.connect()

# Pick up message from queues
queue = "messages"
message = connection.consume(queue=queue)

# Calculate latency (optional, but valuable)
now = datetime.now(timezone.utc)
message_time = datetime.fromtimestamp(message["timestamp"], timezone.utc)
latency = now - message_time

# Continue the trace started in the producer
# If you are using a web framework, the framework integration
# will create this for you and you can omit this.
transaction = sentry_sdk.continue_trace(
    message["headers"],
    op="function",
    name="queue_consumer_transaction",
)
with sentry_sdk.start_transaction(transaction):
    # Create the span
    with sentry_sdk.start_span(
        op="queue.process",
        name="queue_consumer",
    ) as span:
        # Set span data
        span.set_data("messaging.message.id", message["message_id"])
        span.set_data("messaging.destination.name", queue)
        span.set_data("messaging.message.body.size", message["body"])
        span.set_data("messaging.message.receive.latency", latency)
        span.set_data("messaging.message.retry.count", 0)

        try:
            # Process the message
            process_message(message)
        except Exception:
            # In case of an error set the status to "internal_error"
            span.set_status("internal_error")