Nagios Core - Monitoring with JSON RPC

This error message from the REST API

WARNING - (W) The deprecated REST API was used to authenticate 23 times in the last 24 hours from the following IP addresses:10.130.100.41.The REST API will be removed in version 26.04. To avoid service disruption, migrate any remaining integrations to the supported JSON-RPC 2.0 over WebSocket API before upgrading. For migration guidance, see the documentation.

prompted me to slightly modify my Python script for Nagios. Enjoy using it! :slight_smile:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import json
import ssl
import asyncio
import websockets  # pip install websockets


class JsonRpcError(Exception):
    pass


class TrueNASJsonRpcClient:
    def __init__(self, hostname, api_key, use_ssl=True, verify_cert=True, debug=False):
        self.hostname = hostname
        self.api_key = api_key
        self.use_ssl = use_ssl
        self.verify_cert = verify_cert
        self.debug = debug
        self._ws = None
        self._id = 0

    def _make_uri(self):
        scheme = "wss" if self.use_ssl else "ws"
        return f"{scheme}://{self.hostname}/api/current"

    async def _connect(self):
        uri = self._make_uri()
        ssl_ctx = None
        if self.use_ssl:
            ssl_ctx = ssl.create_default_context()
            if not self.verify_cert:
                ssl_ctx.check_hostname = False
                ssl_ctx.verify_mode = ssl.CERT_NONE

        if self.debug:
            print(f"Connecting to {uri}", file=sys.stderr)

        self._ws = await websockets.connect(uri, ssl=ssl_ctx)

    async def _send(self, method, params):
        self._id += 1
        req_id = self._id
        msg = {
            "jsonrpc": "2.0",
            "id": req_id,
            "method": method,
            "params": params,
        }
        if self.debug:
            print(f"--> {json.dumps(msg)}", file=sys.stderr)
        await self._ws.send(json.dumps(msg))

        while True:
            raw = await self._ws.recv()
            if self.debug:
                print(f"<-- {raw}", file=sys.stderr)
            resp = json.loads(raw)
            if resp.get("id") != req_id:
                # Notification or unrelated response – ignore
                continue
            if resp.get("error"):
                raise JsonRpcError(resp["error"])
            return resp.get("result")

    async def login(self):
        await self._connect()
        result = await self._send("auth.login_with_api_key", [self.api_key])
        if result is False:
            raise JsonRpcError({"message": "Invalid API key"})

    async def call(self, method, params=None):
        if params is None:
            params = []
        if self._ws is None:
            await self.login()
        return await self._send(method, params)

    async def close(self):
        if self._ws is not None:
            await self._ws.close()
            self._ws = None


async def check_alerts(hostname, api_key,
                       ignore_dismissed=False,
                       use_ssl=True, verify_cert=True,
                       debug=False):

    client = TrueNASJsonRpcClient(
        hostname, api_key,
        use_ssl=use_ssl,
        verify_cert=verify_cert,
        debug=debug,
    )
    try:
        await client.login()
        # alert.list does not take any parameters
        alerts = await client.call("alert.list")

        warn = 0
        crit = 0
        critical_messages = ""
        warning_messages = ""

        for alert in alerts:
            if ignore_dismissed and alert.get("dismissed") is True:
                continue
            level = alert.get("level")
            msg = alert.get("formatted", "").replace("\n", ". ")
            if level == "CRITICAL":
                crit += 1
                critical_messages += f"- (C) {msg} "
            elif level == "WARNING":
                warn += 1
                warning_messages += f"- (W) {msg} "

        if crit > 0:
            print("CRITICAL " + critical_messages + warning_messages)
            sys.exit(2)
        elif warn > 0:
            print("WARNING " + warning_messages)
            sys.exit(1)
        else:
            print("OK - No problem alerts")
            sys.exit(0)

    except Exception as e:
        print(f"UNKNOWN - check_alerts() - Error when contacting TrueNAS server: {e}")
        sys.exit(3)
    finally:
        await client.close()


