This error message from the REST API
WARNING - (W) The deprecated REST API was used to authenticate 23 times in the last 24 hours from the following IP addresses:10.130.100.41.The REST API will be removed in version 26.04. To avoid service disruption, migrate any remaining integrations to the supported JSON-RPC 2.0 over WebSocket API before upgrading. For migration guidance, see the documentation.
prompted me to slightly modify my Python script for Nagios. Enjoy using it! ![]()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import json
import ssl
import asyncio
import websockets # pip install websockets
class JsonRpcError(Exception):
pass
class TrueNASJsonRpcClient:
def __init__(self, hostname, api_key, use_ssl=True, verify_cert=True, debug=False):
self.hostname = hostname
self.api_key = api_key
self.use_ssl = use_ssl
self.verify_cert = verify_cert
self.debug = debug
self._ws = None
self._id = 0
def _make_uri(self):
scheme = "wss" if self.use_ssl else "ws"
return f"{scheme}://{self.hostname}/api/current"
async def _connect(self):
uri = self._make_uri()
ssl_ctx = None
if self.use_ssl:
ssl_ctx = ssl.create_default_context()
if not self.verify_cert:
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
if self.debug:
print(f"Connecting to {uri}", file=sys.stderr)
self._ws = await websockets.connect(uri, ssl=ssl_ctx)
async def _send(self, method, params):
self._id += 1
req_id = self._id
msg = {
"jsonrpc": "2.0",
"id": req_id,
"method": method,
"params": params,
}
if self.debug:
print(f"--> {json.dumps(msg)}", file=sys.stderr)
await self._ws.send(json.dumps(msg))
while True:
raw = await self._ws.recv()
if self.debug:
print(f"<-- {raw}", file=sys.stderr)
resp = json.loads(raw)
if resp.get("id") != req_id:
# Notification or unrelated response – ignore
continue
if resp.get("error"):
raise JsonRpcError(resp["error"])
return resp.get("result")
async def login(self):
await self._connect()
result = await self._send("auth.login_with_api_key", [self.api_key])
if result is False:
raise JsonRpcError({"message": "Invalid API key"})
async def call(self, method, params=None):
if params is None:
params = []
if self._ws is None:
await self.login()
return await self._send(method, params)
async def close(self):
if self._ws is not None:
await self._ws.close()
self._ws = None
async def check_alerts(hostname, api_key,
ignore_dismissed=False,
use_ssl=True, verify_cert=True,
debug=False):
client = TrueNASJsonRpcClient(
hostname, api_key,
use_ssl=use_ssl,
verify_cert=verify_cert,
debug=debug,
)
try:
await client.login()
# alert.list does not take any parameters
alerts = await client.call("alert.list")
warn = 0
crit = 0
critical_messages = ""
warning_messages = ""
for alert in alerts:
if ignore_dismissed and alert.get("dismissed") is True:
continue
level = alert.get("level")
msg = alert.get("formatted", "").replace("\n", ". ")
if level == "CRITICAL":
crit += 1
critical_messages += f"- (C) {msg} "
elif level == "WARNING":
warn += 1
warning_messages += f"- (W) {msg} "
if crit > 0:
print("CRITICAL " + critical_messages + warning_messages)
sys.exit(2)
elif warn > 0:
print("WARNING " + warning_messages)
sys.exit(1)
else:
print("OK - No problem alerts")
sys.exit(0)
except Exception as e:
print(f"UNKNOWN - check_alerts() - Error when contacting TrueNAS server: {e}")
sys.exit(3)
finally:
await client.close()
async def check_repl(hostname, api_key,
use_ssl=True, verify_cert=True,
debug=False):
client = TrueNASJsonRpcClient(
hostname, api_key,
use_ssl=use_ssl,
verify_cert=verify_cert,
debug=debug,
)
try:
await client.login()
replications = await client.call("replication.query")
errors = 0
msg = ""
replications_examined = ""
for repl in replications:
repl_name = repl.get("name")
repl_state_obj = repl.get("state", {})
repl_state_code = repl_state_obj.get("state")
replications_examined += f" {repl_name}: {repl_state_code}"
repl_was_not_success = repl_state_code != "FINISHED"
repl_not_running = repl_state_code != "RUNNING"
if repl_was_not_success and repl_not_running:
errors += 1
msg += f"{repl_name}: {repl_state_code} "
if errors > 0:
print(
f"WARNING - There are {errors} replication errors "
f"[{msg.strip()}]. Go to Storage > Replication Tasks > "
f"View Replication Tasks in TrueNAS for more details."
)
sys.exit(1)
else:
print(
"OK - No replication errors. "
f"Replications examined:{replications_examined}"
)
sys.exit(0)
except Exception as e:
print(
"UNKNOWN - check_repl() - Error when contacting TrueNAS server: "
+ str(e)
)
sys.exit(3)
finally:
await client.close()
async def check_update(hostname, api_key,
use_ssl=True, verify_cert=True,
debug=False):
client = TrueNASJsonRpcClient(
hostname, api_key,
use_ssl=use_ssl,
verify_cert=verify_cert,
debug=debug,
)
try:
await client.login()
result = await client.call("update.check_available")
status = result.get("status")
status_map = {
"UNAVAILABLE": "no update available",
"AVAILABLE": "an update is available",
"REBOOT_REQUIRED": "an update has already been applied",
"HA_UNAVAILABLE": "HA is non-functional",
}
if status != "UNAVAILABLE":
text = status_map.get(status, "unknown status")
print(
f"WARNING - Update status: {status} ({text}). "
"An update might be required. Go to the TrueNAS "
"Dashboard -> System -> Update to check for a newer version."
)
sys.exit(1)
else:
text = status_map[status]
print(f"OK - Update status: {status} ({text})")
sys.exit(0)
except Exception as e:
print(
"UNKNOWN - check_update() - Error when contacting TrueNAS server: "
+ str(e)
)
sys.exit(3)
finally:
await client.close()
async def check_zpool(hostname, api_key, zpool_name="all",
use_ssl=True, verify_cert=True,
debug=False):
client = TrueNASJsonRpcClient(
hostname, api_key,
use_ssl=use_ssl,
verify_cert=verify_cert,
debug=debug,
)
try:
await client.login()
pools = await client.call("pool.query")
warn = 0
crit = 0
critical_messages = ""
warning_messages = ""
zpools_examined = ""
actual_zpool_count = 0
all_pool_names = ""
looking_for_all = (zpool_name.lower() == "all")
for pool in pools:
actual_zpool_count += 1
name = pool.get("name")
status = pool.get("status")
all_pool_names += f"{name} "
if looking_for_all or name == zpool_name:
zpools_examined += f" {name}"
if status != "ONLINE":
crit += 1
critical_messages += f"- (C) ZPool {name} is {status} "
if zpools_examined == "" and actual_zpool_count > 0 \
and not looking_for_all and crit == 0:
crit += 1
critical_messages = (
f"- No zpools found matching {zpool_name} out of "
f"{actual_zpool_count} pools ({all_pool_names})"
)
if crit > 0:
print("CRITICAL " + critical_messages + warning_messages)
sys.exit(2)
elif warn > 0:
print("WARNING " + warning_messages)
sys.exit(1)
else:
if zpools_examined == "":
zpools_examined = "(None - no zpools found)"
print(
"OK - No pool health issues. Zpools examined:"
+ zpools_examined
)
sys.exit(0)
except Exception as e:
print(
"UNKNOWN - check_zpool() - Error when contacting TrueNAS server: "
+ str(e)
)
sys.exit(3)
finally:
await client.close()
async def check_zpool_capacity(hostname, api_key,
zpool_name="all",
warn_percent=80,
crit_percent=90,
show_perfdata=False,
use_ssl=True, verify_cert=True,
debug=False):
client = TrueNASJsonRpcClient(
hostname, api_key,
use_ssl=use_ssl,
verify_cert=verify_cert,
debug=debug,
)
BYTES_IN_MEGABYTE = 1024 * 1024
try:
await client.login()
# Equivalent to the old pool/dataset request with flat=False
payload = [
[], # filters
{ # options
"extra": {
"flat": False,
},
},
]
dataset_results = await client.call("pool.dataset.query", payload)
warn = 0
crit = 0
critical_messages = ""
warning_messages = ""
zpools_ok = ""
root_level_datasets_examined = ""
root_level_dataset_count = 0
all_root_level_dataset_names = ""
perfdata = ""
if show_perfdata:
perfdata = ";|"
looking_for_all_pools = (zpool_name.lower() == "all")
# Map: pool name -> capacity info
class ZpoolCapacity:
def __init__(self, name, avail_bytes, used_bytes):
self.ZpoolName = name
self.ZpoolAvailableBytes = avail_bytes
self.TotalUsedBytesForAllDatasets = used_bytes
zpool_caps = {}
# Aggregate capacity per pool
for dataset in dataset_results:
root_level_dataset_count += 1
dataset_name = dataset["name"]
dataset_pool_name = dataset["pool"]
all_root_level_dataset_names += dataset_name + " "
if looking_for_all_pools or zpool_name == dataset_pool_name:
root_level_datasets_examined += " " + dataset_name
used_bytes = dataset["used"]["parsed"]
avail_bytes = dataset["available"]["parsed"]
if dataset_pool_name not in zpool_caps:
zpool_caps[dataset_pool_name] = ZpoolCapacity(
dataset_pool_name, avail_bytes, used_bytes
)
else:
zpool_caps[dataset_pool_name].TotalUsedBytesForAllDatasets += used_bytes
if (root_level_datasets_examined == "" and
root_level_dataset_count > 0 and
not looking_for_all_pools and crit == 0):
crit += 1
critical_messages = (
f"- No datasets found matching zpool {zpool_name} out of "
f"{root_level_dataset_count} root level datasets "
f"({all_root_level_dataset_names})"
)
# Calculate usage percentage per pool
for cap in zpool_caps.values():
total_bytes = cap.ZpoolAvailableBytes + cap.TotalUsedBytesForAllDatasets
if total_bytes == 0:
continue
used_pct = (cap.TotalUsedBytesForAllDatasets / total_bytes) * 100.0
usage_str = f"{used_pct:3.1f}"
if used_pct >= crit_percent:
crit += 1
critical_messages += (
f" - Pool {cap.ZpoolName} usage {usage_str}% "
f"exceeds critical threshold of {crit_percent}%"
)
elif used_pct >= warn_percent:
warn += 1
warning_messages += (
f" - Pool {cap.ZpoolName} usage {usage_str}% "
f"exceeds warning threshold of {warn_percent}%"
)
else:
if len(zpools_ok) > 0:
zpools_ok += " - "
zpools_ok += f"{cap.ZpoolName} ({usage_str}% used)"
if show_perfdata:
used_mb = cap.TotalUsedBytesForAllDatasets / BYTES_IN_MEGABYTE
warn_bytes = total_bytes * (warn_percent / 100.0)
crit_bytes = total_bytes * (crit_percent / 100.0)
total_mb = total_bytes / BYTES_IN_MEGABYTE
perfdata += (
f" {cap.ZpoolName}={used_mb:3.2f}MB;"
f"{warn_bytes / BYTES_IN_MEGABYTE:3.2f};"
f"{crit_bytes / BYTES_IN_MEGABYTE:3.2f};0;"
f"{total_mb:3.2f}"
)
divider = ""
if len(zpools_ok) > 0:
divider = " - "
if crit > 0:
print(
"CRITICAL" + critical_messages + warning_messages +
divider + zpools_ok + perfdata
)
sys.exit(2)
elif warn > 0:
print(
"WARNING" + warning_messages +
divider + zpools_ok + perfdata
)
sys.exit(1)
else:
if zpools_ok == "":
zpools_ok = "(No zpool capacity issues)"
print(
"OK - No zpool capacity issues. Zpools examined: "
+ zpools_ok + " - Root level datasets examined:" +
root_level_datasets_examined + perfdata
)
sys.exit(0)
except Exception as e:
print(
"UNKNOWN - check_zpool_capacity() - Error when contacting "
f"TrueNAS server: {e}"
)
sys.exit(3)
finally:
await client.close()
if __name__ == "__main__":
if len(sys.argv) < 4:
print(
f"Usage: {sys.argv[0]} <host> <api_key> "
"<mode:alerts|repl|update|zpool|zpool_capacity> "
"[zpool_name] [warn%] [crit%]",
file=sys.stderr,
)
sys.exit(1)
host = sys.argv[1]
key = sys.argv[2]
mode = sys.argv[3]
# Default values for zpool / zpool_capacity
zname = "all"
if len(sys.argv) > 4:
zname = sys.argv[4]
if mode == "alerts":
asyncio.run(
check_alerts(
host,
key,
ignore_dismissed=False,
use_ssl=True,
verify_cert=False,
debug=False,
)
)
elif mode == "repl":
asyncio.run(
check_repl(
host,
key,
use_ssl=True,
verify_cert=False,
debug=False,
)
)
elif mode == "update":
asyncio.run(
check_update(
host,
key,
use_ssl=True,
verify_cert=False,
debug=False,
)
)
elif mode == "zpool":
asyncio.run(
check_zpool(
host,
key,
zpool_name=zname,
use_ssl=True,
verify_cert=False,
debug=False,
)
)
elif mode == "zpool_capacity":
warn_pct = 80
crit_pct = 90
if len(sys.argv) > 5:
warn_pct = int(sys.argv[5])
if len(sys.argv) > 6:
crit_pct = int(sys.argv[6])
asyncio.run(
check_zpool_capacity(
host,
key,
zpool_name=zname,
warn_percent=warn_pct,
crit_percent=crit_pct,
show_perfdata=True,
use_ssl=True,
verify_cert=False,
debug=False,
)
)
else:
print("Unknown mode, use alerts|repl|update|zpool|zpool_capacity")
sys.exit(3)
#############
Nagios Command:
define command {
command_name ugreen_nas_check_nascommand_line /usr/bin/python3 $USER1$/check_truenas_extended_play_jsonrpc.py $HOSTADDRESS$ $USER22$ $ARG1$ $ARG2$ $ARG3$ $ARG4$
}
$USER22$=API Key
################
alerts | repl | zpool | zpool_capacity (all,80,90)
define service {
use generic-service
host_name NAS
service_description TrueNAS Replication Check
check_command ugreen_nas_check_nas!repl
notifications_enabled 1
check_interval 5
check_period 24x7
notification_period 24x7
}
###########
Regards,
Stefan