improvements

- Consolidate status reads
- Pre-allocate descriptors
- Add read_only disk label
- Remove dead stdin field
- Guard nil pool maps
- Add error-path tests
- Add go vet to flake
This commit is contained in:
illustris
2026-03-12 12:47:50 +05:30
parent d8f4a77657
commit def7d3e086
16 changed files with 644 additions and 267 deletions

View File

@@ -1,6 +1,7 @@
package cache
import (
"sync"
"testing"
"time"
)
@@ -37,3 +38,20 @@ func TestMtimeCache_HitAndMiss(t *testing.T) {
t.Fatalf("expected hit with 'world', got %q ok=%v", v, ok)
}
}
func TestMtimeCache_Concurrent(t *testing.T) {
mtime := time.Now()
stat := func(path string) (time.Time, error) { return mtime, nil }
c := NewMtimeCache[int]("/test", stat)
var wg sync.WaitGroup
for i := range 100 {
wg.Add(1)
go func(n int) {
defer wg.Done()
c.Set(n)
c.Get()
}(i)
}
wg.Wait()
}

View File

@@ -1,6 +1,7 @@
package cache
import (
"sync"
"testing"
"time"
)
@@ -42,6 +43,24 @@ func TestTTLCache_Invalidate(t *testing.T) {
}
}
func TestTTLCache_Concurrent(t *testing.T) {
c := NewTTLCache[string, int](time.Hour, 0)
var wg sync.WaitGroup
for i := range 100 {
wg.Add(1)
go func(n int) {
defer wg.Done()
key := "key"
c.Set(key, n)
c.Get(key)
c.Invalidate(key)
c.Set(key, n+1)
c.Get(key)
}(i)
}
wg.Wait()
}
func TestTTLCache_JitterRange(t *testing.T) {
c := NewTTLCache[string, int](time.Second, 500*time.Millisecond)
// jitteredTTL should be in [500ms, 1500ms]

View File

@@ -4,6 +4,7 @@ import (
"log/slog"
"os"
"os/exec"
"slices"
"strconv"
"strings"
"sync"
@@ -36,6 +37,20 @@ type PVECollector struct {
storageCache *cache.MtimeCache[[]pveconfig.StorageEntry]
prefix string
// Pre-allocated metric descriptors for fixed-label metrics.
descCPU *prometheus.Desc
descVcores *prometheus.Desc
descMaxmem *prometheus.Desc
descMemPct *prometheus.Desc
descMemExt *prometheus.Desc
descThreads *prometheus.Desc
descCtxSwitches *prometheus.Desc
descNicInfo *prometheus.Desc
descNicQueues *prometheus.Desc
descDiskSize *prometheus.Desc
descStorageSize *prometheus.Desc
descStorageFree *prometheus.Desc
}
type poolData struct {
@@ -93,6 +108,7 @@ func New(cfg config.Config) *PVECollector {
func NewWithDeps(cfg config.Config, proc procfs.ProcReader, sys sysfs.SysReader,
qm qmmonitor.QMMonitor, statFS storage.StatFS, cmd CommandRunner, fr FileReaderIface) *PVECollector {
p := cfg.MetricsPrefix
c := &PVECollector{
cfg: cfg,
proc: proc,
@@ -101,7 +117,20 @@ func NewWithDeps(cfg config.Config, proc procfs.ProcReader, sys sysfs.SysReader,
statFS: statFS,
cmdRunner: cmd,
fileReader: fr,
prefix: cfg.MetricsPrefix,
prefix: p,
descCPU: prometheus.NewDesc(p+"_kvm_cpu", "KVM CPU time", []string{"id", "mode"}, nil),
descVcores: prometheus.NewDesc(p+"_kvm_vcores", "vCores allocated", []string{"id"}, nil),
descMaxmem: prometheus.NewDesc(p+"_kvm_maxmem", "Maximum memory bytes", []string{"id"}, nil),
descMemPct: prometheus.NewDesc(p+"_kvm_memory_percent", "Memory percent of host", []string{"id"}, nil),
descMemExt: prometheus.NewDesc(p+"_kvm_memory_extended", "Extended memory info", []string{"id", "type"}, nil),
descThreads: prometheus.NewDesc(p+"_kvm_threads", "Threads used", []string{"id"}, nil),
descCtxSwitches: prometheus.NewDesc(p+"_kvm_ctx_switches", "Context switches", []string{"id", "type"}, nil),
descNicInfo: prometheus.NewDesc(p+"_kvm_nic", "NIC info", []string{"id", "ifname", "netdev", "queues", "type", "model", "macaddr"}, nil),
descNicQueues: prometheus.NewDesc(p+"_kvm_nic_queues", "NIC queue count", []string{"id", "ifname"}, nil),
descDiskSize: prometheus.NewDesc(p+"_kvm_disk_size", "Disk size bytes", []string{"id", "disk_name"}, nil),
descStorageSize: prometheus.NewDesc(p+"_node_storage_size", "Storage total size", []string{"name", "type"}, nil),
descStorageFree: prometheus.NewDesc(p+"_node_storage_free", "Storage free space", []string{"name", "type"}, nil),
}
c.poolCache = cache.NewMtimeCache[poolData]("/etc/pve/user.cfg", fileMtime)
c.storageCache = cache.NewMtimeCache[[]pveconfig.StorageEntry]("/etc/pve/storage.cfg", fileMtime)
@@ -110,7 +139,7 @@ func NewWithDeps(cfg config.Config, proc procfs.ProcReader, sys sysfs.SysReader,
func (c *PVECollector) Describe(ch chan<- *prometheus.Desc) {
// Dynamic metrics - use empty desc to signal unchecked collector
ch <- prometheus.NewDesc(c.prefix+"_kvm_cpu", "KVM CPU time", nil, nil)
ch <- c.descCPU
}
func (c *PVECollector) Collect(ch chan<- prometheus.Metric) {
@@ -137,11 +166,6 @@ func (c *PVECollector) collectVMs(ch chan<- prometheus.Metric) {
}
// Parallel NIC + disk collection with bounded worker pool
type workItem struct {
proc procfs.QEMUProcess
fn func()
}
sem := make(chan struct{}, maxWorkers)
var wg sync.WaitGroup
@@ -179,47 +203,34 @@ func (c *PVECollector) collectVMMetrics(ch chan<- prometheus.Metric, proc procfs
{"system", cpu.System},
{"iowait", cpu.IOWait},
} {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(c.prefix+"_kvm_cpu", "KVM CPU time", []string{"id", "mode"}, nil),
prometheus.GaugeValue, m.val, id, m.mode,
)
ch <- prometheus.MustNewConstMetric(c.descCPU, prometheus.GaugeValue, m.val, id, m.mode)
}
}
// Vcores
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(c.prefix+"_kvm_vcores", "vCores allocated", []string{"id"}, nil),
prometheus.GaugeValue, float64(proc.Vcores), id,
)
ch <- prometheus.MustNewConstMetric(c.descVcores, prometheus.GaugeValue, float64(proc.Vcores), id)
// MaxMem (kB to bytes)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(c.prefix+"_kvm_maxmem", "Maximum memory bytes", []string{"id"}, nil),
prometheus.GaugeValue, float64(proc.MaxMem*1024), id,
)
ch <- prometheus.MustNewConstMetric(c.descMaxmem, prometheus.GaugeValue, float64(proc.MaxMem*1024), id)
// Memory percent
if memPct, err := c.proc.GetMemoryPercent(proc.PID); err == nil {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(c.prefix+"_kvm_memory_percent", "Memory percent of host", []string{"id"}, nil),
prometheus.GaugeValue, memPct, id,
)
}
// Memory extended
if memExt, err := c.proc.GetMemoryExtended(proc.PID); err == nil {
desc := prometheus.NewDesc(c.prefix+"_kvm_memory_extended", "Extended memory info", []string{"id", "type"}, nil)
for key, val := range memExt {
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(val), id, key)
// Status (threads, memory, context switches) -- single /proc/{pid}/status read
if status, err := c.proc.GetStatus(proc.PID); err == nil {
// Memory percent
if memPct, err := c.proc.GetMemoryPercent(proc.PID, status.VmRSS); err == nil {
ch <- prometheus.MustNewConstMetric(c.descMemPct, prometheus.GaugeValue, memPct, id)
}
}
// Threads
if threads, err := c.proc.GetNumThreads(proc.PID); err == nil {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(c.prefix+"_kvm_threads", "Threads used", []string{"id"}, nil),
prometheus.GaugeValue, float64(threads), id,
)
// Memory extended
for key, val := range status.MemoryExtended {
ch <- prometheus.MustNewConstMetric(c.descMemExt, prometheus.GaugeValue, float64(val), id, key)
}
// Threads
ch <- prometheus.MustNewConstMetric(c.descThreads, prometheus.GaugeValue, float64(status.Threads), id)
// Context switches
ch <- prometheus.MustNewConstMetric(c.descCtxSwitches, prometheus.GaugeValue, float64(status.CtxSwitches.Voluntary), id, "voluntary")
ch <- prometheus.MustNewConstMetric(c.descCtxSwitches, prometheus.GaugeValue, float64(status.CtxSwitches.Involuntary), id, "involuntary")
}
// IO counters
@@ -242,13 +253,6 @@ func (c *PVECollector) collectVMMetrics(ch chan<- prometheus.Metric, proc procfs
}
}
// Context switches
if cs, err := c.proc.GetCtxSwitches(proc.PID); err == nil {
desc := prometheus.NewDesc(c.prefix+"_kvm_ctx_switches", "Context switches", []string{"id", "type"}, nil)
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(cs.Voluntary), id, "voluntary")
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(cs.Involuntary), id, "involuntary")
}
// VM info metric
poolName := vmPoolMap[id]
poolInfo := pools[poolName]
@@ -275,20 +279,13 @@ func (c *PVECollector) collectNICMetrics(ch chan<- prometheus.Metric, proc procf
nics := qmmonitor.ParseNetworkInfo(raw)
for _, nic := range nics {
// NIC info metric
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(c.prefix+"_kvm_nic", "NIC info", []string{
"id", "ifname", "netdev", "queues", "type", "model", "macaddr",
}, nil),
prometheus.GaugeValue, 1,
ch <- prometheus.MustNewConstMetric(c.descNicInfo, prometheus.GaugeValue, 1,
id, nic.Ifname, nic.Netdev, strconv.Itoa(nic.Queues),
nic.Type, nic.Model, nic.Macaddr,
)
// NIC queues
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(c.prefix+"_kvm_nic_queues", "NIC queue count", []string{"id", "ifname"}, nil),
prometheus.GaugeValue, float64(nic.Queues), id, nic.Ifname,
)
ch <- prometheus.MustNewConstMetric(c.descNicQueues, prometheus.GaugeValue, float64(nic.Queues), id, nic.Ifname)
// NIC stats from sysfs
stats, err := c.sys.ReadInterfaceStats(nic.Ifname)
@@ -345,10 +342,7 @@ func (c *PVECollector) collectDiskMetrics(ch chan<- prometheus.Metric, proc proc
}
if diskSize > 0 {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(c.prefix+"_kvm_disk_size", "Disk size bytes", []string{"id", "disk_name"}, nil),
prometheus.GaugeValue, float64(diskSize), id, diskName,
)
ch <- prometheus.MustNewConstMetric(c.descDiskSize, prometheus.GaugeValue, float64(diskSize), id, diskName)
}
// Disk info metric - collect all labels
@@ -420,14 +414,8 @@ func (c *PVECollector) collectStorage(ch chan<- prometheus.Metric) {
continue
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(c.prefix+"_node_storage_size", "Storage total size", []string{"name", "type"}, nil),
prometheus.GaugeValue, float64(size.Total), storageName, storageType,
)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(c.prefix+"_node_storage_free", "Storage free space", []string{"name", "type"}, nil),
prometheus.GaugeValue, float64(size.Free), storageName, storageType,
)
ch <- prometheus.MustNewConstMetric(c.descStorageSize, prometheus.GaugeValue, float64(size.Total), storageName, storageType)
ch <- prometheus.MustNewConstMetric(c.descStorageFree, prometheus.GaugeValue, float64(size.Free), storageName, storageType)
}
}
@@ -439,7 +427,7 @@ func (c *PVECollector) getPoolInfo() (map[string]string, map[string]pveconfig.Po
content, err := c.fileReader.ReadFile("/etc/pve/user.cfg")
if err != nil {
slog.Error("read user.cfg", "err", err)
return nil, nil
return make(map[string]string), make(map[string]pveconfig.PoolInfo)
}
vmPoolMap, pools := pveconfig.ParsePoolConfig(content)
@@ -468,12 +456,7 @@ func sortedKeys(m map[string]string) []string {
for k := range m {
keys = append(keys, k)
}
// Simple insertion sort for typically small maps
for i := 1; i < len(keys); i++ {
for j := i; j > 0 && keys[j] < keys[j-1]; j-- {
keys[j], keys[j-1] = keys[j-1], keys[j]
}
}
slices.Sort(keys)
return keys
}

View File

@@ -1,6 +1,7 @@
package collector
import (
"fmt"
"strings"
"testing"
@@ -15,17 +16,16 @@ import (
// Mock implementations
type mockProcReader struct {
procs []procfs.QEMUProcess
cpuTimes map[int]procfs.CPUTimes
ioCount map[int]procfs.IOCounters
threads map[int]int
memPct map[int]float64
memExt map[int]procfs.MemoryExtended
ctxSwitch map[int]procfs.CtxSwitches
procs []procfs.QEMUProcess
procsErr error
cpuTimes map[int]procfs.CPUTimes
ioCount map[int]procfs.IOCounters
status map[int]procfs.StatusInfo
memPct map[int]float64
}
func (m *mockProcReader) DiscoverQEMUProcesses() ([]procfs.QEMUProcess, error) {
return m.procs, nil
return m.procs, m.procsErr
}
func (m *mockProcReader) GetCPUTimes(pid int) (procfs.CPUTimes, error) {
return m.cpuTimes[pid], nil
@@ -33,18 +33,12 @@ func (m *mockProcReader) GetCPUTimes(pid int) (procfs.CPUTimes, error) {
func (m *mockProcReader) GetIOCounters(pid int) (procfs.IOCounters, error) {
return m.ioCount[pid], nil
}
func (m *mockProcReader) GetNumThreads(pid int) (int, error) {
return m.threads[pid], nil
func (m *mockProcReader) GetStatus(pid int) (procfs.StatusInfo, error) {
return m.status[pid], nil
}
func (m *mockProcReader) GetMemoryPercent(pid int) (float64, error) {
func (m *mockProcReader) GetMemoryPercent(pid int, rssBytes int64) (float64, error) {
return m.memPct[pid], nil
}
func (m *mockProcReader) GetMemoryExtended(pid int) (procfs.MemoryExtended, error) {
return m.memExt[pid], nil
}
func (m *mockProcReader) GetCtxSwitches(pid int) (procfs.CtxSwitches, error) {
return m.ctxSwitch[pid], nil
}
func (m *mockProcReader) VMConfigExists(vmid string) bool { return true }
type mockSysReader struct {
@@ -160,14 +154,15 @@ func TestCollector_BasicVMMetrics(t *testing.T) {
ioCount: map[int]procfs.IOCounters{
1234: {ReadChars: 1000, WriteChars: 2000, ReadSyscalls: 10, WriteSyscalls: 20, ReadBytes: 500, WriteBytes: 1000},
},
threads: map[int]int{1234: 50},
memPct: map[int]float64{1234: 25.5},
memExt: map[int]procfs.MemoryExtended{
1234: {"vmrss:": 1048576, "vmpeak:": 2097152},
},
ctxSwitch: map[int]procfs.CtxSwitches{
1234: {Voluntary: 100, Involuntary: 10},
status: map[int]procfs.StatusInfo{
1234: {
Threads: 50,
VmRSS: 1048576,
MemoryExtended: procfs.MemoryExtended{"vmrss": 1048576, "vmpeak": 2097152},
CtxSwitches: procfs.CtxSwitches{Voluntary: 100, Involuntary: 10},
},
},
memPct: map[int]float64{1234: 25.5},
}
sys := &mockSysReader{
@@ -302,12 +297,12 @@ func TestCollector_NICMetrics(t *testing.T) {
procs: []procfs.QEMUProcess{
{PID: 1, VMID: "100", Name: "vm", Vcores: 1, MaxMem: 1024},
},
cpuTimes: map[int]procfs.CPUTimes{1: {}},
ioCount: map[int]procfs.IOCounters{1: {}},
threads: map[int]int{1: 1},
memPct: map[int]float64{1: 0},
memExt: map[int]procfs.MemoryExtended{1: {}},
ctxSwitch: map[int]procfs.CtxSwitches{1: {}},
cpuTimes: map[int]procfs.CPUTimes{1: {}},
ioCount: map[int]procfs.IOCounters{1: {}},
status: map[int]procfs.StatusInfo{
1: {Threads: 1, MemoryExtended: procfs.MemoryExtended{}},
},
memPct: map[int]float64{1: 0},
}
sys := &mockSysReader{
@@ -341,3 +336,81 @@ func TestCollector_NICMetrics(t *testing.T) {
t.Errorf("tx_bytes = %v", txBytes)
}
}
// mockFileReaderErr returns an error for a specific path.
type mockFileReaderErr struct {
files map[string]string
errPath string
}
func (m *mockFileReaderErr) ReadFile(path string) (string, error) {
if path == m.errPath {
return "", fmt.Errorf("read error: %s", path)
}
return m.files[path], nil
}
func TestCollector_PoolReadError(t *testing.T) {
cfg := config.Config{
CollectRunningVMs: true,
CollectStorage: false,
MetricsPrefix: "pve",
}
proc := &mockProcReader{
procs: []procfs.QEMUProcess{
{PID: 1, VMID: "100", Name: "vm", Vcores: 1, MaxMem: 1024},
},
cpuTimes: map[int]procfs.CPUTimes{1: {}},
ioCount: map[int]procfs.IOCounters{1: {}},
status: map[int]procfs.StatusInfo{
1: {Threads: 1, MemoryExtended: procfs.MemoryExtended{}},
},
memPct: map[int]float64{1: 0},
}
fr := &mockFileReaderErr{
files: map[string]string{},
errPath: "/etc/pve/user.cfg",
}
c := NewWithDeps(cfg, proc, &mockSysReader{}, &mockQMMonitor{responses: map[string]string{
"100:info network": "",
"100:info block": "",
}}, &mockStatFS{}, &mockCmdRunner{}, fr)
metrics := collectMetrics(c)
// Should still produce VM info with empty pool
infoMetrics := metrics["pve_kvm"]
if len(infoMetrics) != 1 {
t.Fatalf("expected 1 kvm info metric, got %d", len(infoMetrics))
}
m := findMetricWithLabels(infoMetrics, map[string]string{"pool": ""})
if m == nil {
t.Error("expected empty pool label when user.cfg unreadable")
}
}
func TestCollector_ProcessDiscoveryError(t *testing.T) {
cfg := config.Config{
CollectRunningVMs: true,
CollectStorage: false,
MetricsPrefix: "pve",
}
proc := &mockProcReader{
procsErr: fmt.Errorf("permission denied"),
}
fr := &mockFileReader{files: map[string]string{"/etc/pve/user.cfg": ""}}
c := NewWithDeps(cfg, proc, &mockSysReader{}, &mockQMMonitor{responses: map[string]string{}},
&mockStatFS{}, &mockCmdRunner{}, fr)
metrics := collectMetrics(c)
// No VM metrics should be emitted
if len(metrics) != 0 {
t.Errorf("expected 0 metrics on discovery error, got %d metric names", len(metrics))
}
}

View File

@@ -46,15 +46,21 @@ type CtxSwitches struct {
// MemoryExtended holds memory info from /proc/{pid}/status (values in bytes).
type MemoryExtended map[string]int64
// StatusInfo holds all fields parsed from /proc/{pid}/status in a single read.
type StatusInfo struct {
Threads int
VmRSS int64 // bytes
MemoryExtended MemoryExtended
CtxSwitches CtxSwitches
}
// ProcReader abstracts /proc access for testability.
type ProcReader interface {
DiscoverQEMUProcesses() ([]QEMUProcess, error)
GetCPUTimes(pid int) (CPUTimes, error)
GetIOCounters(pid int) (IOCounters, error)
GetNumThreads(pid int) (int, error)
GetMemoryPercent(pid int) (float64, error)
GetMemoryExtended(pid int) (MemoryExtended, error)
GetCtxSwitches(pid int) (CtxSwitches, error)
GetStatus(pid int) (StatusInfo, error)
GetMemoryPercent(pid int, rssBytes int64) (float64, error)
VMConfigExists(vmid string) bool
}
@@ -143,32 +149,15 @@ func (r *RealProcReader) GetIOCounters(pid int) (IOCounters, error) {
return ParseIO(string(data))
}
func (r *RealProcReader) GetNumThreads(pid int) (int, error) {
func (r *RealProcReader) GetStatus(pid int) (StatusInfo, error) {
data, err := os.ReadFile(filepath.Join(r.ProcPath, strconv.Itoa(pid), "status"))
if err != nil {
return 0, err
return StatusInfo{}, err
}
return ParseThreads(string(data))
return ParseStatus(string(data))
}
func (r *RealProcReader) GetMemoryPercent(pid int) (float64, error) {
// Read process RSS and total memory to compute percentage
statusData, err := os.ReadFile(filepath.Join(r.ProcPath, strconv.Itoa(pid), "status"))
if err != nil {
return 0, err
}
rss := int64(0)
for _, line := range strings.Split(string(statusData), "\n") {
if strings.HasPrefix(line, "VmRSS:") {
parts := strings.Fields(line)
if len(parts) >= 2 {
rss, _ = strconv.ParseInt(parts[1], 10, 64)
rss *= 1024 // kB to bytes
}
break
}
}
func (r *RealProcReader) GetMemoryPercent(pid int, rssBytes int64) (float64, error) {
meminfoData, err := os.ReadFile(filepath.Join(r.ProcPath, "meminfo"))
if err != nil {
return 0, err
@@ -187,23 +176,7 @@ func (r *RealProcReader) GetMemoryPercent(pid int) (float64, error) {
if totalMem == 0 {
return 0, nil
}
return float64(rss) / float64(totalMem) * 100.0, nil
}
func (r *RealProcReader) GetMemoryExtended(pid int) (MemoryExtended, error) {
data, err := os.ReadFile(filepath.Join(r.ProcPath, strconv.Itoa(pid), "status"))
if err != nil {
return nil, err
}
return ParseMemoryExtended(string(data)), nil
}
func (r *RealProcReader) GetCtxSwitches(pid int) (CtxSwitches, error) {
data, err := os.ReadFile(filepath.Join(r.ProcPath, strconv.Itoa(pid), "status"))
if err != nil {
return CtxSwitches{}, err
}
return ParseCtxSwitches(string(data))
return float64(rssBytes) / float64(totalMem) * 100.0, nil
}
// ParseCmdline splits a null-byte separated /proc/{pid}/cmdline.
@@ -339,56 +312,49 @@ func ParseIO(data string) (IOCounters, error) {
return io, nil
}
// ParseThreads extracts the Threads count from /proc/{pid}/status.
func ParseThreads(data string) (int, error) {
for _, line := range strings.Split(data, "\n") {
if strings.HasPrefix(line, "Threads:") {
parts := strings.Fields(line)
if len(parts) >= 2 {
return strconv.Atoi(parts[1])
}
}
}
return 0, fmt.Errorf("Threads field not found")
}
// ParseStatus parses /proc/{pid}/status in one pass, extracting threads, VmRSS,
// memory extended fields, and context switches.
func ParseStatus(data string) (StatusInfo, error) {
var info StatusInfo
info.MemoryExtended = make(MemoryExtended)
foundThreads := false
// ParseMemoryExtended parses /proc/{pid}/status for Vm*/Rss*/Hugetlb* lines.
// Returns map with lowercase keys (trailing colon preserved) to values in bytes.
func ParseMemoryExtended(data string) MemoryExtended {
m := make(MemoryExtended)
for _, line := range strings.Split(data, "\n") {
if strings.HasPrefix(line, "Vm") || strings.HasPrefix(line, "Rss") || strings.HasPrefix(line, "Hugetlb") {
parts := strings.Fields(line)
if len(parts) >= 2 {
key := strings.ToLower(parts[0]) // keeps trailing colon
val, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
continue
}
if len(parts) >= 3 && parts[2] == "kB" {
val *= 1024
}
m[key] = val
}
}
}
return m
}
// ParseCtxSwitches parses voluntary/involuntary context switches from /proc/{pid}/status.
func ParseCtxSwitches(data string) (CtxSwitches, error) {
var cs CtxSwitches
for _, line := range strings.Split(data, "\n") {
parts := strings.Fields(line)
if len(parts) < 2 {
continue
}
switch parts[0] {
case "voluntary_ctxt_switches:":
cs.Voluntary, _ = strconv.ParseUint(parts[1], 10, 64)
case "nonvoluntary_ctxt_switches:":
cs.Involuntary, _ = strconv.ParseUint(parts[1], 10, 64)
switch {
case parts[0] == "Threads:":
n, err := strconv.Atoi(parts[1])
if err != nil {
return StatusInfo{}, fmt.Errorf("parse Threads: %w", err)
}
info.Threads = n
foundThreads = true
case parts[0] == "voluntary_ctxt_switches:":
info.CtxSwitches.Voluntary, _ = strconv.ParseUint(parts[1], 10, 64)
case parts[0] == "nonvoluntary_ctxt_switches:":
info.CtxSwitches.Involuntary, _ = strconv.ParseUint(parts[1], 10, 64)
case strings.HasPrefix(line, "Vm") || strings.HasPrefix(line, "Rss") || strings.HasPrefix(line, "Hugetlb"):
key := strings.ToLower(strings.TrimSuffix(parts[0], ":"))
val, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
continue
}
if len(parts) >= 3 && parts[2] == "kB" {
val *= 1024
}
info.MemoryExtended[key] = val
if key == "vmrss" {
info.VmRSS = val
}
}
}
return cs, nil
if !foundThreads {
return StatusInfo{}, fmt.Errorf("Threads field not found")
}
return info, nil
}

View File

@@ -147,59 +147,107 @@ cancelled_write_bytes: 0
}
}
func TestParseThreads(t *testing.T) {
func TestParseStatus(t *testing.T) {
data := `Name: qemu-system-x86
Threads: 50
VmPeak: 1234 kB
`
n, err := ParseThreads(data)
if err != nil {
t.Fatal(err)
}
if n != 50 {
t.Errorf("got %d, want 50", n)
}
}
func TestParseMemoryExtended(t *testing.T) {
data := `Name: qemu-system-x86
VmPeak: 1000 kB
VmRSS: 500 kB
VmData: 200 kB
RssAnon: 100 kB
HugetlbPages: 0 kB
`
m := ParseMemoryExtended(data)
if m["vmpeak:"] != 1000*1024 {
t.Errorf("VmPeak = %d", m["vmpeak:"])
}
if m["vmrss:"] != 500*1024 {
t.Errorf("VmRSS = %d", m["vmrss:"])
}
if m["vmdata:"] != 200*1024 {
t.Errorf("VmData = %d", m["vmdata:"])
}
if m["rssanon:"] != 100*1024 {
t.Errorf("RssAnon = %d", m["rssanon:"])
}
if m["hugetlbpages:"] != 0 {
t.Errorf("HugetlbPages = %d", m["hugetlbpages:"])
}
}
func TestParseCtxSwitches(t *testing.T) {
data := `Name: qemu
voluntary_ctxt_switches: 1234
nonvoluntary_ctxt_switches: 56
`
cs, err := ParseCtxSwitches(data)
info, err := ParseStatus(data)
if err != nil {
t.Fatal(err)
}
if cs.Voluntary != 1234 {
t.Errorf("Voluntary = %d", cs.Voluntary)
// Threads
if info.Threads != 50 {
t.Errorf("Threads = %d, want 50", info.Threads)
}
if cs.Involuntary != 56 {
t.Errorf("Involuntary = %d", cs.Involuntary)
// VmRSS
if info.VmRSS != 500*1024 {
t.Errorf("VmRSS = %d, want %d", info.VmRSS, 500*1024)
}
// Memory extended
if info.MemoryExtended["vmpeak"] != 1000*1024 {
t.Errorf("VmPeak = %d", info.MemoryExtended["vmpeak"])
}
if info.MemoryExtended["vmrss"] != 500*1024 {
t.Errorf("VmRSS = %d", info.MemoryExtended["vmrss"])
}
if info.MemoryExtended["vmdata"] != 200*1024 {
t.Errorf("VmData = %d", info.MemoryExtended["vmdata"])
}
if info.MemoryExtended["rssanon"] != 100*1024 {
t.Errorf("RssAnon = %d", info.MemoryExtended["rssanon"])
}
if info.MemoryExtended["hugetlbpages"] != 0 {
t.Errorf("HugetlbPages = %d", info.MemoryExtended["hugetlbpages"])
}
// Context switches
if info.CtxSwitches.Voluntary != 1234 {
t.Errorf("Voluntary = %d", info.CtxSwitches.Voluntary)
}
if info.CtxSwitches.Involuntary != 56 {
t.Errorf("Involuntary = %d", info.CtxSwitches.Involuntary)
}
}
func TestParseStatus_NoThreads(t *testing.T) {
data := `Name: qemu
VmRSS: 100 kB
`
_, err := ParseStatus(data)
if err == nil {
t.Fatal("expected error for missing Threads")
}
}
func TestParseStat_Malformed(t *testing.T) {
tests := []struct {
name string
data string
}{
{"no_closing_paren", "12345 (qemu S 1 12345"},
{"truncated", "12345 (qemu) S 1 2 3"},
{"empty", ""},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
_, err := ParseStat(tc.data)
if err == nil {
t.Fatal("expected error for malformed stat")
}
})
}
}
func TestParseIO_Empty(t *testing.T) {
io, err := ParseIO("")
if err != nil {
t.Fatal(err)
}
if io.ReadChars != 0 || io.WriteChars != 0 {
t.Errorf("expected zero counters, got %+v", io)
}
}
func TestParseIO_MalformedLines(t *testing.T) {
data := "rchar: notanumber\nbadline\nwchar: 100\n"
io, err := ParseIO(data)
if err != nil {
t.Fatal(err)
}
if io.ReadChars != 0 {
t.Errorf("ReadChars = %d, want 0 (parse failure)", io.ReadChars)
}
if io.WriteChars != 100 {
t.Errorf("WriteChars = %d, want 100", io.WriteChars)
}
}

View File

@@ -2,8 +2,6 @@ package pveconfig
import (
"strings"
"pve_local_exporter/internal/cache"
)
// PoolInfo holds parsed pool hierarchy info.
@@ -14,12 +12,6 @@ type PoolInfo struct {
Level3 string
}
// FileReader abstracts file reading for testability.
type FileReader interface {
ReadFile(path string) (string, error)
Stat(path string) (cache.StatFunc, error)
}
// ParsePoolConfig parses /etc/pve/user.cfg for pool definitions.
// Returns (vm_pool_map, pools).
// vm_pool_map: vmid -> pool_name

View File

@@ -23,37 +23,36 @@ func ParseStorageConfig(data string) []StorageEntry {
var result []StorageEntry
var current *StorageEntry
for _, line := range strings.Split(data, "\n") {
line = strings.TrimSpace(line)
for _, rawLine := range strings.Split(data, "\n") {
trimmed := strings.TrimSpace(rawLine)
if line == "" || strings.HasPrefix(line, "#") {
if trimmed == "" || strings.HasPrefix(trimmed, "#") {
continue
}
if strings.Contains(line, ":") && !strings.HasPrefix(line, "\t") && !strings.HasPrefix(line, " ") {
// Check if this is a section header (type: name)
colonIdx := strings.Index(line, ":")
sectionType := strings.TrimSpace(line[:colonIdx])
sectionName := strings.TrimSpace(line[colonIdx+1:])
// Section headers start at column 0 (no leading whitespace)
isIndented := len(rawLine) > 0 && (rawLine[0] == '\t' || rawLine[0] == ' ')
// Only treat as header if type has no spaces (it's a single word)
if !strings.Contains(sectionType, " ") {
if current != nil {
result = append(result, *current)
}
current = &StorageEntry{
Properties: map[string]string{
"type": SanitizeKey(sectionType),
"name": SanitizeKey(sectionName),
},
}
continue
if !isIndented && strings.Contains(trimmed, ":") {
colonIdx := strings.Index(trimmed, ":")
sectionType := trimmed[:colonIdx]
sectionName := strings.TrimSpace(trimmed[colonIdx+1:])
if current != nil {
result = append(result, *current)
}
current = &StorageEntry{
Properties: map[string]string{
"type": SanitizeKey(sectionType),
"name": SanitizeKey(sectionName),
},
}
continue
}
// Key-value property line
if current != nil {
parts := strings.SplitN(line, " ", 2)
parts := strings.SplitN(trimmed, " ", 2)
key := SanitizeKey(strings.TrimSpace(parts[0]))
if len(parts) > 1 {
current.Properties[key] = strings.TrimSpace(parts[1])

View File

@@ -48,7 +48,14 @@ func ParseBlockInfo(raw string) map[string]DiskInfo {
blockID := match[2]
diskPath := match[3]
diskTypeAndMode := match[4]
diskType := strings.Split(diskTypeAndMode, ", ")[0]
modeParts := strings.Split(diskTypeAndMode, ", ")
diskType := modeParts[0]
readOnly := false
for _, p := range modeParts[1:] {
if p == "read-only" {
readOnly = true
}
}
// Skip EFI disks
if strings.Contains(diskName, "efidisk") {
@@ -72,6 +79,10 @@ func ParseBlockInfo(raw string) map[string]DiskInfo {
Labels: make(map[string]string),
}
if readOnly {
info.Labels["read_only"] = "true"
}
// Detect disk type from path
classifyDisk(&info)

View File

@@ -130,6 +130,51 @@ func TestHandleJSONPath_NoHostDevice(t *testing.T) {
}
}
func TestParseBlockInfo_ReadOnly(t *testing.T) {
raw := `drive-scsi0 (#block100): /dev/zvol/rpool/data/vm-100-disk-0 (raw, read-only)
`
disks := ParseBlockInfo(raw)
d := disks["scsi0"]
if d.Labels["read_only"] != "true" {
t.Errorf("expected read_only=true, got %q", d.Labels["read_only"])
}
}
func TestParseBlockInfo_ReadWrite(t *testing.T) {
raw := `drive-scsi0 (#block100): /dev/zvol/rpool/data/vm-100-disk-0 (raw, read-write)
`
disks := ParseBlockInfo(raw)
d := disks["scsi0"]
if _, ok := d.Labels["read_only"]; ok {
t.Error("read_only label should not be set for read-write disks")
}
}
func TestParseBlockInfo_MalformedHeader(t *testing.T) {
raw := `drive-scsi0: this is not a valid header
`
disks := ParseBlockInfo(raw)
if len(disks) != 0 {
t.Fatalf("expected 0 disks for malformed header, got %d", len(disks))
}
}
func TestParseBlockInfo_Empty(t *testing.T) {
disks := ParseBlockInfo("")
if len(disks) != 0 {
t.Fatalf("expected 0 disks for empty input, got %d", len(disks))
}
}
func TestParseBlockInfo_JSONError(t *testing.T) {
raw := `drive-scsi0 (#block100): json:{invalid json} (raw, read-write)
`
disks := ParseBlockInfo(raw)
if len(disks) != 0 {
t.Fatalf("expected 0 disks for invalid JSON path, got %d", len(disks))
}
}
func TestParseBlockInfo_MultiDisk(t *testing.T) {
raw := `drive-scsi0 (#block100): /dev/zvol/rpool/data/vm-100-disk-0 (raw, read-write)
Attached to: /machine/peripheral/virtioscsi0/virtio-backend

View File

@@ -63,3 +63,27 @@ func TestParseNetworkInfo_Empty(t *testing.T) {
t.Fatalf("expected 0 NICs, got %d", len(nics))
}
}
func TestParseNetworkInfo_MalformedLine(t *testing.T) {
// Lines without colon-space or without "net" prefix should be skipped
raw := "this is garbage\nnotnet0: index=0,type=tap\nno-colon-here\n"
nics := ParseNetworkInfo(raw)
if len(nics) != 0 {
t.Fatalf("expected 0 NICs for malformed input, got %d", len(nics))
}
}
func TestParseNetworkInfo_MissingFields(t *testing.T) {
// NIC with minimal fields
raw := "net0: index=0"
nics := ParseNetworkInfo(raw)
if len(nics) != 1 {
t.Fatalf("expected 1 NIC, got %d", len(nics))
}
if nics[0].Queues != 1 {
t.Errorf("queues = %d, want 1", nics[0].Queues)
}
if nics[0].Model != "" {
t.Errorf("model = %q, want empty", nics[0].Model)
}
}

View File

@@ -31,7 +31,6 @@ type RealQMMonitor struct {
type deferredProc struct {
cmd *exec.Cmd
stdin io.WriteCloser
timestamp time.Time
}
@@ -113,7 +112,7 @@ func (m *RealQMMonitor) deferCloseProcess(cmd *exec.Cmd, stdin io.WriteCloser) {
stdin.Close()
if m.deferClose {
m.mu.Lock()
m.deferredProcs = append(m.deferredProcs, deferredProc{cmd: cmd, stdin: stdin, timestamp: time.Now()})
m.deferredProcs = append(m.deferredProcs, deferredProc{cmd: cmd, timestamp: time.Now()})
m.mu.Unlock()
slog.Warn("deferred closing qm monitor process", "pid", cmd.Process.Pid)
} else {

View File

@@ -18,11 +18,6 @@ type StatFS interface {
Statfs(path string) (StorageSize, error)
}
// CommandRunner abstracts command execution for testability.
type CommandRunner interface {
Run(name string, args ...string) (string, error)
}
// RealStatFS uses the real syscall.
type RealStatFS struct{}