async def check_repl(hostname, api_key,
                     use_ssl=True, verify_cert=True,
                     debug=False):

    client = TrueNASJsonRpcClient(
        hostname, api_key,
        use_ssl=use_ssl,
        verify_cert=verify_cert,
        debug=debug,
    )
    try:
        await client.login()

        replications = await client.call("replication.query")

        errors = 0
        msg = ""
        replications_examined = ""

        for repl in replications:
            repl_name = repl.get("name")
            repl_state_obj = repl.get("state", {})
            repl_state_code = repl_state_obj.get("state")

            replications_examined += f" {repl_name}: {repl_state_code}"

            repl_was_not_success = repl_state_code != "FINISHED"
            repl_not_running = repl_state_code != "RUNNING"

            if repl_was_not_success and repl_not_running:
                errors += 1
                msg += f"{repl_name}: {repl_state_code} "

        if errors > 0:
            print(
                f"WARNING - There are {errors} replication errors "
                f"[{msg.strip()}]. Go to Storage > Replication Tasks > "
                f"View Replication Tasks in TrueNAS for more details."
            )
            sys.exit(1)
        else:
            print(
                "OK - No replication errors. "
                f"Replications examined:{replications_examined}"
            )
            sys.exit(0)

    except Exception as e:
        print(
            "UNKNOWN - check_repl() - Error when contacting TrueNAS server: "
            + str(e)
        )
        sys.exit(3)
    finally:
        await client.close()


async def check_update(hostname, api_key,
                       use_ssl=True, verify_cert=True,
                       debug=False):

    client = TrueNASJsonRpcClient(
        hostname, api_key,
        use_ssl=use_ssl,
        verify_cert=verify_cert,
        debug=debug,
    )
    try:
        await client.login()

        result = await client.call("update.check_available")

        status = result.get("status")
        status_map = {
            "UNAVAILABLE": "no update available",
            "AVAILABLE": "an update is available",
            "REBOOT_REQUIRED": "an update has already been applied",
            "HA_UNAVAILABLE": "HA is non-functional",
        }

        if status != "UNAVAILABLE":
            text = status_map.get(status, "unknown status")
            print(
                f"WARNING - Update status: {status} ({text}). "
                "An update might be required. Go to the TrueNAS "
                "Dashboard -> System -> Update to check for a newer version."
            )
            sys.exit(1)
        else:
            text = status_map[status]
            print(f"OK - Update status: {status} ({text})")
            sys.exit(0)

    except Exception as e:
        print(
            "UNKNOWN - check_update() - Error when contacting TrueNAS server: "
            + str(e)
        )
        sys.exit(3)
    finally:
        await client.close()


async def check_zpool(hostname, api_key, zpool_name="all",
                      use_ssl=True, verify_cert=True,
                      debug=False):

    client = TrueNASJsonRpcClient(
        hostname, api_key,
        use_ssl=use_ssl,
        verify_cert=verify_cert,
        debug=debug,
    )
    try:
        await client.login()

        pools = await client.call("pool.query")

        warn = 0
        crit = 0
        critical_messages = ""
        warning_messages = ""
        zpools_examined = ""
        actual_zpool_count = 0
        all_pool_names = ""

        looking_for_all = (zpool_name.lower() == "all")

        for pool in pools:
            actual_zpool_count += 1
            name = pool.get("name")
            status = pool.get("status")
            all_pool_names += f"{name} "

            if looking_for_all or name == zpool_name:
                zpools_examined += f" {name}"
                if status != "ONLINE":
                    crit += 1
                    critical_messages += f"- (C) ZPool {name} is {status} "

        if zpools_examined == "" and actual_zpool_count > 0 \
           and not looking_for_all and crit == 0:
            crit += 1
            critical_messages = (
                f"- No zpools found matching {zpool_name} out of "
                f"{actual_zpool_count} pools ({all_pool_names})"
            )

        if crit > 0:
            print("CRITICAL " + critical_messages + warning_messages)
            sys.exit(2)
        elif warn > 0:
            print("WARNING " + warning_messages)
            sys.exit(1)
        else:
            if zpools_examined == "":
                zpools_examined = "(None - no zpools found)"
            print(
                "OK - No pool health issues. Zpools examined:"
                + zpools_examined
            )
            sys.exit(0)

    except Exception as e:
        print(
            "UNKNOWN - check_zpool() - Error when contacting TrueNAS server: "
            + str(e)
        )
        sys.exit(3)
    finally:
        await client.close()


