Go and my realization about what I'll call the 'Promises' pattern

September 24, 2024

Over on the Fediverse, I had a belated realization:

This is my face when I realize I have a situation that 'promises'/asynchronously waitable objects would be great for, but I would have to build them by hand in Go. Oh well.

(I want asynchronous execution but to report the results in order, as each becomes available. With promises as I understand them, generate all the promises in an array, wait for each one in order, report results from it, done.)

A common pattern with work(er) pools in Go and elsewhere is that you want to submit requests to a pool of asynchronous workers and you're happy to handle the completion of that work in any order. This is easily handled in Go with a pair of channels, one for requests and the other for completions. However, this time around I wanted asynchronous requests but to be able to report on completed work in order.

(The specific context is that I've got a little Go program to do IP to name DNS lookups (it's in Go for reasons), and on the one hand it would be handy to do several DNS lookups in parallel because sometimes they take a while, but on the other hand I want to print the results in command line order because otherwise it gets confusing.)

In an environment with 'promises' or some equivalent, asynchronous work with ordered reporting of completion is relatively straightforward. You submit all the work and get an ordered collection of Promises or the equivalent, and then you go through in order harvesting results from each Promise in turn. In Go, I think there are two plausible alternatives; you can use a single common channel for results but put ordering information in them, or you can use a separate reply channel for each request. Having done scratch implementations of both, my conclusion is that the separate reply channel version is simpler for me (and in the future I'm not going to be scared off by thoughts of how many channels it can create).

For the common reply channel version, your requests must include a sequence number and then the replies from the workers will also include that sequence number. You'll receive the replies in some random sequence and then it's on you to reassemble them into order. If you want to start processing replies in order before everything has completed, you have to do additional work (you may want, for example, a container/heap).

For the separate reply channel version, you'll be creating a lot of channels (one per request) and passing them to workers as part of the request; remember to give them a one element buffer size, so that workers never block when they 'complete' each request and send the answer down the request's reply channel. However, handling completed requests in order is simple once you've accumulated a block of them:

var replies []chan ...
for _, req := range worktodo {
  // 'pool' is your worker pool
  replies = append(replies, pool.submit(req))
}

for i := range replies {
  v := <- replies[i]
  // process v
}

If a worker has not yet finished processing request number X when you get to trying to use the reply, you simply block on the channel read. If the worker has already finished, it will have sent the reply into the (buffered, remember) channel and moved on, and the reply is ready for you to pick up immediately.

(In both versions, if you have a lot of things to process, you probably want to handle them in blocks, submitting and then draining N items, repeating until you've handled all items. I think this is probably easier to do in the separate reply channel version, although I haven't implemented it yet.)


Comments on this page:

By Jonathan Duck at 2024-09-26 13:22:52:

Another approach for the pool is to use a buffered channel for supplying the "promise" channels. Since the scanner goroutine will block once the buffer size is filled, it will stop spawning Go routines until it's read. My knee-jerk guess is that this doesn't perform as well as a persistent pool of goroutines handling the tasks, but I haven't done any comparisons.

resultPromises := make(chan chan string, 3)
go func() {
	scanner := bufio.NewScanner(strings.NewReader(stdin))
	for scanner.Scan() {
		resultPromise := make(chan string)
		ips := scanner.Text()
		resultPromises <- resultPromise
		go func() {
			resultPromise <- checkIp(ips)
			close(resultPromise)
		}()
	}
	close(resultPromises)
}()

for resultPromise := range resultPromises {
	result := <-resultPromise
	fmt.Println(result)
}

This doesn't fit the use case as well, but I think it's also worth mentioning that if you're trying to simulate the behavior of `Promise.all` in JavaScript, you can do it without channels. Individual indexes of a slice or fields in a struct can be written to concurrently as long as no two goroutines are writing to the same index/field.

var wg sync.WaitGroup
results := make([]string, len(args))
wg.Add(len(args))
for i, ips := range args {
go func() { 
	results[i] = checkIp(ips)
	wg.Done()
}()
}}
wg.Wait()

You need know how many elements you're dealing with ahead of time and wait for all of them to resolve before doing anything with the results.

Written on 24 September 2024.
« Mostly getting redundant UEFI boot disks on modern Ubuntu (especially 24.04)
Using a small ZFS recordsize doesn't save you space (well, almost never) »

Page tools: View Source, View Normal.
Search:
Login: Password:

Last modified: Tue Sep 24 23:23:03 2024
This dinky wiki is brought to you by the Insane Hackers Guild, Python sub-branch.