检测队列
了解如何手动检测代码以使用 Sentry 的 Queues 模块。
Sentry 提供了一个队列监控仪表板,可以自动检测流行的 Python 队列设置(如 Celery)。
如果您使用的队列不被支持,您仍然可以通过在队列生产者和消费者周围手动检测自定义 Span 和事务来确保拥有关于消息队列的性能数据。
要开始捕获性能指标,请使用 sentry_sdk.start_span()
函数包装您的队列生产者事件。您的 Span op
必须设置为 queue.publish
。包括以下 Span 数据以丰富生产者 Span 的队列指标:
数据属性 | 类型 | 描述 |
---|---|---|
messaging.message.id | string | 消息标识符 |
messaging.destination.name | string | 队列或主题名称 |
messaging.message.body.size | int | 消息体的大小(以字节为单位) |
您的 queue.publish
Span 必须存在于一个事务中,才能被识别为生产者 Span。如果您使用的是 支持的 Web 框架,事务将由集成创建。如果您使用纯 Python,可以使用 sentry_sdk.start_transaction()
启动一个新的事务。
您还必须在消息中包含跟踪头(sentry-trace
和 baggage
),以便您的消费者在消息被拾取后可以继续跟踪。
from datetime import datetime, timezone
import sentry_sdk
import my_custom_queue
# Initialize Sentry
sentry_sdk.init(...)
connection = my_custom_queue.connect()
# The message you want to send to the queue
queue = "messages"
message = "Hello World!"
message_id = "abc123"
# Create transaction
# If you are using a web framework, the framework integration
# will create this for you and you can omit this.
with sentry_sdk.start_transaction(
op="function",
name="queue_producer_transaction",
):
# Create the span
with sentry_sdk.start_span(
op="queue.publish",
name="queue_producer",
) as span:
# Set span data
span.set_data("messaging.message.id", message_id)
span.set_data("messaging.destination.name", queue)
span.set_data("messaging.message.body.size", len(message.encode("utf-8")))
# Publish the message to the queue (including trace information and current time stamp)
now = int(datetime.now(timezone.utc).timestamp())
connection.publish(
queue=queue,
body=message,
timestamp=now,
headers={
"sentry-trace": sentry_sdk.get_traceparent(),
"baggage": sentry_sdk.get_baggage(),
},
)
要开始捕获性能指标,请使用 sentry_sdk.start_span()
函数包装您的队列消费者。您的 Span op
必须设置为 queue.process
。包括以下 Span 数据以丰富消费者 Span 的队列指标:
数据属性 | 类型 | 描述 |
---|---|---|
messaging.message.id | string | 消息标识符 |
messaging.destination.name | string | 队列或主题名称 |
messaging.message.body.size | number | 消息体的大小(以字节为单位) |
messaging.message.retry.count | number | 消息处理尝试的次数 |
messaging.message.receive.latency | number | 消息在队列中等待处理的时间(以毫秒为单位) |
您的 queue.process
Span 必须存在于一个事务中,才能被识别为消费者 Span。如果您使用的是 支持的 Web 框架,事务将由集成创建。如果您使用纯 Python,可以使用 sentry_sdk.start_transaction()
启动一个新的事务。
使用 sentry_sdk.continue_trace()
将您的消费者 Span 连接到其关联的生产者 Span,并使用 span.set_status()
标记消息的跟踪为成功或失败。
from datetime import datetime, timezone
import sentry_sdk
import my_custom_queue
# Initialize Sentry
sentry_sdk.init(...)
connection = my_custom_queue.connect()
# Pick up message from queues
queue = "messages"
message = connection.consume(queue=queue)
# Calculate latency (optional, but valuable)
now = datetime.now(timezone.utc)
message_time = datetime.fromtimestamp(message["timestamp"], timezone.utc)
latency = now - message_time
# Continue the trace started in the producer
# If you are using a web framework, the framework integration
# will create this for you and you can omit this.
transaction = sentry_sdk.continue_trace(
message["headers"],
op="function",
name="queue_consumer_transaction",
)
with sentry_sdk.start_transaction(transaction):
# Create the span
with sentry_sdk.start_span(
op="queue.process",
name="queue_consumer",
) as span:
# Set span data
span.set_data("messaging.message.id", message["message_id"])
span.set_data("messaging.destination.name", queue)
span.set_data("messaging.message.body.size", message["body"])
span.set_data("messaging.message.receive.latency", latency)
span.set_data("messaging.message.retry.count", 0)
try:
# Process the message
process_message(message)
except Exception:
# In case of an error set the status to "internal_error"
span.set_status("internal_error")