Compare commits

...

4 Commits

Author SHA1 Message Date
illustris
14db1fa68c
Bump version to 1.2.0 2024-09-09 01:02:02 +05:30
illustris
9745364a72
collect storage info, size and free space 2024-09-09 00:53:59 +05:30
illustris
2eb85eed75
collect disk size 2024-09-09 00:53:41 +05:30
illustris
07c07a6b7d
bump inputs 2024-09-09 00:53:41 +05:30
6 changed files with 192 additions and 20 deletions

12
flake.lock generated
View File

@ -3,11 +3,11 @@
"debBundler": { "debBundler": {
"flake": false, "flake": false,
"locked": { "locked": {
"lastModified": 1705083181, "lastModified": 1725149456,
"narHash": "sha256-o1zolChrQk7UpMmFLjymjQWuoDIL1XStV56kuOprMDQ=", "narHash": "sha256-rRrSD7itoPm+VIT4bIzSupQ7jw+H4eOjxRiRA89Kxb4=",
"owner": "illustris", "owner": "illustris",
"repo": "flake", "repo": "flake",
"rev": "6a9df656834b5111f7ffb0b1f6d97a0d8700de58", "rev": "257a6c986cb9a67c4d6d0e0363507cab7f958b63",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -18,11 +18,11 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1704722960, "lastModified": 1725103162,
"narHash": "sha256-mKGJ3sPsT6//s+Knglai5YflJUF2DGj7Ai6Ynopz0kI=", "narHash": "sha256-Ym04C5+qovuQDYL/rKWSR+WESseQBbNAe5DsXNx5trY=",
"owner": "nixos", "owner": "nixos",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "317484b1ead87b9c1b8ac5261a8d2dd748a0492d", "rev": "12228ff1752d7b7624a54e9c1af4b222b3c1073b",
"type": "github" "type": "github"
}, },
"original": { "original": {

View File

@ -14,7 +14,7 @@ rec {
packages.x86_64-linux = with nixpkgs.legacyPackages.x86_64-linux; rec { packages.x86_64-linux = with nixpkgs.legacyPackages.x86_64-linux; rec {
pvemon = python3Packages.buildPythonApplication { pvemon = python3Packages.buildPythonApplication {
pname = "pvemon"; pname = "pvemon";
version = "1.1.6"; version = "1.2.0";
src = ./src; src = ./src;
propagatedBuildInputs = with python3Packages; [ propagatedBuildInputs = with python3Packages; [
pexpect pexpect

View File

@ -19,8 +19,11 @@ from concurrent.futures import ThreadPoolExecutor
from threading import Lock from threading import Lock
import pvecommon import pvecommon
import pvestorage
import qmblock import qmblock
import builtins
DEFAULT_PORT = 9116 DEFAULT_PORT = 9116
DEFAULT_INTERVAL = 10 DEFAULT_INTERVAL = 10
DEFAULT_PREFIX = "pve" DEFAULT_PREFIX = "pve"
@ -41,6 +44,8 @@ gauge_settings = [
('kvm_io_write_chars', 'Number of bytes written including buffers', ['id']), ('kvm_io_write_chars', 'Number of bytes written including buffers', ['id']),
('kvm_nic_queues', 'Number of queues in multiqueue config', ['id', 'ifname']), ('kvm_nic_queues', 'Number of queues in multiqueue config', ['id', 'ifname']),
('kvm_disk_size', 'Size of virtual disk', ['id', 'disk_name']),
] ]
label_flags = [ "-id", "-name", "-cpu" ] label_flags = [ "-id", "-name", "-cpu" ]
@ -67,12 +72,14 @@ def parse_mem(cmdline):
return ret return ret
def create_or_get_gauge(metric_name, labels, dynamic_gauges, gauge_lock): def create_or_get_gauge(metric_name, labels, dynamic_gauges, gauge_lock):
logging.debug(f"create_or_get_gauge({metric_name=}, labels={str(labels)}")
with gauge_lock: with gauge_lock:
if metric_name not in dynamic_gauges: if metric_name not in dynamic_gauges:
dynamic_gauges[metric_name] = GaugeMetricFamily(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels=labels) dynamic_gauges[metric_name] = GaugeMetricFamily(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels=labels)
return dynamic_gauges[metric_name] return dynamic_gauges[metric_name]
def create_or_get_info(info_name, labels, dynamic_infos, info_lock): def create_or_get_info(info_name, labels, dynamic_infos, info_lock):
logging.debug(f"create_or_get_info({info_name=}, labels={str(labels)}")
with info_lock: with info_lock:
if (info_name,str(labels)) not in dynamic_infos: if (info_name,str(labels)) not in dynamic_infos:
dynamic_infos[(info_name,str(labels))] = InfoMetricFamily(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels=labels) dynamic_infos[(info_name,str(labels))] = InfoMetricFamily(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels=labels)
@ -163,6 +170,8 @@ def collect_kvm_metrics():
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue continue
# Try to find ID to pool mapping here
for proc, cmdline, id in procs: for proc, cmdline, id in procs:
# Extract vm labels from cmdline # Extract vm labels from cmdline
info_label_dict = {get_label_name(l): flag_to_label_value(cmdline,l) for l in label_flags} info_label_dict = {get_label_name(l): flag_to_label_value(cmdline,l) for l in label_flags}
@ -221,6 +230,11 @@ def collect_kvm_metrics():
disk_labels = {"id": id, "disk_name": disk_name} disk_labels = {"id": id, "disk_name": disk_name}
prom_disk_info = create_or_get_info("kvm_disk", disk_labels.keys(), dynamic_infos, info_lock) prom_disk_info = create_or_get_info("kvm_disk", disk_labels.keys(), dynamic_infos, info_lock)
prom_disk_info.add_metric(disk_labels.values(), disk_info) prom_disk_info.add_metric(disk_labels.values(), disk_info)
disk_size = qmblock.get_disk_size(disk_info["disk_path"], disk_info["disk_type"])
if disk_size == None and disk_info["disk_type"] != "qcow2":
logging.debug(f"collect_kvm_metrics: failed to get disk size for {disk_info=}")
else:
gauge_dict["kvm_disk_size"].add_metric([id, disk_name], qmblock.get_disk_size(disk_info["disk_path"], disk_info["disk_type"]))
list(executor.map(map_netstat_proc, [ proc[2] for proc in procs ])) list(executor.map(map_netstat_proc, [ proc[2] for proc in procs ]))
list(executor.map(map_disk_proc, [ proc[2] for proc in procs ])) list(executor.map(map_disk_proc, [ proc[2] for proc in procs ]))
@ -243,12 +257,16 @@ class PVECollector(object):
if cli_args.collect_running_vms.lower() == 'true': if cli_args.collect_running_vms.lower() == 'true':
for x in collect_kvm_metrics(): for x in collect_kvm_metrics():
yield x yield x
if cli_args.collect_storage.lower() == 'true':
for x in pvestorage.collect_storage_metrics():
yield x
def main(): def main():
parser = argparse.ArgumentParser(description='PVE metrics exporter for Prometheus') parser = argparse.ArgumentParser(description='PVE metrics exporter for Prometheus')
parser.add_argument('--port', type=int, default=DEFAULT_PORT, help='Port for the exporter to listen on') parser.add_argument('--port', type=int, default=DEFAULT_PORT, help='Port for the exporter to listen on')
parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL, help='THIS OPTION DOES NOTHING') parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL, help='THIS OPTION DOES NOTHING')
parser.add_argument('--collect-running-vms', type=str, default='true', help='Enable or disable collecting running VMs metric (true/false)') parser.add_argument('--collect-running-vms', type=str, default='true', help='Enable or disable collecting running VMs metric (true/false)')
parser.add_argument('--collect-storage', type=str, default='true', help='Enable or disable collecting storage info (true/false)')
parser.add_argument('--metrics-prefix', type=str, default=DEFAULT_PREFIX, help='<prefix>_ will be prepended to each metric name') parser.add_argument('--metrics-prefix', type=str, default=DEFAULT_PREFIX, help='<prefix>_ will be prepended to each metric name')
parser.add_argument('--loglevel', type=str, default='INFO', help='Set log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)') parser.add_argument('--loglevel', type=str, default='INFO', help='Set log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)')
parser.add_argument('--profile', type=str, default='false', help='collect metrics once, and print profiling stats') parser.add_argument('--profile', type=str, default='false', help='collect metrics once, and print profiling stats')
@ -257,23 +275,22 @@ def main():
parser.add_argument('--qm-rand', type=int, default=60, help='randomize qm monitor cache expiry') parser.add_argument('--qm-rand', type=int, default=60, help='randomize qm monitor cache expiry')
parser.add_argument('--qm-monitor-defer-close', type=str, default="true", help='defer and retry closing unresponsive qm monitor sessions') parser.add_argument('--qm-monitor-defer-close', type=str, default="true", help='defer and retry closing unresponsive qm monitor sessions')
args = parser.parse_args() # hack to access cli_args across modules
global cli_args builtins.cli_args = parser.parse_args()
cli_args = args
loglevel = getattr(logging, args.loglevel.upper(), None) loglevel = getattr(logging, cli_args.loglevel.upper(), None)
if not isinstance(loglevel, int): if not isinstance(loglevel, int):
raise ValueError(f'Invalid log level: {args.loglevel}') raise ValueError(f'Invalid log level: {cli_args.loglevel}')
logging.basicConfig(level=loglevel,format='%(asctime)s: %(message)s') logging.basicConfig(level=loglevel,format='%(asctime)s: %(message)s')
global prefix global prefix
prefix = args.metrics_prefix prefix = cli_args.metrics_prefix
pvecommon.global_qm_timeout = args.qm_terminal_timeout pvecommon.global_qm_timeout = cli_args.qm_terminal_timeout
pvecommon.qm_max_ttl = args.qm_max_ttl pvecommon.qm_max_ttl = cli_args.qm_max_ttl
pvecommon.qm_rand = args.qm_rand pvecommon.qm_rand = cli_args.qm_rand
pvecommon.qm_monitor_defer_close = args.qm_monitor_defer_close pvecommon.qm_monitor_defer_close = cli_args.qm_monitor_defer_close
if args.profile.lower() == 'true': if cli_args.profile.lower() == 'true':
profiler = cProfile.Profile() profiler = cProfile.Profile()
profiler.enable() profiler.enable()
collect_kvm_metrics() collect_kvm_metrics()
@ -282,7 +299,7 @@ def main():
return return
else: else:
REGISTRY.register(PVECollector()) REGISTRY.register(PVECollector())
start_http_server(args.port) start_http_server(cli_args.port)
while True: while True:
time.sleep(100) time.sleep(100)

134
src/pvestorage/__init__.py Normal file
View File

@ -0,0 +1,134 @@
import os
import re
import logging
import pprint
from prometheus_client.core import InfoMetricFamily, GaugeMetricFamily, CounterMetricFamily, REGISTRY
gauge_settings = [
('node_storage_size', 'Size of the storage pool. This number is inaccurate for ZFS.', ['name', 'type']),
('node_storage_free', 'Free space on the storage pool', ['name', 'type'])
]
info_settings = [
('node_storage', 'information for each PVE storage'),
]
# Sanitize the key to match Prometheus label requirements
# Replace any character that is not a letter, digit, or underscore with an underscore
sanitize_key = lambda key: re.sub(r"[^a-zA-Z0-9_]", "_", key)
_cached_storage_data = None
_cached_mtime = None
def parse_storage_cfg(file_path='/etc/pve/storage.cfg'):
logging.debug(f"parse_storage_cfg({file_path=}) called")
global _cached_storage_data, _cached_mtime
# Check if file exists
if not os.path.exists(file_path):
raise FileNotFoundError(f"The file {file_path} does not exist.")
# Get the file's modification time
current_mtime = os.path.getmtime(file_path)
# If the data is already cached and the file hasn't changed, return the cached data
if _cached_storage_data is not None and _cached_mtime == current_mtime:
logging.debug("parse_storage_cfg: returning cached data")
return _cached_storage_data
logging.debug("parse_storage_cfg: file modified, dropping cache")
# Initialize list to store storages
storage_list = []
current_storage = None
with open(file_path, 'r') as file:
for line in file:
line = line.strip()
if not line or line.startswith("#"):
# Ignore empty lines or comments
continue
if ":" in line:
# If we were processing a previous section, append it to the list
if current_storage:
storage_list.append(current_storage)
# Start a new storage definition
section_type, section_name = line.split(":", 1)
current_storage = {
'type': sanitize_key(section_type.strip()), # Sanitize the section type
'name': sanitize_key(section_name.strip()), # Sanitize the section name
}
else:
# Parse key-value pairs within the current storage
if current_storage:
key, value = line.split(None, 1)
sanitized_key = sanitize_key(key.strip())
current_storage[sanitized_key] = value.strip()
# Append the last storage section to the list if any
if current_storage:
storage_list.append(current_storage)
# Update the cache
_cached_storage_data = storage_list
_cached_mtime = current_mtime
return storage_list
def get_storage_size(storage):
try:
if storage["type"] in ["dir", "nfs", "cephfs", "zfspool"]:
if storage["type"] == "zfspool":
path = storage["mountpoint"]
else:
path = storage["path"]
# Get filesystem statistics
stats = os.statvfs(path)
# Calculate total size and free space in bytes
# TODO: find an alternative way to calculate total_size for ZFS
total_size = stats.f_frsize * stats.f_blocks
free_space = stats.f_frsize * stats.f_bavail
return {
"total": total_size,
"free": free_space
}
# TODO: handle lvmthin
# could parse /etc/lvm/backup/<vg-name> to collect this data
# TODO: handle rbd
except Exception as e:
logging.warn(f"get_storage_size: unknown error, {storage=}, error: {e}")
# Return None if the case is not handled
return None
def collect_storage_metrics():
logging.debug("collect_storage_metrics() called")
gauge_dict = {}
info_dict = {}
prefix = cli_args.metrics_prefix
for name, description, labels in gauge_settings:
gauge_dict[name] = GaugeMetricFamily(f"{prefix}_{name}", description, labels=labels)
for name, description in info_settings:
info_dict[name] = InfoMetricFamily(f"{prefix}_{name}", description)
storage_pools = parse_storage_cfg()
for storage in storage_pools:
info_dict["node_storage"].add_metric([], storage)
size = get_storage_size(storage)
if size != None:
gauge_dict["node_storage_size"].add_metric([storage["name"], storage["type"]], size["total"])
gauge_dict["node_storage_free"].add_metric([storage["name"], storage["type"]], size["free"])
for v in info_dict.values():
yield v
for v in gauge_dict.values():
yield v
logging.debug("collect_storage_metrics() return")

View File

@ -2,6 +2,7 @@ import pexpect
import re import re
import os import os
import json import json
import stat
import pvecommon import pvecommon
@ -66,6 +67,8 @@ def extract_disk_info_from_monitor(vm_id, retries = 0):
disks_map[disk_name]["disk_type"] = "rbd" disks_map[disk_name]["disk_type"] = "rbd"
rbd_parts = disk_path.split('/') rbd_parts = disk_path.split('/')
disks_map[disk_name]["cluster_id"] = rbd_parts[-3] disks_map[disk_name]["cluster_id"] = rbd_parts[-3]
disks_map[disk_name]["pool"] = rbd_parts[-2]
# Keeping for backwards compatibility
disks_map[disk_name]["pool_name"] = rbd_parts[-2] disks_map[disk_name]["pool_name"] = rbd_parts[-2]
disks_map[disk_name]["vol_name"] = rbd_parts[-1] disks_map[disk_name]["vol_name"] = rbd_parts[-1]
disks_map[disk_name]["device"] = get_device(disk_path) disks_map[disk_name]["device"] = get_device(disk_path)
@ -94,6 +97,24 @@ def extract_disk_info_from_monitor(vm_id, retries = 0):
disks_map[disk_name]["detect_zeroes"] = "on" disks_map[disk_name]["detect_zeroes"] = "on"
return disks_map return disks_map
def get_disk_size(disk_path, disk_type):
if stat.S_ISBLK(os.stat(disk_path).st_mode):
disk_name = os.path.basename(os.path.realpath(disk_path))
size_file_path = f"/sys/block/{disk_name}/size"
sector_size_file_path = f"/sys/block/{disk_name}/queue/hw_sector_size"
with open(size_file_path, 'r') as f:
sectors = int(f.read().strip())
with open(sector_size_file_path, 'r') as sector_size_file:
sector_size = int(sector_size_file.read().strip())
size_in_bytes = sectors * sector_size
else:
size_in_bytes = os.path.getsize(disk_path)
return size_in_bytes
if __name__ == "__main__": if __name__ == "__main__":
import json import json
import sys import sys

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name='pvemon', name='pvemon',
version = "1.1.6", version = "1.2.0",
packages=find_packages(), packages=find_packages(),
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [