diff --git a/default.nix b/default.nix index 5103eac..ec42cf6 100644 --- a/default.nix +++ b/default.nix @@ -8,7 +8,7 @@ buildGoModule rec { pname = "pve-local-exporter"; version = "0.1.0"; src = ./src; - vendorHash = "sha256-f0f8tYmoI6DtuB/K4++gu9b2na/d0ECTaF2zvDijW58="; + vendorHash = "sha256-MLB7y7shnOhxW8K2R6+d9E63wGEhlErnv+1MYOJO3Hw="; ldflags = [ "-X=main.version=${version}" ]; diff --git a/src/go.mod b/src/go.mod index 0d7baed..07bc0e2 100644 --- a/src/go.mod +++ b/src/go.mod @@ -3,6 +3,7 @@ module pve_local_exporter go 1.25.7 require ( + github.com/creack/pty v1.1.24 github.com/prometheus/client_golang v1.22.0 github.com/prometheus/client_model v0.6.1 ) diff --git a/src/go.sum b/src/go.sum index e81255c..9d17222 100644 --- a/src/go.sum +++ b/src/go.sum @@ -2,6 +2,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.24 h1:bJrF4RRfyJnbTJqzRLHzcGaZK1NeM5kTC9jGgovnR1s= +github.com/creack/pty v1.1.24/go.mod h1:08sCNb52WyoAwi2QDyzUCTgcvVFhUzewun7wtTfvcwE= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= diff --git a/src/internal/qmmonitor/qmmonitor.go b/src/internal/qmmonitor/qmmonitor.go index d4a8db0..a43c94b 100644 --- a/src/internal/qmmonitor/qmmonitor.go +++ b/src/internal/qmmonitor/qmmonitor.go @@ -2,31 +2,36 @@ package qmmonitor import ( "bufio" + "errors" "fmt" "io" "log/slog" + "os" "os/exec" "strings" "sync" "time" + "github.com/creack/pty" "pve_local_exporter/internal/cache" ) +var errTimeout = errors.New("timeout waiting for qm monitor") + // QMMonitor runs commands against qm monitor and caches results. type QMMonitor interface { RunCommand(vmid, cmd string) (string, error) InvalidateCache(vmid, cmd string) } -// RealQMMonitor spawns `qm monitor` via os/exec with pipe-based I/O. +// RealQMMonitor spawns `qm monitor` on a PTY via creack/pty. type RealQMMonitor struct { timeout time.Duration deferClose bool cache *cache.TTLCache[string, string] - mu sync.Mutex - deferredProcs []deferredProc + mu sync.Mutex + deferredProcs []deferredProc } type deferredProc struct { @@ -53,6 +58,7 @@ func (m *RealQMMonitor) InvalidateCache(vmid, cmd string) { func (m *RealQMMonitor) RunCommand(vmid, cmd string) (string, error) { key := cacheKey(vmid, cmd) if v, ok := m.cache.Get(key); ok { + slog.Debug("qm cache hit", "vmid", vmid, "cmd", cmd) return v, nil } @@ -66,50 +72,56 @@ func (m *RealQMMonitor) RunCommand(vmid, cmd string) (string, error) { } func (m *RealQMMonitor) execQMMonitor(vmid, cmd string) (string, error) { + slog.Debug("qm monitor exec", "vmid", vmid, "cmd", cmd) + start := time.Now() + qmCmd := exec.Command("qm", "monitor", vmid) + qmCmd.Env = append(os.Environ(), "TERM=dumb") - stdin, err := qmCmd.StdinPipe() + ptmx, err := pty.Start(qmCmd) if err != nil { - return "", fmt.Errorf("stdin pipe: %w", err) - } - stdout, err := qmCmd.StdoutPipe() - if err != nil { - return "", fmt.Errorf("stdout pipe: %w", err) - } - - if err := qmCmd.Start(); err != nil { return "", fmt.Errorf("start qm monitor: %w", err) } - reader := bufio.NewReader(stdout) + reader := bufio.NewReader(ptmx) // Wait for initial "qm>" prompt - if err := readUntilPrompt(reader, m.timeout); err != nil { - m.deferCloseProcess(qmCmd, stdin) + deadline := time.Now().Add(m.timeout) + _, err = readUntilMarker(reader, "qm>", deadline) + if err != nil { + slog.Debug("qm monitor initial prompt failed", "vmid", vmid, "err", err) + m.killOrDefer(qmCmd, ptmx) return "", fmt.Errorf("initial prompt: %w", err) } // Send command - fmt.Fprintf(stdin, "%s\n", cmd) + fmt.Fprintf(ptmx, "%s\n", cmd) // Read response until next "qm>" prompt - response, err := readResponseUntilPrompt(reader, m.timeout) + deadline = time.Now().Add(m.timeout) + raw, err := readUntilMarker(reader, "qm>", deadline) if err != nil { - m.deferCloseProcess(qmCmd, stdin) + slog.Debug("qm monitor response failed", "vmid", vmid, "cmd", cmd, "err", err) + m.killOrDefer(qmCmd, ptmx) return "", fmt.Errorf("read response: %w", err) } - // Close cleanly - stdin.Close() + response := parseQMResponse(raw) + + // Close cleanly: closing ptmx sends SIGHUP to child + ptmx.Close() if err := qmCmd.Wait(); err != nil { slog.Debug("qm monitor wait error", "vmid", vmid, "err", err) } + slog.Debug("qm monitor done", "vmid", vmid, "cmd", cmd, + "duration", time.Since(start), "responseLen", len(response)) + return response, nil } -func (m *RealQMMonitor) deferCloseProcess(cmd *exec.Cmd, stdin io.WriteCloser) { - stdin.Close() +func (m *RealQMMonitor) killOrDefer(cmd *exec.Cmd, closer io.Closer) { + closer.Close() if m.deferClose { m.mu.Lock() m.deferredProcs = append(m.deferredProcs, deferredProc{cmd: cmd, timestamp: time.Now()}) @@ -140,50 +152,63 @@ func (m *RealQMMonitor) cleanupDeferred() { m.deferredProcs = still } -func readUntilPrompt(r *bufio.Reader, timeout time.Duration) error { - deadline := time.Now().Add(timeout) - for { - if time.Now().After(deadline) { - return fmt.Errorf("timeout waiting for qm> prompt") - } - line, err := r.ReadString('\n') - if err != nil { - // Check if we got the prompt without newline - if strings.Contains(line, "qm>") { - return nil +// readUntilMarker reads from r byte-by-byte until the buffer ends with marker +// or the deadline expires. Returns everything read before the marker. +// Uses a goroutine for reads so the deadline is enforced even when ReadByte blocks. +func readUntilMarker(r *bufio.Reader, marker string, deadline time.Time) (string, error) { + type result struct { + data string + err error + } + ch := make(chan result, 1) + + go func() { + var buf []byte + markerBytes := []byte(marker) + for { + b, err := r.ReadByte() + if err != nil { + ch <- result{"", err} + return + } + buf = append(buf, b) + if len(buf) >= len(markerBytes) && + string(buf[len(buf)-len(markerBytes):]) == marker { + // Return everything before the marker + ch <- result{string(buf[:len(buf)-len(markerBytes)]), nil} + return } - return err - } - if strings.Contains(line, "qm>") { - return nil } + }() + + remaining := time.Until(deadline) + if remaining <= 0 { + remaining = time.Millisecond + } + select { + case res := <-ch: + return res.data, res.err + case <-time.After(remaining): + return "", errTimeout } } -func readResponseUntilPrompt(r *bufio.Reader, timeout time.Duration) (string, error) { - deadline := time.Now().Add(timeout) - var lines []string - firstLine := true - for { - if time.Now().After(deadline) { - return "", fmt.Errorf("timeout waiting for qm> prompt") - } - line, err := r.ReadString('\n') - if err != nil { - if strings.Contains(line, "qm>") { - break - } - return "", err - } - if strings.Contains(line, "qm>") { - break - } - // Skip the echo of the command (first line) - if firstLine { - firstLine = false - continue - } - lines = append(lines, strings.TrimRight(line, "\r\n")) +// parseQMResponse takes the raw output before the "qm>" marker from a command +// response, skips the command echo (first line), and trims \r characters. +func parseQMResponse(raw string) string { + lines := strings.Split(raw, "\n") + // Skip the command echo (first line) + if len(lines) > 0 { + lines = lines[1:] } - return strings.Join(lines, "\n"), nil + var out []string + for _, line := range lines { + cleaned := strings.TrimRight(line, "\r") + out = append(out, cleaned) + } + // Trim trailing empty lines + for len(out) > 0 && out[len(out)-1] == "" { + out = out[:len(out)-1] + } + return strings.Join(out, "\n") } diff --git a/src/internal/qmmonitor/qmmonitor_test.go b/src/internal/qmmonitor/qmmonitor_test.go index 7fba8c4..19f92b8 100644 --- a/src/internal/qmmonitor/qmmonitor_test.go +++ b/src/internal/qmmonitor/qmmonitor_test.go @@ -1,6 +1,8 @@ package qmmonitor import ( + "bufio" + "io" "testing" "time" @@ -57,3 +59,85 @@ func TestMockQMMonitor_Invalidate(t *testing.T) { t.Errorf("unexpected: %q", r) } } + +func TestReadUntilMarker_Success(t *testing.T) { + pr, pw := io.Pipe() + defer pr.Close() + + go func() { + pw.Write([]byte("banner\nqm> ")) + pw.Close() + }() + + reader := bufio.NewReader(pr) + deadline := time.Now().Add(5 * time.Second) + got, err := readUntilMarker(reader, "qm>", deadline) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != "banner\n" { + t.Errorf("got %q, want %q", got, "banner\n") + } +} + +func TestReadUntilMarker_Timeout(t *testing.T) { + pr, pw := io.Pipe() + defer pr.Close() + defer pw.Close() + + // Write nothing -- should timeout + reader := bufio.NewReader(pr) + deadline := time.Now().Add(50 * time.Millisecond) + start := time.Now() + _, err := readUntilMarker(reader, "qm>", deadline) + elapsed := time.Since(start) + + if err != errTimeout { + t.Fatalf("expected errTimeout, got: %v", err) + } + if elapsed > 500*time.Millisecond { + t.Errorf("timeout took too long: %v", elapsed) + } +} + +func TestReadUntilMarker_EOF(t *testing.T) { + pr, pw := io.Pipe() + defer pr.Close() + + go func() { + pw.Write([]byte("partial data")) + pw.Close() + }() + + reader := bufio.NewReader(pr) + deadline := time.Now().Add(5 * time.Second) + _, err := readUntilMarker(reader, "qm>", deadline) + if err == nil { + t.Fatal("expected error on EOF before marker") + } + if err == errTimeout { + t.Fatal("expected EOF error, not timeout") + } +} + +func TestParseQMResponse(t *testing.T) { + // Simulate: command echo + response lines with \r + trailing newline + raw := "info network\r\n" + + "net0: index=0,type=tap,ifname=tap100i0\r\n" + + "net1: index=1,type=tap,ifname=tap100i1\r\n" + + got := parseQMResponse(raw) + want := "net0: index=0,type=tap,ifname=tap100i0\n" + + "net1: index=1,type=tap,ifname=tap100i1" + if got != want { + t.Errorf("parseQMResponse:\ngot: %q\nwant: %q", got, want) + } +} + +func TestParseQMResponse_Empty(t *testing.T) { + // Command echo only, no response data + got := parseQMResponse("info version\r\n") + if got != "" { + t.Errorf("expected empty, got %q", got) + } +}