package agenthttp import ( "fmt" "log" "net/http" "time" "github.com/gorilla/websocket" "zlh-agent/internal/state" ) /* websocket.go ZeroLagHub Agent — Real-time Log WebSocket Endpoint: GET /console/stream */ const maxPayloadSize = 64 * 1024 var wsUpgrader = websocket.Upgrader{ ReadBufferSize: 4096, WriteBufferSize: 4096, CheckOrigin: func(r *http.Request) bool { return true }, } /* -------------------------------------------------------------------------- MAIN HANDLER — /console/stream ----------------------------------------------------------------------------*/ func handleConsoleStream(w http.ResponseWriter, r *http.Request) { cfg, err := state.LoadConfig() if err != nil { http.Error(w, "no config loaded", http.StatusBadRequest) return } log.Printf("[ws] console connect: vmid=%d type=%s runtime=%s game=%s", cfg.VMID, cfg.ContainerType, cfg.Runtime, cfg.Game) conn, err := wsUpgrader.Upgrade(w, r, nil) if err != nil { log.Println("[ws] upgrade failed:", err) http.Error(w, "websocket upgrade failed", http.StatusBadRequest) return } defer conn.Close() conn.SetReadLimit(maxPayloadSize) conn.SetCloseHandler(func(code int, text string) error { log.Printf("[ws] close frame: vmid=%d code=%d text=%q", cfg.VMID, code, text) return nil }) sendCh := make(chan []byte, 64) writeErrCh := make(chan error, 1) go func() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case msg, ok := <-sendCh: if !ok { writeErrCh <- nil return } if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { writeErrCh <- err return } case <-ticker.C: if err := conn.WriteMessage(websocket.TextMessage, []byte{}); err != nil { writeErrCh <- err return } } } }() inputCh := make(chan []byte, 32) readErrCh := make(chan error, 1) go func() { for { msgType, msg, err := conn.ReadMessage() if err != nil { log.Printf("[ws] read error: vmid=%d err=%v", cfg.VMID, err) readErrCh <- err close(inputCh) return } if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage { log.Printf("[ws] input: vmid=%d bytes=%d type=%d", cfg.VMID, len(msg), msgType) inputCh <- msg } } }() waitingNotified := false var sess *consoleSession sessionBound := false for { sess, _, err = getConsoleSession(cfg) if err == nil { log.Printf("[ws] console attached: vmid=%d type=%s", cfg.VMID, cfg.ContainerType) break } if cfg.ContainerType != "dev" { if !waitingNotified { sendCh <- []byte("[info] waiting for server console...") log.Printf("[ws] waiting for server console: vmid=%d type=%s err=%v", cfg.VMID, cfg.ContainerType, err) waitingNotified = true } } else { log.Printf("[ws] dev console unavailable: vmid=%d err=%v", cfg.VMID, err) sendCh <- []byte(fmt.Sprintf("[error] %v", err)) if !sessionBound { close(sendCh) } return } select { case <-time.After(500 * time.Millisecond): case <-readErrCh: if !sessionBound { close(sendCh) } return case <-writeErrCh: if !sessionBound { close(sendCh) } return } } sess.addConn(conn, &consoleConn{send: sendCh}) sessionBound = true defer func() { if sess != nil { if sess.removeConn(conn) == 0 { sess.scheduleCleanupIfIdle() } } }() for { select { case msg, ok := <-inputCh: if !ok { return } if err := sess.writeInput(msg); err != nil { sendCh <- []byte(fmt.Sprintf("[error] %v", err)) } case <-readErrCh: return case err := <-writeErrCh: if err != nil { log.Printf("[ws] write error: vmid=%d err=%v", cfg.VMID, err) } return } } } /* -------------------------------------------------------------------------- Register handler in NewMux() ----------------------------------------------------------------------------*/ func registerWebSocket(m *http.ServeMux) { m.HandleFunc("/console/stream", handleConsoleStream) }