Skip to content

Datadog agent checks: How to instrument custom metrics for your application

Ever felt like Datadog’s built-in integrations don’t cover your most crucial metrics?
Let’s go beyond “out-of-the-box” and see how you can instrument custom metrics for your own application using Manticore searchd as a real-world example.

Table of Contents

Why create a custom Datadog Agent check?

  • 📈 Track metrics unique to your stack
  • 🦄 Observe services Datadog doesn’t know about
  • 🛠️ Get creative with dashboards and alerts

While we already had a lot of system metrics from the instance that manticore is running on, the datadog mysql integration does not work for manticore due to it running an outdated version and its own abstraction on top. At the same time, whenever we had manticore outages in the past, the system itself was reporting a healthy state:

  • manticore systemd service running fine
  • No concrete tells form system metrics like CPU, RAM or disk usage

What we really needed: A check which is looking at core metrics like worker queue, manticore response time and replication state. In comes the custom check!

This is fine

Getting Started: Datadog Custom Agent Checks

To create a custom Agent check, you need:

  1. The Datadog Agent installed (Python 3 compatible).
  2. Your check script in the checks.d directory, and a matching config file in conf.d.
  3. File names must match: e.g. custom_manticore.py and custom_manticore.yaml.

Example configuration (conf.d/custom_manticore.yaml):

init_config:
instances:
  - cluster_name: statista
  binary: /usr/bin/searchd
  args:
    - --status
  tags:
    - service:manticore
    - env:prod
logs:
  - path: /var/log/manticore/searchd.log
  service: manticore
  source: manticore
  type: file

Example check script

This custom Agent check runs searchd --status and publishes key metrics and service checks to Datadog. The output of searchd --status needs to be parsed and mapped to the correct data types before being sent to datadog by the agent.

Note how values emitted from searchdare mapped to data types to be understood by and be useable within datadog:

<summary> manticore_search.py</summary>
  from typing import Dict, List
  import re
  import shlex
  import subprocess

  from datadog_checks.base import AgentCheck, ConfigurationError

  KEYVAL_RE = re.compile(r"^\s*([A-Za-z0-9_\.-]+)\s*:\s*(.+?)\s*$")

  def parse_status_output(output: str) -> Dict[str, str]:
      data: Dict[str, str] = {}
      for line in output.splitlines():
          m = KEYVAL_RE.match(line)
          if not m:
              continue
          key, val = m.group(1), m.group(2)
          data[key] = val
      return data

  def to_int(val: str) -> int:
      try:
          return int(val)
      except Exception:
          m = re.search(r"(-?\d+)", val)
          return int(m.group(1)) if m else 0

  def to_float(val: str) -> float:
      try:
          return float(val)
      except Exception:
          m = re.search(r"(-?\d+(?:\.\d+)?)", val)
          return float(m.group(1)) if m else 0.0

  class ManticoreSearchdCheck(AgentCheck):
      __NAMESPACE__ = "manticore"

      def check(self, instance):
          binary = instance.get("binary", "searchd")
          args = instance.get("args", ["--status"]) or ["--status"]
          cluster_name = instance.get("cluster_name")
          extra_tags = instance.get("tags", [])

          if not cluster_name:
              raise ConfigurationError("cluster_name is required in the instance config")

          cmd = [binary] + args
          try:
              proc = subprocess.run(
                  cmd,
                  stdout=subprocess.PIPE,
                  stderr=subprocess.PIPE,
                  check=True,
                  text=True,
              )
              out = proc.stdout
          except Exception as e:
              self.service_check(
                  "manticore.cluster.node_state",
                  self.CRITICAL,
                  message=f"Failed to run {' '.join(shlex.quote(c) for c in cmd)}: {e}",
                  tags=extra_tags + [f"cluster:{cluster_name}"],
              )
              raise

          raw = parse_status_output(out)
          base_tags = list(extra_tags) + [f"cluster:{cluster_name}"]

          self._submit_gauge(raw, "uptime", "uptime", base_tags)
          self._submit_monotonic_count(raw, "connections", "connections", base_tags)
          self._submit_monotonic_count(raw, "maxed_out", "maxed_out", base_tags)
          self._submit_monotonic_count(raw, "queries", "queries", base_tags)
          self._submit_gauge(raw, "workers_total", "workers.total", base_tags)
          self._submit_gauge(raw, "workers_active", "workers.active", base_tags)
          self._submit_gauge(raw, "workers_clients", "workers.clients", base_tags)
          self._submit_gauge(raw, "work_queue_length", "work_queue.length", base_tags)
          self._submit_monotonic_count(raw, "query_wall", "query_wall.total_seconds", base_tags, as_float=True)
          self._submit_gauge(raw, "avg_query_wall", "avg_query_wall.seconds", base_tags, as_float=True)

          cluster_prefix = f"cluster_{cluster_name}_"
          node_state_comment = raw.get(f"{cluster_prefix}node_state")
          sc_tags = list(base_tags)
          if node_state_comment:
              healthy = node_state_comment.lower() in ("synced", "healthy", "ok")
              self.service_check("cluster.node_state", self.OK if healthy else self.CRITICAL, tags=sc_tags)
          else:
              self.service_check("cluster.node_state", self.UNKNOWN, tags=sc_tags)
          self._submit_gauge(raw, f"{cluster_prefix}local_send_queue", "cluster.send_queue", base_tags)
          self._submit_gauge(raw, f"{cluster_prefix}recv_queue", "cluster.recv_queue", base_tags)

      def _metric(self, name: str) -> str:
          return name

      def _submit_gauge(self, raw: Dict[str, str], key: str, metric: str, tags: List[str], as_float: bool = False):
          if key not in raw:
              return None
          val = to_float(raw[key]) if as_float else to_int(raw[key])
          self.gauge(self._metric(metric), float(val), tags=tags)
          return val

      def _submit_monotonic_count(self, raw: Dict[str, str], key: str, metric: str, tags: List[str], as_float: bool = False):
          if key not in raw:
              return None
          val = to_float(raw[key]) if as_float else to_int(raw[key])
          self.count(self._metric(metric), float(val), tags=tags)
          return val

