forked from ingyamilmolinar/doctorgpt
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
242 lines (215 loc) · 7.01 KB
/
main.go
File metadata and controls
242 lines (215 loc) · 7.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package main
import (
"flag"
"fmt"
"os"
"time"
"github.com/hpcloud/tail"
"go.uber.org/zap"
)
func main() {
// Parse command-line arguments
// TODO: Monitor all logs in a directory
logFilePath := flag.String("logfile", "", "path to log file")
outputDir := flag.String("outdir", "", "path to output directory")
configFilePath := flag.String("configfile", "", "path to config file")
debugFlag := flag.Bool("debug", true, "log debug flag")
logBundlingTimeoutInSecs := flag.Int("bundlingtimeoutseconds", 5, "log bundling timeout duration in seconds")
bufferSize := flag.Int("buffersize", 100, "max log entries per ring-buffer")
maxTokens := flag.Int("maxtokens", 8000, "max tokens for context per API request")
gptModel := flag.String("gptmodel", "gpt-4", "GPT model to use for diagnosis")
flag.Parse()
// Init logger
var err error
var logger *zap.Logger
if *debugFlag {
logger, err = zap.NewDevelopment()
} else {
logger, err = zap.NewProduction()
}
if err != nil {
fmt.Printf("Failed to init logger: %v", err)
os.Exit(1)
}
// TODO: Can this cause issues?
// TODO: Handle a kill event gracefully
defer logger.Sync()
log := logger.Sugar()
if *logFilePath == "" {
log.Fatal("Log file path is required")
}
if *outputDir == "" {
log.Fatal("Output directory path is required")
}
if *configFilePath == "" {
log.Fatal("Config file path is required")
}
// Get ChatGPT API key from environment variable
apiKey := os.Getenv("OPENAI_KEY")
if apiKey == "" {
log.Fatal("ChatGPT API key is required")
}
// Setup and build parsers
parsers, err := setup(log, *configFilePath, *outputDir, fileConfigProvider)
if err != nil {
log.Fatal("Setup failed: %v", err)
}
// This will effectively never end (it doesn't handle EOF)
timeoutDuration := time.Duration(*logBundlingTimeoutInSecs) * time.Second
monitorLogLoop(log, *logFilePath, *outputDir, apiKey, *gptModel, *bufferSize, *maxTokens, parsers, handleTrigger, timeoutDuration, true)
}
func setup(log *zap.SugaredLogger, configFile, outputDir string, configProvider configProvider) ([]parser, error) {
config, err := configProvider(log, configFile)
if err != nil {
return nil, fmt.Errorf("config provider failed: %w", err)
}
if config.Prompt != "" {
basePrompt = config.Prompt
}
var parsers []parser
for _, parser := range config.Parsers {
parser, err := newParser(log, parser.Regex, parser.Filters, parser.Triggers, parser.Excludes)
if err != nil {
return nil, fmt.Errorf("invalid config file: %w", err)
}
log.Debugf("Appending parser (%s)", parser.regex)
parsers = append(parsers, parser)
}
log.Infof("Initialized (%d) parsers", len(parsers))
// Create dir if not exists
exists, err := exists(outputDir)
if err != nil {
return nil, fmt.Errorf("failed to open output directory: %w", err)
}
// TODO: If exists, check permissions
if !exists {
err = os.Mkdir(outputDir, 0755)
if err != nil {
return nil, fmt.Errorf("failed to create output directory: %w", err)
}
}
return parsers, nil
}
func monitorLogLoop(log *zap.SugaredLogger, fileName, outputDir, apiKey, model string, bufferSize, maxTokens int, parsers []parser, handler handler, timeout time.Duration, follow bool) {
// Set up tail object to read log file
tailConfig := tail.Config{
Follow: follow,
ReOpen: follow,
}
t, err := tail.TailFile(fileName, tailConfig)
if err != nil {
log.Fatalf("Failed to tail log file: %v", err)
}
// Map of log buffers, keyed by thread ID or routine name
logBuffers := make(map[string]*logBuffer)
// Loop to read new lines from the log file
lineNum := 0
for line := range t.Lines {
lineNum++
top:
// Parse the log entry
entry, parserMatched, err := parseLogEntry(log, parsers, line.Text, lineNum)
if err != nil {
log.Fatalf("Error parsing log entry (%s)", line)
}
// If entry is excluded, ignore it
if entry.Excluded {
continue
}
// TODO: Divide into buffers depending on granularity
key := "DEFAULT"
log.Debugf("Process key (%s)", key)
// Create a new buffer if necessary
if _, ok := logBuffers[key]; !ok {
logBuffers[key] = newLogBuffer(log, bufferSize, maxTokens-len(basePrompt))
}
// Buffer the log entry
log.Debugf("Appending to buffer: (%s)", line)
buffer := logBuffers[key]
buffer.Append(entry)
// Check if the log entry indicates an error
log.Debugf("Should filter: %v", entry.Filtered)
log.Debugf("Should diagnose: %v", !entry.Filtered && entry.Triggered)
if !entry.Filtered && entry.Triggered {
entryToDiagnose := entry
log.Infof("Entry to diagnose: %s", entryToDiagnose.Text)
// Append subsequent log entries to the buffer until a new log level is detected
// Wait for input or timeout in N seconds
timec := time.After(timeout)
outer:
for {
select {
case <-timec:
log.Info("Timeout!")
break outer // timed out
// Process previous entry if exist
case l, ok := <-t.Lines:
if !ok {
log.Debug("Log line channel closed or empty")
break outer
}
// increment line number
lineNum++
// Parse lines until we hit a known log line that's not the generic one
var matched int
entry, matched, err = parseLogEntry(log, parsers, l.Text, lineNum)
if err != nil {
log.Fatalf("Error parsing log entry (%s)", l.Text)
}
// If entry is excluded, ignore it
if entry.Excluded {
continue
}
// TODO: Have an optional "bundle" line limit to avoid packing too much context after the error
// TODO: Do not rely on location for the default parser
if matched == len(parsers)-1 || (matched == parserMatched && !entry.Filtered && entry.Triggered) {
// Matched default parser OR
// Matched the same parser and it was triggered
log.Debugf("Default parser matched: (%v)", matched == len(parsers)-1)
log.Debugf("Appending to buffer: (%v)", entry)
buffer := logBuffers[key]
buffer.Append(entry)
} else {
// Spoof line and go back to top
log.Debugf("Spoofing: (%s)", l.Text)
line = l
// TODO: Deduplicate this logic
// dump log context buffer and clear
dumpedBuffer := logBuffers[key].Dump()
logBuffers[key].Clear()
go func() {
err := handler(log, fileName, outputDir, apiKey, model, entryToDiagnose, dumpedBuffer)
if err != nil {
log.Errorf("Handler failed: %v", err)
}
}()
goto top
}
}
}
// dump log context buffer and clear
dumpedBuffer := logBuffers[key].Dump()
logBuffers[key].Clear()
// Async call the ChatGPT API
// TODO: We need persistance to make sure all errors are reported
// TODO: Expose N prompts and N diagnosis per error configuration
go func() {
err := handler(log, fileName, outputDir, apiKey, model, entryToDiagnose, dumpedBuffer)
if err != nil {
log.Errorf("Handler failed: %v", err)
}
}()
}
}
}
// exists returns whether the given file or directory exists
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}