The way I write automation for personal projects nowadays seems to follow a common pattern:

  1. A command line, that’s getting a bit long
  2. A bash script
  3. Rewrite in Go

Occasionally I add a step between 2 and 3 where I write it in Python, but it’s generally not actually gaining me anything. Python’s concurrency primitives are pretty bad, and it’s pretty wasteful.

Maybe there’s an actually good scripting language somewhere.

I should remember that writing a bash script (step 2) seems to almost never be worth it. If it’s so complicated that it doesn’t fit on one line, then it’ll become complicated enough to not work with bash.

There are two main things that don’t work well. Maybe there are good solutions to these problems, but I’ve not found them.

1. Concurrency

There are no good primitives. Basically only xargs -P and &. It’s annonying when you have an embarrassingly parallelizable problem where you want to run exactly nproc in parallel.

Especially error handling becomes terrible here.

2. Error handling

You can handle errors in bash scripts in various ways:

  1. || operator. E.g. gzip -9 < a > a.gz || (echo "handling error…")
  2. set -e at the top of the script. Actually you should always have this at the top of your scripts. A missed failed return code is probably always an error.

But this doesn’t handle all failures. E.g. if you have a pipeline, and anything but the first command fails.

(false | gzip -c > /dev/null) && echo yes

This is because by default the result of the whole pipeline is just the result of the last command.

You can fix this using set -o pipefail. This makes the exit code of the pipeline be the value of the last (rightmost) command to exit with a non-zero status, or zero if all commands exit successfully.

But even this is not good enough. A remaining problem is that a downstream command in a pipeline has no way to know if an upstream command failed.

Here’s an example command I tried to run:

gsutil cat gs://example/test.data \
       | sort -S300M \
       | gsutil cp - gs://example/test.data.sorted

If sort fails, for example because it runs out of space for temporary files, then all the second gsutil sees is that its input closes, and as far as it knows the data generation is now successfully completed, and it finalizes the upload.

So I want the second gsutil to be killed if anything earlier in the pipeline fails.

Yes, I could probably do an initial upload to a temporary file, marked with STANDARD storage class, and a TTL set to automatically delete, and then if the pipeline succeeds I can rename, set storage class, and change TTL.

But I shouldn’t have to! If only foo | bar killed bar (race condition safely) if foo failed then this wouldn’t be a problem.

I could do this in bash, with something like:

SELF=$$
mkfifo pipe1 pipe2
gsutil cp - gs://example/test.data.sorted < pipe2 &
CMD3=$!
(
  sort -S300M || (
    kill $CMD3
    # TODO: wait for $CMD3 to exit, to avoid race condition
  )
) < pipe1 > pipe2 &
CMD2=$!
(
  gsutil cat gs://example/test.data || (
    kill $CMD2
    # TODO: wait for $CMD2 to exit, to avoid race condition
    kill $SELF
  )
) > pipe1

Ugh.

So I end up writing something like this:

package main

import (
	"context"
	"flag"
	"io"
	"os"
	"os/exec"
	"path"

	log "github.com/sirupsen/logrus"
)

var (
	indir  = flag.String("indir", "", "Input bucket and directory on GCS. E.g. for `gs://a/b/` say `a/b`.")
	outdir = flag.String("outdir", "", "Output bucket and directory on GCS. E.g. for `gs://a/b/` say `a/b`.")
)

func main() {
	flag.Parse()

	if *indir == "" || *outdir == "" || flag.NArg() == 0 {
		log.Fatalf("Need -indir, -outdir, and one arg")
	}

	fn := flag.Arg(0)

	inPath := "gs://" + path.Join(*indir, fn)
	outPath := "gs://" + path.Join(*outdir, fn)

	log.Infof("Running %q -> %q…", inPath, outPath)

	pipes := [][]string{
		{"gsutil", "cat", inPath},
		{"zcat"},
		{"awk", "-F,", `{print (15*int(($1%86400)/15)) "," $0}`},
		{"sort", "-t,", "-k", "1,2", "-n"}, // -S300M
		{"gzip", "-9"},
	}
	ctx := context.Background()
	storeCtx, cancelStore := context.WithCancel(ctx)

	var lastPipe io.Reader
	storeDone := make(chan struct{})
	for _, args := range pipes {
		args := args
		cmd := exec.CommandContext(ctx, args[0], args[1:]...)
		if lastPipe != nil {
			cmd.Stdin = lastPipe
		}
		r, w, err := os.Pipe()
		if err != nil {
			log.Fatal(err)
		}
		cmd.Stdout = w
		cmd.Stderr = os.Stderr
		lastPipe = r

		go func() {
			defer func() {
				if err := w.Close(); err != nil {
					log.Errorf("Closing for %q failed: %v", args[0])
					cancelStore()
					<-storeDone
				}
			}()
			if err := cmd.Run(); err != nil {
				log.Errorf("Failed to run %q: %v", args[0], err)
				cancelStore()
				<-storeDone
			}
		}()
	}

	store := exec.CommandContext(storeCtx, "gsutil", "cp", "-", outPath)
	store.Stdin = lastPipe
	store.Stderr = os.Stderr
	store.Stdout = os.Stdout
	if err := store.Run(); err != nil {
		close(storeDone)
		log.Fatalf("Failed to store results: %v", err)
	}
}

Future work

It’s not nice to hard code the commands to run. So maybe the pipeline should be defined in JSON or something, and it’ll all be generic.

Update (2021-06-22): I have actually written this now.

$ cat > test.pipe.json
[
  ["gsutil", "cat", "gs://example/input.txt"]
  ["sort", "-S300M"],
  ["gzip", "-9"],
  ["gsutil", "cp", "-", "gs://example/input-sorted.txt.gz"]
]
$ ./goodpipe < test.pipe.json