async def check_zpool_capacity(hostname, api_key,
                               zpool_name="all",
                               warn_percent=80,
                               crit_percent=90,
                               show_perfdata=False,
                               use_ssl=True, verify_cert=True,
                               debug=False):

    client = TrueNASJsonRpcClient(
        hostname, api_key,
        use_ssl=use_ssl,
        verify_cert=verify_cert,
        debug=debug,
    )
    BYTES_IN_MEGABYTE = 1024 * 1024

    try:
        await client.login()

        # Equivalent to the old pool/dataset request with flat=False
        payload = [
            [],  # filters
            {    # options
                "extra": {
                    "flat": False,
                },
            },
        ]
        dataset_results = await client.call("pool.dataset.query", payload)

        warn = 0
        crit = 0
        critical_messages = ""
        warning_messages = ""
        zpools_ok = ""
        root_level_datasets_examined = ""
        root_level_dataset_count = 0
        all_root_level_dataset_names = ""
        perfdata = ""
        if show_perfdata:
            perfdata = ";|"

        looking_for_all_pools = (zpool_name.lower() == "all")

        # Map: pool name -> capacity info
        class ZpoolCapacity:
            def __init__(self, name, avail_bytes, used_bytes):
                self.ZpoolName = name
                self.ZpoolAvailableBytes = avail_bytes
                self.TotalUsedBytesForAllDatasets = used_bytes

        zpool_caps = {}

        # Aggregate capacity per pool
        for dataset in dataset_results:
            root_level_dataset_count += 1
            dataset_name = dataset["name"]
            dataset_pool_name = dataset["pool"]
            all_root_level_dataset_names += dataset_name + " "

            if looking_for_all_pools or zpool_name == dataset_pool_name:
                root_level_datasets_examined += " " + dataset_name

                used_bytes = dataset["used"]["parsed"]
                avail_bytes = dataset["available"]["parsed"]

                if dataset_pool_name not in zpool_caps:
                    zpool_caps[dataset_pool_name] = ZpoolCapacity(
                        dataset_pool_name, avail_bytes, used_bytes
                    )
                else:
                    zpool_caps[dataset_pool_name].TotalUsedBytesForAllDatasets += used_bytes

        if (root_level_datasets_examined == "" and
                root_level_dataset_count > 0 and
                not looking_for_all_pools and crit == 0):
            crit += 1
            critical_messages = (
                f"- No datasets found matching zpool {zpool_name} out of "
                f"{root_level_dataset_count} root level datasets "
                f"({all_root_level_dataset_names})"
            )

        # Calculate usage percentage per pool
        for cap in zpool_caps.values():
            total_bytes = cap.ZpoolAvailableBytes + cap.TotalUsedBytesForAllDatasets
            if total_bytes == 0:
                continue
            used_pct = (cap.TotalUsedBytesForAllDatasets / total_bytes) * 100.0
            usage_str = f"{used_pct:3.1f}"

            if used_pct >= crit_percent:
                crit += 1
                critical_messages += (
                    f" - Pool {cap.ZpoolName} usage {usage_str}% "
                    f"exceeds critical threshold of {crit_percent}%"
                )
            elif used_pct >= warn_percent:
                warn += 1
                warning_messages += (
                    f" - Pool {cap.ZpoolName} usage {usage_str}% "
                    f"exceeds warning threshold of {warn_percent}%"
                )
            else:
                if len(zpools_ok) > 0:
                    zpools_ok += " - "
                zpools_ok += f"{cap.ZpoolName} ({usage_str}% used)"

            if show_perfdata:
                used_mb = cap.TotalUsedBytesForAllDatasets / BYTES_IN_MEGABYTE
                warn_bytes = total_bytes * (warn_percent / 100.0)
                crit_bytes = total_bytes * (crit_percent / 100.0)
                total_mb = total_bytes / BYTES_IN_MEGABYTE

                perfdata += (
                    f" {cap.ZpoolName}={used_mb:3.2f}MB;"
                    f"{warn_bytes / BYTES_IN_MEGABYTE:3.2f};"
                    f"{crit_bytes / BYTES_IN_MEGABYTE:3.2f};0;"
                    f"{total_mb:3.2f}"
                )

        divider = ""
        if len(zpools_ok) > 0:
            divider = " - "

        if crit > 0:
            print(
                "CRITICAL" + critical_messages + warning_messages +
                divider + zpools_ok + perfdata
            )
            sys.exit(2)
        elif warn > 0:
            print(
                "WARNING" + warning_messages +
                divider + zpools_ok + perfdata
            )
            sys.exit(1)
        else:
            if zpools_ok == "":
                zpools_ok = "(No zpool capacity issues)"
            print(
                "OK - No zpool capacity issues. Zpools examined: "
                + zpools_ok + " - Root level datasets examined:" +
                root_level_datasets_examined + perfdata
            )
            sys.exit(0)

    except Exception as e:
        print(
            "UNKNOWN - check_zpool_capacity() - Error when contacting "
            f"TrueNAS server: {e}"
        )
        sys.exit(3)
    finally:
        await client.close()


