From 6e90f95e1c3e0306ca707b37d9306c5964adcc12 Mon Sep 17 00:00:00 2001 From: illustris Date: Mon, 2 Oct 2023 19:49:47 +0530 Subject: [PATCH] parallelize nic info collection --- src/pvemon/__init__.py | 45 ++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/src/pvemon/__init__.py b/src/pvemon/__init__.py index 60e20f9..e065896 100644 --- a/src/pvemon/__init__.py +++ b/src/pvemon/__init__.py @@ -11,6 +11,9 @@ import pexpect import logging import cProfile +from concurrent.futures import ThreadPoolExecutor +from threading import Lock + DEFAULT_PORT = 9116 DEFAULT_INTERVAL = 10 DEFAULT_PREFIX = "pve" @@ -45,16 +48,19 @@ info_settings = [ flag_to_label_value = lambda args, match: next((args[i+1] for i, x in enumerate(args[:-1]) if x == match), "unknown").split(",")[0] dynamic_gauges = {} - +gauge_lock = Lock() # avoid race condition when checking and creating gauges def create_or_get_gauge(metric_name, labels): - if metric_name not in dynamic_gauges: - dynamic_gauges[metric_name] = Gauge(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels) + with gauge_lock: + if metric_name not in dynamic_gauges: + dynamic_gauges[metric_name] = Gauge(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels) return dynamic_gauges[metric_name] dynamic_infos = {} +info_lock = Lock() # avoid race condition when checking and creating infos def create_or_get_info(info_name, labels): - if (info_name,str(labels)) not in dynamic_infos: - dynamic_infos[(info_name,str(labels))] = Info(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels) + with info_lock: + if (info_name,str(labels)) not in dynamic_infos: + dynamic_infos[(info_name,str(labels))] = Info(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels) return dynamic_infos[(info_name,str(labels))] def get_memory_info(pid): @@ -167,20 +173,25 @@ def collect_kvm_metrics(): for key, value in memory_metrics.items(): gauge_dict["kvm_memory_extended"].labels(id=id, type=key).set(value) - for nic_info in extract_nic_info_from_monitor(id): - queues = nic_info["queues"] - del nic_info["queues"] - nic_labels = {"id": id, "ifname": nic_info["ifname"]} - prom_nic_info = create_or_get_info("kvm_nic", nic_labels.keys()) - prom_nic_info.labels(**nic_labels).info({k: v for k, v in nic_info.items() if k not in nic_labels.keys()}) + # upper limit on max_workers for safety + with ThreadPoolExecutor(max_workers=16) as executor: + def map_netstat_proc(id): + for nic_info in extract_nic_info_from_monitor(id): + queues = nic_info["queues"] + del nic_info["queues"] + nic_labels = {"id": id, "ifname": nic_info["ifname"]} + prom_nic_info = create_or_get_info("kvm_nic", nic_labels.keys()) + prom_nic_info.labels(**nic_labels).info({k: v for k, v in nic_info.items() if k not in nic_labels.keys()}) - gauge_dict["kvm_nic_queues"].labels(**nic_labels).set(queues) + gauge_dict["kvm_nic_queues"].labels(**nic_labels).set(queues) - interface_stats = read_interface_stats(nic_info["ifname"]) - for filename, value in interface_stats.items(): - metric_name = f"kvm_nic_{filename}" - gauge = create_or_get_gauge(metric_name, nic_labels.keys()) - gauge.labels(**nic_labels).set(value) + interface_stats = read_interface_stats(nic_info["ifname"]) + for filename, value in interface_stats.items(): + metric_name = f"kvm_nic_{filename}" + gauge = create_or_get_gauge(metric_name, nic_labels.keys()) + gauge.labels(**nic_labels).set(value) + + list(executor.map(map_netstat_proc, [ proc[2] for proc in procs ])) def main(): parser = argparse.ArgumentParser(description='PVE metrics exporter for Prometheus')