-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
222 lines (185 loc) · 6.19 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package main
import (
"fmt"
"github.com/gorilla/mux"
"io/ioutil"
"log"
"net/http"
"net/http/httputil"
"net/url"
"os"
"strings"
"time"
)
// Basic options for the batching size and frequency
const batchSize int = 1000 // maximum documents per batch
const batchPause time.Duration = time.Millisecond * 1000 // milliseconds between batch updates
// Configure the server from the environment
var serverPort = os.Getenv("PORT")
var elasticsearchUrl = os.Getenv("BONSAI_URL")
// Buffered channel for updates. These should be strings in the Bulk API
// format.
var updates chan string = make(chan string, batchSize)
// Start up the processor and the HTTP server.
func main() {
go processor()
server()
}
// Processor receives batch updates from the updates channel and sends them to
// Elasticsearch, subject to a short cooldown interval. The strings in the
// updates channel should be formatted for the bulk API
func processor() {
// Create an array of strings to help prepare our bulk request, set its
// length to zero.
batch := make([]string, batchSize)
batch = batch[0:0]
// A short interval to wait between requests. This gives a bit of time for
// Elasticsearch to think, as well as our channel to refill.
limiter := time.Tick(batchPause)
// Infinite loop to pull updates out of a channel, and periodically send them
// to Elasticsearch
for {
select {
// Pull as many updates as we can and batch them into a slice.
case update := <-updates:
batch = append(batch, update)
// When the channel is empty, combine the batch, send it to ES, then wait a
// bit.
default:
if len(batch) > 0 {
log.Println("Processed", len(batch), "updates")
// POST the batch to the Elasticsearch cluster _bulk handler
resp, err := http.Post(
elasticsearchUrl+"/_bulk", "application/json",
strings.NewReader(strings.Join(batch, "")))
if err != nil {
log.Println("Error:", err)
}
// TODO: log something interesting from the response
if true {
body, _ := ioutil.ReadAll(resp.Body)
log.Println("Response", string(body))
resp.Body.Close()
}
// Reset the batch array
batch = batch[0:0]
}
<-limiter
}
}
}
// Set up and run an HTTP server that intercepts updates and formats them for
// batches, and also proxies searches through to Elasticsearch.
func server() {
r := mux.NewRouter()
s := r.Methods("PUT", "POST").Subrouter()
// Match the various _bulk handlers
s.Path("/_bulk").
HandlerFunc(queueClusterBulk)
s.Path("/{index}/_bulk").
HandlerFunc(queueIndexBulk)
s.Path("/{index}/{type}/_bulk").
HandlerFunc(queueTypeBulk)
// Handle individual document updates
r.Methods("POST", "PUT").
Path("/{index}/{type}/{id}").
HandlerFunc(queueUpdates)
// Handle individual document deletes as well
r.Methods("DELETE").
Path("/{index}/{type}/{id}").
HandlerFunc(queueDeletes)
// Proxy everything else straight through to Elasticsearch
r.PathPrefix("/").
HandlerFunc(proxy)
// Start the HTTP server
http.Handle("/", r)
fmt.Println("listening on " + serverPort + "...")
err := http.ListenAndServe(":"+serverPort, nil)
if err != nil {
panic(err)
}
}
//
// Request Handlers
//
// Proxy read requests straight through to Elasticsearch
// TODO: parse the URL and initialize the proxy somewhere else?
func proxy(res http.ResponseWriter, req *http.Request) {
log.Println("Proxied:", req.Method, req.RequestURI)
// Set up a simple proxy for read requests
targetUrl, err := url.Parse(elasticsearchUrl)
if err != nil {
log.Println("ERROR - Couldn't parse URL:", elasticsearchUrl, " - ", err)
fmt.Fprintln(res, "Couldn't parse BONSAI_URL: '", elasticsearchUrl, "'")
} else {
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
proxy.ServeHTTP(res, req)
}
}
// Parse cluster bulk updates and queue them.
func queueClusterBulk(res http.ResponseWriter, req *http.Request) {
updates <- readBody(req)
respond202(res)
}
// Parse index bulk requests and queue them. The index name may have been
// implicit from the URL and omitted in the payload, so we have to deal with
// that.
func queueIndexBulk(res http.ResponseWriter, req *http.Request) {
log.Println("WARNING:", req.RequestURI, "is not currently optimized, use /_bulk instead")
proxy(res, req)
}
// Parse index bulk requests and queue them. The index and type names may have
// been implicit from the URL and omitted in the payload, so we have to deal
// with that.
func queueTypeBulk(res http.ResponseWriter, req *http.Request) {
log.Println("WARNING:", req.RequestURI, "is not currently optimized, use /_bulk instead")
proxy(res, req)
}
// Parse individual document updates and queue them
func queueDeletes(res http.ResponseWriter, req *http.Request) {
updates <- bulk("delete", req)
respond202(res)
}
// Parse individual document updates and queue them
func queueUpdates(res http.ResponseWriter, req *http.Request) {
updates <- bulk("index", req)
respond202(res)
}
//
// Request helpers
//
// Helper to read the body of a document update request, and enforce a trailing
// newline.
func readBody(req *http.Request) string {
body, err := ioutil.ReadAll(req.Body)
req.Body.Close()
if err != nil {
return "{}"
}
return strings.TrimRight(string(body), "\n")
}
// Helper to turn individual document updates into bulk JSON
func bulk(action string, req *http.Request) string {
vars := mux.Vars(req)
msg := fmt.Sprintf(
"{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\"}}\n",
vars["index"], vars["type"], vars["id"])
return msg + readBody(req) + "\n"
// return msg + string.TrimRight(req.Body, `\n`) + `\n`
}
//
// Response writers
//
// Indicate that we've queued the message
func respond202(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json; charset=utf-8")
res.WriteHeader(202)
fmt.Fprintln(res, `{"status":"ok","message":"queued"}`)
}
// For the sake of shipping a prototype, I'm going to refuse index and type
// bulk imports while I work out the nuts and bolts of parsing them.
func respond501(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json; charset=utf-8")
res.WriteHeader(502)
fmt.Fprintln(res, `{ "status": "error", "message": "this action is not implemented, pull requests welcome!" }`)
}