Run the check manually:

# Agent v7+
datadog-agent check custom_manticore

Metrics Collected

You will see an output of the collected metrics, in this case:

Metric Description
manticore.uptime Seconds searchd has been running
manticore.connections Total client connections since startup
manticore.maxed_out Times max concurrent connections was hit
manticore.queries Number of queries served since startup
manticore.workers.total Worker threads total
manticore.workers.active Worker threads active
manticore.workers.clients Worker threads serving clients
manticore.work_queue.length Queries waiting in the internal queue
manticore.query_wall.total_seconds Total wall-clock time spent on queries
manticore.cluster.node_state Cluster node state (service check)
manticore.cluster.send_queue Replication send queue length
manticore.cluster.recv_queue Replication receive queue length

Then open Datadog Live Metrics and query searchd.queries{service:manticore}.

If datadog_checks import fails, ensure this runs inside the Agent Python runtime; do not run standalone. If searchd requires sudo or a specific PATH, set binary to the full path in conf.yaml.

Dashboard notes

Now we can finally build our dashboard and monitors!

Datadog dashboard example

  • Graph overlay manticore.avg_query_wall.seconds.
  • Add worker utilization (active/total) and manticore.work_queue.length.
  • Add replication lag manticore.cluster.replication_lag and send/recv queues.
  • Add service check widgets for manticore.cluster.node_state and manticore.cluster.role.

This alone will allow Watchdog to alert us on outliers! Automatically. We have also added a few monitors, e.g. to be alerted if the query wall time is over 200ms.

In addition, we can now much better understand what is going with the system and draw conclusions in comparison to other parts of our platform when looking at the metrics over time.

Agentless Custom Metrics: Send Directly from Your Application

Not running the Datadog Agent, or want to push metrics directly from your app?
You can send custom metrics to Datadog using the Datadog Metrics API.

Example: Sending a custom metric with Python

import requests
import time

api_key = "<YOUR_DATADOG_API_KEY>"
url = "https://api.datadoghq.com/api/v2/series"

headers = {
  "Content-Type": "application/json",
  "DD-API-KEY": api_key,
}

payload = {
  "series": [
    {
      "metric": "myapp.custom.metric",
      "type": "gauge",
      "points": [[int(time.time()), 42]],
      "tags": ["env:prod", "service:myapp"],
    }
  ]
}

response = requests.post(url, json=payload, headers=headers)
print(response.status_code, response.text)

You can use any language or framework that can make HTTP requests.
See the API docs for more details and examples.

Pro tips: - Use meaningful metric names and tags for easy dashboarding. - You can send gauges, counts, rates, and more. - Metrics sent this way are subject to custom metrics billing.