-
Notifications
You must be signed in to change notification settings - Fork 11
/
future.go
185 lines (163 loc) · 4.49 KB
/
future.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package async
import (
"context"
"sync"
)
// Future represents a value which may or may not currently be available,
// but will be available at some point, or an error if that value could
// not be made available.
type Future[T any] interface {
// Map creates a new Future by applying a function to the successful
// result of this Future.
Map(func(T) (T, error)) Future[T]
// FlatMap creates a new Future by applying a function to the successful
// result of this Future.
FlatMap(func(T) (Future[T], error)) Future[T]
// Join blocks until the Future is completed and returns either a result
// or an error.
Join() (T, error)
// Get blocks until the Future is completed or context is canceled and
// returns either a result or an error.
Get(context.Context) (T, error)
// Recover handles any error that this Future might contain using a
// resolver function.
Recover(func() (T, error)) Future[T]
// RecoverWith handles any error that this Future might contain using
// another Future.
RecoverWith(Future[T]) Future[T]
// complete completes the Future with either a value or an error.
// It is used by [Promise] internally.
complete(T, error)
}
// futureImpl implements the Future interface.
type futureImpl[T any] struct {
acceptOnce sync.Once
completeOnce sync.Once
done chan any
value T
err error
}
// Verify futureImpl satisfies the Future interface.
var _ Future[any] = (*futureImpl[any])(nil)
// newFuture returns a new Future.
func newFuture[T any]() Future[T] {
return &futureImpl[T]{
done: make(chan any, 1),
}
}
// accept blocks once, until the Future result is available.
func (fut *futureImpl[T]) accept() {
fut.acceptOnce.Do(func() {
result := <-fut.done
fut.setResult(result)
})
}
// acceptTimeout blocks once, until the Future result is available or until
// the context is canceled.
func (fut *futureImpl[T]) acceptContext(ctx context.Context) {
fut.acceptOnce.Do(func() {
select {
case result := <-fut.done:
fut.setResult(result)
case <-ctx.Done():
fut.setResult(ctx.Err())
}
})
}
// setResult assigns a value to the Future instance.
func (fut *futureImpl[T]) setResult(result any) {
switch value := result.(type) {
case error:
fut.err = value
default:
fut.value = value.(T)
}
}
// Map creates a new Future by applying a function to the successful result
// of this Future and returns the result of the function as a new Future.
func (fut *futureImpl[T]) Map(f func(T) (T, error)) Future[T] {
next := newFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
var zero T
next.complete(zero, fut.err)
} else {
next.complete(f(fut.value))
}
}()
return next
}
// FlatMap creates a new Future by applying a function to the successful result
// of this Future and returns the result of the function as a new Future.
func (fut *futureImpl[T]) FlatMap(f func(T) (Future[T], error)) Future[T] {
next := newFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
var zero T
next.complete(zero, fut.err)
} else {
tfut, terr := f(fut.value)
if terr != nil {
var zero T
next.complete(zero, terr)
} else {
next.complete(tfut.Join())
}
}
}()
return next
}
// Join blocks until the Future is completed and returns either
// a result or an error.
func (fut *futureImpl[T]) Join() (T, error) {
fut.accept()
return fut.value, fut.err
}
// Get blocks until the Future is completed or context is canceled and
// returns either a result or an error.
func (fut *futureImpl[T]) Get(ctx context.Context) (T, error) {
fut.acceptContext(ctx)
return fut.value, fut.err
}
// Recover handles any error that this Future might contain using
// a given resolver function.
// Returns the result as a new Future.
func (fut *futureImpl[T]) Recover(f func() (T, error)) Future[T] {
next := newFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
next.complete(f())
} else {
next.complete(fut.value, nil)
}
}()
return next
}
// RecoverWith handles any error that this Future might contain using
// another Future.
// Returns the result as a new Future.
func (fut *futureImpl[T]) RecoverWith(rf Future[T]) Future[T] {
next := newFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
next.complete(rf.Join())
} else {
next.complete(fut.value, nil)
}
}()
return next
}
// complete completes the Future with either a value or an error.
func (fut *futureImpl[T]) complete(value T, err error) {
fut.completeOnce.Do(func() {
if err != nil {
fut.done <- err
} else {
fut.done <- value
}
})
}