package agenthttp import ( "bufio" "crypto/sha1" "encoding/base64" "fmt" "log" "net" "net/http" "os" "path/filepath" "strings" "time" "zlh-agent/internal/provision" "zlh-agent/internal/state" ) /* websocket.go ZeroLagHub Agent — Real-time Log WebSocket Endpoint: GET /console/stream */ const maxInitialTail = 4096 // 4 KB /* -------------------------------------------------------------------------- Minimal WebSocket Upgrader (stdlib only) ----------------------------------------------------------------------------*/ type WebSocketConn struct { Conn net.Conn Rw *bufio.ReadWriter } func upgradeToWebSocket(w http.ResponseWriter, r *http.Request) (*WebSocketConn, error) { if !strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") || strings.ToLower(r.Header.Get("Upgrade")) != "websocket" { return nil, fmt.Errorf("invalid websocket upgrade request") } key := r.Header.Get("Sec-WebSocket-Key") if key == "" { return nil, fmt.Errorf("missing Sec-WebSocket-Key") } accept := computeAcceptKey(key) h := w.Header() h.Set("Upgrade", "websocket") h.Set("Connection", "Upgrade") h.Set("Sec-WebSocket-Accept", accept) h.Set("Sec-WebSocket-Version", "13") w.WriteHeader(http.StatusSwitchingProtocols) hj, ok := w.(http.Hijacker) if !ok { return nil, fmt.Errorf("websocket: hijacking not supported") } conn, rw, err := hj.Hijack() if err != nil { return nil, fmt.Errorf("websocket hijack: %w", err) } return &WebSocketConn{ Conn: conn, Rw: rw, }, nil } func (c *WebSocketConn) WriteText(msg string) error { payload := []byte(msg) // FIN + opcode(1 = text) header := []byte{0x81} // Length encoding if len(payload) < 126 { header = append(header, byte(len(payload))) } else { header = append(header, 126, byte(len(payload)>>8), byte(len(payload))) } if _, err := c.Conn.Write(header); err != nil { return err } _, err := c.Conn.Write(payload) return err } func (c *WebSocketConn) Close() error { return c.Conn.Close() } /* -------------------------------------------------------------------------- SHA-1 + Base64 for Sec-WebSocket-Accept ----------------------------------------------------------------------------*/ func computeAcceptKey(key string) string { const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" h := sha1.Sum([]byte(key + magic)) return base64.StdEncoding.EncodeToString(h[:]) } /* -------------------------------------------------------------------------- MAIN HANDLER — /console/stream ----------------------------------------------------------------------------*/ func handleConsoleStream(w http.ResponseWriter, r *http.Request) { cfg, err := state.LoadConfig() if err != nil { http.Error(w, "no config loaded", http.StatusBadRequest) return } ws, err := upgradeToWebSocket(w, r) if err != nil { log.Println("[ws] upgrade failed:", err) http.Error(w, "websocket upgrade failed", http.StatusBadRequest) return } defer ws.Close() dir := provision.ServerDir(*cfg) logfile := filepath.Join(dir, "logs", "latest.log") f, err := os.Open(logfile) if err != nil { _ = ws.WriteText(fmt.Sprintf("[error] cannot open log: %v", err)) return } defer f.Close() // 1) Send last 4 KB (initial tail) stat, _ := f.Stat() sz := stat.Size() if sz > maxInitialTail { _, _ = f.Seek(sz-maxInitialTail, 0) } scanner := bufio.NewScanner(f) for scanner.Scan() { _ = ws.WriteText(scanner.Text()) } // 2) Follow the file — stream new log lines live for { time.Sleep(500 * time.Millisecond) for scanner.Scan() { line := scanner.Text() _ = ws.WriteText(line) } // on EOF, loop continues and scanner will pick up new lines } } /* -------------------------------------------------------------------------- Register handler in NewMux() ----------------------------------------------------------------------------*/ func registerWebSocket(m *http.ServeMux) { m.HandleFunc("/console/stream", handleConsoleStream) }