Skip to content

Commit d6f37ce

Browse files
committed
add README
1 parent dc642a9 commit d6f37ce

File tree

1 file changed

+362
-0
lines changed

1 file changed

+362
-0
lines changed

README.md

Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
# `conc`: better structured concurrency for go
2+
3+
`conc` is your toolbelt for structured concurrency in go, making common tasks
4+
easier and safer.
5+
6+
The main goals of the package are:
7+
1) Make it harder to leak goroutines
8+
2) Handle panics gracefully
9+
3) Make concurrent code easier to read
10+
11+
## Goal #1: Make it harder to leak goroutines
12+
13+
A common pain point when working with goroutines is cleaning them up. It's
14+
really easy to fire off a `go` statement and fail to properly wait for it to
15+
complete.
16+
17+
`conc` takes the opinionated stance that all concurrency should be scoped.
18+
That is, goroutines should have an owner and that owner should always
19+
ensure that its owned goroutines exit properly.
20+
21+
In `conc`, the owner of a goroutine is always a `conc.WaitGroup`. Goroutines
22+
are spawned in a `WaitGroup` with `(*WaitGroup).Go()`, and
23+
`(*WaitGroup).Wait()` should always be called before the `WaitGroup` goes out
24+
of scope.
25+
26+
In some cases, you might want a spawned goroutine to outlast the scope of the
27+
caller. In that case, you could pass a `WaitGroup` into the spawning function.
28+
29+
```go
30+
func main() {
31+
var wg conc.WaitGroup
32+
defer wg.Wait()
33+
34+
startTheThing(&wg)
35+
}
36+
37+
func startTheThing(wg *conc.WaitGroup) {
38+
wg.Go(func() { ... })
39+
}
40+
```
41+
42+
For some more discussion on why scoped concurrency is nice, check out [this
43+
blog
44+
post](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/).
45+
46+
## Goal #2: Handle panics gracefully
47+
48+
A frequent problem with goroutines in long-running applications is handling
49+
panics. A goroutine spawned without a panic handler will crash the whole process
50+
on panic. This is usually undesirable.
51+
52+
However, if you do add a panic handler to a goroutine, what do you do with the
53+
panic once you catch it? Some options:
54+
1) Ignore it
55+
2) Log it
56+
3) Turn it into an error and return that to the goroutine spawner
57+
4) Propagate the panic to the goroutine spawner
58+
59+
Ignoring panics is a bad idea since panics usually mean there is actually
60+
something wrong and someone should fix it.
61+
62+
Just logging panics isn't great either because then there is no indication to the spawner
63+
that something bad happened, and it might just continue on as normal even though your
64+
program is in a really bad state.
65+
66+
Both (3) and (4) are reasonable options, but both require the goroutine to have
67+
an owner that can actually receive the message that something went wrong. This
68+
is generally not true with a goroutine spawned with `go`, but in the `conc`
69+
package, all goroutines have an owner that must collect the spawned goroutine.
70+
In the conc package, any call to `Wait()` will panic if any of the spawned goroutines
71+
panicked. Additionally, it decorates the panic value with a stacktrace from the child
72+
goroutine so that you don't lose information about what caused the panic.
73+
74+
Doing this all correctly every time you spawn something with `go` is not
75+
trivial and it requires a lot of boilerplate that makes the important parts of
76+
the code more difficult to read, so `conc` does this for you.
77+
78+
<table>
79+
<tr>
80+
<th>
81+
`stdlib`
82+
</th>
83+
<th>
84+
`conc`
85+
</th>
86+
</tr>
87+
<tr>
88+
<td>
89+
```go
90+
type caughtPanicError struct {
91+
val any
92+
stack []byte
93+
}
94+
95+
func (e *caughtPanicError) Error() string {
96+
return fmt.Sprintf("panic: %q\n%s", e.val, string(e.stack))
97+
}
98+
99+
func spawn() {
100+
done := make(chan error)
101+
go func() {
102+
defer func() {
103+
if val := recover(); val != nil {
104+
done <- caughtPanicError{
105+
val: val,
106+
stack: debug.Stack()
107+
}
108+
} else {
109+
done <- nil
110+
}
111+
}()
112+
doSomethingThatMightPanic()
113+
}()
114+
err := <-done
115+
if err != nil {
116+
panic(err)
117+
}
118+
}
119+
```
120+
</td>
121+
<td>
122+
```go
123+
func spawn() {
124+
var wg conc.WaitGroup
125+
wg.Go(doSomethingThatMightPanic)
126+
wg.Wait()
127+
}
128+
```
129+
</td>
130+
</tr>
131+
</table>
132+
133+
## Goal #3: Make concurrent code easier to read
134+
135+
Doing concurrency correctly is difficult. Doing it in a way that doesn't
136+
obfuscate what the code is actually doing is more difficult. The `conc` package
137+
attempts to make common operations easier by abstracting as much boilerplate
138+
complexity as possible.
139+
140+
Want to run a set of concurrent tasks with a bounded set of goroutines? Use
141+
`pool.New()`. Want to process an ordered stream of results concurrently, but
142+
still maintain order? Try `stream.New()`. What about a concurrent map over
143+
a slice? Take a peek at `iter.Map()`.
144+
145+
Browse some examples below for some comparisons with doing these by hand.
146+
147+
# Examples
148+
149+
Each of these examples forgoes propagating panics for simplicity. To see
150+
what kind of complexity that would add, check out the "Goal #2" header above.
151+
152+
Spawn a set of goroutines and waiting for them to finish:
153+
154+
<table>
155+
<tr>
156+
<th>
157+
`stdlib`
158+
</th>
159+
<th>
160+
`conc`
161+
</th>
162+
</tr>
163+
<tr>
164+
<td>
165+
```go
166+
func main() {
167+
var wg sync.WaitGroup
168+
for i := 0; i < 10; i++ {
169+
wg.Add(1)
170+
go func() {
171+
defer wg.Done()
172+
// if doSomething panics, the process crashes!
173+
doSomething()
174+
}()
175+
}
176+
wg.Wait()
177+
}
178+
```
179+
</td>
180+
<td>
181+
```go
182+
func main() {
183+
var wg conc.WaitGroup
184+
for i := 0; i < 10; i++ {
185+
wg.Go(doSomething)
186+
}
187+
wg.Wait()
188+
}
189+
```
190+
</td>
191+
</tr>
192+
</table>
193+
194+
Process each element of a stream in a static pool of goroutines:
195+
196+
<table>
197+
<tr>
198+
<th>
199+
`stdlib`
200+
</th>
201+
<th>
202+
`conc`
203+
</th>
204+
</tr>
205+
<tr>
206+
<td>
207+
```go
208+
func process(stream chan int) {
209+
var wg sync.WaitGroup
210+
for i := 0; i < 10; i++ {
211+
wg.Add(1)
212+
go func() {
213+
defer wg.Done()
214+
for elem := range stream {
215+
handle(elem)
216+
}
217+
}()
218+
}
219+
wg.Wait()
220+
}
221+
```
222+
</td>
223+
<td>
224+
```go
225+
func process(stream chan int) {
226+
p := pool.New().WithMaxGoroutines(10)
227+
for elem := range stream {
228+
p.Go(func() {
229+
handle(values[i])
230+
})
231+
}
232+
p.Wait()
233+
}
234+
```
235+
</td>
236+
</tr>
237+
</table>
238+
239+
Process each element of a slice in a static pool of goroutines:
240+
241+
<table>
242+
<tr>
243+
<th>
244+
`stdlib`
245+
</th>
246+
<th>
247+
`conc`
248+
</th>
249+
</tr>
250+
<tr>
251+
<td>
252+
```go
253+
func main() {
254+
values := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
255+
256+
feeder := make(chan int, 8)
257+
258+
var wg sync.WaitGroup
259+
for i := 0; i < 10; i++ {
260+
wg.Add(1)
261+
go func() {
262+
defer wg.Done()
263+
for elem := range feeder {
264+
handle(elem)
265+
}
266+
}()
267+
}
268+
269+
for _, value := range values {
270+
feeder <- value
271+
}
272+
273+
wg.Wait()
274+
}
275+
```
276+
</td>
277+
<td>
278+
```go
279+
func main() {
280+
values := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
281+
iter.ForEach(values, handle)
282+
}
283+
```
284+
</td>
285+
</tr>
286+
</table>
287+
288+
Process an ordered stream concurrently:
289+
290+
291+
<table>
292+
<tr>
293+
<th>
294+
`stdlib`
295+
</th>
296+
<th>
297+
`conc`
298+
</th>
299+
</tr>
300+
<tr>
301+
<td>
302+
```go
303+
func mapStream(input chan int, output chan int, f func(int) int) {
304+
tasks := make(chan func())
305+
taskResults := make(chan chan int)
306+
307+
// Spawn the worker goroutines
308+
var workerWg sync.WaitGroup
309+
for i := 0; i < 10; i++ {
310+
workerWg.Add(1)
311+
go func() {
312+
defer workerWg.Done()
313+
for task := range tasks {
314+
task()
315+
}
316+
}()
317+
}
318+
319+
// Spawn the goroutine that reads results in order
320+
var readerWg sync.WaitGroup
321+
readerWg.Add(1)
322+
go func() {
323+
defer readerWg.Done()
324+
for taskResult := range taskResults {
325+
output <- taskResult
326+
}
327+
}
328+
329+
// Feed the workers with tasks
330+
for elem := range input {
331+
resultCh := make(chan int, 1)
332+
taskResults <- resultCh
333+
tasks <- func() {
334+
resultCh <- f(elem)
335+
}
336+
}
337+
338+
// We've exhausted input. Wait for everything to finish
339+
close(tasks)
340+
workerWg.Wait()
341+
close(taskResults)
342+
readerWg.Wait()
343+
}
344+
```
345+
</td>
346+
<td>
347+
```go
348+
func mapStream(input chan int, output chan int, f func(int) int) {
349+
s := stream.New().WithMaxGoroutines(10)
350+
for elem := range input {
351+
elem := elem
352+
s.Go(func() {
353+
res := f(elem)
354+
return func() { output <- res }
355+
})
356+
}
357+
s.Wait()
358+
}
359+
```
360+
</td>
361+
</tr>
362+
</table>

0 commit comments

Comments
 (0)