diff --git a/src/pvemon/__init__.py b/src/pvemon/__init__.py index bb7e6fa..b612df6 100644 --- a/src/pvemon/__init__.py +++ b/src/pvemon/__init__.py @@ -19,8 +19,11 @@ from concurrent.futures import ThreadPoolExecutor from threading import Lock import pvecommon +import pvestorage import qmblock +import builtins + DEFAULT_PORT = 9116 DEFAULT_INTERVAL = 10 DEFAULT_PREFIX = "pve" @@ -69,12 +72,14 @@ def parse_mem(cmdline): return ret def create_or_get_gauge(metric_name, labels, dynamic_gauges, gauge_lock): + logging.debug(f"create_or_get_gauge({metric_name=}, labels={str(labels)}") with gauge_lock: if metric_name not in dynamic_gauges: dynamic_gauges[metric_name] = GaugeMetricFamily(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels=labels) return dynamic_gauges[metric_name] def create_or_get_info(info_name, labels, dynamic_infos, info_lock): + logging.debug(f"create_or_get_info({info_name=}, labels={str(labels)}") with info_lock: if (info_name,str(labels)) not in dynamic_infos: dynamic_infos[(info_name,str(labels))] = InfoMetricFamily(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels=labels) @@ -231,7 +236,6 @@ def collect_kvm_metrics(): else: gauge_dict["kvm_disk_size"].add_metric([id, disk_name], qmblock.get_disk_size(disk_info["disk_path"], disk_info["disk_type"])) - list(executor.map(map_netstat_proc, [ proc[2] for proc in procs ])) list(executor.map(map_disk_proc, [ proc[2] for proc in procs ])) @@ -253,12 +257,16 @@ class PVECollector(object): if cli_args.collect_running_vms.lower() == 'true': for x in collect_kvm_metrics(): yield x + if cli_args.collect_storage.lower() == 'true': + for x in pvestorage.collect_storage_metrics(): + yield x def main(): parser = argparse.ArgumentParser(description='PVE metrics exporter for Prometheus') parser.add_argument('--port', type=int, default=DEFAULT_PORT, help='Port for the exporter to listen on') parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL, help='THIS OPTION DOES NOTHING') parser.add_argument('--collect-running-vms', type=str, default='true', help='Enable or disable collecting running VMs metric (true/false)') + parser.add_argument('--collect-storage', type=str, default='true', help='Enable or disable collecting storage info (true/false)') parser.add_argument('--metrics-prefix', type=str, default=DEFAULT_PREFIX, help='_ will be prepended to each metric name') parser.add_argument('--loglevel', type=str, default='INFO', help='Set log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)') parser.add_argument('--profile', type=str, default='false', help='collect metrics once, and print profiling stats') @@ -267,23 +275,22 @@ def main(): parser.add_argument('--qm-rand', type=int, default=60, help='randomize qm monitor cache expiry') parser.add_argument('--qm-monitor-defer-close', type=str, default="true", help='defer and retry closing unresponsive qm monitor sessions') - args = parser.parse_args() - global cli_args - cli_args = args + # hack to access cli_args across modules + builtins.cli_args = parser.parse_args() - loglevel = getattr(logging, args.loglevel.upper(), None) + loglevel = getattr(logging, cli_args.loglevel.upper(), None) if not isinstance(loglevel, int): - raise ValueError(f'Invalid log level: {args.loglevel}') + raise ValueError(f'Invalid log level: {cli_args.loglevel}') logging.basicConfig(level=loglevel,format='%(asctime)s: %(message)s') global prefix - prefix = args.metrics_prefix - pvecommon.global_qm_timeout = args.qm_terminal_timeout - pvecommon.qm_max_ttl = args.qm_max_ttl - pvecommon.qm_rand = args.qm_rand - pvecommon.qm_monitor_defer_close = args.qm_monitor_defer_close + prefix = cli_args.metrics_prefix + pvecommon.global_qm_timeout = cli_args.qm_terminal_timeout + pvecommon.qm_max_ttl = cli_args.qm_max_ttl + pvecommon.qm_rand = cli_args.qm_rand + pvecommon.qm_monitor_defer_close = cli_args.qm_monitor_defer_close - if args.profile.lower() == 'true': + if cli_args.profile.lower() == 'true': profiler = cProfile.Profile() profiler.enable() collect_kvm_metrics() @@ -292,7 +299,7 @@ def main(): return else: REGISTRY.register(PVECollector()) - start_http_server(args.port) + start_http_server(cli_args.port) while True: time.sleep(100) diff --git a/src/pvestorage/__init__.py b/src/pvestorage/__init__.py new file mode 100644 index 0000000..e539707 --- /dev/null +++ b/src/pvestorage/__init__.py @@ -0,0 +1,134 @@ +import os +import re +import logging +import pprint + +from prometheus_client.core import InfoMetricFamily, GaugeMetricFamily, CounterMetricFamily, REGISTRY + +gauge_settings = [ + ('node_storage_size', 'Size of the storage pool. This number is inaccurate for ZFS.', ['name', 'type']), + ('node_storage_free', 'Free space on the storage pool', ['name', 'type']) +] + +info_settings = [ + ('node_storage', 'information for each PVE storage'), +] + +# Sanitize the key to match Prometheus label requirements +# Replace any character that is not a letter, digit, or underscore with an underscore +sanitize_key = lambda key: re.sub(r"[^a-zA-Z0-9_]", "_", key) + +_cached_storage_data = None +_cached_mtime = None + +def parse_storage_cfg(file_path='/etc/pve/storage.cfg'): + logging.debug(f"parse_storage_cfg({file_path=}) called") + global _cached_storage_data, _cached_mtime + + # Check if file exists + if not os.path.exists(file_path): + raise FileNotFoundError(f"The file {file_path} does not exist.") + + # Get the file's modification time + current_mtime = os.path.getmtime(file_path) + + # If the data is already cached and the file hasn't changed, return the cached data + if _cached_storage_data is not None and _cached_mtime == current_mtime: + logging.debug("parse_storage_cfg: returning cached data") + return _cached_storage_data + + logging.debug("parse_storage_cfg: file modified, dropping cache") + + # Initialize list to store storages + storage_list = [] + current_storage = None + + with open(file_path, 'r') as file: + for line in file: + line = line.strip() + + if not line or line.startswith("#"): + # Ignore empty lines or comments + continue + + if ":" in line: + # If we were processing a previous section, append it to the list + if current_storage: + storage_list.append(current_storage) + + # Start a new storage definition + section_type, section_name = line.split(":", 1) + current_storage = { + 'type': sanitize_key(section_type.strip()), # Sanitize the section type + 'name': sanitize_key(section_name.strip()), # Sanitize the section name + } + else: + # Parse key-value pairs within the current storage + if current_storage: + key, value = line.split(None, 1) + sanitized_key = sanitize_key(key.strip()) + current_storage[sanitized_key] = value.strip() + + # Append the last storage section to the list if any + if current_storage: + storage_list.append(current_storage) + + # Update the cache + _cached_storage_data = storage_list + _cached_mtime = current_mtime + + return storage_list + +def get_storage_size(storage): + try: + if storage["type"] in ["dir", "nfs", "cephfs", "zfspool"]: + if storage["type"] == "zfspool": + path = storage["mountpoint"] + else: + path = storage["path"] + # Get filesystem statistics + stats = os.statvfs(path) + # Calculate total size and free space in bytes + # TODO: find an alternative way to calculate total_size for ZFS + total_size = stats.f_frsize * stats.f_blocks + free_space = stats.f_frsize * stats.f_bavail + return { + "total": total_size, + "free": free_space + } + + # TODO: handle lvmthin + # could parse /etc/lvm/backup/ to collect this data + # TODO: handle rbd + + except Exception as e: + logging.warn(f"get_storage_size: unknown error, {storage=}, error: {e}") + + # Return None if the case is not handled + return None + +def collect_storage_metrics(): + logging.debug("collect_storage_metrics() called") + gauge_dict = {} + info_dict = {} + prefix = cli_args.metrics_prefix + for name, description, labels in gauge_settings: + gauge_dict[name] = GaugeMetricFamily(f"{prefix}_{name}", description, labels=labels) + + for name, description in info_settings: + info_dict[name] = InfoMetricFamily(f"{prefix}_{name}", description) + + storage_pools = parse_storage_cfg() + for storage in storage_pools: + info_dict["node_storage"].add_metric([], storage) + size = get_storage_size(storage) + if size != None: + gauge_dict["node_storage_size"].add_metric([storage["name"], storage["type"]], size["total"]) + gauge_dict["node_storage_free"].add_metric([storage["name"], storage["type"]], size["free"]) + + for v in info_dict.values(): + yield v + for v in gauge_dict.values(): + yield v + + logging.debug("collect_storage_metrics() return") diff --git a/src/qmblock/__init__.py b/src/qmblock/__init__.py index 641b16c..86c664c 100644 --- a/src/qmblock/__init__.py +++ b/src/qmblock/__init__.py @@ -67,6 +67,8 @@ def extract_disk_info_from_monitor(vm_id, retries = 0): disks_map[disk_name]["disk_type"] = "rbd" rbd_parts = disk_path.split('/') disks_map[disk_name]["cluster_id"] = rbd_parts[-3] + disks_map[disk_name]["pool"] = rbd_parts[-2] + # Keeping for backwards compatibility disks_map[disk_name]["pool_name"] = rbd_parts[-2] disks_map[disk_name]["vol_name"] = rbd_parts[-1] disks_map[disk_name]["device"] = get_device(disk_path)