Skip to content

Commit d91bf53

Browse files
TingjiaTingjia Cao
authored andcommitted
Leopard Implementation for NPFU
0 parents  commit d91bf53

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+5770
-0
lines changed

.gitignore

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
*.log
2+
*.json
3+
imgs/ol-min
4+
5+
ol
6+
min-image/spin
7+
8+
default-ol/*

BilliBenchClient/client.go

Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"log"
9+
"math"
10+
"net/http"
11+
"os"
12+
"strconv"
13+
"sync"
14+
"time"
15+
)
16+
17+
type LambdaFunction struct {
18+
MaxCPU int `json:"maxCPU"`
19+
MaxMemory int `json:"maxMemory"`
20+
IdleMemory int `json:"idleMemory"`
21+
ColdStartDuration int `json:"coldStartDuration"`
22+
23+
AvgCPU_orig float64 `json:"avgCPU_orig"`
24+
AvgCPU_uniform float64 `json:"avgCPU_uniform"`
25+
AvgCPU_norm_50 float64 `json:"avgCPU_norm_50"`
26+
AvgCPU_norm_70 float64 `json:"avgCPU_norm_70"`
27+
AvgCPU_norm_30 float64 `json:"avgCPU_norm_30"`
28+
AvgCPU_norm_90 float64 `json:"avgCPU_norm_90"`
29+
AvgCPU_norm_10 float64 `json:"avgCPU_norm_10"`
30+
31+
AvgMemory float64 `json:"avgMemory"`
32+
AvgRuntime float64 `json:"avgRuntime"`
33+
34+
TotalInvocationNumber int `json:"totalInvocationNumber"`
35+
}
36+
37+
type LambdaRegisterMsg struct {
38+
FuncName string `json:"funcName"`
39+
40+
// Knobs
41+
MaxCPU int `json:"maxCPU"`
42+
MaxMem int `json:"maxMem"`
43+
Preemptible bool `json:"preemptible"`
44+
45+
// ColdStart information
46+
IdleMemory int `json:"idleMemory"`
47+
ColdStartDuration int `json:"coldStartDuration"`
48+
49+
// Stat
50+
AvgCPUUtil float64 `json:"avgCPUUtil"`
51+
AvgMemUtil float64 `json:"avgMemUtil"`
52+
AvgRuntime float64 `json:"avgRuntime"`
53+
}
54+
55+
type LambdaCallMsg struct {
56+
FuncName string `json:"funcName"`
57+
IdealRuntime float64 `json:"idealRuntime"`
58+
EventStr string `json:"eventStr"`
59+
InvokeId string `json:"invokeId"`
60+
}
61+
62+
var Conf *Config
63+
64+
type Config struct {
65+
AdmissionControl string
66+
Mem2CPU int
67+
boss_addr string
68+
boss_port string
69+
requestLog *log.Logger
70+
load float64
71+
cpudist string
72+
}
73+
74+
func RegisterFunctions(lambda_functions map[string]LambdaFunction, batch_set []string, client *http.Client) {
75+
var maxCPU int
76+
var maxMemory int
77+
var preemptible bool
78+
79+
for lambda_name, lambda_function := range lambda_functions {
80+
lambda_name_int, err := strconv.Atoi(lambda_name)
81+
maxCPU = lambda_name_int%8 + 1
82+
maxMemory = lambda_function.MaxMemory
83+
if Conf.AdmissionControl == "StaticLinear" {
84+
preemptible = false
85+
if maxCPU*Conf.Mem2CPU < maxMemory {
86+
maxCPU = int(math.Ceil(float64(maxMemory) / float64(Conf.Mem2CPU)))
87+
}
88+
maxMemory = maxCPU * Conf.Mem2CPU
89+
} else if Conf.AdmissionControl == "Static" {
90+
preemptible = false
91+
} else {
92+
preemptible = false
93+
for _, str := range batch_set {
94+
if str == lambda_name {
95+
preemptible = true
96+
break
97+
}
98+
}
99+
}
100+
101+
avgCPU := 0.0
102+
switch Conf.cpudist {
103+
case "orig":
104+
avgCPU = lambda_function.AvgCPU_orig
105+
case "uniform":
106+
avgCPU = lambda_function.AvgCPU_uniform
107+
case "norm_10":
108+
avgCPU = lambda_function.AvgCPU_norm_10
109+
case "norm_30":
110+
avgCPU = lambda_function.AvgCPU_norm_30
111+
case "norm_50":
112+
avgCPU = lambda_function.AvgCPU_norm_50
113+
case "norm_70":
114+
avgCPU = lambda_function.AvgCPU_norm_70
115+
case "norm_90":
116+
avgCPU = lambda_function.AvgCPU_norm_90
117+
}
118+
119+
msg := LambdaRegisterMsg{
120+
FuncName: lambda_name,
121+
MaxCPU: maxCPU,
122+
MaxMem: maxMemory,
123+
Preemptible: preemptible,
124+
125+
IdleMemory: lambda_function.IdleMemory,
126+
ColdStartDuration: lambda_function.ColdStartDuration,
127+
128+
AvgCPUUtil: avgCPU * float64(lambda_name_int%8+1),
129+
AvgMemUtil: lambda_function.AvgMemory,
130+
AvgRuntime: lambda_function.AvgRuntime,
131+
}
132+
133+
jsonData, err := json.Marshal(msg)
134+
if err != nil {
135+
panic(fmt.Sprintf("Error marshaling JSON: %v", err))
136+
}
137+
138+
url := fmt.Sprintf("http://%s:%s/register/", Conf.boss_addr, Conf.boss_port)
139+
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
140+
if err != nil {
141+
panic(fmt.Sprintf("error creating request: %v", err))
142+
}
143+
req.Header.Set("Content-Type", "application/json")
144+
145+
fmt.Printf("%s\n", jsonData)
146+
resp, err := client.Do(req)
147+
if err != nil {
148+
panic(fmt.Sprintf("error sending request: %v", err))
149+
}
150+
resp.Body.Close()
151+
152+
fmt.Println("Response status:", resp.Status)
153+
}
154+
155+
return
156+
}
157+
158+
func SendLambdaCallReq(fname string, lambda_call map[string]interface{}, client *http.Client, wg *sync.WaitGroup, nextId int) {
159+
defer wg.Done()
160+
event := make(map[string]interface{})
161+
event["durationArray"] = lambda_call["durationArray"].([]interface{})
162+
event["memArray"] = lambda_call["memArray"].([]interface{})
163+
event["cpuArray"] = lambda_call[fmt.Sprintf("parallelArray_%s", Conf.cpudist)].([]interface{})
164+
165+
eventStr, err := json.Marshal(event)
166+
if err != nil {
167+
panic(fmt.Sprintf("Error marshaling JSON: %v", err))
168+
}
169+
170+
msg := LambdaCallMsg{
171+
FuncName: fname,
172+
IdealRuntime: lambda_call["IdealRuntime"].(float64),
173+
EventStr: string(eventStr),
174+
InvokeId: fmt.Sprintf("%sIK%d", fname, nextId),
175+
}
176+
177+
jsonData, err := json.Marshal(msg)
178+
if err != nil {
179+
panic(fmt.Sprintf("Error marshaling JSON: %v", err))
180+
}
181+
182+
url := fmt.Sprintf("http://%s:%s/run/", Conf.boss_addr, Conf.boss_port)
183+
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
184+
if err != nil {
185+
panic(fmt.Sprintf("error creating request: %v", err))
186+
}
187+
req.Header.Set("Content-Type", "application/json")
188+
fmt.Printf("%s\n", jsonData)
189+
Conf.requestLog.Printf("%s", jsonData)
190+
resp, err := client.Do(req)
191+
if err != nil {
192+
panic(fmt.Sprintf("error sending request: %v", err))
193+
}
194+
resp.Body.Close()
195+
196+
}
197+
198+
func conditionalMultiple(index, divisor, falseVal, trueVal int) int {
199+
if index%divisor == 0 {
200+
return falseVal
201+
}
202+
return trueVal
203+
}
204+
205+
func CallFunction(fname string, lambda_call_list [][]interface{}, client *http.Client, wg *sync.WaitGroup) {
206+
defer wg.Done()
207+
208+
var function_wg sync.WaitGroup
209+
// timeInterval := int(lambda_calls[0].(float64))
210+
nextId := 0
211+
multiple := 0
212+
for index, value := range lambda_call_list {
213+
timeInterval := int(value[0].(float64))
214+
time.Sleep(time.Duration(timeInterval) * time.Millisecond)
215+
216+
switch Conf.load {
217+
case 0.25:
218+
multiple = conditionalMultiple(index, 4, 1, 0)
219+
case 0.5:
220+
multiple = conditionalMultiple(index, 2, 0, 1)
221+
case 0.75:
222+
multiple = conditionalMultiple(index, 4, 0, 1)
223+
case 1:
224+
multiple = 1
225+
case 1.25:
226+
multiple = conditionalMultiple(index, 4, 2, 1)
227+
case 1.5:
228+
multiple = conditionalMultiple(index, 2, 1, 2)
229+
case 1.75:
230+
multiple = conditionalMultiple(index, 4, 1, 2)
231+
case 2:
232+
multiple = 2
233+
case 2.5:
234+
multiple = conditionalMultiple(index, 2, 2, 3)
235+
case 3:
236+
multiple = 3
237+
}
238+
239+
for i := 0; i < multiple; i += 1 {
240+
function_wg.Add(1)
241+
lambda_call := value[1].(map[string]interface{})
242+
go SendLambdaCallReq(fname, lambda_call, client, &function_wg, nextId)
243+
nextId += 1
244+
}
245+
246+
}
247+
248+
function_wg.Wait()
249+
}
250+
251+
type Addresses struct {
252+
Cpudist string `json:"cpudist"`
253+
Batchset string `json:"batchset"`
254+
AdmissionControl string `json:"admissionControl"`
255+
Controller string `json:"controller"`
256+
Workers []string `json:"workers"`
257+
Load float64 `json:"load"`
258+
Mem2CPU float64 `json:"mem2CPU"`
259+
}
260+
261+
func main() {
262+
file, err := os.Open("/users/Tingjia/project/addresses.json")
263+
if err != nil {
264+
log.Fatalf("Failed to open file: %s", err)
265+
}
266+
defer file.Close()
267+
268+
byteValue, err := io.ReadAll(file)
269+
if err != nil {
270+
log.Fatalf("Failed to read file: %s", err)
271+
}
272+
273+
var addresses Addresses
274+
err = json.Unmarshal(byteValue, &addresses)
275+
if err != nil {
276+
log.Fatalf("Failed to unmarshal JSON: %s", err)
277+
}
278+
279+
Conf = &Config{
280+
AdmissionControl: addresses.AdmissionControl, // StaticLinear Static FLEX
281+
Mem2CPU: int(addresses.Mem2CPU),
282+
boss_addr: addresses.Controller,
283+
boss_port: "5000",
284+
load: addresses.Load,
285+
cpudist: addresses.Cpudist,
286+
}
287+
288+
requestLogFile, _ := os.Create("request.log")
289+
requestLog := log.New(requestLogFile, "", 0)
290+
requestLog.SetFlags(log.Lmicroseconds)
291+
Conf.requestLog = requestLog
292+
293+
client := &http.Client{}
294+
295+
var lambda_functions map[string]LambdaFunction
296+
var lambda_calls map[string][][]interface{}
297+
// Get the function statistic and meta data
298+
jsonFile, err := os.Open("workloads/Functions.json")
299+
if err != nil {
300+
log.Fatalf("Failed to open JSON file: %s", err)
301+
}
302+
defer jsonFile.Close()
303+
304+
byteValue, err = io.ReadAll(jsonFile)
305+
if err != nil {
306+
log.Fatalf("Failed to read JSON file: %s", err)
307+
}
308+
309+
err = json.Unmarshal(byteValue, &lambda_functions)
310+
if err != nil {
311+
fmt.Println("Error parsing JSON:", err)
312+
return
313+
}
314+
315+
jsonFile, err = os.Open(fmt.Sprintf("workloads/Batch_set.json"))
316+
if err != nil {
317+
log.Fatalf("Failed to open JSON file: %s", err)
318+
}
319+
defer jsonFile.Close()
320+
321+
// Read the file content
322+
byteValue, err = io.ReadAll(jsonFile)
323+
if err != nil {
324+
log.Fatalf("Failed to read JSON file: %s", err)
325+
}
326+
327+
var batchFuncs map[string][]string
328+
329+
// Parse the JSON data
330+
if err := json.Unmarshal(byteValue, &batchFuncs); err != nil {
331+
log.Fatalf("Failed to parse JSON: %s", err)
332+
}
333+
334+
RegisterFunctions(lambda_functions, batchFuncs[addresses.Batchset], client)
335+
336+
// Get the request call for each function
337+
jsonFile, err = os.Open(fmt.Sprintf("workloads/AzureWorkloads_0.json"))
338+
if err != nil {
339+
log.Fatalf("Failed to open JSON file: %s", err)
340+
}
341+
defer jsonFile.Close()
342+
343+
byteValue, err = io.ReadAll(jsonFile)
344+
if err != nil {
345+
log.Fatalf("Failed to read JSON file: %s", err)
346+
}
347+
348+
err = json.Unmarshal(byteValue, &lambda_calls)
349+
if err != nil {
350+
fmt.Println("Error parsing JSON:", err)
351+
return
352+
}
353+
354+
var wg sync.WaitGroup
355+
for fname, lambda_call_list := range lambda_calls {
356+
wg.Add(1)
357+
go CallFunction(fname, lambda_call_list, client, &wg)
358+
}
359+
time.Sleep(20 * time.Minute)
360+
os.Exit(1)
361+
362+
// wg.Wait()
363+
}

0 commit comments

Comments
 (0)