switch from polling to custom collector

This fixes a couple of issues:
- metrics lingering after stopping VM or removing disks
- collection happens on scrape rather than polling
This commit is contained in:
illustris 2023-10-21 23:11:53 +05:30
parent 922dc6c30f
commit d221c758aa
Signed by: illustris
GPG Key ID: 56C8FC0B899FEFA3

View File

@ -1,4 +1,8 @@
from prometheus_client import start_http_server, Gauge, Info
# from prometheus_client import start_http_server, Gauge, Info, REGISTRY, Metric
from prometheus_client.core import InfoMetricFamily, GaugeMetricFamily, CounterMetricFamily, REGISTRY
from prometheus_client.registry import Collector
from prometheus_client import start_http_server
import psutil
import time
import argparse
@ -39,9 +43,6 @@ gauge_settings = [
('kvm_nic_queues', 'Number of queues in multiqueue config', ['id', 'ifname']),
]
gauge_dict = {}
info_dict = {}
label_flags = [ "-id", "-name", "-cpu" ]
get_label_name = lambda flag: flag[1:]
info_settings = [
@ -50,20 +51,16 @@ info_settings = [
flag_to_label_value = lambda args, match: next((args[i+1] for i, x in enumerate(args[:-1]) if x == match), "unknown").split(",")[0]
dynamic_gauges = {}
gauge_lock = Lock() # avoid race condition when checking and creating gauges
def create_or_get_gauge(metric_name, labels):
def create_or_get_gauge(metric_name, labels, dynamic_gauges, gauge_lock):
with gauge_lock:
if metric_name not in dynamic_gauges:
dynamic_gauges[metric_name] = Gauge(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels)
dynamic_gauges[metric_name] = GaugeMetricFamily(f"{prefix}_{metric_name}", f'{metric_name} for KVM process', labels=labels)
return dynamic_gauges[metric_name]
dynamic_infos = {}
info_lock = Lock() # avoid race condition when checking and creating infos
def create_or_get_info(info_name, labels):
def create_or_get_info(info_name, labels, dynamic_infos, info_lock):
with info_lock:
if (info_name,str(labels)) not in dynamic_infos:
dynamic_infos[(info_name,str(labels))] = Info(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels)
dynamic_infos[(info_name,str(labels))] = InfoMetricFamily(f"{prefix}_{info_name}", f'{info_name} for {str(labels)}', labels=labels)
return dynamic_infos[(info_name,str(labels))]
def get_memory_info(pid):
@ -117,6 +114,19 @@ def read_interface_stats(ifname):
return stats
def collect_kvm_metrics():
gauge_dict = {}
info_dict = {}
for name, description, labels in gauge_settings:
gauge_dict[name] = GaugeMetricFamily(f"{prefix}_{name}", description, labels=labels)
for name, description in info_settings:
info_dict[name] = InfoMetricFamily(f"{prefix}_{name}", description)
dynamic_gauges = {}
gauge_lock = Lock() # avoid race condition when checking and creating gauges
dynamic_infos = {}
info_lock = Lock() # avoid race condition when checking and creating infos
procs = []
for proc in psutil.process_iter(['pid', 'name', 'exe', 'cmdline', 'cpu_percent', 'memory_percent', 'num_threads']):
try:
@ -136,7 +146,7 @@ def collect_kvm_metrics():
info_label_dict = {get_label_name(l): flag_to_label_value(cmdline,l) for l in label_flags}
info_label_dict['pid']=str(proc.pid)
logging.debug(f"got PID: {proc.pid}")
info_dict["kvm"].info(info_label_dict)
info_dict["kvm"].add_metric([], info_label_dict)
d = {
"kvm_vcores": flag_to_label_value(cmdline,"-smp"),
@ -146,24 +156,23 @@ def collect_kvm_metrics():
}
for k, v in d.items():
gauge_dict[k].labels(id=id).set(v)
gauge_dict[k].add_metric([id], v)
logging.debug(f"gauge_dict[{k}].labels(id={id}).set({v})")
cpu_times = proc.cpu_times()
for mode in ['user', 'system', 'iowait']:
gauge_dict["kvm_cpu"].labels(id=id, mode=mode).set(getattr(cpu_times, mode))
gauge_dict["kvm_cpu"].add_metric([id, mode], getattr(cpu_times,mode))
io = proc.io_counters()
for io_type, attr in itertools.product(['read', 'write'], ['count', 'bytes', 'chars']):
gauge = globals()["gauge_dict"][f'kvm_io_{io_type}_{attr}']
gauge.labels(id=id).set(getattr(io, f"{io_type}_{attr}"))
gauge_dict[f'kvm_io_{io_type}_{attr}'].add_metric([id], getattr(io, f"{io_type}_{attr}"))
for type in [ "voluntary", "involuntary" ]:
gauge_dict["kvm_ctx_switches"].labels(id=id, type=type).set(getattr(proc.num_ctx_switches(),type))
gauge_dict["kvm_ctx_switches"].add_metric([id, type], getattr(proc.num_ctx_switches(),type))
memory_metrics = get_memory_info(proc.pid) # Assuming proc.pid gives you the PID of the process
for key, value in memory_metrics.items():
gauge_dict["kvm_memory_extended"].labels(id=id, type=key).set(value)
gauge_dict["kvm_memory_extended"].add_metric([id, key], value)
# upper limit on max_workers for safety
with ThreadPoolExecutor(max_workers=16) as executor:
@ -172,30 +181,49 @@ def collect_kvm_metrics():
queues = nic_info["queues"]
del nic_info["queues"]
nic_labels = {"id": id, "ifname": nic_info["ifname"]}
prom_nic_info = create_or_get_info("kvm_nic", nic_labels.keys())
prom_nic_info.labels(**nic_labels).info({k: v for k, v in nic_info.items() if k not in nic_labels.keys()})
prom_nic_info = create_or_get_info("kvm_nic", nic_labels.keys(), dynamic_infos, info_lock)
gauge_dict["kvm_nic_queues"].labels(**nic_labels).set(queues)
prom_nic_info.add_metric(nic_labels.values(), nic_info)
gauge_dict["kvm_nic_queues"].add_metric(nic_labels.values(), queues)
interface_stats = read_interface_stats(nic_info["ifname"])
for filename, value in interface_stats.items():
metric_name = f"kvm_nic_{filename}"
gauge = create_or_get_gauge(metric_name, nic_labels.keys())
gauge.labels(**nic_labels).set(value)
gauge = create_or_get_gauge(metric_name, nic_labels.keys(), dynamic_gauges, gauge_lock)
gauge.add_metric(nic_labels.values(), value)
def map_disk_proc(id):
for disk_name, disk_info in qmblock.extract_disk_info_from_monitor(id).items():
disk_labels = {"id": id, "disk_name": disk_name}
prom_disk_info = create_or_get_info("kvm_disk", disk_labels.keys())
prom_disk_info.labels(**disk_labels).info({k: v for k, v in disk_info.items() if k not in disk_labels.keys()})
prom_disk_info = create_or_get_info("kvm_disk", disk_labels.keys(), dynamic_infos, info_lock)
prom_disk_info.add_metric(disk_labels.values(), disk_info)
list(executor.map(map_netstat_proc, [ proc[2] for proc in procs ]))
list(executor.map(map_disk_proc, [ proc[2] for proc in procs ]))
for v in info_dict.values():
yield v
for x in dynamic_infos.values():
yield v
for v in gauge_dict.values():
yield v
for v in dynamic_gauges.values():
yield v
class PVECollector(object):
def __init__(self):
return
def collect(self):
if cli_args.collect_running_vms.lower() == 'true':
for x in collect_kvm_metrics():
yield x
def main():
parser = argparse.ArgumentParser(description='PVE metrics exporter for Prometheus')
parser.add_argument('--port', type=int, default=DEFAULT_PORT, help='Port for the exporter to listen on')
parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL, help='Interval between metric collections in seconds')
parser.add_argument('--interval', type=int, default=DEFAULT_INTERVAL, help='THIS OPTION DOES NOTHING')
parser.add_argument('--collect-running-vms', type=str, default='true', help='Enable or disable collecting running VMs metric (true/false)')
parser.add_argument('--metrics-prefix', type=str, default=DEFAULT_PREFIX, help='<prefix>_ will be prepended to each metric name')
parser.add_argument('--loglevel', type=str, default='INFO', help='Set log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)')
@ -206,6 +234,8 @@ def main():
parser.add_argument('--qm-monitor-defer-close', type=str, default="true", help='defer and retry closing unresponsive qm monitor sessions')
args = parser.parse_args()
global cli_args
cli_args = args
loglevel = getattr(logging, args.loglevel.upper(), None)
if not isinstance(loglevel, int):
@ -219,14 +249,6 @@ def main():
pvecommon.qm_rand = args.qm_rand
pvecommon.qm_monitor_defer_close = args.qm_monitor_defer_close
for name, description, labels in gauge_settings:
gauge_dict[name] = Gauge(f"{prefix}_{name}", description, labels)
for name, description in info_settings:
info_dict[name] = Info(f"{prefix}_{name}", description)
start_http_server(args.port)
if args.profile.lower() == 'true':
profiler = cProfile.Profile()
profiler.enable()
@ -234,11 +256,12 @@ def main():
profiler.disable()
profiler.print_stats(sort='cumulative')
return
else:
REGISTRY.register(PVECollector())
start_http_server(args.port)
while True:
if args.collect_running_vms.lower() == 'true':
collect_kvm_metrics()
time.sleep(args.interval)
time.sleep(100)
if __name__ == "__main__":
main()