if __name__ == "__main__":
    if len(sys.argv) < 4:
        print(
            f"Usage: {sys.argv[0]} <host> <api_key> "
            "<mode:alerts|repl|update|zpool|zpool_capacity> "
            "[zpool_name] [warn%] [crit%]",
            file=sys.stderr,
        )
        sys.exit(1)

    host = sys.argv[1]
    key = sys.argv[2]
    mode = sys.argv[3]

    # Default values for zpool / zpool_capacity
    zname = "all"
    if len(sys.argv) > 4:
        zname = sys.argv[4]

    if mode == "alerts":
        asyncio.run(
            check_alerts(
                host,
                key,
                ignore_dismissed=False,
                use_ssl=True,
                verify_cert=False,
                debug=False,
            )
        )
    elif mode == "repl":
        asyncio.run(
            check_repl(
                host,
                key,
                use_ssl=True,
                verify_cert=False,
                debug=False,
            )
        )
    elif mode == "update":
        asyncio.run(
            check_update(
                host,
                key,
                use_ssl=True,
                verify_cert=False,
                debug=False,
            )
        )
    elif mode == "zpool":
        asyncio.run(
            check_zpool(
                host,
                key,
                zpool_name=zname,
                use_ssl=True,
                verify_cert=False,
                debug=False,
            )
        )
    elif mode == "zpool_capacity":
        warn_pct = 80
        crit_pct = 90
        if len(sys.argv) > 5:
            warn_pct = int(sys.argv[5])
        if len(sys.argv) > 6:
            crit_pct = int(sys.argv[6])
        asyncio.run(
            check_zpool_capacity(
                host,
                key,
                zpool_name=zname,
                warn_percent=warn_pct,
                crit_percent=crit_pct,
                show_perfdata=True,
                use_ssl=True,
                verify_cert=False,
                debug=False,
            )
        )
    else:
        print("Unknown mode, use alerts|repl|update|zpool|zpool_capacity")
        sys.exit(3)

#############

Nagios Command:

define command {
command_name    ugreen_nas_check_nascommand_line                   /usr/bin/python3 $USER1$/check_truenas_extended_play_jsonrpc.py $HOSTADDRESS$ $USER22$ $ARG1$ $ARG2$ $ARG3$ $ARG4$
}

$USER22$=API Key

################
alerts | repl | zpool | zpool_capacity (all,80,90)

define service {
use generic-service
host_name NAS
service_description TrueNAS Replication Check
check_command ugreen_nas_check_nas!repl
notifications_enabled 1
check_interval 5
check_period 24x7
notification_period 24x7
}

###########

Regards,

Stefan

Just one quick remark, I would rewrite your comments from German to English, so that English speaking ppl can understand what you mean by e.g.
“# Notification o.ä. – ignorieren”