parallelize nic info collection

This commit is contained in:
illustris 2023-10-02 19:49:47 +05:30
parent eea59a20b3
commit 6e90f95e1c
Signed by: illustris
GPG Key ID: 56C8FC0B899FEFA3

View File

@ -11,6 +11,9 @@ import pexpect
import logging
import cProfile
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
DEFAULT_PORT = 9116
DEFAULT_INTERVAL = 10
DEFAULT_PREFIX = "pve"
@ -45,16 +48,19 @@ info_settings = [
flag_to_label_value = lambda args, match: next((args[i+1] for i, x in enumerate(args[:-1]) if x == match), "unknown").split(",")[0]
dynamic_gauges = {}
gauge_lock = Lock() # avoid race condition when checking and creating gauges
def create_or_get_gauge(metric_name, labels):
if metric_name not in dynamic_gauges:
dynamic_gauges[metric_name] = Gauge(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels)
with gauge_lock:
if metric_name not in dynamic_gauges:
dynamic_gauges[metric_name] = Gauge(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels)
return dynamic_gauges[metric_name]
dynamic_infos = {}
info_lock = Lock() # avoid race condition when checking and creating infos
def create_or_get_info(info_name, labels):
if (info_name,str(labels)) not in dynamic_infos:
dynamic_infos[(info_name,str(labels))] = Info(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels)
with info_lock:
if (info_name,str(labels)) not in dynamic_infos:
dynamic_infos[(info_name,str(labels))] = Info(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels)
return dynamic_infos[(info_name,str(labels))]
def get_memory_info(pid):
@ -167,20 +173,25 @@ def collect_kvm_metrics():
for key, value in memory_metrics.items():
gauge_dict["kvm_memory_extended"].labels(id=id, type=key).set(value)
for nic_info in extract_nic_info_from_monitor(id):
queues = nic_info["queues"]
del nic_info["queues"]
nic_labels = {"id": id, "ifname": nic_info["ifname"]}
prom_nic_info = create_or_get_info("kvm_nic", nic_labels.keys())
prom_nic_info.labels(**nic_labels).info({k: v for k, v in nic_info.items() if k not in nic_labels.keys()})
# upper limit on max_workers for safety
with ThreadPoolExecutor(max_workers=16) as executor:
def map_netstat_proc(id):
for nic_info in extract_nic_info_from_monitor(id):
queues = nic_info["queues"]
del nic_info["queues"]
nic_labels = {"id": id, "ifname": nic_info["ifname"]}
prom_nic_info = create_or_get_info("kvm_nic", nic_labels.keys())
prom_nic_info.labels(**nic_labels).info({k: v for k, v in nic_info.items() if k not in nic_labels.keys()})
gauge_dict["kvm_nic_queues"].labels(**nic_labels).set(queues)
gauge_dict["kvm_nic_queues"].labels(**nic_labels).set(queues)
interface_stats = read_interface_stats(nic_info["ifname"])
for filename, value in interface_stats.items():
metric_name = f"kvm_nic_{filename}"
gauge = create_or_get_gauge(metric_name, nic_labels.keys())
gauge.labels(**nic_labels).set(value)
interface_stats = read_interface_stats(nic_info["ifname"])
for filename, value in interface_stats.items():
metric_name = f"kvm_nic_{filename}"
gauge = create_or_get_gauge(metric_name, nic_labels.keys())
gauge.labels(**nic_labels).set(value)
list(executor.map(map_netstat_proc, [ proc[2] for proc in procs ]))
def main():
parser = argparse.ArgumentParser(description='PVE metrics exporter for Prometheus')