Switch qm monitor I/O from pipes to PTY using creack/pty

qm monitor requires a TTY to emit its prompt; the pipe-based approach
missed prompt detection on some hosts. Replace with pty.Start, refactor
readUntilMarker to scan byte-by-byte with deadline goroutine, and add
parseQMResponse to strip command echo and \r artifacts. Add unit tests
for the new reader and parser functions.
This commit is contained in:
illustris
2026-03-12 09:52:26 +00:00
parent 00404095b9
commit e9b60a3af6
5 changed files with 175 additions and 63 deletions

View File

@@ -8,7 +8,7 @@ buildGoModule rec {
pname = "pve-local-exporter"; pname = "pve-local-exporter";
version = "0.1.0"; version = "0.1.0";
src = ./src; src = ./src;
vendorHash = "sha256-f0f8tYmoI6DtuB/K4++gu9b2na/d0ECTaF2zvDijW58="; vendorHash = "sha256-MLB7y7shnOhxW8K2R6+d9E63wGEhlErnv+1MYOJO3Hw=";
ldflags = [ ldflags = [
"-X=main.version=${version}" "-X=main.version=${version}"
]; ];

View File

@@ -3,6 +3,7 @@ module pve_local_exporter
go 1.25.7 go 1.25.7
require ( require (
github.com/creack/pty v1.1.24
github.com/prometheus/client_golang v1.22.0 github.com/prometheus/client_golang v1.22.0
github.com/prometheus/client_model v0.6.1 github.com/prometheus/client_model v0.6.1
) )

View File

@@ -2,6 +2,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.24 h1:bJrF4RRfyJnbTJqzRLHzcGaZK1NeM5kTC9jGgovnR1s=
github.com/creack/pty v1.1.24/go.mod h1:08sCNb52WyoAwi2QDyzUCTgcvVFhUzewun7wtTfvcwE=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=

View File

@@ -2,31 +2,36 @@ package qmmonitor
import ( import (
"bufio" "bufio"
"errors"
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
"os"
"os/exec" "os/exec"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/creack/pty"
"pve_local_exporter/internal/cache" "pve_local_exporter/internal/cache"
) )
var errTimeout = errors.New("timeout waiting for qm monitor")
// QMMonitor runs commands against qm monitor and caches results. // QMMonitor runs commands against qm monitor and caches results.
type QMMonitor interface { type QMMonitor interface {
RunCommand(vmid, cmd string) (string, error) RunCommand(vmid, cmd string) (string, error)
InvalidateCache(vmid, cmd string) InvalidateCache(vmid, cmd string)
} }
// RealQMMonitor spawns `qm monitor` via os/exec with pipe-based I/O. // RealQMMonitor spawns `qm monitor` on a PTY via creack/pty.
type RealQMMonitor struct { type RealQMMonitor struct {
timeout time.Duration timeout time.Duration
deferClose bool deferClose bool
cache *cache.TTLCache[string, string] cache *cache.TTLCache[string, string]
mu sync.Mutex mu sync.Mutex
deferredProcs []deferredProc deferredProcs []deferredProc
} }
type deferredProc struct { type deferredProc struct {
@@ -53,6 +58,7 @@ func (m *RealQMMonitor) InvalidateCache(vmid, cmd string) {
func (m *RealQMMonitor) RunCommand(vmid, cmd string) (string, error) { func (m *RealQMMonitor) RunCommand(vmid, cmd string) (string, error) {
key := cacheKey(vmid, cmd) key := cacheKey(vmid, cmd)
if v, ok := m.cache.Get(key); ok { if v, ok := m.cache.Get(key); ok {
slog.Debug("qm cache hit", "vmid", vmid, "cmd", cmd)
return v, nil return v, nil
} }
@@ -66,50 +72,56 @@ func (m *RealQMMonitor) RunCommand(vmid, cmd string) (string, error) {
} }
func (m *RealQMMonitor) execQMMonitor(vmid, cmd string) (string, error) { func (m *RealQMMonitor) execQMMonitor(vmid, cmd string) (string, error) {
slog.Debug("qm monitor exec", "vmid", vmid, "cmd", cmd)
start := time.Now()
qmCmd := exec.Command("qm", "monitor", vmid) qmCmd := exec.Command("qm", "monitor", vmid)
qmCmd.Env = append(os.Environ(), "TERM=dumb")
stdin, err := qmCmd.StdinPipe() ptmx, err := pty.Start(qmCmd)
if err != nil { if err != nil {
return "", fmt.Errorf("stdin pipe: %w", err)
}
stdout, err := qmCmd.StdoutPipe()
if err != nil {
return "", fmt.Errorf("stdout pipe: %w", err)
}
if err := qmCmd.Start(); err != nil {
return "", fmt.Errorf("start qm monitor: %w", err) return "", fmt.Errorf("start qm monitor: %w", err)
} }
reader := bufio.NewReader(stdout) reader := bufio.NewReader(ptmx)
// Wait for initial "qm>" prompt // Wait for initial "qm>" prompt
if err := readUntilPrompt(reader, m.timeout); err != nil { deadline := time.Now().Add(m.timeout)
m.deferCloseProcess(qmCmd, stdin) _, err = readUntilMarker(reader, "qm>", deadline)
if err != nil {
slog.Debug("qm monitor initial prompt failed", "vmid", vmid, "err", err)
m.killOrDefer(qmCmd, ptmx)
return "", fmt.Errorf("initial prompt: %w", err) return "", fmt.Errorf("initial prompt: %w", err)
} }
// Send command // Send command
fmt.Fprintf(stdin, "%s\n", cmd) fmt.Fprintf(ptmx, "%s\n", cmd)
// Read response until next "qm>" prompt // Read response until next "qm>" prompt
response, err := readResponseUntilPrompt(reader, m.timeout) deadline = time.Now().Add(m.timeout)
raw, err := readUntilMarker(reader, "qm>", deadline)
if err != nil { if err != nil {
m.deferCloseProcess(qmCmd, stdin) slog.Debug("qm monitor response failed", "vmid", vmid, "cmd", cmd, "err", err)
m.killOrDefer(qmCmd, ptmx)
return "", fmt.Errorf("read response: %w", err) return "", fmt.Errorf("read response: %w", err)
} }
// Close cleanly response := parseQMResponse(raw)
stdin.Close()
// Close cleanly: closing ptmx sends SIGHUP to child
ptmx.Close()
if err := qmCmd.Wait(); err != nil { if err := qmCmd.Wait(); err != nil {
slog.Debug("qm monitor wait error", "vmid", vmid, "err", err) slog.Debug("qm monitor wait error", "vmid", vmid, "err", err)
} }
slog.Debug("qm monitor done", "vmid", vmid, "cmd", cmd,
"duration", time.Since(start), "responseLen", len(response))
return response, nil return response, nil
} }
func (m *RealQMMonitor) deferCloseProcess(cmd *exec.Cmd, stdin io.WriteCloser) { func (m *RealQMMonitor) killOrDefer(cmd *exec.Cmd, closer io.Closer) {
stdin.Close() closer.Close()
if m.deferClose { if m.deferClose {
m.mu.Lock() m.mu.Lock()
m.deferredProcs = append(m.deferredProcs, deferredProc{cmd: cmd, timestamp: time.Now()}) m.deferredProcs = append(m.deferredProcs, deferredProc{cmd: cmd, timestamp: time.Now()})
@@ -140,50 +152,63 @@ func (m *RealQMMonitor) cleanupDeferred() {
m.deferredProcs = still m.deferredProcs = still
} }
func readUntilPrompt(r *bufio.Reader, timeout time.Duration) error { // readUntilMarker reads from r byte-by-byte until the buffer ends with marker
deadline := time.Now().Add(timeout) // or the deadline expires. Returns everything read before the marker.
for { // Uses a goroutine for reads so the deadline is enforced even when ReadByte blocks.
if time.Now().After(deadline) { func readUntilMarker(r *bufio.Reader, marker string, deadline time.Time) (string, error) {
return fmt.Errorf("timeout waiting for qm> prompt") type result struct {
} data string
line, err := r.ReadString('\n') err error
if err != nil { }
// Check if we got the prompt without newline ch := make(chan result, 1)
if strings.Contains(line, "qm>") {
return nil go func() {
var buf []byte
markerBytes := []byte(marker)
for {
b, err := r.ReadByte()
if err != nil {
ch <- result{"", err}
return
}
buf = append(buf, b)
if len(buf) >= len(markerBytes) &&
string(buf[len(buf)-len(markerBytes):]) == marker {
// Return everything before the marker
ch <- result{string(buf[:len(buf)-len(markerBytes)]), nil}
return
} }
return err
}
if strings.Contains(line, "qm>") {
return nil
} }
}()
remaining := time.Until(deadline)
if remaining <= 0 {
remaining = time.Millisecond
}
select {
case res := <-ch:
return res.data, res.err
case <-time.After(remaining):
return "", errTimeout
} }
} }
func readResponseUntilPrompt(r *bufio.Reader, timeout time.Duration) (string, error) { // parseQMResponse takes the raw output before the "qm>" marker from a command
deadline := time.Now().Add(timeout) // response, skips the command echo (first line), and trims \r characters.
var lines []string func parseQMResponse(raw string) string {
firstLine := true lines := strings.Split(raw, "\n")
for { // Skip the command echo (first line)
if time.Now().After(deadline) { if len(lines) > 0 {
return "", fmt.Errorf("timeout waiting for qm> prompt") lines = lines[1:]
}
line, err := r.ReadString('\n')
if err != nil {
if strings.Contains(line, "qm>") {
break
}
return "", err
}
if strings.Contains(line, "qm>") {
break
}
// Skip the echo of the command (first line)
if firstLine {
firstLine = false
continue
}
lines = append(lines, strings.TrimRight(line, "\r\n"))
} }
return strings.Join(lines, "\n"), nil var out []string
for _, line := range lines {
cleaned := strings.TrimRight(line, "\r")
out = append(out, cleaned)
}
// Trim trailing empty lines
for len(out) > 0 && out[len(out)-1] == "" {
out = out[:len(out)-1]
}
return strings.Join(out, "\n")
} }

View File

@@ -1,6 +1,8 @@
package qmmonitor package qmmonitor
import ( import (
"bufio"
"io"
"testing" "testing"
"time" "time"
@@ -57,3 +59,85 @@ func TestMockQMMonitor_Invalidate(t *testing.T) {
t.Errorf("unexpected: %q", r) t.Errorf("unexpected: %q", r)
} }
} }
func TestReadUntilMarker_Success(t *testing.T) {
pr, pw := io.Pipe()
defer pr.Close()
go func() {
pw.Write([]byte("banner\nqm> "))
pw.Close()
}()
reader := bufio.NewReader(pr)
deadline := time.Now().Add(5 * time.Second)
got, err := readUntilMarker(reader, "qm>", deadline)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got != "banner\n" {
t.Errorf("got %q, want %q", got, "banner\n")
}
}
func TestReadUntilMarker_Timeout(t *testing.T) {
pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()
// Write nothing -- should timeout
reader := bufio.NewReader(pr)
deadline := time.Now().Add(50 * time.Millisecond)
start := time.Now()
_, err := readUntilMarker(reader, "qm>", deadline)
elapsed := time.Since(start)
if err != errTimeout {
t.Fatalf("expected errTimeout, got: %v", err)
}
if elapsed > 500*time.Millisecond {
t.Errorf("timeout took too long: %v", elapsed)
}
}
func TestReadUntilMarker_EOF(t *testing.T) {
pr, pw := io.Pipe()
defer pr.Close()
go func() {
pw.Write([]byte("partial data"))
pw.Close()
}()
reader := bufio.NewReader(pr)
deadline := time.Now().Add(5 * time.Second)
_, err := readUntilMarker(reader, "qm>", deadline)
if err == nil {
t.Fatal("expected error on EOF before marker")
}
if err == errTimeout {
t.Fatal("expected EOF error, not timeout")
}
}
func TestParseQMResponse(t *testing.T) {
// Simulate: command echo + response lines with \r + trailing newline
raw := "info network\r\n" +
"net0: index=0,type=tap,ifname=tap100i0\r\n" +
"net1: index=1,type=tap,ifname=tap100i1\r\n"
got := parseQMResponse(raw)
want := "net0: index=0,type=tap,ifname=tap100i0\n" +
"net1: index=1,type=tap,ifname=tap100i1"
if got != want {
t.Errorf("parseQMResponse:\ngot: %q\nwant: %q", got, want)
}
}
func TestParseQMResponse_Empty(t *testing.T) {
// Command echo only, no response data
got := parseQMResponse("info version\r\n")
if got != "" {
t.Errorf("expected empty, got %q", got)
}
}