zlh-agent/internal/alloy/alloy.go
2026-04-18 13:53:33 +00:00

361 lines
9.1 KiB
Go

package alloy
import (
"bytes"
"errors"
"fmt"
"log"
"net"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"time"
"zlh-agent/internal/state"
)
const (
ConfigPath = "/etc/alloy/config.alloy"
tmpConfigPath = "/etc/alloy/config.alloy.tmp"
metricsPort = "12345"
unixJob = "integrations/unix"
collector = "alloy"
alloyServiceName = "alloy"
updateAttempts = 3
retryDelay = time.Second
)
type Result struct {
Applied bool
Labels map[string]string
}
var (
writeFile = os.WriteFile
readFile = os.ReadFile
removeFile = os.Remove
renameFile = os.Rename
localIPFunc = localIPv4
runCommand = runSystemCommand
dialTimeout = net.DialTimeout
sleepFunc = time.Sleep
)
func EnsureConfig(cfg state.Config) (Result, error) {
var last Result
var lastErr error
for attempt := 1; attempt <= updateAttempts; attempt++ {
result, err := ensureConfigOnce(cfg, attempt, last.Applied)
if err == nil {
return result, nil
}
last = result
lastErr = err
log.Printf("[alloy] vmid=%d action=update status=failed attempt=%d/%d err=%v", cfg.VMID, attempt, updateAttempts, err)
if attempt < updateAttempts {
sleepFunc(retryDelay)
}
}
return last, lastErr
}
func ensureConfigOnce(cfg state.Config, attempt int, forceRestart bool) (Result, error) {
log.Printf("[alloy] vmid=%d action=render status=attempt type=%s", cfg.VMID, cfg.ContainerType)
if !managedContainerType(cfg.ContainerType) {
log.Printf("[alloy] vmid=%d action=render status=skipped type=%s", cfg.VMID, cfg.ContainerType)
return Result{}, nil
}
labels, err := Labels(cfg)
if err != nil {
log.Printf("[alloy] vmid=%d action=render status=failed err=%v", cfg.VMID, err)
return Result{}, err
}
log.Printf("[alloy] vmid=%d action=render status=labels labels=%s", cfg.VMID, formatLabels(labels))
existing, err := readFile(ConfigPath)
if err != nil {
return Result{Labels: labels}, fmt.Errorf("read alloy config: %w", err)
}
rendered, err := ReplaceExternalLabelsBlock(string(existing), labels)
if err != nil {
return Result{Labels: labels}, err
}
if err := writeFile(tmpConfigPath, []byte(rendered), 0o644); err != nil {
return Result{Labels: labels}, fmt.Errorf("write alloy temp config: %w", err)
}
if bytes.Equal(existing, []byte(rendered)) {
if removeErr := removeFile(tmpConfigPath); removeErr != nil && !errors.Is(removeErr, os.ErrNotExist) {
return Result{Labels: labels}, fmt.Errorf("remove unchanged alloy temp config: %w", removeErr)
}
if forceRestart {
log.Printf("[alloy] vmid=%d action=render status=unchanged restart_retry=true", cfg.VMID)
if err := restartAndValidate(cfg.VMID, labels, attempt); err != nil {
return Result{Applied: true, Labels: labels}, err
}
return Result{Applied: true, Labels: labels}, nil
}
log.Printf("[alloy] vmid=%d action=render status=unchanged", cfg.VMID)
return Result{Applied: false, Labels: labels}, nil
}
if err := renameFile(tmpConfigPath, ConfigPath); err != nil {
_ = removeFile(tmpConfigPath)
return Result{Labels: labels}, fmt.Errorf("replace alloy config: %w", err)
}
log.Printf("[alloy] vmid=%d action=render status=changed path=%s", cfg.VMID, ConfigPath)
if err := restartAndValidate(cfg.VMID, labels, attempt); err != nil {
return Result{Applied: true, Labels: labels}, err
}
return Result{Applied: true, Labels: labels}, nil
}
func restartAndValidate(vmid int, labels map[string]string, attempt int) error {
log.Printf("[alloy] vmid=%d action=restart status=attempt attempt=%d/%d service=%s", vmid, attempt, updateAttempts, alloyServiceName)
if err := runCommand("systemctl", "restart", alloyServiceName); err != nil {
log.Printf("[alloy] vmid=%d action=restart status=failed err=%v labels=%s", vmid, err, formatLabels(labels))
return fmt.Errorf("restart alloy: %w", err)
}
if err := validateAlloy(); err != nil {
log.Printf("[alloy] vmid=%d action=restart status=failed err=%v labels=%s", vmid, err, formatLabels(labels))
return err
}
log.Printf("[alloy] vmid=%d action=restart status=success labels=%s", vmid, formatLabels(labels))
return nil
}
func RenderExternalLabelsBlock(labels map[string]string) string {
order := []string{
"job",
"instance",
"collector",
"role",
"vmid",
}
var b strings.Builder
b.WriteString(" external_labels = {\n")
for _, key := range order {
value, ok := labels[key]
if !ok {
continue
}
fmt.Fprintf(&b, " %-9s = \"%s\",\n", key, escapeAlloyString(value))
}
b.WriteString(" }\n")
return b.String()
}
func ReplaceExternalLabelsBlock(config string, labels map[string]string) (string, error) {
start, end, err := externalLabelsRange(config)
if err != nil {
return "", err
}
return config[:start] + RenderExternalLabelsBlock(labels) + config[end:], nil
}
func Labels(cfg state.Config) (map[string]string, error) {
containerType := strings.ToLower(strings.TrimSpace(cfg.ContainerType))
if !managedContainerType(containerType) {
return nil, fmt.Errorf("unsupported alloy container type: %q", cfg.ContainerType)
}
if cfg.VMID <= 0 {
return nil, fmt.Errorf("missing required container metadata: vmid")
}
ip := strings.TrimSpace(cfg.ContainerIP)
if ip == "" {
var err error
ip, err = localIPFunc()
if err != nil {
return nil, err
}
}
if parsed := net.ParseIP(ip); parsed == nil {
return nil, fmt.Errorf("invalid container_ip metadata: %q", ip)
}
role := "game-container"
if containerType == "dev" {
role = "dev-container"
}
labels := map[string]string{
"job": unixJob,
"instance": net.JoinHostPort(ip, metricsPort),
"collector": collector,
"role": role,
"vmid": strconv.Itoa(cfg.VMID),
}
return labels, nil
}
func managedContainerType(containerType string) bool {
switch strings.ToLower(strings.TrimSpace(containerType)) {
case "game", "dev":
return true
default:
return false
}
}
func formatLabels(labels map[string]string) string {
keys := make([]string, 0, len(labels))
for key := range labels {
keys = append(keys, key)
}
sort.Strings(keys)
parts := make([]string, 0, len(keys))
for _, key := range keys {
parts = append(parts, key+"="+strconv.Quote(labels[key]))
}
return strings.Join(parts, " ")
}
func externalLabelsRange(config string) (int, int, error) {
const marker = "external_labels"
start := strings.Index(config, marker)
if start < 0 {
return 0, 0, fmt.Errorf("alloy config missing external_labels block")
}
if next := strings.Index(config[start+len(marker):], marker); next >= 0 {
return 0, 0, fmt.Errorf("alloy config contains multiple external_labels blocks")
}
eq := strings.Index(config[start+len(marker):], "=")
if eq < 0 {
return 0, 0, fmt.Errorf("alloy external_labels block missing assignment")
}
eq += start + len(marker)
open := strings.Index(config[eq+1:], "{")
if open < 0 {
return 0, 0, fmt.Errorf("alloy external_labels block missing opening brace")
}
open += eq + 1
end, err := matchingBraceEnd(config, open)
if err != nil {
return 0, 0, err
}
lineStart := strings.LastIndex(config[:start], "\n")
if lineStart < 0 {
lineStart = 0
} else {
lineStart++
}
if strings.TrimSpace(config[lineStart:start]) == "" {
start = lineStart
}
if end < len(config) && config[end] == '\n' {
end++
}
return start, end, nil
}
func matchingBraceEnd(config string, open int) (int, error) {
depth := 0
inString := false
escaped := false
for i := open; i < len(config); i++ {
ch := config[i]
if inString {
if escaped {
escaped = false
continue
}
if ch == '\\' {
escaped = true
continue
}
if ch == '"' {
inString = false
}
continue
}
switch ch {
case '"':
inString = true
case '{':
depth++
case '}':
depth--
if depth == 0 {
return i + 1, nil
}
}
}
return 0, fmt.Errorf("alloy external_labels block missing closing brace")
}
func escapeAlloyString(value string) string {
value = strings.ReplaceAll(value, "\\", "\\\\")
return strings.ReplaceAll(value, "\"", "\\\"")
}
func validateAlloy() error {
if err := runCommand("systemctl", "is-active", "--quiet", alloyServiceName); err != nil {
return fmt.Errorf("alloy service is not active: %w", err)
}
conn, err := dialTimeout("tcp", net.JoinHostPort("127.0.0.1", metricsPort), 3*time.Second)
if err != nil {
return fmt.Errorf("alloy is not listening on :%s: %w", metricsPort, err)
}
_ = conn.Close()
return nil
}
func runSystemCommand(name string, args ...string) error {
out, err := exec.Command(name, args...).CombinedOutput()
if err != nil {
return fmt.Errorf("%s %s: %w: %s", name, strings.Join(args, " "), err, strings.TrimSpace(string(out)))
}
return nil
}
func localIPv4() (string, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", fmt.Errorf("read local addresses: %w", err)
}
candidates := make([]string, 0)
for _, addr := range addrs {
ipNet, ok := addr.(*net.IPNet)
if !ok {
continue
}
ip := ipNet.IP.To4()
if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() {
continue
}
candidates = append(candidates, ip.String())
}
if len(candidates) == 0 {
return "", fmt.Errorf("missing required container metadata: non-loopback IPv4")
}
sort.Strings(candidates)
return candidates[0], nil
}