Datadog agent checks: How to instrument custom metrics for your application
Ever felt like Datadog’s built-in integrations don’t cover your most crucial metrics?
Let’s go beyond “out-of-the-box” and see how you can instrument custom metrics for your own application using Manticore searchd as a real-world example.
Table of Contents
- Why create a custom Datadog Agent check?
- The challenge: manticore search
- Getting Started: Datadog Custom Agent Checks
- Example configuration
- Example check script
- Metrics Collected
- Dashboard notes
- Agentless Custom Metrics: Send Directly from Your Application
Why create a custom Datadog Agent check?
- 📈 Track metrics unique to your stack
- 🦄 Observe services Datadog doesn’t know about
- 🛠️ Get creative with dashboards and alerts
The challenge: manticore search
While we already had a lot of system metrics from the instance that manticore is running on, the datadog mysql integration does not work for manticore due to it running an outdated version and its own abstraction on top. At the same time, whenever we had manticore outages in the past, the system itself was reporting a healthy state:
- manticore systemd service running fine
- No concrete tells form system metrics like CPU, RAM or disk usage
What we really needed: A check which is looking at core metrics like worker queue, manticore response time and replication state. In comes the custom check!
Getting Started: Datadog Custom Agent Checks
To create a custom Agent check, you need:
- The Datadog Agent installed (Python 3 compatible).
- Your check script in the
checks.ddirectory, and a matching config file inconf.d. - File names must match: e.g.
custom_manticore.pyandcustom_manticore.yaml.
Example configuration (conf.d/custom_manticore.yaml):
init_config:
instances:
- cluster_name: statista
binary: /usr/bin/searchd
args:
- --status
tags:
- service:manticore
- env:prod
logs:
- path: /var/log/manticore/searchd.log
service: manticore
source: manticore
type: file
Example check script
This custom Agent check runs searchd --status and publishes key metrics and service checks to Datadog.
The output of searchd --status needs to be parsed and mapped to the correct data types before being sent to datadog by the agent.
Note how values emitted from searchdare mapped to data types to be understood by and be useable within datadog:
<summary> manticore_search.py</summary>
from typing import Dict, List
import re
import shlex
import subprocess
from datadog_checks.base import AgentCheck, ConfigurationError
KEYVAL_RE = re.compile(r"^\s*([A-Za-z0-9_\.-]+)\s*:\s*(.+?)\s*$")
def parse_status_output(output: str) -> Dict[str, str]:
data: Dict[str, str] = {}
for line in output.splitlines():
m = KEYVAL_RE.match(line)
if not m:
continue
key, val = m.group(1), m.group(2)
data[key] = val
return data
def to_int(val: str) -> int:
try:
return int(val)
except Exception:
m = re.search(r"(-?\d+)", val)
return int(m.group(1)) if m else 0
def to_float(val: str) -> float:
try:
return float(val)
except Exception:
m = re.search(r"(-?\d+(?:\.\d+)?)", val)
return float(m.group(1)) if m else 0.0
class ManticoreSearchdCheck(AgentCheck):
__NAMESPACE__ = "manticore"
def check(self, instance):
binary = instance.get("binary", "searchd")
args = instance.get("args", ["--status"]) or ["--status"]
cluster_name = instance.get("cluster_name")
extra_tags = instance.get("tags", [])
if not cluster_name:
raise ConfigurationError("cluster_name is required in the instance config")
cmd = [binary] + args
try:
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True,
)
out = proc.stdout
except Exception as e:
self.service_check(
"manticore.cluster.node_state",
self.CRITICAL,
message=f"Failed to run {' '.join(shlex.quote(c) for c in cmd)}: {e}",
tags=extra_tags + [f"cluster:{cluster_name}"],
)
raise
raw = parse_status_output(out)
base_tags = list(extra_tags) + [f"cluster:{cluster_name}"]
self._submit_gauge(raw, "uptime", "uptime", base_tags)
self._submit_monotonic_count(raw, "connections", "connections", base_tags)
self._submit_monotonic_count(raw, "maxed_out", "maxed_out", base_tags)
self._submit_monotonic_count(raw, "queries", "queries", base_tags)
self._submit_gauge(raw, "workers_total", "workers.total", base_tags)
self._submit_gauge(raw, "workers_active", "workers.active", base_tags)
self._submit_gauge(raw, "workers_clients", "workers.clients", base_tags)
self._submit_gauge(raw, "work_queue_length", "work_queue.length", base_tags)
self._submit_monotonic_count(raw, "query_wall", "query_wall.total_seconds", base_tags, as_float=True)
self._submit_gauge(raw, "avg_query_wall", "avg_query_wall.seconds", base_tags, as_float=True)
cluster_prefix = f"cluster_{cluster_name}_"
node_state_comment = raw.get(f"{cluster_prefix}node_state")
sc_tags = list(base_tags)
if node_state_comment:
healthy = node_state_comment.lower() in ("synced", "healthy", "ok")
self.service_check("cluster.node_state", self.OK if healthy else self.CRITICAL, tags=sc_tags)
else:
self.service_check("cluster.node_state", self.UNKNOWN, tags=sc_tags)
self._submit_gauge(raw, f"{cluster_prefix}local_send_queue", "cluster.send_queue", base_tags)
self._submit_gauge(raw, f"{cluster_prefix}recv_queue", "cluster.recv_queue", base_tags)
def _metric(self, name: str) -> str:
return name
def _submit_gauge(self, raw: Dict[str, str], key: str, metric: str, tags: List[str], as_float: bool = False):
if key not in raw:
return None
val = to_float(raw[key]) if as_float else to_int(raw[key])
self.gauge(self._metric(metric), float(val), tags=tags)
return val
def _submit_monotonic_count(self, raw: Dict[str, str], key: str, metric: str, tags: List[str], as_float: bool = False):
if key not in raw:
return None
val = to_float(raw[key]) if as_float else to_int(raw[key])
self.count(self._metric(metric), float(val), tags=tags)
return val
Run the check manually:
# Agent v7+
datadog-agent check custom_manticore
Metrics Collected
You will see an output of the collected metrics, in this case:
| Metric | Description |
|---|---|
manticore.uptime |
Seconds searchd has been running |
manticore.connections |
Total client connections since startup |
manticore.maxed_out |
Times max concurrent connections was hit |
manticore.queries |
Number of queries served since startup |
manticore.workers.total |
Worker threads total |
manticore.workers.active |
Worker threads active |
manticore.workers.clients |
Worker threads serving clients |
manticore.work_queue.length |
Queries waiting in the internal queue |
manticore.query_wall.total_seconds |
Total wall-clock time spent on queries |
manticore.cluster.node_state |
Cluster node state (service check) |
manticore.cluster.send_queue |
Replication send queue length |
manticore.cluster.recv_queue |
Replication receive queue length |
Then open Datadog Live Metrics and query searchd.queries{service:manticore}.
If datadog_checks import fails, ensure this runs inside the Agent Python runtime; do not run standalone. If searchd requires sudo or a specific PATH, set binary to the full path in conf.yaml.
Dashboard notes
Now we can finally build our dashboard and monitors!
- Graph overlay
manticore.avg_query_wall.seconds. - Add worker utilization (active/total) and
manticore.work_queue.length. - Add replication lag
manticore.cluster.replication_lagand send/recv queues. - Add service check widgets for
manticore.cluster.node_stateandmanticore.cluster.role.
This alone will allow Watchdog to alert us on outliers! Automatically. We have also added a few monitors, e.g. to be alerted if the query wall time is over 200ms.
In addition, we can now much better understand what is going with the system and draw conclusions in comparison to other parts of our platform when looking at the metrics over time.
Agentless Custom Metrics: Send Directly from Your Application
Not running the Datadog Agent, or want to push metrics directly from your app?
You can send custom metrics to Datadog using the Datadog Metrics API.
Example: Sending a custom metric with Python
import requests
import time
api_key = "<YOUR_DATADOG_API_KEY>"
url = "https://api.datadoghq.com/api/v2/series"
headers = {
"Content-Type": "application/json",
"DD-API-KEY": api_key,
}
payload = {
"series": [
{
"metric": "myapp.custom.metric",
"type": "gauge",
"points": [[int(time.time()), 42]],
"tags": ["env:prod", "service:myapp"],
}
]
}
response = requests.post(url, json=payload, headers=headers)
print(response.status_code, response.text)
You can use any language or framework that can make HTTP requests.
See the API docs for more details and examples.
Pro tips: - Use meaningful metric names and tags for easy dashboarding. - You can send gauges, counts, rates, and more. - Metrics sent this way are subject to custom metrics billing.