diff --git a/src/pvemon/__init__.py b/src/pvemon/__init__.py index d23f827..4066a82 100644 --- a/src/pvemon/__init__.py +++ b/src/pvemon/__init__.py @@ -1,4 +1,8 @@ -from prometheus_client import start_http_server, Gauge, Info +# from prometheus_client import start_http_server, Gauge, Info, REGISTRY, Metric +from prometheus_client.core import InfoMetricFamily, GaugeMetricFamily, CounterMetricFamily, REGISTRY +from prometheus_client.registry import Collector +from prometheus_client import start_http_server + import psutil import time import argparse @@ -39,9 +43,6 @@ gauge_settings = [ ('kvm_nic_queues', 'Number of queues in multiqueue config', ['id', 'ifname']), ] -gauge_dict = {} -info_dict = {} - label_flags = [ "-id", "-name", "-cpu" ] get_label_name = lambda flag: flag[1:] info_settings = [ @@ -50,20 +51,16 @@ info_settings = [ flag_to_label_value = lambda args, match: next((args[i+1] for i, x in enumerate(args[:-1]) if x == match), "unknown").split(",")[0] -dynamic_gauges = {} -gauge_lock = Lock() # avoid race condition when checking and creating gauges -def create_or_get_gauge(metric_name, labels): +def create_or_get_gauge(metric_name, labels, dynamic_gauges, gauge_lock): with gauge_lock: if metric_name not in dynamic_gauges: - dynamic_gauges[metric_name] = Gauge(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels) + dynamic_gauges[metric_name] = GaugeMetricFamily(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels=labels) return dynamic_gauges[metric_name] -dynamic_infos = {} -info_lock = Lock() # avoid race condition when checking and creating infos -def create_or_get_info(info_name, labels): +def create_or_get_info(info_name, labels, dynamic_infos, info_lock): with info_lock: if (info_name,str(labels)) not in dynamic_infos: - dynamic_infos[(info_name,str(labels))] = Info(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels) + dynamic_infos[(info_name,str(labels))] = InfoMetricFamily(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels=labels) return dynamic_infos[(info_name,str(labels))] def get_memory_info(pid): @@ -117,6 +114,19 @@ def read_interface_stats(ifname): return stats def collect_kvm_metrics(): + gauge_dict = {} + info_dict = {} + for name, description, labels in gauge_settings: + gauge_dict[name] = GaugeMetricFamily(f"{prefix}_{name}", description, labels=labels) + + for name, description in info_settings: + info_dict[name] = InfoMetricFamily(f"{prefix}_{name}", description) + + dynamic_gauges = {} + gauge_lock = Lock() # avoid race condition when checking and creating gauges + dynamic_infos = {} + info_lock = Lock() # avoid race condition when checking and creating infos + procs = [] for proc in psutil.process_iter(['pid', 'name', 'exe', 'cmdline', 'cpu_percent', 'memory_percent', 'num_threads']): try: @@ -136,7 +146,7 @@ def collect_kvm_metrics(): info_label_dict = {get_label_name(l): flag_to_label_value(cmdline,l) for l in label_flags} info_label_dict['pid']=str(proc.pid) logging.debug(f"got PID: {proc.pid}") - info_dict["kvm"].info(info_label_dict) + info_dict["kvm"].add_metric([], info_label_dict) d = { "kvm_vcores": flag_to_label_value(cmdline,"-smp"), @@ -146,24 +156,23 @@ def collect_kvm_metrics(): } for k, v in d.items(): - gauge_dict[k].labels(id=id).set(v) + gauge_dict[k].add_metric([id], v) logging.debug(f"gauge_dict[{k}].labels(id={id}).set({v})") cpu_times = proc.cpu_times() for mode in ['user', 'system', 'iowait']: - gauge_dict["kvm_cpu"].labels(id=id, mode=mode).set(getattr(cpu_times, mode)) + gauge_dict["kvm_cpu"].add_metric([id, mode], getattr(cpu_times,mode)) io = proc.io_counters() for io_type, attr in itertools.product(['read', 'write'], ['count', 'bytes', 'chars']): - gauge = globals()["gauge_dict"][f'kvm_io_{io_type}_{attr}'] - gauge.labels(id=id).set(getattr(io, f"{io_type}_{attr}")) + gauge_dict[f'kvm_io_{io_type}_{attr}'].add_metric([id], getattr(io, f"{io_type}_{attr}")) for type in [ "voluntary", "involuntary" ]: - gauge_dict["kvm_ctx_switches"].labels(id=id, type=type).set(getattr(proc.num_ctx_switches(),type)) + gauge_dict["kvm_ctx_switches"].add_metric([id, type], getattr(proc.num_ctx_switches(),type)) memory_metrics = get_memory_info(proc.pid) # Assuming proc.pid gives you the PID of the process for key, value in memory_metrics.items(): - gauge_dict["kvm_memory_extended"].labels(id=id, type=key).set(value) + gauge_dict["kvm_memory_extended"].add_metric([id, key], value) # upper limit on max_workers for safety with ThreadPoolExecutor(max_workers=16) as executor: @@ -172,30 +181,49 @@ def collect_kvm_metrics(): queues = nic_info["queues"] del nic_info["queues"] nic_labels = {"id": id, "ifname": nic_info["ifname"]} - prom_nic_info = create_or_get_info("kvm_nic", nic_labels.keys()) - prom_nic_info.labels(**nic_labels).info({k: v for k, v in nic_info.items() if k not in nic_labels.keys()}) + prom_nic_info = create_or_get_info("kvm_nic", nic_labels.keys(), dynamic_infos, info_lock) - gauge_dict["kvm_nic_queues"].labels(**nic_labels).set(queues) + prom_nic_info.add_metric(nic_labels.values(), nic_info) + + gauge_dict["kvm_nic_queues"].add_metric(nic_labels.values(), queues) interface_stats = read_interface_stats(nic_info["ifname"]) for filename, value in interface_stats.items(): metric_name = f"kvm_nic_{filename}" - gauge = create_or_get_gauge(metric_name, nic_labels.keys()) - gauge.labels(**nic_labels).set(value) + gauge = create_or_get_gauge(metric_name, nic_labels.keys(), dynamic_gauges, gauge_lock) + gauge.add_metric(nic_labels.values(), value) def map_disk_proc(id): for disk_name, disk_info in qmblock.extract_disk_info_from_monitor(id).items(): disk_labels = {"id": id, "disk_name": disk_name} - prom_disk_info = create_or_get_info("kvm_disk", disk_labels.keys()) - prom_disk_info.labels(**disk_labels).info({k: v for k, v in disk_info.items() if k not in disk_labels.keys()}) + prom_disk_info = create_or_get_info("kvm_disk", disk_labels.keys(), dynamic_infos, info_lock) + prom_disk_info.add_metric(disk_labels.values(), disk_info) list(executor.map(map_netstat_proc, [ proc[2] for proc in procs ])) list(executor.map(map_disk_proc, [ proc[2] for proc in procs ])) + for v in info_dict.values(): + yield v + for x in dynamic_infos.values(): + yield v + for v in gauge_dict.values(): + yield v + for v in dynamic_gauges.values(): + yield v + +class PVECollector(object): + def __init__(self): + return + + def collect(self): + if cli_args.collect_running_vms.lower() == 'true': + for x in collect_kvm_metrics(): + yield x + def main(): parser = argparse.ArgumentParser(description='PVE metrics exporter for Prometheus') parser.add_argument('--port', type=int, default=DEFAULT_PORT, help='Port for the exporter to listen on') - parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL, help='Interval between metric collections in seconds') + parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL, help='THIS OPTION DOES NOTHING') parser.add_argument('--collect-running-vms', type=str, default='true', help='Enable or disable collecting running VMs metric (true/false)') parser.add_argument('--metrics-prefix', type=str, default=DEFAULT_PREFIX, help='_ will be prepended to each metric name') parser.add_argument('--loglevel', type=str, default='INFO', help='Set log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)') @@ -206,6 +234,8 @@ def main(): parser.add_argument('--qm-monitor-defer-close', type=str, default="true", help='defer and retry closing unresponsive qm monitor sessions') args = parser.parse_args() + global cli_args + cli_args = args loglevel = getattr(logging, args.loglevel.upper(), None) if not isinstance(loglevel, int): @@ -219,14 +249,6 @@ def main(): pvecommon.qm_rand = args.qm_rand pvecommon.qm_monitor_defer_close = args.qm_monitor_defer_close - for name, description, labels in gauge_settings: - gauge_dict[name] = Gauge(f"{prefix}_{name}", description, labels) - - for name, description in info_settings: - info_dict[name] = Info(f"{prefix}_{name}", description) - - start_http_server(args.port) - if args.profile.lower() == 'true': profiler = cProfile.Profile() profiler.enable() @@ -234,11 +256,12 @@ def main(): profiler.disable() profiler.print_stats(sort='cumulative') return + else: + REGISTRY.register(PVECollector()) + start_http_server(args.port) while True: - if args.collect_running_vms.lower() == 'true': - collect_kvm_metrics() - time.sleep(args.interval) + time.sleep(100) if __name__ == "__main__": main()