Skip to content

Commit 7180990

Browse files
committed
refactor(p2p): use buffered channels to communicate inside peer connection
1 parent 53244d8 commit 7180990

18 files changed

Lines changed: 190 additions & 51 deletions

faws/app/repository/notify.go

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@ import (
1111
"github.com/faws-vcs/faws/faws/app"
1212
"github.com/faws-vcs/faws/faws/repo/cas"
1313
"github.com/faws-vcs/faws/faws/repo/event"
14+
"github.com/faws-vcs/faws/faws/repo/p2p/peernet"
1415
)
1516

1617
var (
1718
stages_text = map[event.Stage]string{
18-
event.StagePullObjects: "Retrieve objects",
19-
event.StagePullTags: "Retrieve tags",
20-
event.StageCacheFiles: "Cache files",
21-
event.StageCacheFile: "Cache file",
22-
event.StageWriteTree: "Write tree",
23-
event.StageCheckout: "Checkout",
19+
event.StagePullObjects: "Retrieve objects",
20+
event.StagePullTags: "Retrieve tags",
21+
event.StageCacheFiles: "Cache files",
22+
event.StageCacheFile: "Cache file",
23+
event.StageWriteTree: "Write tree",
24+
event.StageCheckout: "Checkout",
25+
event.StageServeObjects: "Distribute objects",
2426
}
2527

2628
scrn activity_screen
@@ -54,6 +56,12 @@ type activity_screen struct {
5456

5557
connected_peers int
5658

59+
received_messages int64
60+
last_message_id peernet.MessageID
61+
object_uploads int64
62+
duplicate_object_downloads int64
63+
duplicate_object_download_size uint64
64+
5765
verbose bool
5866
}
5967

@@ -166,6 +174,14 @@ func notify(ev event.Notification, params *event.NotifyParams) {
166174
case event.NotifyPeerDisconnected:
167175
// app.Info("disconnected from", params.ID)
168176
scrn.connected_peers--
177+
case event.NotifyPeerNetMessage:
178+
scrn.received_messages++
179+
scrn.last_message_id = params.MessageID
180+
case event.NotifyPeerObjectUpload:
181+
scrn.object_uploads++
182+
case event.NotifyPeerObjectDuplicateDownload:
183+
scrn.duplicate_object_downloads++
184+
scrn.duplicate_object_download_size += uint64(params.Count)
169185
}
170186
guard.Unlock()
171187

@@ -230,6 +246,21 @@ func render_activity_screen(hud *console.Hud) {
230246
return
231247
}
232248

249+
// if scrn.received_messages > 0 {
250+
// var received_messages_text console.Text
251+
// received_messages_text.Stylesheet.Width = console.Width()
252+
// received_messages_text.Add(fmt.Sprintf("%d messages received. last message id: %s", scrn.received_messages, scrn.last_message_id), 0, 0)
253+
// hud.Line(&received_messages_text)
254+
// }
255+
256+
if scrn.connected_peers > 0 {
257+
var peers_text console.Text
258+
peers_text.Stylesheet.Margin[console.Left] = 1
259+
peers_text.Stylesheet.Width = console.Width()
260+
peers_text.Add(fmt.Sprintf("%d peers connected", scrn.connected_peers), 0, 0)
261+
hud.Line(&peers_text)
262+
}
263+
233264
var progress_bar console.ProgressBar
234265
progress_bar.Stylesheet.Sequence[console.PbCaseLeft] = console.Cell{'[', 0, 0}
235266
progress_bar.Stylesheet.Sequence[console.PbCaseRight] = console.Cell{']', 0, 0}
@@ -249,18 +280,23 @@ func render_activity_screen(hud *console.Hud) {
249280
progress_bar.Progress = float64(scrn.tags_received) / float64(scrn.tags_in_queue)
250281
hud.Line(&progress_bar)
251282
case event.StagePullObjects:
252-
if scrn.connected_peers > 0 {
253-
var peers_text console.Text
254-
peers_text.Stylesheet.Margin[console.Left] = 1
255-
peers_text.Stylesheet.Width = console.Width()
256-
peers_text.Add(fmt.Sprintf("%d peers connected", scrn.connected_peers), 0, 0)
257-
hud.Line(&peers_text)
283+
284+
if scrn.duplicate_object_downloads > 0 {
285+
var duplicate_objects_text console.Text
286+
duplicate_objects_text.Stylesheet.Margin[console.Left] = 1
287+
duplicate_objects_text.Stylesheet.Width = console.Width()
288+
duplicate_objects_text.Add(
289+
fmt.Sprintf("%d duplicate objects downloaded (%s wasted)", scrn.duplicate_object_downloads, humanize.Bytes(scrn.duplicate_object_download_size)),
290+
console.BrightYellow,
291+
0,
292+
)
293+
hud.Line(&duplicate_objects_text)
258294
}
259295

260296
var usage_text console.Text
261297
usage_text.Stylesheet.Width = console.Width()
262298
usage_text.Stylesheet.Margin[console.Left] = 1
263-
usage_text.Add(fmt.Sprintf("%d/%d objects received, %s total", scrn.objects_received, scrn.objects_in_queue, humanize.Bytes(scrn.bytes_received)), 0, 0)
299+
usage_text.Add(fmt.Sprintf("%d/%d objects processed, %s total", scrn.objects_received, scrn.objects_in_queue, humanize.Bytes(scrn.bytes_received)), 0, 0)
264300
hud.Line(&usage_text)
265301

266302
// if scrn.objects_received > 0 {
@@ -305,5 +341,10 @@ func render_activity_screen(hud *console.Hud) {
305341

306342
hud.Line(&file_name_text)
307343
hud.Line(&progress_text, &progress_bar)
344+
case event.StageServeObjects:
345+
var object_upload_count_text console.Text
346+
object_upload_count_text.Stylesheet.Width = console.Width()
347+
object_upload_count_text.Add(fmt.Sprintf("%d objects uploaded", scrn.object_uploads), 0, 0)
348+
hud.Line(&object_upload_count_text)
308349
}
309350
}

faws/app/repository/pull.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type PullParams struct {
1313
// If len(refs) == 0, download all tags from the origin
1414
Tags bool
1515
Verbose bool
16+
Quiet bool
1617
}
1718

1819
// Pull is the implementation of the command "faws pull"
@@ -23,6 +24,7 @@ func Pull(params *PullParams) {
2324
TrackerURL = params.TrackerURL
2425
}
2526

27+
quiet = params.Quiet
2628
app.Open()
2729
defer func() {
2830
app.Close()

faws/app/repository/seed.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type SeedParams struct {
1111
Sign string
1212
TrackerURL string
1313
TopicURI string
14+
Quiet bool
1415
}
1516

1617
func Seed(params *SeedParams) {
@@ -23,6 +24,8 @@ func Seed(params *SeedParams) {
2324
TrackerURL = params.TrackerURL
2425
}
2526

27+
quiet = params.Quiet
28+
2629
app.Open()
2730
defer func() {
2831
app.Close()

faws/app/repository/util.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/faws-vcs/faws/faws/app"
88
"github.com/faws-vcs/faws/faws/app/identities"
99
"github.com/faws-vcs/faws/faws/repo"
10+
"github.com/faws-vcs/faws/faws/repo/event"
1011
"github.com/faws-vcs/faws/faws/repo/p2p/tracker"
1112
)
1213

@@ -15,16 +16,26 @@ var Repo repo.Repository
1516

1617
var TrackerURL = tracker.DefaultURL
1718

19+
var quiet bool
20+
1821
// Open opens the repository located at directory
1922
func Open(directory string) (err error) {
23+
notify_func := notify
24+
25+
if quiet {
26+
notify_func = func(ev event.Notification, params *event.NotifyParams) {}
27+
}
28+
2029
err = Repo.Open(directory,
2130
repo.WithTrust(identities.NewRingTrust(app.Configuration.Ring())),
22-
repo.WithNotify(notify),
31+
repo.WithNotify(notify_func),
2332
repo.WithTracker(TrackerURL),
2433
)
2534

26-
console.RenderFunc(render_activity_screen)
27-
console.SwapInterval(time.Second / 3)
35+
if !quiet {
36+
console.RenderFunc(render_activity_screen)
37+
console.SwapInterval(time.Second / 3)
38+
}
2839
return
2940
}
3041

faws/cmd/pull/pull.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func init() {
2121
flag := pull_cmd.Flags()
2222
flag.BoolP("tag", "t", false, "pull the named tags instead of objects. If no tags are named, all tags from the origin will get pulled")
2323
flag.BoolP("verbose", "v", false, "display extra information")
24+
flag.BoolP("quiet", "q", false, "shut up the interactive Hud")
2425
root.RootCmd.AddCommand(&pull_cmd)
2526
}
2627

@@ -61,5 +62,10 @@ func run_pull_cmd(cmd *cobra.Command, args []string) {
6162
app.Fatal(err)
6263
return
6364
}
65+
params.Quiet, err = flag.GetBool("quiet")
66+
if err != nil {
67+
app.Fatal(err)
68+
return
69+
}
6470
repository.Pull(&params)
6571
}

faws/cmd/seed/seed.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ var seed_cmd = cobra.Command{
2020
func init() {
2121
flag := seed_cmd.Flags()
2222
flag.StringP("sign", "s", "", "use a signing identity to identify yourself with the P2P network")
23+
flag.BoolP("quiet", "q", false, "shut up the interactive Hud")
2324
root.RootCmd.AddCommand(&seed_cmd)
2425
}
2526

@@ -47,5 +48,10 @@ func run_seed_cmd(cmd *cobra.Command, args []string) {
4748
app.Fatal(err)
4849
return
4950
}
51+
params.Quiet, err = flag.GetBool("quiet")
52+
if err != nil {
53+
app.Fatal(err)
54+
return
55+
}
5056
repository.Seed(&params)
5157
}

faws/repo/event/event.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package event
33
import (
44
"github.com/faws-vcs/faws/faws/identity"
55
"github.com/faws-vcs/faws/faws/repo/cas"
6+
"github.com/faws-vcs/faws/faws/repo/p2p/peernet"
67
)
78

89
// A Notification signifies different types of repository events
@@ -29,6 +30,9 @@ const (
2930
// p2p
3031
NotifyPeerConnected
3132
NotifyPeerDisconnected
33+
NotifyPeerNetMessage
34+
NotifyPeerObjectUpload
35+
NotifyPeerObjectDuplicateDownload
3236
)
3337

3438
// A Stage represents a phase of operations within the repository, typically one that can take quite a long time.
@@ -45,6 +49,7 @@ const (
4549
StagePullTags
4650
StagePullObjects
4751
StageCheckout
52+
StageServeObjects
4853
)
4954

5055
// NotifyParams are extra information parameters shared along with the Notification
@@ -67,6 +72,8 @@ type NotifyParams struct {
6772
Child bool
6873
//
6974
ID identity.ID
75+
//
76+
MessageID peernet.MessageID
7077
}
7178

7279
// A NotifyFunc can be supplied to repo.Repository.Open to get notifications about the repository's actions

faws/repo/p2p/agent.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type Agent struct {
2020
}
2121

2222
func (agent *Agent) Init(options ...Option) (err error) {
23+
agent.options.requests_per_second = 128
2324
agent.options.tracker_url = tracker.DefaultURL
2425
agent.options.notify = func(n event.Notification, params *event.NotifyParams) {}
2526

faws/repo/p2p/agent_options.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ type agent_options struct {
1717
// URL of the tracker server
1818
tracker_url string
1919
// If true, forces the use of a TURN server
20-
use_turn bool
21-
notify event.NotifyFunc
20+
use_turn bool
21+
notify event.NotifyFunc
22+
requests_per_second int64
2223
}
2324

2425
type Option func(*agent_options)

faws/repo/p2p/job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (agent *Agent) Seed(topic tracker.Topic) (job Job, err error) {
105105
}
106106

107107
seed_job_ := new(seed_job)
108-
seed_job_.init()
108+
seed_job_.init(subscription)
109109

110110
subscription.set_current_job(seed_job_)
111111
job = seed_job_

0 commit comments

Comments
 (0)