From d8f4a7765722cf527bed044d7ac663fb555aecf5 Mon Sep 17 00:00:00 2001 From: illustris Date: Wed, 11 Mar 2026 15:01:20 +0530 Subject: [PATCH] init --- .gitignore | 1 + default.nix | 17 + flake.lock | 27 ++ flake.nix | 28 ++ src/go.mod | 18 + src/go.sum | 32 ++ src/internal/cache/mtimecache.go | 61 +++ src/internal/cache/mtimecache_test.go | 39 ++ src/internal/cache/ttlcache.go | 62 +++ src/internal/cache/ttlcache_test.go | 54 +++ src/internal/collector/collector.go | 479 +++++++++++++++++++++++ src/internal/collector/collector_test.go | 343 ++++++++++++++++ src/internal/config/config.go | 37 ++ src/internal/procfs/procfs.go | 394 +++++++++++++++++++ src/internal/procfs/procfs_test.go | 205 ++++++++++ src/internal/pveconfig/pool.go | 68 ++++ src/internal/pveconfig/pool_test.go | 61 +++ src/internal/pveconfig/storage.go | 71 ++++ src/internal/pveconfig/storage_test.go | 106 +++++ src/internal/qmmonitor/block.go | 174 ++++++++ src/internal/qmmonitor/block_test.go | 149 +++++++ src/internal/qmmonitor/network.go | 77 ++++ src/internal/qmmonitor/network_test.go | 65 +++ src/internal/qmmonitor/qmmonitor.go | 190 +++++++++ src/internal/qmmonitor/qmmonitor_test.go | 59 +++ src/internal/storage/storage.go | 69 ++++ src/internal/storage/storage_test.go | 55 +++ src/internal/sysfs/sysfs.go | 88 +++++ src/internal/sysfs/sysfs_test.go | 69 ++++ src/main.go | 66 ++++ 30 files changed, 3164 insertions(+) create mode 100644 .gitignore create mode 100644 default.nix create mode 100644 flake.lock create mode 100644 flake.nix create mode 100644 src/go.mod create mode 100644 src/go.sum create mode 100644 src/internal/cache/mtimecache.go create mode 100644 src/internal/cache/mtimecache_test.go create mode 100644 src/internal/cache/ttlcache.go create mode 100644 src/internal/cache/ttlcache_test.go create mode 100644 src/internal/collector/collector.go create mode 100644 src/internal/collector/collector_test.go create mode 100644 src/internal/config/config.go create mode 100644 src/internal/procfs/procfs.go create mode 100644 src/internal/procfs/procfs_test.go create mode 100644 src/internal/pveconfig/pool.go create mode 100644 src/internal/pveconfig/pool_test.go create mode 100644 src/internal/pveconfig/storage.go create mode 100644 src/internal/pveconfig/storage_test.go create mode 100644 src/internal/qmmonitor/block.go create mode 100644 src/internal/qmmonitor/block_test.go create mode 100644 src/internal/qmmonitor/network.go create mode 100644 src/internal/qmmonitor/network_test.go create mode 100644 src/internal/qmmonitor/qmmonitor.go create mode 100644 src/internal/qmmonitor/qmmonitor_test.go create mode 100644 src/internal/storage/storage.go create mode 100644 src/internal/storage/storage_test.go create mode 100644 src/internal/sysfs/sysfs.go create mode 100644 src/internal/sysfs/sysfs_test.go create mode 100644 src/main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b2be92b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +result diff --git a/default.nix b/default.nix new file mode 100644 index 0000000..5103eac --- /dev/null +++ b/default.nix @@ -0,0 +1,17 @@ +{ + lib, + buildGoModule, + ... +}: + +buildGoModule rec { + pname = "pve-local-exporter"; + version = "0.1.0"; + src = ./src; + vendorHash = "sha256-f0f8tYmoI6DtuB/K4++gu9b2na/d0ECTaF2zvDijW58="; + ldflags = [ + "-X=main.version=${version}" + ]; + env.CGO_ENABLED = 0; + meta.mainProgram = "pve_local_exporter"; +} diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..3e38940 --- /dev/null +++ b/flake.lock @@ -0,0 +1,27 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1772963539, + "narHash": "sha256-9jVDGZnvCckTGdYT53d/EfznygLskyLQXYwJLKMPsZs=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "9dcb002ca1690658be4a04645215baea8b95f31d", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..cf6c49e --- /dev/null +++ b/flake.nix @@ -0,0 +1,28 @@ +{ + description = "Proxmox VE local metrics exporter for Prometheus"; + + inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; + + outputs = { self, nixpkgs }: let + forAllSystems = nixpkgs.lib.genAttrs [ + "x86_64-linux" + "aarch64-linux" + "riscv64-linux" + ]; + in { + packages = forAllSystems (system: let + pkgs = nixpkgs.legacyPackages.${system}; + in rec { + pve-local-exporter = pkgs.callPackage ./. {}; + default = pve-local-exporter; + }); + + devShells = forAllSystems (system: let + pkgs = nixpkgs.legacyPackages.${system}; + in { + default = pkgs.mkShell { + buildInputs = [ pkgs.go pkgs.gopls ]; + }; + }); + }; +} diff --git a/src/go.mod b/src/go.mod new file mode 100644 index 0000000..0d7baed --- /dev/null +++ b/src/go.mod @@ -0,0 +1,18 @@ +module pve_local_exporter + +go 1.25.7 + +require ( + github.com/prometheus/client_golang v1.22.0 + github.com/prometheus/client_model v0.6.1 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/common v0.62.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + golang.org/x/sys v0.30.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect +) diff --git a/src/go.sum b/src/go.sum new file mode 100644 index 0000000..e81255c --- /dev/null +++ b/src/go.sum @@ -0,0 +1,32 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= +github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/src/internal/cache/mtimecache.go b/src/internal/cache/mtimecache.go new file mode 100644 index 0000000..c4d79a1 --- /dev/null +++ b/src/internal/cache/mtimecache.go @@ -0,0 +1,61 @@ +package cache + +import ( + "sync" + "time" +) + +// MtimeCache caches a value and invalidates when a file's mtime changes. +type MtimeCache[V any] struct { + mu sync.Mutex + value V + lastMtime time.Time + valid bool + stat func(path string) (time.Time, error) + path string +} + +// StatFunc returns the mtime of a file. Allows injection for testing. +type StatFunc func(path string) (time.Time, error) + +func NewMtimeCache[V any](path string, stat StatFunc) *MtimeCache[V] { + return &MtimeCache[V]{ + path: path, + stat: stat, + } +} + +// Get returns the cached value if the file mtime hasn't changed. +// Returns (value, true) on cache hit, (zero, false) on miss. +func (c *MtimeCache[V]) Get() (V, bool) { + c.mu.Lock() + defer c.mu.Unlock() + if !c.valid { + var zero V + return zero, false + } + mtime, err := c.stat(c.path) + if err != nil { + var zero V + return zero, false + } + if !mtime.Equal(c.lastMtime) { + var zero V + c.valid = false + return zero, false + } + return c.value, true +} + +// Set stores the value with the current file mtime. +func (c *MtimeCache[V]) Set(value V) { + c.mu.Lock() + defer c.mu.Unlock() + mtime, err := c.stat(c.path) + if err != nil { + return + } + c.value = value + c.lastMtime = mtime + c.valid = true +} diff --git a/src/internal/cache/mtimecache_test.go b/src/internal/cache/mtimecache_test.go new file mode 100644 index 0000000..aff574b --- /dev/null +++ b/src/internal/cache/mtimecache_test.go @@ -0,0 +1,39 @@ +package cache + +import ( + "testing" + "time" +) + +func TestMtimeCache_HitAndMiss(t *testing.T) { + mtime := time.Now() + stat := func(path string) (time.Time, error) { return mtime, nil } + + c := NewMtimeCache[string]("/test", stat) + + // Miss before any Set + _, ok := c.Get() + if ok { + t.Fatal("expected miss before Set") + } + + c.Set("hello") + v, ok := c.Get() + if !ok || v != "hello" { + t.Fatalf("expected hit with 'hello', got %q ok=%v", v, ok) + } + + // Simulate file change + mtime = mtime.Add(time.Second) + _, ok = c.Get() + if ok { + t.Fatal("expected miss after mtime change") + } + + // Set again with new mtime + c.Set("world") + v, ok = c.Get() + if !ok || v != "world" { + t.Fatalf("expected hit with 'world', got %q ok=%v", v, ok) + } +} diff --git a/src/internal/cache/ttlcache.go b/src/internal/cache/ttlcache.go new file mode 100644 index 0000000..5fa565a --- /dev/null +++ b/src/internal/cache/ttlcache.go @@ -0,0 +1,62 @@ +package cache + +import ( + "math/rand/v2" + "sync" + "time" +) + +// TTLCache is a generic TTL cache with random jitter to prevent thundering herd. +type TTLCache[K comparable, V any] struct { + mu sync.Mutex + entries map[K]ttlEntry[V] + maxTTL time.Duration + rand time.Duration +} + +type ttlEntry[V any] struct { + value V + expiresAt time.Time +} + +func NewTTLCache[K comparable, V any](maxTTL, randRange time.Duration) *TTLCache[K, V] { + return &TTLCache[K, V]{ + entries: make(map[K]ttlEntry[V]), + maxTTL: maxTTL, + rand: randRange, + } +} + +func (c *TTLCache[K, V]) jitteredTTL() time.Duration { + jitter := time.Duration(rand.Float64()*2*float64(c.rand)) - c.rand + return c.maxTTL + jitter +} + +func (c *TTLCache[K, V]) Get(key K) (V, bool) { + c.mu.Lock() + defer c.mu.Unlock() + e, ok := c.entries[key] + if !ok || time.Now().After(e.expiresAt) { + if ok { + delete(c.entries, key) + } + var zero V + return zero, false + } + return e.value, true +} + +func (c *TTLCache[K, V]) Set(key K, value V) { + c.mu.Lock() + defer c.mu.Unlock() + c.entries[key] = ttlEntry[V]{ + value: value, + expiresAt: time.Now().Add(c.jitteredTTL()), + } +} + +func (c *TTLCache[K, V]) Invalidate(key K) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.entries, key) +} diff --git a/src/internal/cache/ttlcache_test.go b/src/internal/cache/ttlcache_test.go new file mode 100644 index 0000000..6bd9c52 --- /dev/null +++ b/src/internal/cache/ttlcache_test.go @@ -0,0 +1,54 @@ +package cache + +import ( + "testing" + "time" +) + +func TestTTLCache_SetGet(t *testing.T) { + c := NewTTLCache[string, int](time.Hour, 0) + c.Set("a", 42) + v, ok := c.Get("a") + if !ok || v != 42 { + t.Fatalf("expected 42, got %d (ok=%v)", v, ok) + } +} + +func TestTTLCache_Miss(t *testing.T) { + c := NewTTLCache[string, int](time.Hour, 0) + _, ok := c.Get("missing") + if ok { + t.Fatal("expected miss") + } +} + +func TestTTLCache_Expiry(t *testing.T) { + c := NewTTLCache[string, int](time.Millisecond, 0) + c.Set("a", 1) + time.Sleep(5 * time.Millisecond) + _, ok := c.Get("a") + if ok { + t.Fatal("expected expired entry") + } +} + +func TestTTLCache_Invalidate(t *testing.T) { + c := NewTTLCache[string, int](time.Hour, 0) + c.Set("a", 1) + c.Invalidate("a") + _, ok := c.Get("a") + if ok { + t.Fatal("expected invalidated entry") + } +} + +func TestTTLCache_JitterRange(t *testing.T) { + c := NewTTLCache[string, int](time.Second, 500*time.Millisecond) + // jitteredTTL should be in [500ms, 1500ms] + for range 100 { + d := c.jitteredTTL() + if d < 500*time.Millisecond || d > 1500*time.Millisecond { + t.Fatalf("jitter out of range: %v", d) + } + } +} diff --git a/src/internal/collector/collector.go b/src/internal/collector/collector.go new file mode 100644 index 0000000..48af651 --- /dev/null +++ b/src/internal/collector/collector.go @@ -0,0 +1,479 @@ +package collector + +import ( + "log/slog" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "pve_local_exporter/internal/cache" + "pve_local_exporter/internal/config" + "pve_local_exporter/internal/procfs" + "pve_local_exporter/internal/pveconfig" + "pve_local_exporter/internal/qmmonitor" + "pve_local_exporter/internal/storage" + "pve_local_exporter/internal/sysfs" +) + +const maxWorkers = 16 + +// PVECollector implements prometheus.Collector. +type PVECollector struct { + cfg config.Config + proc procfs.ProcReader + sys sysfs.SysReader + qm qmmonitor.QMMonitor + statFS storage.StatFS + cmdRunner CommandRunner + fileReader FileReaderIface + + poolCache *cache.MtimeCache[poolData] + storageCache *cache.MtimeCache[[]pveconfig.StorageEntry] + + prefix string +} + +type poolData struct { + vmPoolMap map[string]string + pools map[string]pveconfig.PoolInfo +} + +// CommandRunner for executing shell commands. +type CommandRunner interface { + Run(name string, args ...string) (string, error) +} + +// FileReaderIface for reading files. +type FileReaderIface interface { + ReadFile(path string) (string, error) +} + +// RealCommandRunner executes real commands. +type RealCommandRunner struct{} + +func (RealCommandRunner) Run(name string, args ...string) (string, error) { + out, err := exec.Command(name, args...).Output() + return string(out), err +} + +// RealFileReader reads real files. +type RealFileReader struct{} + +func (RealFileReader) ReadFile(path string) (string, error) { + data, err := os.ReadFile(path) + return string(data), err +} + +func fileMtime(path string) (time.Time, error) { + info, err := os.Stat(path) + if err != nil { + return time.Time{}, err + } + return info.ModTime(), nil +} + +// New creates a PVECollector with real I/O implementations. +func New(cfg config.Config) *PVECollector { + return NewWithDeps(cfg, + procfs.NewRealProcReader(), + sysfs.NewRealSysReader(), + qmmonitor.NewRealQMMonitor(cfg.QMTerminalTimeout, cfg.QMMaxTTL, cfg.QMRand, cfg.QMMonitorDeferClose), + storage.RealStatFS{}, + RealCommandRunner{}, + RealFileReader{}, + ) +} + +// NewWithDeps creates a PVECollector with injected dependencies (for testing). +func NewWithDeps(cfg config.Config, proc procfs.ProcReader, sys sysfs.SysReader, + qm qmmonitor.QMMonitor, statFS storage.StatFS, cmd CommandRunner, fr FileReaderIface) *PVECollector { + + c := &PVECollector{ + cfg: cfg, + proc: proc, + sys: sys, + qm: qm, + statFS: statFS, + cmdRunner: cmd, + fileReader: fr, + prefix: cfg.MetricsPrefix, + } + c.poolCache = cache.NewMtimeCache[poolData]("/etc/pve/user.cfg", fileMtime) + c.storageCache = cache.NewMtimeCache[[]pveconfig.StorageEntry]("/etc/pve/storage.cfg", fileMtime) + return c +} + +func (c *PVECollector) Describe(ch chan<- *prometheus.Desc) { + // Dynamic metrics - use empty desc to signal unchecked collector + ch <- prometheus.NewDesc(c.prefix+"_kvm_cpu", "KVM CPU time", nil, nil) +} + +func (c *PVECollector) Collect(ch chan<- prometheus.Metric) { + if c.cfg.CollectRunningVMs { + c.collectVMs(ch) + } + if c.cfg.CollectStorage { + c.collectStorage(ch) + } +} + +func (c *PVECollector) collectVMs(ch chan<- prometheus.Metric) { + procs, err := c.proc.DiscoverQEMUProcesses() + if err != nil { + slog.Error("discover QEMU processes", "err", err) + return + } + + // Load pool info + vmPoolMap, pools := c.getPoolInfo() + + for _, proc := range procs { + c.collectVMMetrics(ch, proc, vmPoolMap, pools) + } + + // Parallel NIC + disk collection with bounded worker pool + type workItem struct { + proc procfs.QEMUProcess + fn func() + } + + sem := make(chan struct{}, maxWorkers) + var wg sync.WaitGroup + + for _, proc := range procs { + proc := proc + wg.Add(2) + + go func() { + sem <- struct{}{} + defer func() { <-sem; wg.Done() }() + c.collectNICMetrics(ch, proc) + }() + + go func() { + sem <- struct{}{} + defer func() { <-sem; wg.Done() }() + c.collectDiskMetrics(ch, proc) + }() + } + wg.Wait() +} + +func (c *PVECollector) collectVMMetrics(ch chan<- prometheus.Metric, proc procfs.QEMUProcess, + vmPoolMap map[string]string, pools map[string]pveconfig.PoolInfo) { + + id := proc.VMID + + // CPU times + if cpu, err := c.proc.GetCPUTimes(proc.PID); err == nil { + for _, m := range []struct { + mode string + val float64 + }{ + {"user", cpu.User}, + {"system", cpu.System}, + {"iowait", cpu.IOWait}, + } { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm_cpu", "KVM CPU time", []string{"id", "mode"}, nil), + prometheus.GaugeValue, m.val, id, m.mode, + ) + } + } + + // Vcores + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm_vcores", "vCores allocated", []string{"id"}, nil), + prometheus.GaugeValue, float64(proc.Vcores), id, + ) + + // MaxMem (kB to bytes) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm_maxmem", "Maximum memory bytes", []string{"id"}, nil), + prometheus.GaugeValue, float64(proc.MaxMem*1024), id, + ) + + // Memory percent + if memPct, err := c.proc.GetMemoryPercent(proc.PID); err == nil { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm_memory_percent", "Memory percent of host", []string{"id"}, nil), + prometheus.GaugeValue, memPct, id, + ) + } + + // Memory extended + if memExt, err := c.proc.GetMemoryExtended(proc.PID); err == nil { + desc := prometheus.NewDesc(c.prefix+"_kvm_memory_extended", "Extended memory info", []string{"id", "type"}, nil) + for key, val := range memExt { + ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(val), id, key) + } + } + + // Threads + if threads, err := c.proc.GetNumThreads(proc.PID); err == nil { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm_threads", "Threads used", []string{"id"}, nil), + prometheus.GaugeValue, float64(threads), id, + ) + } + + // IO counters + if io, err := c.proc.GetIOCounters(proc.PID); err == nil { + for _, m := range []struct { + name string + val uint64 + }{ + {"kvm_io_read_count", io.ReadSyscalls}, + {"kvm_io_read_bytes", io.ReadBytes}, + {"kvm_io_read_chars", io.ReadChars}, + {"kvm_io_write_count", io.WriteSyscalls}, + {"kvm_io_write_bytes", io.WriteBytes}, + {"kvm_io_write_chars", io.WriteChars}, + } { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_"+m.name, "", []string{"id"}, nil), + prometheus.GaugeValue, float64(m.val), id, + ) + } + } + + // Context switches + if cs, err := c.proc.GetCtxSwitches(proc.PID); err == nil { + desc := prometheus.NewDesc(c.prefix+"_kvm_ctx_switches", "Context switches", []string{"id", "type"}, nil) + ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(cs.Voluntary), id, "voluntary") + ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(cs.Involuntary), id, "involuntary") + } + + // VM info metric + poolName := vmPoolMap[id] + poolInfo := pools[poolName] + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm", "VM info", []string{ + "id", "name", "cpu", "pid", "pool", "pool_levels", "pool1", "pool2", "pool3", + }, nil), + prometheus.GaugeValue, 1, + id, proc.Name, proc.CPU, strconv.Itoa(proc.PID), + poolName, strconv.Itoa(poolInfo.LevelCount), + poolInfo.Level1, poolInfo.Level2, poolInfo.Level3, + ) +} + +func (c *PVECollector) collectNICMetrics(ch chan<- prometheus.Metric, proc procfs.QEMUProcess) { + id := proc.VMID + + raw, err := c.qm.RunCommand(id, "info network") + if err != nil { + slog.Error("qm info network", "vmid", id, "err", err) + return + } + + nics := qmmonitor.ParseNetworkInfo(raw) + for _, nic := range nics { + // NIC info metric + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm_nic", "NIC info", []string{ + "id", "ifname", "netdev", "queues", "type", "model", "macaddr", + }, nil), + prometheus.GaugeValue, 1, + id, nic.Ifname, nic.Netdev, strconv.Itoa(nic.Queues), + nic.Type, nic.Model, nic.Macaddr, + ) + + // NIC queues + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm_nic_queues", "NIC queue count", []string{"id", "ifname"}, nil), + prometheus.GaugeValue, float64(nic.Queues), id, nic.Ifname, + ) + + // NIC stats from sysfs + stats, err := c.sys.ReadInterfaceStats(nic.Ifname) + if err != nil { + slog.Debug("read interface stats", "ifname", nic.Ifname, "err", err) + continue + } + for statName, val := range stats { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm_nic_"+statName, "", []string{"id", "ifname"}, nil), + prometheus.GaugeValue, float64(val), id, nic.Ifname, + ) + } + } +} + +func (c *PVECollector) collectDiskMetrics(ch chan<- prometheus.Metric, proc procfs.QEMUProcess) { + id := proc.VMID + + raw, err := c.qm.RunCommand(id, "info block") + if err != nil { + slog.Error("qm info block", "vmid", id, "err", err) + return + } + + disks := qmmonitor.ParseBlockInfo(raw) + for diskName, disk := range disks { + // Try to get device symlink target for zvol/rbd/lvm + if disk.DiskType == "zvol" || disk.DiskType == "rbd" || disk.DiskType == "lvm" { + target, err := sysfs.GetDeviceSymlinkTarget(disk.DiskPath) + if err == nil { + disk.Labels["device"] = target + } else { + slog.Debug("resolve device symlink", "path", disk.DiskPath, "err", err) + // Retry with cache invalidation + c.qm.InvalidateCache(id, "info block") + } + } + + // Disk size + var diskSize int64 + switch disk.DiskType { + case "qcow2": + // File-backed: use file size + if fi, err := os.Stat(disk.DiskPath); err == nil { + diskSize = fi.Size() + } + default: + // Block device + size, err := c.sys.GetBlockDeviceSize(disk.DiskPath) + if err == nil { + diskSize = size + } + } + + if diskSize > 0 { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm_disk_size", "Disk size bytes", []string{"id", "disk_name"}, nil), + prometheus.GaugeValue, float64(diskSize), id, diskName, + ) + } + + // Disk info metric - collect all labels + labelNames := []string{"id", "disk_name", "block_id", "disk_path", "disk_type"} + labelValues := []string{id, diskName, disk.BlockID, disk.DiskPath, disk.DiskType} + + // Add variable labels in sorted-ish order + for _, key := range sortedKeys(disk.Labels) { + labelNames = append(labelNames, key) + labelValues = append(labelValues, disk.Labels[key]) + } + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_kvm_disk", "Disk info", labelNames, nil), + prometheus.GaugeValue, 1, labelValues..., + ) + } +} + +func (c *PVECollector) collectStorage(ch chan<- prometheus.Metric) { + entries := c.getStorageEntries() + + for _, entry := range entries { + storageType := entry.Properties["type"] + storageName := entry.Properties["name"] + + // Info metric + labelNames := make([]string, 0, len(entry.Properties)) + labelValues := make([]string, 0, len(entry.Properties)) + for _, key := range sortedKeys(entry.Properties) { + labelNames = append(labelNames, key) + labelValues = append(labelValues, entry.Properties[key]) + } + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_node_storage", "Storage info", labelNames, nil), + prometheus.GaugeValue, 1, labelValues..., + ) + + // Size metrics + var size storage.StorageSize + var err error + + switch storageType { + case "dir", "nfs", "cephfs": + path := entry.Properties["path"] + if path == "" { + continue + } + size, err = storage.GetDirStorageSize(c.statFS, path) + case "zfspool": + pool := entry.Properties["pool"] + if pool == "" { + continue + } + // Extract base pool name (before any /) + poolName := strings.Split(pool, "/")[0] + out, runErr := c.cmdRunner.Run("zpool", "list", "-p", poolName) + if runErr != nil { + slog.Error("zpool list", "pool", poolName, "err", runErr) + continue + } + size, err = storage.GetZPoolSize(out) + default: + continue + } + + if err != nil { + slog.Error("storage size", "name", storageName, "err", err) + continue + } + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_node_storage_size", "Storage total size", []string{"name", "type"}, nil), + prometheus.GaugeValue, float64(size.Total), storageName, storageType, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(c.prefix+"_node_storage_free", "Storage free space", []string{"name", "type"}, nil), + prometheus.GaugeValue, float64(size.Free), storageName, storageType, + ) + } +} + +func (c *PVECollector) getPoolInfo() (map[string]string, map[string]pveconfig.PoolInfo) { + if data, ok := c.poolCache.Get(); ok { + return data.vmPoolMap, data.pools + } + + content, err := c.fileReader.ReadFile("/etc/pve/user.cfg") + if err != nil { + slog.Error("read user.cfg", "err", err) + return nil, nil + } + + vmPoolMap, pools := pveconfig.ParsePoolConfig(content) + c.poolCache.Set(poolData{vmPoolMap: vmPoolMap, pools: pools}) + return vmPoolMap, pools +} + +func (c *PVECollector) getStorageEntries() []pveconfig.StorageEntry { + if data, ok := c.storageCache.Get(); ok { + return data + } + + content, err := c.fileReader.ReadFile("/etc/pve/storage.cfg") + if err != nil { + slog.Error("read storage.cfg", "err", err) + return nil + } + + entries := pveconfig.ParseStorageConfig(content) + c.storageCache.Set(entries) + return entries +} + +func sortedKeys(m map[string]string) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + // Simple insertion sort for typically small maps + for i := 1; i < len(keys); i++ { + for j := i; j > 0 && keys[j] < keys[j-1]; j-- { + keys[j], keys[j-1] = keys[j-1], keys[j] + } + } + return keys +} + diff --git a/src/internal/collector/collector_test.go b/src/internal/collector/collector_test.go new file mode 100644 index 0000000..8886aab --- /dev/null +++ b/src/internal/collector/collector_test.go @@ -0,0 +1,343 @@ +package collector + +import ( + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + + "pve_local_exporter/internal/config" + "pve_local_exporter/internal/procfs" + "pve_local_exporter/internal/storage" +) + +// Mock implementations + +type mockProcReader struct { + procs []procfs.QEMUProcess + cpuTimes map[int]procfs.CPUTimes + ioCount map[int]procfs.IOCounters + threads map[int]int + memPct map[int]float64 + memExt map[int]procfs.MemoryExtended + ctxSwitch map[int]procfs.CtxSwitches +} + +func (m *mockProcReader) DiscoverQEMUProcesses() ([]procfs.QEMUProcess, error) { + return m.procs, nil +} +func (m *mockProcReader) GetCPUTimes(pid int) (procfs.CPUTimes, error) { + return m.cpuTimes[pid], nil +} +func (m *mockProcReader) GetIOCounters(pid int) (procfs.IOCounters, error) { + return m.ioCount[pid], nil +} +func (m *mockProcReader) GetNumThreads(pid int) (int, error) { + return m.threads[pid], nil +} +func (m *mockProcReader) GetMemoryPercent(pid int) (float64, error) { + return m.memPct[pid], nil +} +func (m *mockProcReader) GetMemoryExtended(pid int) (procfs.MemoryExtended, error) { + return m.memExt[pid], nil +} +func (m *mockProcReader) GetCtxSwitches(pid int) (procfs.CtxSwitches, error) { + return m.ctxSwitch[pid], nil +} +func (m *mockProcReader) VMConfigExists(vmid string) bool { return true } + +type mockSysReader struct { + ifStats map[string]map[string]int64 + blockSize map[string]int64 +} + +func (m *mockSysReader) ReadInterfaceStats(ifname string) (map[string]int64, error) { + return m.ifStats[ifname], nil +} +func (m *mockSysReader) GetBlockDeviceSize(devPath string) (int64, error) { + return m.blockSize[devPath], nil +} + +type mockQMMonitor struct { + responses map[string]string +} + +func (m *mockQMMonitor) RunCommand(vmid, cmd string) (string, error) { + return m.responses[vmid+":"+cmd], nil +} +func (m *mockQMMonitor) InvalidateCache(vmid, cmd string) {} + +type mockStatFS struct { + sizes map[string]storage.StorageSize +} + +func (m *mockStatFS) Statfs(path string) (storage.StorageSize, error) { + return m.sizes[path], nil +} + +type mockCmdRunner struct { + outputs map[string]string +} + +func (m *mockCmdRunner) Run(name string, args ...string) (string, error) { + key := name + " " + strings.Join(args, " ") + return m.outputs[key], nil +} + +type mockFileReader struct { + files map[string]string +} + +func (m *mockFileReader) ReadFile(path string) (string, error) { + return m.files[path], nil +} + +// collectMetrics collects all metrics from a collector into a map keyed by metric name. +func collectMetrics(c prometheus.Collector) map[string][]*dto.Metric { + ch := make(chan prometheus.Metric, 200) + go func() { + c.Collect(ch) + close(ch) + }() + + result := make(map[string][]*dto.Metric) + for m := range ch { + d := &dto.Metric{} + m.Write(d) + desc := m.Desc() + // Extract fqName from desc string + name := desc.String() + // Format: Desc{fqName: "name", ...} + if idx := strings.Index(name, "fqName: \""); idx >= 0 { + name = name[idx+9:] + if end := strings.Index(name, "\""); end >= 0 { + name = name[:end] + } + } + result[name] = append(result[name], d) + } + return result +} + +func findMetricWithLabels(metrics []*dto.Metric, labels map[string]string) *dto.Metric { + for _, m := range metrics { + match := true + for wantName, wantVal := range labels { + found := false + for _, l := range m.Label { + if l.GetName() == wantName && l.GetValue() == wantVal { + found = true + break + } + } + if !found { + match = false + break + } + } + if match { + return m + } + } + return nil +} + +func TestCollector_BasicVMMetrics(t *testing.T) { + cfg := config.Config{ + CollectRunningVMs: true, + CollectStorage: false, + MetricsPrefix: "pve", + } + + proc := &mockProcReader{ + procs: []procfs.QEMUProcess{ + {PID: 1234, VMID: "100", Name: "testvm", CPU: "host", Vcores: 4, MaxMem: 4194304}, + }, + cpuTimes: map[int]procfs.CPUTimes{ + 1234: {User: 5.0, System: 2.0, IOWait: 0.5}, + }, + ioCount: map[int]procfs.IOCounters{ + 1234: {ReadChars: 1000, WriteChars: 2000, ReadSyscalls: 10, WriteSyscalls: 20, ReadBytes: 500, WriteBytes: 1000}, + }, + threads: map[int]int{1234: 50}, + memPct: map[int]float64{1234: 25.5}, + memExt: map[int]procfs.MemoryExtended{ + 1234: {"vmrss:": 1048576, "vmpeak:": 2097152}, + }, + ctxSwitch: map[int]procfs.CtxSwitches{ + 1234: {Voluntary: 100, Involuntary: 10}, + }, + } + + sys := &mockSysReader{ + ifStats: map[string]map[string]int64{}, + blockSize: map[string]int64{}, + } + + qm := &mockQMMonitor{responses: map[string]string{ + "100:info network": "", + "100:info block": "", + }} + + fr := &mockFileReader{files: map[string]string{ + "/etc/pve/user.cfg": "pool:prod:Production:100\n", + }} + + c := NewWithDeps(cfg, proc, sys, qm, &mockStatFS{}, &mockCmdRunner{}, fr) + metrics := collectMetrics(c) + + // Check CPU metrics + cpuMetrics := metrics["pve_kvm_cpu"] + if len(cpuMetrics) != 3 { + t.Fatalf("expected 3 cpu metrics, got %d", len(cpuMetrics)) + } + m := findMetricWithLabels(cpuMetrics, map[string]string{"mode": "user"}) + if m == nil || m.Gauge.GetValue() != 5.0 { + t.Errorf("cpu user = %v", m) + } + m = findMetricWithLabels(cpuMetrics, map[string]string{"mode": "system"}) + if m == nil || m.Gauge.GetValue() != 2.0 { + t.Errorf("cpu system = %v", m) + } + m = findMetricWithLabels(cpuMetrics, map[string]string{"mode": "iowait"}) + if m == nil || m.Gauge.GetValue() != 0.5 { + t.Errorf("cpu iowait = %v", m) + } + + // Check vcores + vcoreMetrics := metrics["pve_kvm_vcores"] + if len(vcoreMetrics) != 1 || vcoreMetrics[0].Gauge.GetValue() != 4 { + t.Errorf("vcores = %v", vcoreMetrics) + } + + // Check threads + threadMetrics := metrics["pve_kvm_threads"] + if len(threadMetrics) != 1 || threadMetrics[0].Gauge.GetValue() != 50 { + t.Errorf("threads = %v", threadMetrics) + } + + // Check memory percent + memPctMetrics := metrics["pve_kvm_memory_percent"] + if len(memPctMetrics) != 1 || memPctMetrics[0].Gauge.GetValue() != 25.5 { + t.Errorf("memory_percent = %v", memPctMetrics) + } + + // Check IO + if m := metrics["pve_kvm_io_read_count"]; len(m) != 1 || m[0].Gauge.GetValue() != 10 { + t.Errorf("io_read_count = %v", m) + } + if m := metrics["pve_kvm_io_write_bytes"]; len(m) != 1 || m[0].Gauge.GetValue() != 1000 { + t.Errorf("io_write_bytes = %v", m) + } + + // Check context switches + csMetrics := metrics["pve_kvm_ctx_switches"] + if len(csMetrics) != 2 { + t.Fatalf("expected 2 ctx_switches metrics, got %d", len(csMetrics)) + } + + // Check VM info metric + infoMetrics := metrics["pve_kvm"] + if len(infoMetrics) != 1 { + t.Fatalf("expected 1 kvm info metric, got %d", len(infoMetrics)) + } + m = findMetricWithLabels(infoMetrics, map[string]string{"id": "100", "name": "testvm", "pool": "prod"}) + if m == nil { + t.Error("kvm info metric not found with expected labels") + } +} + +func TestCollector_StorageMetrics(t *testing.T) { + cfg := config.Config{ + CollectRunningVMs: false, + CollectStorage: true, + MetricsPrefix: "pve", + } + + fr := &mockFileReader{files: map[string]string{ + "/etc/pve/storage.cfg": `dir: local + path /var/lib/vz + content iso,vztmpl,backup +`, + "/etc/pve/user.cfg": "", + }} + + statFS := &mockStatFS{sizes: map[string]storage.StorageSize{ + "/var/lib/vz": {Total: 1000000000, Free: 500000000}, + }} + + c := NewWithDeps(cfg, &mockProcReader{}, &mockSysReader{}, &mockQMMonitor{responses: map[string]string{}}, + statFS, &mockCmdRunner{}, fr) + + metrics := collectMetrics(c) + + // Check storage size + sizeMetrics := metrics["pve_node_storage_size"] + if len(sizeMetrics) != 1 || sizeMetrics[0].Gauge.GetValue() != 1e9 { + t.Errorf("storage_size = %v", sizeMetrics) + } + + // Check storage free + freeMetrics := metrics["pve_node_storage_free"] + if len(freeMetrics) != 1 || freeMetrics[0].Gauge.GetValue() != 5e8 { + t.Errorf("storage_free = %v", freeMetrics) + } + + // Check storage info + infoMetrics := metrics["pve_node_storage"] + if len(infoMetrics) != 1 { + t.Fatalf("expected 1 storage info metric, got %d", len(infoMetrics)) + } +} + +func TestCollector_NICMetrics(t *testing.T) { + cfg := config.Config{ + CollectRunningVMs: true, + CollectStorage: false, + MetricsPrefix: "pve", + } + + proc := &mockProcReader{ + procs: []procfs.QEMUProcess{ + {PID: 1, VMID: "100", Name: "vm", Vcores: 1, MaxMem: 1024}, + }, + cpuTimes: map[int]procfs.CPUTimes{1: {}}, + ioCount: map[int]procfs.IOCounters{1: {}}, + threads: map[int]int{1: 1}, + memPct: map[int]float64{1: 0}, + memExt: map[int]procfs.MemoryExtended{1: {}}, + ctxSwitch: map[int]procfs.CtxSwitches{1: {}}, + } + + sys := &mockSysReader{ + ifStats: map[string]map[string]int64{ + "tap100i0": {"rx_bytes": 1000, "tx_bytes": 2000}, + }, + } + + qm := &mockQMMonitor{responses: map[string]string{ + "100:info network": "net0: index=0,type=tap,ifname=tap100i0,model=virtio-net-pci,macaddr=AA:BB:CC:DD:EE:FF", + "100:info block": "", + }} + + fr := &mockFileReader{files: map[string]string{"/etc/pve/user.cfg": ""}} + c := NewWithDeps(cfg, proc, sys, qm, &mockStatFS{}, &mockCmdRunner{}, fr) + metrics := collectMetrics(c) + + // NIC info + nicInfo := metrics["pve_kvm_nic"] + if len(nicInfo) != 1 { + t.Fatalf("expected 1 nic info, got %d", len(nicInfo)) + } + + // NIC stats + rxBytes := metrics["pve_kvm_nic_rx_bytes"] + if len(rxBytes) != 1 || rxBytes[0].Gauge.GetValue() != 1000 { + t.Errorf("rx_bytes = %v", rxBytes) + } + txBytes := metrics["pve_kvm_nic_tx_bytes"] + if len(txBytes) != 1 || txBytes[0].Gauge.GetValue() != 2000 { + t.Errorf("tx_bytes = %v", txBytes) + } +} diff --git a/src/internal/config/config.go b/src/internal/config/config.go new file mode 100644 index 0000000..2b79064 --- /dev/null +++ b/src/internal/config/config.go @@ -0,0 +1,37 @@ +package config + +import ( + "flag" + "time" +) + +type Config struct { + Port int + Host string + CollectRunningVMs bool + CollectStorage bool + MetricsPrefix string + LogLevel string + QMTerminalTimeout time.Duration + QMMaxTTL time.Duration + QMRand time.Duration + QMMonitorDeferClose bool + ShowVersion bool +} + +func Parse() Config { + c := Config{} + flag.IntVar(&c.Port, "port", 9116, "HTTP server listen port") + flag.StringVar(&c.Host, "host", "0.0.0.0", "HTTP server bind address") + flag.BoolVar(&c.CollectRunningVMs, "collect-running-vms", true, "collect KVM VM metrics") + flag.BoolVar(&c.CollectStorage, "collect-storage", true, "collect storage pool metrics") + flag.StringVar(&c.MetricsPrefix, "metrics-prefix", "pve", "metric name prefix") + flag.StringVar(&c.LogLevel, "loglevel", "INFO", "log level (DEBUG, INFO, WARNING, ERROR)") + flag.DurationVar(&c.QMTerminalTimeout, "qm-terminal-timeout", 10*time.Second, "qm monitor command timeout") + flag.DurationVar(&c.QMMaxTTL, "qm-max-ttl", 600*time.Second, "cache TTL for qm monitor data") + flag.DurationVar(&c.QMRand, "qm-rand", 60*time.Second, "randomness for qm cache expiry") + flag.BoolVar(&c.QMMonitorDeferClose, "qm-monitor-defer-close", true, "defer closing unresponsive qm sessions") + flag.BoolVar(&c.ShowVersion, "version", false, "print the version and exit") + flag.Parse() + return c +} diff --git a/src/internal/procfs/procfs.go b/src/internal/procfs/procfs.go new file mode 100644 index 0000000..bd416f4 --- /dev/null +++ b/src/internal/procfs/procfs.go @@ -0,0 +1,394 @@ +package procfs + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "strings" +) + +const clkTck = 100 // sysconf(_SC_CLK_TCK) on Linux + +// QEMUProcess holds info discovered from /proc for a QEMU VM. +type QEMUProcess struct { + PID int + VMID string + Name string + CPU string + Vcores int + MaxMem int64 // in kB (parsed from cmdline) +} + +// CPUTimes holds parsed CPU times from /proc/{pid}/stat. +type CPUTimes struct { + User float64 + System float64 + IOWait float64 +} + +// IOCounters holds parsed I/O counters from /proc/{pid}/io. +type IOCounters struct { + ReadChars uint64 + WriteChars uint64 + ReadSyscalls uint64 + WriteSyscalls uint64 + ReadBytes uint64 + WriteBytes uint64 +} + +// CtxSwitches holds context switch counts from /proc/{pid}/status. +type CtxSwitches struct { + Voluntary uint64 + Involuntary uint64 +} + +// MemoryExtended holds memory info from /proc/{pid}/status (values in bytes). +type MemoryExtended map[string]int64 + +// ProcReader abstracts /proc access for testability. +type ProcReader interface { + DiscoverQEMUProcesses() ([]QEMUProcess, error) + GetCPUTimes(pid int) (CPUTimes, error) + GetIOCounters(pid int) (IOCounters, error) + GetNumThreads(pid int) (int, error) + GetMemoryPercent(pid int) (float64, error) + GetMemoryExtended(pid int) (MemoryExtended, error) + GetCtxSwitches(pid int) (CtxSwitches, error) + VMConfigExists(vmid string) bool +} + +// RealProcReader reads from the actual /proc filesystem. +type RealProcReader struct { + ProcPath string // default "/proc" + PVECfgPath string // default "/etc/pve/qemu-server" +} + +func NewRealProcReader() *RealProcReader { + return &RealProcReader{ + ProcPath: "/proc", + PVECfgPath: "/etc/pve/qemu-server", + } +} + +func (r *RealProcReader) DiscoverQEMUProcesses() ([]QEMUProcess, error) { + entries, err := os.ReadDir(r.ProcPath) + if err != nil { + return nil, err + } + + var procs []QEMUProcess + for _, e := range entries { + if !e.IsDir() { + continue + } + pid, err := strconv.Atoi(e.Name()) + if err != nil { + continue + } + + exe, err := os.Readlink(filepath.Join(r.ProcPath, e.Name(), "exe")) + if err != nil { + continue + } + if exe != "/usr/bin/qemu-system-x86_64" { + continue + } + + cmdlineBytes, err := os.ReadFile(filepath.Join(r.ProcPath, e.Name(), "cmdline")) + if err != nil { + continue + } + cmdline := ParseCmdline(cmdlineBytes) + + vmid := FlagValue(cmdline, "-id") + if vmid == "" { + continue + } + if !r.VMConfigExists(vmid) { + continue + } + + proc := QEMUProcess{ + PID: pid, + VMID: vmid, + Name: FlagValue(cmdline, "-name"), + CPU: FlagValue(cmdline, "-cpu"), + } + proc.Vcores = ParseVcores(cmdline) + proc.MaxMem = ParseMem(cmdline) + procs = append(procs, proc) + } + return procs, nil +} + +func (r *RealProcReader) VMConfigExists(vmid string) bool { + _, err := os.Stat(filepath.Join(r.PVECfgPath, vmid+".conf")) + return err == nil +} + +func (r *RealProcReader) GetCPUTimes(pid int) (CPUTimes, error) { + data, err := os.ReadFile(filepath.Join(r.ProcPath, strconv.Itoa(pid), "stat")) + if err != nil { + return CPUTimes{}, err + } + return ParseStat(string(data)) +} + +func (r *RealProcReader) GetIOCounters(pid int) (IOCounters, error) { + data, err := os.ReadFile(filepath.Join(r.ProcPath, strconv.Itoa(pid), "io")) + if err != nil { + return IOCounters{}, err + } + return ParseIO(string(data)) +} + +func (r *RealProcReader) GetNumThreads(pid int) (int, error) { + data, err := os.ReadFile(filepath.Join(r.ProcPath, strconv.Itoa(pid), "status")) + if err != nil { + return 0, err + } + return ParseThreads(string(data)) +} + +func (r *RealProcReader) GetMemoryPercent(pid int) (float64, error) { + // Read process RSS and total memory to compute percentage + statusData, err := os.ReadFile(filepath.Join(r.ProcPath, strconv.Itoa(pid), "status")) + if err != nil { + return 0, err + } + rss := int64(0) + for _, line := range strings.Split(string(statusData), "\n") { + if strings.HasPrefix(line, "VmRSS:") { + parts := strings.Fields(line) + if len(parts) >= 2 { + rss, _ = strconv.ParseInt(parts[1], 10, 64) + rss *= 1024 // kB to bytes + } + break + } + } + + meminfoData, err := os.ReadFile(filepath.Join(r.ProcPath, "meminfo")) + if err != nil { + return 0, err + } + totalMem := int64(0) + for _, line := range strings.Split(string(meminfoData), "\n") { + if strings.HasPrefix(line, "MemTotal:") { + parts := strings.Fields(line) + if len(parts) >= 2 { + totalMem, _ = strconv.ParseInt(parts[1], 10, 64) + totalMem *= 1024 // kB to bytes + } + break + } + } + if totalMem == 0 { + return 0, nil + } + return float64(rss) / float64(totalMem) * 100.0, nil +} + +func (r *RealProcReader) GetMemoryExtended(pid int) (MemoryExtended, error) { + data, err := os.ReadFile(filepath.Join(r.ProcPath, strconv.Itoa(pid), "status")) + if err != nil { + return nil, err + } + return ParseMemoryExtended(string(data)), nil +} + +func (r *RealProcReader) GetCtxSwitches(pid int) (CtxSwitches, error) { + data, err := os.ReadFile(filepath.Join(r.ProcPath, strconv.Itoa(pid), "status")) + if err != nil { + return CtxSwitches{}, err + } + return ParseCtxSwitches(string(data)) +} + +// ParseCmdline splits a null-byte separated /proc/{pid}/cmdline. +func ParseCmdline(data []byte) []string { + s := string(data) + if len(s) == 0 { + return nil + } + // Remove trailing null byte if present + s = strings.TrimRight(s, "\x00") + return strings.Split(s, "\x00") +} + +// FlagValue returns the value after a flag in cmdline args. +func FlagValue(cmdline []string, flag string) string { + for i, arg := range cmdline { + if arg == flag && i+1 < len(cmdline) { + return cmdline[i+1] + } + } + return "" +} + +// ParseVcores extracts vCPU count from -smp flag. +// -smp can be just a number or key=value pairs like "4,sockets=1,cores=4,maxcpus=4" +func ParseVcores(cmdline []string) int { + smp := FlagValue(cmdline, "-smp") + if smp == "" { + return 0 + } + // Try simple numeric + parts := strings.Split(smp, ",") + n, err := strconv.Atoi(parts[0]) + if err == nil { + return n + } + // Try key=value format + for _, p := range parts { + kv := strings.SplitN(p, "=", 2) + if len(kv) == 2 && kv[0] == "cpus" { + n, _ = strconv.Atoi(kv[1]) + return n + } + } + return 0 +} + +// ParseMem extracts max memory in kB from cmdline. +// Simple: -m 1024 -> 1024*1024 kB +// NUMA: memory-backend-ram...size=NM -> sum * 1024 kB +func ParseMem(cmdline []string) int64 { + mVal := FlagValue(cmdline, "-m") + if mVal == "" { + return 0 + } + // Simple numeric case + if n, err := strconv.ParseInt(mVal, 10, 64); err == nil { + return n * 1024 // MB to kB + } + // NUMA case: search for memory-backend-ram in all args + var total int64 + for _, arg := range cmdline { + if strings.Contains(arg, "memory-backend-ram") { + // Format: ...size=XXXM + for _, part := range strings.Split(arg, ",") { + if strings.HasPrefix(part, "size=") { + sizeStr := strings.TrimPrefix(part, "size=") + if strings.HasSuffix(sizeStr, "M") { + sizeStr = strings.TrimSuffix(sizeStr, "M") + if n, err := strconv.ParseInt(sizeStr, 10, 64); err == nil { + total += n * 1024 // MB to kB + } + } + } + } + } + } + return total +} + +// ParseStat extracts CPU times from /proc/{pid}/stat. +// Fields: (1-indexed) 14=utime, 15=stime, 42=delayacct_blkio_ticks +func ParseStat(data string) (CPUTimes, error) { + // Find the closing paren of comm field to handle spaces in process names + closeIdx := strings.LastIndex(data, ")") + if closeIdx < 0 { + return CPUTimes{}, fmt.Errorf("malformed stat: no closing paren") + } + // Fields after ") " are 1-indexed starting at field 3 + rest := data[closeIdx+2:] + fields := strings.Fields(rest) + // field 14 (utime) is at index 14-3=11, field 15 (stime) at 12, field 42 at 39 + if len(fields) < 40 { + return CPUTimes{}, fmt.Errorf("not enough fields in stat: %d", len(fields)) + } + utime, _ := strconv.ParseUint(fields[11], 10, 64) + stime, _ := strconv.ParseUint(fields[12], 10, 64) + blkio, _ := strconv.ParseUint(fields[39], 10, 64) + return CPUTimes{ + User: float64(utime) / clkTck, + System: float64(stime) / clkTck, + IOWait: float64(blkio) / clkTck, + }, nil +} + +// ParseIO parses /proc/{pid}/io. +func ParseIO(data string) (IOCounters, error) { + var io IOCounters + for _, line := range strings.Split(data, "\n") { + parts := strings.SplitN(line, ": ", 2) + if len(parts) != 2 { + continue + } + val, err := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64) + if err != nil { + continue + } + switch parts[0] { + case "rchar": + io.ReadChars = val + case "wchar": + io.WriteChars = val + case "syscr": + io.ReadSyscalls = val + case "syscw": + io.WriteSyscalls = val + case "read_bytes": + io.ReadBytes = val + case "write_bytes": + io.WriteBytes = val + } + } + return io, nil +} + +// ParseThreads extracts the Threads count from /proc/{pid}/status. +func ParseThreads(data string) (int, error) { + for _, line := range strings.Split(data, "\n") { + if strings.HasPrefix(line, "Threads:") { + parts := strings.Fields(line) + if len(parts) >= 2 { + return strconv.Atoi(parts[1]) + } + } + } + return 0, fmt.Errorf("Threads field not found") +} + +// ParseMemoryExtended parses /proc/{pid}/status for Vm*/Rss*/Hugetlb* lines. +// Returns map with lowercase keys (trailing colon preserved) to values in bytes. +func ParseMemoryExtended(data string) MemoryExtended { + m := make(MemoryExtended) + for _, line := range strings.Split(data, "\n") { + if strings.HasPrefix(line, "Vm") || strings.HasPrefix(line, "Rss") || strings.HasPrefix(line, "Hugetlb") { + parts := strings.Fields(line) + if len(parts) >= 2 { + key := strings.ToLower(parts[0]) // keeps trailing colon + val, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + continue + } + if len(parts) >= 3 && parts[2] == "kB" { + val *= 1024 + } + m[key] = val + } + } + } + return m +} + +// ParseCtxSwitches parses voluntary/involuntary context switches from /proc/{pid}/status. +func ParseCtxSwitches(data string) (CtxSwitches, error) { + var cs CtxSwitches + for _, line := range strings.Split(data, "\n") { + parts := strings.Fields(line) + if len(parts) < 2 { + continue + } + switch parts[0] { + case "voluntary_ctxt_switches:": + cs.Voluntary, _ = strconv.ParseUint(parts[1], 10, 64) + case "nonvoluntary_ctxt_switches:": + cs.Involuntary, _ = strconv.ParseUint(parts[1], 10, 64) + } + } + return cs, nil +} diff --git a/src/internal/procfs/procfs_test.go b/src/internal/procfs/procfs_test.go new file mode 100644 index 0000000..b57eeb2 --- /dev/null +++ b/src/internal/procfs/procfs_test.go @@ -0,0 +1,205 @@ +package procfs + +import ( + "testing" +) + +func TestParseCmdline(t *testing.T) { + data := []byte("/usr/bin/qemu-system-x86_64\x00-id\x00100\x00-name\x00myvm\x00-cpu\x00host\x00-smp\x004\x00-m\x002048\x00") + args := ParseCmdline(data) + if len(args) != 11 { + t.Fatalf("expected 11 args, got %d: %v", len(args), args) + } + if args[0] != "/usr/bin/qemu-system-x86_64" { + t.Fatalf("unexpected first arg: %s", args[0]) + } +} + +func TestFlagValue(t *testing.T) { + cmdline := []string{"/usr/bin/qemu-system-x86_64", "-id", "100", "-name", "myvm", "-cpu", "host"} + tests := []struct { + flag, want string + }{ + {"-id", "100"}, + {"-name", "myvm"}, + {"-cpu", "host"}, + {"-missing", ""}, + } + for _, tc := range tests { + got := FlagValue(cmdline, tc.flag) + if got != tc.want { + t.Errorf("FlagValue(%q) = %q, want %q", tc.flag, got, tc.want) + } + } +} + +func TestParseVcores(t *testing.T) { + tests := []struct { + name string + cmdline []string + want int + }{ + {"simple", []string{"-smp", "4"}, 4}, + {"with_opts", []string{"-smp", "4,sockets=1,cores=4,maxcpus=4"}, 4}, + {"missing", []string{"-m", "1024"}, 0}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := ParseVcores(tc.cmdline) + if got != tc.want { + t.Errorf("got %d, want %d", got, tc.want) + } + }) + } +} + +func TestParseMem(t *testing.T) { + tests := []struct { + name string + cmdline []string + want int64 + }{ + { + "simple", + []string{"-m", "1024"}, + 1024 * 1024, // kB + }, + { + "numa", + []string{ + "-m", "size=4096M,slots=255,maxmem=524288M", + "-object", "memory-backend-ram,id=ram-node0,size=2048M", + "-object", "memory-backend-ram,id=ram-node1,size=2048M", + }, + 4096 * 1024, // 2048+2048 MB in kB + }, + { + "missing", + []string{"-smp", "4"}, + 0, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := ParseMem(tc.cmdline) + if got != tc.want { + t.Errorf("got %d, want %d", got, tc.want) + } + }) + } +} + +func TestParseStat(t *testing.T) { + // Realistic /proc/{pid}/stat with QEMU process name containing spaces + // Fields after ')': state(3) ppid(4) pgrp(5) session(6) tty_nr(7) tpgid(8) flags(9) + // minflt(10) cminflt(11) majflt(12) cmajflt(13) utime(14) stime(15) + // cutime(16) cstime(17) priority(18) nice(19) num_threads(20) itrealvalue(21) + // starttime(22) vsize(23) rss(24) rsslim(25) startcode(26) endcode(27) startstack(28) + // kstkesp(29) kstkeip(30) signal(31) blocked(32) sigignore(33) sigcatch(34) wchan(35) + // nswap(36) cnswap(37) exit_signal(38) processor(39) rt_priority(40) policy(41) + // delayacct_blkio_ticks(42) + stat := `12345 (qemu-system-x86) S 1 12345 12345 0 -1 4194304 1000 0 0 0 500 200 0 0 20 0 50 0 100 1000000 500 18446744073709551615 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 42` + cpu, err := ParseStat(stat) + if err != nil { + t.Fatal(err) + } + if cpu.User != 5.0 { // 500/100 + t.Errorf("User = %f, want 5.0", cpu.User) + } + if cpu.System != 2.0 { // 200/100 + t.Errorf("System = %f, want 2.0", cpu.System) + } + if cpu.IOWait != 0.42 { // 42/100 + t.Errorf("IOWait = %f, want 0.42", cpu.IOWait) + } +} + +func TestParseIO(t *testing.T) { + data := `rchar: 123456 +wchar: 789012 +syscr: 100 +syscw: 200 +read_bytes: 4096 +write_bytes: 8192 +cancelled_write_bytes: 0 +` + io, err := ParseIO(data) + if err != nil { + t.Fatal(err) + } + if io.ReadChars != 123456 { + t.Errorf("ReadChars = %d", io.ReadChars) + } + if io.WriteChars != 789012 { + t.Errorf("WriteChars = %d", io.WriteChars) + } + if io.ReadSyscalls != 100 { + t.Errorf("ReadSyscalls = %d", io.ReadSyscalls) + } + if io.WriteSyscalls != 200 { + t.Errorf("WriteSyscalls = %d", io.WriteSyscalls) + } + if io.ReadBytes != 4096 { + t.Errorf("ReadBytes = %d", io.ReadBytes) + } + if io.WriteBytes != 8192 { + t.Errorf("WriteBytes = %d", io.WriteBytes) + } +} + +func TestParseThreads(t *testing.T) { + data := `Name: qemu-system-x86 +Threads: 50 +VmPeak: 1234 kB +` + n, err := ParseThreads(data) + if err != nil { + t.Fatal(err) + } + if n != 50 { + t.Errorf("got %d, want 50", n) + } +} + +func TestParseMemoryExtended(t *testing.T) { + data := `Name: qemu-system-x86 +VmPeak: 1000 kB +VmRSS: 500 kB +VmData: 200 kB +RssAnon: 100 kB +HugetlbPages: 0 kB +` + m := ParseMemoryExtended(data) + if m["vmpeak:"] != 1000*1024 { + t.Errorf("VmPeak = %d", m["vmpeak:"]) + } + if m["vmrss:"] != 500*1024 { + t.Errorf("VmRSS = %d", m["vmrss:"]) + } + if m["vmdata:"] != 200*1024 { + t.Errorf("VmData = %d", m["vmdata:"]) + } + if m["rssanon:"] != 100*1024 { + t.Errorf("RssAnon = %d", m["rssanon:"]) + } + if m["hugetlbpages:"] != 0 { + t.Errorf("HugetlbPages = %d", m["hugetlbpages:"]) + } +} + +func TestParseCtxSwitches(t *testing.T) { + data := `Name: qemu +voluntary_ctxt_switches: 1234 +nonvoluntary_ctxt_switches: 56 +` + cs, err := ParseCtxSwitches(data) + if err != nil { + t.Fatal(err) + } + if cs.Voluntary != 1234 { + t.Errorf("Voluntary = %d", cs.Voluntary) + } + if cs.Involuntary != 56 { + t.Errorf("Involuntary = %d", cs.Involuntary) + } +} diff --git a/src/internal/pveconfig/pool.go b/src/internal/pveconfig/pool.go new file mode 100644 index 0000000..a863dbd --- /dev/null +++ b/src/internal/pveconfig/pool.go @@ -0,0 +1,68 @@ +package pveconfig + +import ( + "strings" + + "pve_local_exporter/internal/cache" +) + +// PoolInfo holds parsed pool hierarchy info. +type PoolInfo struct { + LevelCount int + Level1 string + Level2 string + Level3 string +} + +// FileReader abstracts file reading for testability. +type FileReader interface { + ReadFile(path string) (string, error) + Stat(path string) (cache.StatFunc, error) +} + +// ParsePoolConfig parses /etc/pve/user.cfg for pool definitions. +// Returns (vm_pool_map, pools). +// vm_pool_map: vmid -> pool_name +// pools: pool_name -> PoolInfo +func ParsePoolConfig(data string) (map[string]string, map[string]PoolInfo) { + vmPoolMap := make(map[string]string) + pools := make(map[string]PoolInfo) + + for _, line := range strings.Split(data, "\n") { + if !strings.HasPrefix(line, "pool:") { + continue + } + parts := strings.Split(strings.TrimSpace(line), ":") + if len(parts) < 2 { + continue + } + poolName := parts[1] + + poolParts := strings.Split(poolName, "/") + info := PoolInfo{ + LevelCount: len(poolParts), + } + if len(poolParts) > 0 { + info.Level1 = poolParts[0] + } + if len(poolParts) > 1 { + info.Level2 = poolParts[1] + } + if len(poolParts) > 2 { + info.Level3 = poolParts[2] + } + pools[poolName] = info + + // VM list is in parts[3] if it exists + if len(parts) > 3 && parts[3] != "" { + for _, vmid := range strings.Split(parts[3], ",") { + vmid = strings.TrimSpace(vmid) + if vmid != "" { + vmPoolMap[vmid] = poolName + } + } + } + } + + return vmPoolMap, pools +} diff --git a/src/internal/pveconfig/pool_test.go b/src/internal/pveconfig/pool_test.go new file mode 100644 index 0000000..3a4efda --- /dev/null +++ b/src/internal/pveconfig/pool_test.go @@ -0,0 +1,61 @@ +package pveconfig + +import ( + "testing" +) + +func TestParsePoolConfig_Basic(t *testing.T) { + data := `user:root@pam:1:0:::root@pam: +pool:production:Some comment:100,200,300 +pool:staging:Staging env:400 +pool:production/tier1:Tier 1:500,600 +` + vmMap, pools := ParsePoolConfig(data) + + // Check VM mappings + if vmMap["100"] != "production" { + t.Errorf("VM 100 pool = %q, want production", vmMap["100"]) + } + if vmMap["200"] != "production" { + t.Errorf("VM 200 pool = %q", vmMap["200"]) + } + if vmMap["400"] != "staging" { + t.Errorf("VM 400 pool = %q", vmMap["400"]) + } + if vmMap["500"] != "production/tier1" { + t.Errorf("VM 500 pool = %q", vmMap["500"]) + } + + // Check pool info + prod := pools["production"] + if prod.LevelCount != 1 || prod.Level1 != "production" { + t.Errorf("production pool = %+v", prod) + } + + tier1 := pools["production/tier1"] + if tier1.LevelCount != 2 || tier1.Level1 != "production" || tier1.Level2 != "tier1" { + t.Errorf("production/tier1 pool = %+v", tier1) + } +} + +func TestParsePoolConfig_NoVMs(t *testing.T) { + data := `pool:empty:No VMs: +` + vmMap, pools := ParsePoolConfig(data) + if len(vmMap) != 0 { + t.Errorf("expected no VM mappings, got %d", len(vmMap)) + } + if _, ok := pools["empty"]; !ok { + t.Error("expected 'empty' pool to exist") + } +} + +func TestParsePoolConfig_ThreeLevels(t *testing.T) { + data := `pool:a/b/c::deep:100 +` + _, pools := ParsePoolConfig(data) + p := pools["a/b/c"] + if p.LevelCount != 3 || p.Level1 != "a" || p.Level2 != "b" || p.Level3 != "c" { + t.Errorf("pool = %+v", p) + } +} diff --git a/src/internal/pveconfig/storage.go b/src/internal/pveconfig/storage.go new file mode 100644 index 0000000..429a8de --- /dev/null +++ b/src/internal/pveconfig/storage.go @@ -0,0 +1,71 @@ +package pveconfig + +import ( + "regexp" + "strings" +) + +// StorageEntry holds a parsed storage definition from storage.cfg. +type StorageEntry struct { + Properties map[string]string +} + +var sanitizeRe = regexp.MustCompile(`[^a-zA-Z0-9_]`) + +// SanitizeKey replaces non-alphanumeric/underscore chars with underscore. +func SanitizeKey(key string) string { + return sanitizeRe.ReplaceAllString(key, "_") +} + +// ParseStorageConfig parses /etc/pve/storage.cfg content. +// Returns a list of storage entries, each with sanitized key-value properties. +func ParseStorageConfig(data string) []StorageEntry { + var result []StorageEntry + var current *StorageEntry + + for _, line := range strings.Split(data, "\n") { + line = strings.TrimSpace(line) + + if line == "" || strings.HasPrefix(line, "#") { + continue + } + + if strings.Contains(line, ":") && !strings.HasPrefix(line, "\t") && !strings.HasPrefix(line, " ") { + // Check if this is a section header (type: name) + colonIdx := strings.Index(line, ":") + sectionType := strings.TrimSpace(line[:colonIdx]) + sectionName := strings.TrimSpace(line[colonIdx+1:]) + + // Only treat as header if type has no spaces (it's a single word) + if !strings.Contains(sectionType, " ") { + if current != nil { + result = append(result, *current) + } + current = &StorageEntry{ + Properties: map[string]string{ + "type": SanitizeKey(sectionType), + "name": SanitizeKey(sectionName), + }, + } + continue + } + } + + // Key-value property line + if current != nil { + parts := strings.SplitN(line, " ", 2) + key := SanitizeKey(strings.TrimSpace(parts[0])) + if len(parts) > 1 { + current.Properties[key] = strings.TrimSpace(parts[1]) + } else { + current.Properties[key] = "true" + } + } + } + + if current != nil { + result = append(result, *current) + } + + return result +} diff --git a/src/internal/pveconfig/storage_test.go b/src/internal/pveconfig/storage_test.go new file mode 100644 index 0000000..598f5f6 --- /dev/null +++ b/src/internal/pveconfig/storage_test.go @@ -0,0 +1,106 @@ +package pveconfig + +import ( + "testing" +) + +func TestSanitizeKey(t *testing.T) { + tests := []struct { + in, want string + }{ + {"simple", "simple"}, + {"with-dash", "with_dash"}, + {"with.dot", "with_dot"}, + {"with space", "with_space"}, + {"key123", "key123"}, + } + for _, tc := range tests { + got := SanitizeKey(tc.in) + if got != tc.want { + t.Errorf("SanitizeKey(%q) = %q, want %q", tc.in, got, tc.want) + } + } +} + +func TestParseStorageConfig_Basic(t *testing.T) { + data := `dir: local + path /var/lib/vz + content iso,vztmpl,backup + maxfiles 3 + +zfspool: local-zfs + pool rpool/data + content images,rootdir + sparse 1 + +nfs: nas-backup + export /mnt/backup + path /mnt/pve/nas-backup + server 10.0.0.1 + content backup +` + entries := ParseStorageConfig(data) + if len(entries) != 3 { + t.Fatalf("expected 3 entries, got %d", len(entries)) + } + + // Check dir entry + e := entries[0] + if e.Properties["type"] != "dir" { + t.Errorf("type = %q", e.Properties["type"]) + } + if e.Properties["name"] != "local" { + t.Errorf("name = %q", e.Properties["name"]) + } + if e.Properties["path"] != "/var/lib/vz" { + t.Errorf("path = %q", e.Properties["path"]) + } + if e.Properties["content"] != "iso,vztmpl,backup" { + t.Errorf("content = %q", e.Properties["content"]) + } + + // Check zfspool entry + e = entries[1] + if e.Properties["type"] != "zfspool" { + t.Errorf("type = %q", e.Properties["type"]) + } + if e.Properties["name"] != "local_zfs" { + t.Errorf("name = %q, want local_zfs", e.Properties["name"]) + } + if e.Properties["pool"] != "rpool/data" { + t.Errorf("pool = %q", e.Properties["pool"]) + } + + // Check nfs entry + e = entries[2] + if e.Properties["type"] != "nfs" { + t.Errorf("type = %q", e.Properties["type"]) + } + if e.Properties["server"] != "10.0.0.1" { + t.Errorf("server = %q", e.Properties["server"]) + } +} + +func TestParseStorageConfig_Comments(t *testing.T) { + data := `# This is a comment +dir: local + path /var/lib/vz + # inline comment + content iso +` + entries := ParseStorageConfig(data) + if len(entries) != 1 { + t.Fatalf("expected 1 entry, got %d", len(entries)) + } +} + +func TestParseStorageConfig_BooleanValue(t *testing.T) { + data := `zfspool: tank + pool rpool/data + sparse +` + entries := ParseStorageConfig(data) + if entries[0].Properties["sparse"] != "true" { + t.Errorf("sparse = %q, want 'true'", entries[0].Properties["sparse"]) + } +} diff --git a/src/internal/qmmonitor/block.go b/src/internal/qmmonitor/block.go new file mode 100644 index 0000000..efcc922 --- /dev/null +++ b/src/internal/qmmonitor/block.go @@ -0,0 +1,174 @@ +package qmmonitor + +import ( + "encoding/json" + "fmt" + "regexp" + "strings" +) + +// DiskInfo holds parsed block device info from "info block". +type DiskInfo struct { + DiskName string + BlockID string + DiskPath string + DiskType string + Labels map[string]string // additional labels: vol_name, pool, device, etc. +} + +// blockHeaderRe matches: "disk_name (#blockN): /path/to/disk (type, mode)" +var blockHeaderRe = regexp.MustCompile(`^(\w+) \(#block(\d+)\): (.+) \(([\w, -]+)\)$`) + +// lvmRe matches: /dev/{vg_name}/vm-{N}-disk-{N} +var lvmRe = regexp.MustCompile(`^/dev/([^/]+)/(vm-\d+-disk-\d+)$`) + +// ParseBlockInfo parses "info block" output from qm monitor. +// Returns map of disk_name -> DiskInfo. Skips efidisk entries. +func ParseBlockInfo(raw string) map[string]DiskInfo { + result := make(map[string]DiskInfo) + + // Split by "drive-" prefix to get individual disk blocks + parts := strings.Split(raw, "drive-") + if len(parts) < 2 { + return result + } + + for _, part := range parts[1:] { + lines := strings.Split(strings.TrimSpace(part), "\n") + if len(lines) == 0 { + continue + } + + match := blockHeaderRe.FindStringSubmatch(strings.TrimSpace(lines[0])) + if match == nil { + continue + } + + diskName := match[1] + blockID := match[2] + diskPath := match[3] + diskTypeAndMode := match[4] + diskType := strings.Split(diskTypeAndMode, ", ")[0] + + // Skip EFI disks + if strings.Contains(diskName, "efidisk") { + continue + } + + // Handle json: paths + if strings.HasPrefix(diskPath, "json:") { + resolved, err := HandleJSONPath(diskPath) + if err != nil { + continue + } + diskPath = resolved + } + + info := DiskInfo{ + DiskName: diskName, + BlockID: blockID, + DiskPath: diskPath, + DiskType: diskType, + Labels: make(map[string]string), + } + + // Detect disk type from path + classifyDisk(&info) + + // Parse additional info from remaining lines + for _, line := range lines[1:] { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "Attached to:") { + // Extract device ID, e.g. "Attached to: /machine/peripheral/virtio0/virtio-backend" + val := strings.TrimSpace(strings.TrimPrefix(line, "Attached to:")) + info.Labels["attached_to"] = val + } else if strings.HasPrefix(line, "Cache mode:") { + val := strings.TrimSpace(strings.TrimPrefix(line, "Cache mode:")) + for _, mode := range strings.Split(val, ", ") { + mode = strings.TrimSpace(mode) + if mode != "" { + key := "cache_mode_" + strings.ReplaceAll(mode, " ", "_") + info.Labels[key] = "true" + } + } + } else if strings.HasPrefix(line, "Detect zeroes:") { + info.Labels["detect_zeroes"] = "on" + } + } + + result[diskName] = info + } + + return result +} + +// classifyDisk sets DiskType and extra labels based on the disk path. +func classifyDisk(info *DiskInfo) { + path := info.DiskPath + + if info.DiskType == "qcow2" { + // Extract volume name: filename without extension + parts := strings.Split(path, "/") + filename := parts[len(parts)-1] + dotIdx := strings.Index(filename, ".") + if dotIdx > 0 { + info.Labels["vol_name"] = filename[:dotIdx] + } + } + + if strings.HasPrefix(path, "/dev/zvol/") { + info.DiskType = "zvol" + // /dev/zvol/pool_name/vol_name + trimmed := strings.TrimPrefix(path, "/dev/zvol/") + parts := strings.Split(trimmed, "/") + if len(parts) >= 2 { + info.Labels["pool"] = strings.Join(parts[:len(parts)-1], "/") + info.Labels["vol_name"] = parts[len(parts)-1] + } + } else if strings.HasPrefix(path, "/dev/rbd-pve/") { + info.DiskType = "rbd" + // /dev/rbd-pve/cluster_id/pool/vol_name + parts := strings.Split(path, "/") + if len(parts) >= 5 { + info.Labels["cluster_id"] = parts[len(parts)-3] + info.Labels["pool"] = parts[len(parts)-2] + info.Labels["pool_name"] = parts[len(parts)-2] + info.Labels["vol_name"] = parts[len(parts)-1] + } + } else if m := lvmRe.FindStringSubmatch(path); m != nil { + info.DiskType = "lvm" + info.Labels["vg_name"] = m[1] + info.Labels["vol_name"] = m[2] + } +} + +// HandleJSONPath resolves a "json:{...}" disk path by searching for +// a driver == "host_device" entry and extracting its filename. +func HandleJSONPath(path string) (string, error) { + jsonStr := strings.TrimPrefix(path, "json:") + var data map[string]any + if err := json.Unmarshal([]byte(jsonStr), &data); err != nil { + return "", fmt.Errorf("parse json path: %w", err) + } + if result := searchHostDevice(data); result != "" { + return result, nil + } + return "", fmt.Errorf("no host_device driver found in json path") +} + +func searchHostDevice(data map[string]any) string { + driver, _ := data["driver"].(string) + if driver == "host_device" { + if filename, ok := data["filename"].(string); ok { + return filename + } + } + for _, v := range data { + if sub, ok := v.(map[string]any); ok { + if result := searchHostDevice(sub); result != "" { + return result + } + } + } + return "" +} diff --git a/src/internal/qmmonitor/block_test.go b/src/internal/qmmonitor/block_test.go new file mode 100644 index 0000000..d654755 --- /dev/null +++ b/src/internal/qmmonitor/block_test.go @@ -0,0 +1,149 @@ +package qmmonitor + +import ( + "testing" +) + +func TestParseBlockInfo_Qcow2(t *testing.T) { + raw := `drive-scsi0 (#block100): /mnt/storage/images/100/vm-100-disk-0.qcow2 (qcow2, read-write) + Attached to: /machine/peripheral/virtioscsi0/virtio-backend + Cache mode: writeback, direct + Detect zeroes: on +` + disks := ParseBlockInfo(raw) + if len(disks) != 1 { + t.Fatalf("expected 1 disk, got %d", len(disks)) + } + d := disks["scsi0"] + if d.DiskType != "qcow2" { + t.Errorf("type = %q", d.DiskType) + } + if d.BlockID != "100" { + t.Errorf("block_id = %q", d.BlockID) + } + if d.Labels["vol_name"] != "vm-100-disk-0" { + t.Errorf("vol_name = %q", d.Labels["vol_name"]) + } + if d.Labels["detect_zeroes"] != "on" { + t.Errorf("detect_zeroes = %q", d.Labels["detect_zeroes"]) + } + if d.Labels["cache_mode_writeback"] != "true" { + t.Errorf("cache_mode_writeback missing") + } + if d.Labels["cache_mode_direct"] != "true" { + t.Errorf("cache_mode_direct missing") + } +} + +func TestParseBlockInfo_Zvol(t *testing.T) { + raw := `drive-scsi0 (#block200): /dev/zvol/rpool/data/vm-200-disk-0 (raw, read-write) + Attached to: /machine/peripheral/virtioscsi0/virtio-backend +` + disks := ParseBlockInfo(raw) + d := disks["scsi0"] + if d.DiskType != "zvol" { + t.Errorf("type = %q", d.DiskType) + } + if d.Labels["pool"] != "rpool/data" { + t.Errorf("pool = %q", d.Labels["pool"]) + } + if d.Labels["vol_name"] != "vm-200-disk-0" { + t.Errorf("vol_name = %q", d.Labels["vol_name"]) + } +} + +func TestParseBlockInfo_RBD(t *testing.T) { + raw := `drive-scsi0 (#block300): /dev/rbd-pve/ceph1/pool1/vm-300-disk-0 (raw, read-write) +` + disks := ParseBlockInfo(raw) + d := disks["scsi0"] + if d.DiskType != "rbd" { + t.Errorf("type = %q", d.DiskType) + } + if d.Labels["cluster_id"] != "ceph1" { + t.Errorf("cluster_id = %q", d.Labels["cluster_id"]) + } + if d.Labels["pool"] != "pool1" { + t.Errorf("pool = %q", d.Labels["pool"]) + } + if d.Labels["vol_name"] != "vm-300-disk-0" { + t.Errorf("vol_name = %q", d.Labels["vol_name"]) + } +} + +func TestParseBlockInfo_LVM(t *testing.T) { + raw := `drive-scsi0 (#block400): /dev/myvg/vm-400-disk-0 (raw, read-write) +` + disks := ParseBlockInfo(raw) + d := disks["scsi0"] + if d.DiskType != "lvm" { + t.Errorf("type = %q", d.DiskType) + } + if d.Labels["vg_name"] != "myvg" { + t.Errorf("vg_name = %q", d.Labels["vg_name"]) + } + if d.Labels["vol_name"] != "vm-400-disk-0" { + t.Errorf("vol_name = %q", d.Labels["vol_name"]) + } +} + +func TestParseBlockInfo_SkipsEFI(t *testing.T) { + raw := `drive-efidisk0 (#block500): /dev/zvol/rpool/data/vm-500-disk-1 (raw, read-write) +drive-scsi0 (#block501): /dev/zvol/rpool/data/vm-500-disk-0 (raw, read-write) +` + disks := ParseBlockInfo(raw) + if len(disks) != 1 { + t.Fatalf("expected 1 disk (efidisk skipped), got %d", len(disks)) + } + if _, ok := disks["efidisk0"]; ok { + t.Error("efidisk0 should be skipped") + } +} + +func TestHandleJSONPath(t *testing.T) { + jsonPath := `json:{"driver":"raw","file":{"driver":"host_device","filename":"/dev/zvol/rpool/data/vm-100-disk-0"}}` + result, err := HandleJSONPath(jsonPath) + if err != nil { + t.Fatal(err) + } + if result != "/dev/zvol/rpool/data/vm-100-disk-0" { + t.Errorf("got %q", result) + } +} + +func TestHandleJSONPath_Nested(t *testing.T) { + jsonPath := `json:{"driver":"raw","file":{"driver":"copy-on-read","file":{"driver":"host_device","filename":"/dev/rbd-pve/ceph/pool/vm-200-disk-0"}}}` + result, err := HandleJSONPath(jsonPath) + if err != nil { + t.Fatal(err) + } + if result != "/dev/rbd-pve/ceph/pool/vm-200-disk-0" { + t.Errorf("got %q", result) + } +} + +func TestHandleJSONPath_NoHostDevice(t *testing.T) { + jsonPath := `json:{"driver":"raw","file":{"driver":"file","filename":"/tmp/test.img"}}` + _, err := HandleJSONPath(jsonPath) + if err == nil { + t.Fatal("expected error for missing host_device") + } +} + +func TestParseBlockInfo_MultiDisk(t *testing.T) { + raw := `drive-scsi0 (#block100): /dev/zvol/rpool/data/vm-100-disk-0 (raw, read-write) + Attached to: /machine/peripheral/virtioscsi0/virtio-backend +drive-scsi1 (#block101): /mnt/storage/images/100/vm-100-disk-1.qcow2 (qcow2, read-write) + Attached to: /machine/peripheral/virtioscsi0/virtio-backend +` + disks := ParseBlockInfo(raw) + if len(disks) != 2 { + t.Fatalf("expected 2 disks, got %d", len(disks)) + } + if disks["scsi0"].DiskType != "zvol" { + t.Errorf("scsi0 type = %q", disks["scsi0"].DiskType) + } + if disks["scsi1"].DiskType != "qcow2" { + t.Errorf("scsi1 type = %q", disks["scsi1"].DiskType) + } +} diff --git a/src/internal/qmmonitor/network.go b/src/internal/qmmonitor/network.go new file mode 100644 index 0000000..69342d7 --- /dev/null +++ b/src/internal/qmmonitor/network.go @@ -0,0 +1,77 @@ +package qmmonitor + +import ( + "strconv" + "strings" +) + +// NICInfo holds parsed network interface info from "info network". +type NICInfo struct { + Netdev string + Queues int + Type string + Model string + Macaddr string + Ifname string +} + +// ParseNetworkInfo parses the output of "info network" from qm monitor. +// Format: "net0: index=0,type=tap,ifname=tap100i0,model=virtio-net-pci,macaddr=AA:BB:CC:DD:EE:FF" +// Multiqueue lines: " \ net0: index=1,type=tap,ifname=tap100i0" +// For multiqueue, same netdev appears multiple times with increasing index; queues = max(index)+1. +func ParseNetworkInfo(raw string) []NICInfo { + nicsMap := make(map[string]map[string]string) + + lines := strings.Split(raw, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + // Strip leading "\ " from continuation lines + line = strings.TrimPrefix(line, "\\ ") + line = strings.TrimSpace(line) + if line == "" { + continue + } + + colonIdx := strings.Index(line, ": ") + if colonIdx < 0 { + continue + } + netdev := line[:colonIdx] + if !strings.HasPrefix(netdev, "net") { + continue + } + + cfg := line[colonIdx+2:] + if _, ok := nicsMap[netdev]; !ok { + nicsMap[netdev] = make(map[string]string) + } + for _, pair := range strings.Split(cfg, ",") { + pair = strings.TrimSpace(pair) + if pair == "" { + continue + } + eqIdx := strings.Index(pair, "=") + if eqIdx < 0 { + continue + } + key := pair[:eqIdx] + value := pair[eqIdx+1:] + // Overwrite so last index wins (for multiqueue) + nicsMap[netdev][key] = value + } + } + + var result []NICInfo + for netdev, cfg := range nicsMap { + idx, _ := strconv.Atoi(cfg["index"]) + result = append(result, NICInfo{ + Netdev: netdev, + Queues: idx + 1, + Type: cfg["type"], + Model: cfg["model"], + Macaddr: cfg["macaddr"], + Ifname: cfg["ifname"], + }) + } + return result +} diff --git a/src/internal/qmmonitor/network_test.go b/src/internal/qmmonitor/network_test.go new file mode 100644 index 0000000..47935f9 --- /dev/null +++ b/src/internal/qmmonitor/network_test.go @@ -0,0 +1,65 @@ +package qmmonitor + +import ( + "testing" +) + +func TestParseNetworkInfo_Single(t *testing.T) { + raw := `net0: index=0,type=tap,ifname=tap100i0,script=/var/lib/qemu-server/pve-bridge,downscript=/var/lib/qemu-server/pve-bridgedown,model=virtio-net-pci,macaddr=AA:BB:CC:DD:EE:FF` + + nics := ParseNetworkInfo(raw) + if len(nics) != 1 { + t.Fatalf("expected 1 NIC, got %d", len(nics)) + } + nic := nics[0] + if nic.Netdev != "net0" { + t.Errorf("netdev = %q", nic.Netdev) + } + if nic.Queues != 1 { + t.Errorf("queues = %d", nic.Queues) + } + if nic.Type != "tap" { + t.Errorf("type = %q", nic.Type) + } + if nic.Model != "virtio-net-pci" { + t.Errorf("model = %q", nic.Model) + } + if nic.Macaddr != "AA:BB:CC:DD:EE:FF" { + t.Errorf("macaddr = %q", nic.Macaddr) + } + if nic.Ifname != "tap100i0" { + t.Errorf("ifname = %q", nic.Ifname) + } +} + +func TestParseNetworkInfo_Multiqueue(t *testing.T) { + raw := `net0: index=0,type=tap,ifname=tap100i0,model=virtio-net-pci,macaddr=AA:BB:CC:DD:EE:FF + \ net0: index=1,type=tap,ifname=tap100i0 + \ net0: index=2,type=tap,ifname=tap100i0 + \ net0: index=3,type=tap,ifname=tap100i0 +net1: index=0,type=tap,ifname=tap100i1,model=virtio-net-pci,macaddr=11:22:33:44:55:66` + + nics := ParseNetworkInfo(raw) + if len(nics) != 2 { + t.Fatalf("expected 2 NICs, got %d", len(nics)) + } + + byName := map[string]NICInfo{} + for _, n := range nics { + byName[n.Netdev] = n + } + + if byName["net0"].Queues != 4 { + t.Errorf("net0 queues = %d, want 4", byName["net0"].Queues) + } + if byName["net1"].Queues != 1 { + t.Errorf("net1 queues = %d, want 1", byName["net1"].Queues) + } +} + +func TestParseNetworkInfo_Empty(t *testing.T) { + nics := ParseNetworkInfo("") + if len(nics) != 0 { + t.Fatalf("expected 0 NICs, got %d", len(nics)) + } +} diff --git a/src/internal/qmmonitor/qmmonitor.go b/src/internal/qmmonitor/qmmonitor.go new file mode 100644 index 0000000..892e560 --- /dev/null +++ b/src/internal/qmmonitor/qmmonitor.go @@ -0,0 +1,190 @@ +package qmmonitor + +import ( + "bufio" + "fmt" + "io" + "log/slog" + "os/exec" + "strings" + "sync" + "time" + + "pve_local_exporter/internal/cache" +) + +// QMMonitor runs commands against qm monitor and caches results. +type QMMonitor interface { + RunCommand(vmid, cmd string) (string, error) + InvalidateCache(vmid, cmd string) +} + +// RealQMMonitor spawns `qm monitor` via os/exec with pipe-based I/O. +type RealQMMonitor struct { + timeout time.Duration + deferClose bool + cache *cache.TTLCache[string, string] + + mu sync.Mutex + deferredProcs []deferredProc +} + +type deferredProc struct { + cmd *exec.Cmd + stdin io.WriteCloser + timestamp time.Time +} + +func NewRealQMMonitor(timeout, maxTTL, randRange time.Duration, deferClose bool) *RealQMMonitor { + return &RealQMMonitor{ + timeout: timeout, + deferClose: deferClose, + cache: cache.NewTTLCache[string, string](maxTTL, randRange), + } +} + +func cacheKey(vmid, cmd string) string { + return vmid + "\x00" + cmd +} + +func (m *RealQMMonitor) InvalidateCache(vmid, cmd string) { + m.cache.Invalidate(cacheKey(vmid, cmd)) +} + +func (m *RealQMMonitor) RunCommand(vmid, cmd string) (string, error) { + key := cacheKey(vmid, cmd) + if v, ok := m.cache.Get(key); ok { + return v, nil + } + + result, err := m.execQMMonitor(vmid, cmd) + if err != nil { + return "", err + } + m.cache.Set(key, result) + m.cleanupDeferred() + return result, nil +} + +func (m *RealQMMonitor) execQMMonitor(vmid, cmd string) (string, error) { + qmCmd := exec.Command("qm", "monitor", vmid) + + stdin, err := qmCmd.StdinPipe() + if err != nil { + return "", fmt.Errorf("stdin pipe: %w", err) + } + stdout, err := qmCmd.StdoutPipe() + if err != nil { + return "", fmt.Errorf("stdout pipe: %w", err) + } + + if err := qmCmd.Start(); err != nil { + return "", fmt.Errorf("start qm monitor: %w", err) + } + + reader := bufio.NewReader(stdout) + + // Wait for initial "qm>" prompt + if err := readUntilPrompt(reader, m.timeout); err != nil { + m.deferCloseProcess(qmCmd, stdin) + return "", fmt.Errorf("initial prompt: %w", err) + } + + // Send command + fmt.Fprintf(stdin, "%s\n", cmd) + + // Read response until next "qm>" prompt + response, err := readResponseUntilPrompt(reader, m.timeout) + if err != nil { + m.deferCloseProcess(qmCmd, stdin) + return "", fmt.Errorf("read response: %w", err) + } + + // Close cleanly + stdin.Close() + if err := qmCmd.Wait(); err != nil { + slog.Debug("qm monitor wait error", "vmid", vmid, "err", err) + } + + return response, nil +} + +func (m *RealQMMonitor) deferCloseProcess(cmd *exec.Cmd, stdin io.WriteCloser) { + stdin.Close() + if m.deferClose { + m.mu.Lock() + m.deferredProcs = append(m.deferredProcs, deferredProc{cmd: cmd, stdin: stdin, timestamp: time.Now()}) + m.mu.Unlock() + slog.Warn("deferred closing qm monitor process", "pid", cmd.Process.Pid) + } else { + cmd.Process.Kill() + cmd.Wait() + } +} + +func (m *RealQMMonitor) cleanupDeferred() { + m.mu.Lock() + defer m.mu.Unlock() + + var still []deferredProc + for _, dp := range m.deferredProcs { + if time.Since(dp.timestamp) > 10*time.Second { + if err := dp.cmd.Process.Kill(); err != nil { + still = append(still, dp) + } else { + dp.cmd.Wait() + } + } else { + still = append(still, dp) + } + } + m.deferredProcs = still +} + +func readUntilPrompt(r *bufio.Reader, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for { + if time.Now().After(deadline) { + return fmt.Errorf("timeout waiting for qm> prompt") + } + line, err := r.ReadString('\n') + if err != nil { + // Check if we got the prompt without newline + if strings.Contains(line, "qm>") { + return nil + } + return err + } + if strings.Contains(line, "qm>") { + return nil + } + } +} + +func readResponseUntilPrompt(r *bufio.Reader, timeout time.Duration) (string, error) { + deadline := time.Now().Add(timeout) + var lines []string + firstLine := true + for { + if time.Now().After(deadline) { + return "", fmt.Errorf("timeout waiting for qm> prompt") + } + line, err := r.ReadString('\n') + if err != nil { + if strings.Contains(line, "qm>") { + break + } + return "", err + } + if strings.Contains(line, "qm>") { + break + } + // Skip the echo of the command (first line) + if firstLine { + firstLine = false + continue + } + lines = append(lines, strings.TrimRight(line, "\r\n")) + } + return strings.Join(lines, "\n"), nil +} diff --git a/src/internal/qmmonitor/qmmonitor_test.go b/src/internal/qmmonitor/qmmonitor_test.go new file mode 100644 index 0000000..7fba8c4 --- /dev/null +++ b/src/internal/qmmonitor/qmmonitor_test.go @@ -0,0 +1,59 @@ +package qmmonitor + +import ( + "testing" + "time" + + "pve_local_exporter/internal/cache" +) + +// mockQMMonitor is a test double for QMMonitor. +type mockQMMonitor struct { + responses map[string]string + cache *cache.TTLCache[string, string] +} + +func newMockQMMonitor(responses map[string]string) *mockQMMonitor { + return &mockQMMonitor{ + responses: responses, + cache: cache.NewTTLCache[string, string](time.Hour, 0), + } +} + +func (m *mockQMMonitor) RunCommand(vmid, cmd string) (string, error) { + key := cacheKey(vmid, cmd) + if v, ok := m.cache.Get(key); ok { + return v, nil + } + resp := m.responses[vmid+":"+cmd] + m.cache.Set(key, resp) + return resp, nil +} + +func (m *mockQMMonitor) InvalidateCache(vmid, cmd string) { + m.cache.Invalidate(cacheKey(vmid, cmd)) +} + +func TestMockQMMonitor_CacheHit(t *testing.T) { + mon := newMockQMMonitor(map[string]string{ + "100:info network": "some output", + }) + r1, _ := mon.RunCommand("100", "info network") + r2, _ := mon.RunCommand("100", "info network") + if r1 != r2 { + t.Errorf("cache miss: %q != %q", r1, r2) + } +} + +func TestMockQMMonitor_Invalidate(t *testing.T) { + mon := newMockQMMonitor(map[string]string{ + "100:info network": "some output", + }) + mon.RunCommand("100", "info network") + mon.InvalidateCache("100", "info network") + // After invalidation, it fetches again (same mock response) + r, _ := mon.RunCommand("100", "info network") + if r != "some output" { + t.Errorf("unexpected: %q", r) + } +} diff --git a/src/internal/storage/storage.go b/src/internal/storage/storage.go new file mode 100644 index 0000000..99fe0f6 --- /dev/null +++ b/src/internal/storage/storage.go @@ -0,0 +1,69 @@ +package storage + +import ( + "fmt" + "strconv" + "strings" + "syscall" +) + +// StorageSize holds the total and free bytes of a storage pool. +type StorageSize struct { + Total int64 + Free int64 +} + +// StatFS abstracts the statfs syscall for testability. +type StatFS interface { + Statfs(path string) (StorageSize, error) +} + +// CommandRunner abstracts command execution for testability. +type CommandRunner interface { + Run(name string, args ...string) (string, error) +} + +// RealStatFS uses the real syscall. +type RealStatFS struct{} + +func (RealStatFS) Statfs(path string) (StorageSize, error) { + var stat syscall.Statfs_t + if err := syscall.Statfs(path, &stat); err != nil { + return StorageSize{}, fmt.Errorf("statfs %s: %w", path, err) + } + return StorageSize{ + Total: int64(stat.Frsize) * int64(stat.Blocks), + Free: int64(stat.Frsize) * int64(stat.Bavail), + }, nil +} + +// GetDirStorageSize returns size info for dir/nfs/cephfs storage using statfs. +func GetDirStorageSize(fs StatFS, path string) (StorageSize, error) { + return fs.Statfs(path) +} + +// GetZPoolSize parses `zpool list -p {poolName}` output for size and free. +func GetZPoolSize(output string) (StorageSize, error) { + lines := strings.Split(strings.TrimSpace(output), "\n") + if len(lines) < 2 { + return StorageSize{}, fmt.Errorf("unexpected zpool output: %q", output) + } + + // Header: NAME SIZE ALLOC FREE ... + // Data: pool 1234 567 890 ... + fields := strings.Fields(lines[1]) + if len(fields) < 4 { + return StorageSize{}, fmt.Errorf("not enough fields in zpool output: %q", lines[1]) + } + + total, err := strconv.ParseInt(fields[1], 10, 64) + if err != nil { + return StorageSize{}, fmt.Errorf("parse total: %w", err) + } + free, err := strconv.ParseInt(fields[3], 10, 64) + if err != nil { + return StorageSize{}, fmt.Errorf("parse free: %w", err) + } + + return StorageSize{Total: total, Free: free}, nil +} diff --git a/src/internal/storage/storage_test.go b/src/internal/storage/storage_test.go new file mode 100644 index 0000000..78bd236 --- /dev/null +++ b/src/internal/storage/storage_test.go @@ -0,0 +1,55 @@ +package storage + +import ( + "testing" +) + +type mockStatFS struct { + sizes map[string]StorageSize +} + +func (m mockStatFS) Statfs(path string) (StorageSize, error) { + if s, ok := m.sizes[path]; ok { + return s, nil + } + return StorageSize{}, nil +} + +func TestGetDirStorageSize(t *testing.T) { + fs := mockStatFS{sizes: map[string]StorageSize{ + "/var/lib/vz": {Total: 1000000, Free: 500000}, + }} + s, err := GetDirStorageSize(fs, "/var/lib/vz") + if err != nil { + t.Fatal(err) + } + if s.Total != 1000000 { + t.Errorf("total = %d", s.Total) + } + if s.Free != 500000 { + t.Errorf("free = %d", s.Free) + } +} + +func TestGetZPoolSize(t *testing.T) { + output := `NAME SIZE ALLOC FREE CKPOINT EXPANDSZ FRAG CAP DEDUP HEALTH ALTROOT +rpool 1073741824 536870912 536870912 - - 10% 50% 1.00x ONLINE - +` + s, err := GetZPoolSize(output) + if err != nil { + t.Fatal(err) + } + if s.Total != 1073741824 { + t.Errorf("total = %d", s.Total) + } + if s.Free != 536870912 { + t.Errorf("free = %d", s.Free) + } +} + +func TestGetZPoolSize_BadOutput(t *testing.T) { + _, err := GetZPoolSize("bad") + if err == nil { + t.Fatal("expected error") + } +} diff --git a/src/internal/sysfs/sysfs.go b/src/internal/sysfs/sysfs.go new file mode 100644 index 0000000..bb98ec8 --- /dev/null +++ b/src/internal/sysfs/sysfs.go @@ -0,0 +1,88 @@ +package sysfs + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "strings" +) + +// SysReader abstracts /sys access for testability. +type SysReader interface { + ReadInterfaceStats(ifname string) (map[string]int64, error) + GetBlockDeviceSize(devPath string) (int64, error) +} + +// RealSysReader reads from the actual /sys filesystem. +type RealSysReader struct { + SysPath string // default "/sys" +} + +func NewRealSysReader() *RealSysReader { + return &RealSysReader{SysPath: "/sys"} +} + +// ReadInterfaceStats reads all statistics files from /sys/class/net/{ifname}/statistics/. +func (r *RealSysReader) ReadInterfaceStats(ifname string) (map[string]int64, error) { + dir := filepath.Join(r.SysPath, "class", "net", ifname, "statistics") + entries, err := os.ReadDir(dir) + if err != nil { + return nil, err + } + + stats := make(map[string]int64) + for _, e := range entries { + if e.IsDir() { + continue + } + data, err := os.ReadFile(filepath.Join(dir, e.Name())) + if err != nil { + continue + } + val, err := strconv.ParseInt(strings.TrimSpace(string(data)), 10, 64) + if err != nil { + continue + } + stats[e.Name()] = val + } + return stats, nil +} + +// GetBlockDeviceSize returns the size in bytes of a block device. +// For symlinks (e.g., /dev/zvol/...), resolves to the real device first. +// Reads size from /sys/block/{dev}/size (in 512-byte sectors). +func (r *RealSysReader) GetBlockDeviceSize(devPath string) (int64, error) { + // Resolve symlinks + resolved, err := filepath.EvalSymlinks(devPath) + if err != nil { + return 0, fmt.Errorf("resolve symlink %s: %w", devPath, err) + } + + // Extract device name from /dev/XXX + devName := filepath.Base(resolved) + + // Try /sys/block/{devName}/size + sizeFile := filepath.Join(r.SysPath, "block", devName, "size") + data, err := os.ReadFile(sizeFile) + if err != nil { + // For partition devices like dm-0, try without partition suffix + return 0, fmt.Errorf("read size %s: %w", sizeFile, err) + } + + sectors, err := strconv.ParseInt(strings.TrimSpace(string(data)), 10, 64) + if err != nil { + return 0, fmt.Errorf("parse size: %w", err) + } + + return sectors * 512, nil +} + +// GetDeviceSymlinkTarget resolves a device symlink and returns the target path. +func GetDeviceSymlinkTarget(devPath string) (string, error) { + resolved, err := filepath.EvalSymlinks(devPath) + if err != nil { + return "", err + } + return resolved, nil +} diff --git a/src/internal/sysfs/sysfs_test.go b/src/internal/sysfs/sysfs_test.go new file mode 100644 index 0000000..4bdfe2a --- /dev/null +++ b/src/internal/sysfs/sysfs_test.go @@ -0,0 +1,69 @@ +package sysfs + +import ( + "os" + "path/filepath" + "testing" +) + +func TestReadInterfaceStats(t *testing.T) { + // Create temp sysfs-like structure + tmpDir := t.TempDir() + statsDir := filepath.Join(tmpDir, "class", "net", "tap100i0", "statistics") + os.MkdirAll(statsDir, 0755) + + os.WriteFile(filepath.Join(statsDir, "rx_bytes"), []byte("123456\n"), 0644) + os.WriteFile(filepath.Join(statsDir, "tx_bytes"), []byte("789012\n"), 0644) + os.WriteFile(filepath.Join(statsDir, "rx_packets"), []byte("100\n"), 0644) + + reader := &RealSysReader{SysPath: tmpDir} + stats, err := reader.ReadInterfaceStats("tap100i0") + if err != nil { + t.Fatal(err) + } + + if stats["rx_bytes"] != 123456 { + t.Errorf("rx_bytes = %d", stats["rx_bytes"]) + } + if stats["tx_bytes"] != 789012 { + t.Errorf("tx_bytes = %d", stats["tx_bytes"]) + } + if stats["rx_packets"] != 100 { + t.Errorf("rx_packets = %d", stats["rx_packets"]) + } +} + +func TestReadInterfaceStats_NotFound(t *testing.T) { + reader := &RealSysReader{SysPath: t.TempDir()} + _, err := reader.ReadInterfaceStats("nonexistent") + if err == nil { + t.Fatal("expected error for nonexistent interface") + } +} + +func TestGetBlockDeviceSize(t *testing.T) { + tmpDir := t.TempDir() + + // Create /sys/block/dm-0/size + blockDir := filepath.Join(tmpDir, "block", "dm-0") + os.MkdirAll(blockDir, 0755) + // 1GB = 2097152 sectors of 512 bytes + os.WriteFile(filepath.Join(blockDir, "size"), []byte("2097152\n"), 0644) + + // Create a "device" symlink that points to dm-0 + devDir := filepath.Join(tmpDir, "dev") + os.MkdirAll(devDir, 0755) + os.Symlink(filepath.Join(devDir, "dm-0"), filepath.Join(devDir, "mydev")) + // Create the actual "device" file so symlink resolves + os.WriteFile(filepath.Join(devDir, "dm-0"), []byte{}, 0644) + + reader := &RealSysReader{SysPath: tmpDir} + size, err := reader.GetBlockDeviceSize(filepath.Join(devDir, "dm-0")) + if err != nil { + t.Fatal(err) + } + expected := int64(2097152 * 512) + if size != expected { + t.Errorf("size = %d, want %d", size, expected) + } +} diff --git a/src/main.go b/src/main.go new file mode 100644 index 0000000..63a754c --- /dev/null +++ b/src/main.go @@ -0,0 +1,66 @@ +package main + +import ( + "fmt" + "log" + "log/slog" + "net/http" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "pve_local_exporter/internal/collector" + "pve_local_exporter/internal/config" +) + +var version string + +func main() { + cfg := config.Parse() + + if cfg.ShowVersion { + fmt.Println(version) + return + } + + level := slog.LevelInfo + switch strings.ToUpper(cfg.LogLevel) { + case "DEBUG": + level = slog.LevelDebug + case "WARNING", "WARN": + level = slog.LevelWarn + case "ERROR", "CRITICAL": + level = slog.LevelError + } + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level}))) + + reg := prometheus.NewRegistry() + c := collector.New(cfg) + reg.MustRegister(c) + + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`Metrics`)) + }) + + addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) + server := &http.Server{Addr: addr, Handler: mux} + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + slog.Info("shutting down") + server.Close() + }() + + log.Printf("listening on %s", addr) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatal(err) + } +}