Graceful shutdowns are a fundamental trait of most distributed systems. Often, gracefully shutting down a server is very straightforward: stop accepting new connections and let the open ones finish their job. This is usually handled by your server code of choice — Golang’s HTTP package has a shutdown method to help you with that. Yet, many cases need a more thorough and hand-crafted graceful shutdown, especially systems/services that are highly concurrent and/or are dealing with partitioned services / external services. Handling shutdown in these cases can get quite tricky, and forcefully shutting down the program can lead to weird bugs, most commonly, data loss. In my experience, implementing a nice graceful shutdown mechanism at an early stage of a project is the best approach. Leaving it for later usually means performing complex surgical code changes in many places and potentially introducing newer bugs that are hard to catch.

Here is a simple pattern to apply graceful shutdowns when the program has some level of concurrency in it. If you want to browse the code, you can skip the explanation and get them here.

A simple scenario

Let’s consider a simple scenario where we have an HTTP endpoint that takes a job name, asynchronously starts 3 slow jobs related to this job (could be writing to a DB, S3, Kafka, and whatnot), and then responds a simple “job started” to the caller. The moment we return an answer to the caller doesn’t mean it has finished the 3 slow jobs — they’re running in the background, inside goroutines.

This is a “mock” code that represents this scenario:

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)


/////////////////////////////////////////////////////////////
/// 3 functions that take some time to run something fictitious
func slowJob1(name string) {
	fmt.Printf("starting job 1 for %s\n", name)
	time.Sleep(5 * time.Second)
	fmt.Printf("finished job 1 for %s\n", name)

}

func slowJob2(name string) {
	fmt.Printf("starting job 2 for %s\n", name)
	time.Sleep(4 * time.Second)
	fmt.Printf("finished job 2 for %s\n", name)
}

func slowJob3(name string) {
	fmt.Printf("starting job 3 for %s\n", name)
	time.Sleep(3 * time.Second)
	fmt.Printf("finished job 3 for %s\n", name)
}
/////////////////////////////////////////////////////////////

func handler(w http.ResponseWriter, r *http.Request) {
	jobName := r.URL.Path[1:]
	
	// Spawn three goroutines that will run in the background
	// even after this function returns
	go slowJob1(jobName)
	go slowJob2(jobName)
	go slowJob3(jobName)
	
	// Write back to the caller
	fmt.Fprintf(w, "job %s started", r.URL.Path[1:])
}

func main() {
	http.HandleFunc("/", handler)
	log.Fatal(http.ListenAndServe(":8080", nil))
}

Now let’s imagine that this endpoint gets called many times per second. That means we’re going to have many goroutines working in the background at any given time.

However, here’s an important contract we’ve implicitly made with our clients: once the job is started, it must be finished (well well well… what a surprise?). So, once we return “job $name started” to the client, they can be confident the service will execute it.

But, even assuming the server won’t ever crash and burn, in cases of scheduled maintenance or deployment rollouts, this service will be shut down at some point so that updated versions of it can be rolled out. So what happens with our service, right now, if we send a SIGTERM or SIGINT to its PID? Let’s see!

Oops. Seems like a CTRL-c (SIGINT) or a SIGTERM (coming from a docker stop or something) would force the service to exit before finishing the already started jobs; that’s not good.

Here’s where proper shutdown is crucial. In this scenario, when we receive a SIGTERM, what we want is:

  1. Stop accepting new requests
  2. Finish the jobs we already started
  3. Exit

Important: note that if the HTTP requests were being handled synchronously, i.e., keeping the connection alive, a simple shutdown() in the HTTP server would suffice. It would wait for the handlers that are still working before shutting down the server. The tricky part here, however, is that the HTTP endpoint is acting as a trigger to more long-lived background-running jobs - a typical pattern. The HTTP connection between the client and server will be long gone before the jobs actually finish.

Signal handling and server shutdown

First, we need to capture these UNIX signals, such as SIGTERM and SIGINT. Golang gives you a nice abstraction that takes the signals you wanna listen to and a channel that it will use to notify you. You, then, read from this channel to get notified about the signals you’re listening to.

// Handle sigterm and await termChan signal
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGTERM, syscall.SIGINT)

go func() {
	<-termChan // Blocks here until interrupted
	log.Print("SIGTERM received. Shutdown process initiated\n")
	httpServer.Shutdown(context.Background())
}()
$ go-complex-shutdown go run main.go
^C2020/10/21 17:42:52 SIGTERM received. Shutdown process initiated
2020/10/21 17:42:52 HTTP server shut down

Note the ^C, meaning I just slapped a CTRL-C in there, sending a SIGTERM to the process. Now we’re capturing it and shutting down the server. But we’re still not waiting for the goroutines to finish their job.

Gracefully shutting down the server

Now we’re capturing the SIGTERMs and shutting down the HTTP server. Let’s now wait for the running goroutines to finish before exiting the program.

To do that, we’re going to use the good old sync.WaitGroup. We want to pass a waitgroup into the handler itself. For that, we’ll have to create a custom handler struct that implements ServeHttp(w http.ResponseWriter, r *http.Request). Then we move the logic from the previous handler into this new ServeHTTP function

// Our custom handler that holds a wait group used to 
// block the shutdown while it's running the jobs.
type CustomHandler struct {
	wg *sync.WaitGroup
}

func NewCustomHandler(wg *sync.WaitGroup) *CustomHandler {
	// You can check for wg == nil if feeling paranoid
	return &CustomHandler{wg: wg}
}

func (h *CustomHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	jobName := vars["jobName"]

	fmt.Fprintf(w, "job %s started", jobName)
	
	// Here's where the magic happens. 
	// We'll use the *WaitGroup inside the handler struct to add 3 to it
	// and pass in the waitgroup to the slowJobs, they'll have to wg.Done()
	// once they return. 
    // Also keep in mind that this will happen for every HTTP 
    // request coming in.
	// So at any given time, we'll have `3 * n` in the "global" WaitGroup,
	// where `n` is the number of requests.  
	h.wg.Add(3)
	go slowJob1(jobName, h.wg)
	go slowJob2(jobName, h.wg)
	go slowJob3(jobName, h.wg)
}

And now our slowJob functions will look like this:

func slowJob1(name string, wg *sync.WaitGroup) {
	defer wg.Done()

	fmt.Printf("starting job 1 for %s\n", name)
	time.Sleep(5 * time.Second)
	fmt.Printf("finished job 1 for %s\n", name)
}

And that’s about it. Let’s see how it behaves once we interrupt the program:

Awesome, working just as intended — no jobs interrupted while shutting down and no new jobs started! The idea behind how we achieved this is fairly simple and you can easily replicate this pattern elsewhere. In my experience, this can be tricky to implement at later stages of the project. More things are going on, more concurrency, more layers. Introducing this capability, then, becomes quite a surgical task. Just imagine passing WaitGroup (or context objects) deeper into different layers. It’s not impossible, plus, if it makes our systems more reliable, predictable, and fault-tolerant, it’s usually worth the effort. But doing it early on is definitely a better approach.

Full code

Here’s the full code for what we’ve created so far:

package main

import (
	"context"
	"fmt"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"log"

	"github.com/gorilla/mux"
)

func slowJob1(name string, wg *sync.WaitGroup) {
	defer wg.Done()
	fmt.Printf("starting job 1 for %s\n", name)
	time.Sleep(5 * time.Second)
	fmt.Printf("finished job 1 for %s\n", name)
}

func slowJob2(name string, wg *sync.WaitGroup) {
	defer wg.Done()
	fmt.Printf("starting job 2 for %s\n", name)
	time.Sleep(4 * time.Second)
	fmt.Printf("finished job 2 for %s\n", name)
}

func slowJob3(name string, wg *sync.WaitGroup) {
	defer wg.Done()
	fmt.Printf("starting job 3 for %s\n", name)
	time.Sleep(3 * time.Second)
	fmt.Printf("finished job 3 for %s\n", name)
}

// Our custom handler that holds a wait group used to block the shutdown 
// while it's running the jobs.
type CustomHandler struct {
	wg *sync.WaitGroup
}

func NewCustomHandler(wg *sync.WaitGroup) *CustomHandler {
	// You can check for wg == nil if feeling paranoid
	return &CustomHandler{wg: wg}
}

func (h *CustomHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	jobName := vars["jobName"]

	fmt.Fprintf(w, "job %s started", jobName)

	h.wg.Add(3)
	go slowJob1(jobName, h.wg)
	go slowJob2(jobName, h.wg)
	go slowJob3(jobName, h.wg)
}

func main() {
	wg := &sync.WaitGroup{}
	customHandler := NewCustomHandler(wg)

	router := mux.NewRouter()
	router.Handle("/{jobName}", customHandler)

	httpServer := &http.Server{
		Addr:    ":8080",
		Handler: router,
	}

	// Handle sigterm and await termChan signal
	termChan := make(chan os.Signal)
	signal.Notify(termChan, syscall.SIGTERM, syscall.SIGINT)

	go func() {
		<-termChan // Blocks here until interrupted
		log.Print("SIGTERM received. Shutdown process initiated\n")
		httpServer.Shutdown(context.Background())
	}()

	// Blocking
	if err := httpServer.ListenAndServe(); err != nil {
		if err.Error() != "http: Server closed" {
			log.Printf("HTTP server closed with: %v\n", err)
		}
		log.Printf("HTTP server shut down")
	}
	

	// This is where, once we're closing the program, we wait for all
	// jobs (they all have been added to this WaitGroup) to `wg.Done()`.
	log.Println("waiting for running jobs to finish")
	wg.Wait()
	log.Println("jobs finished. exiting")
}

A more complex scenario

Now, another typical pattern is to send the jobs straight to a shared job queue that’ll be consumed by a consumer process (likely in a separate goroutine), now it looks like this:

Let’s not get into the discussion of keeping all jobs in memory for now — a better approach would be writing these jobs to a Kafka topic — but the principle of graceful shutdown works roughly the same way for both. Still, with an in-memory job queue, we can easily demonstrate how to gracefully shut it down.

Once we get a SIGTERM, we want to:

  1. Stop the HTTP handler from writing more jobs to the job queue
  2. Stop the HTTP server from accepting new connections
  3. Drain the job queue by running the leftover jobs in there
  4. Actually shut down the program

Fortunately, Golang provides us with a nice construct for this: channel and controlling the flow to it is just a matter of closing it.

This is how our consumer looks like:

func consumer(jobQueue chan string) {
	wg := &sync.WaitGroup{}

	for job := range jobQueue {
		wg.Add(3)
		go slowJob1(job, wg)
		go slowJob2(job, wg)
		go slowJob3(job, wg)
	}

	log.Println("Waiting for running jobs to finish")
	wg.Wait()
	log.Println("Done, shutting down the consumer")
}

The slowJobN functions are the same from the previous scenario.

Now our handler simply writes to a channel that’s acting as our job queue:

func (h *CustomHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	jobName := vars["jobName"]

	h.jobQueue <- jobName

	fmt.Fprintf(w, "job %s started", jobName)
}

And our main gets a wee bit more complicated, now that the consumer should be the blocking part of this workflow:

func main() {
	jobQueue := make(chan string)

	customHandler := NewCustomHandler(jobQueue)

	router := mux.NewRouter()
	router.Handle("/{jobName}", customHandler)

	httpServer := &http.Server{
		Addr:    ":8080",
		Handler: router,
	}

	// Handle sigterm and await termChan signal
	termChan := make(chan os.Signal)
	signal.Notify(termChan, syscall.SIGTERM, syscall.SIGINT)

	go func() {
		<-termChan // Blocks here until interrupted
		log.Println("SIGTERM received. Shutdown process initiated")

		log.Println("stopping the consumer")

		// This will force the consumer to stop its main loop
		close(jobQueue) 

		httpServer.Shutdown(context.Background())
	}()

	go func() {
		if err := httpServer.ListenAndServe(); err != nil {
			if err.Error() != "http: Server closed" {
				log.Printf("HTTP server closed with: %v\n", err)
			}
			log.Printf("HTTP server shut down")
		}
	}()

	// Now the consumer is the blocking part
	consumer(jobQueue)
}

And the trick is simply grabbing the SIGTERM and closing the jobQueue:

close(jobQueue) 

This means, from a producer (the handler) perspective: you can’t write to the channel anymore (can’t write to closed channels). And from a consumer perspective: it can read the data inside the channel, even if it was closed, but once we reach the last bit of data in there, the for-loop will break.

Once we’ve started all the slowJobNs, and after the for-loop breaks, we’ll be waiting on that wg.Wait(). Meaning that we’ve drained the jobQueue and waited for all of them to finish before exiting the program.

You can find the full code here: https://gist.github.com/digorithm/6ea1b0a129bea2ce4fac404143738cc5#file-jobqueue-go

Making this code better

Thanks to some comments, I’ve noticed a few issues with this last iteration, namely:

  1. Calling httpServer.Shutdown() in a goroutine can cause some weird issues.
  2. There’s a race condition between the goroutine that closes the channel (close(jobQueue)), and the handler writing to this channel (h.jobQueue <- jobName). That means the handler could write to a closed channel and the program would panic.
  3. Context's cancellation is a much better way to propagate shutdown signals.

So let’s do another iteration here and fix some loose ends.

For the main, let’s create a context.WithCancel and a channel that we’ll pass to the consumer and it will use to tell us when the jobs are finished, and it’s safe to exit the program:

ctx, cancel := context.WithCancel(context.Background())

// ...
// ... same as before
// ...

// doneChan will be the channel we'll be listening on
// to know all already started jobs have finished
// before we actually exit the program
doneChan := make(chan interface{})
go consumer(ctx, jobQueue, doneChan) // <- not blocking anymore

Now, to solve the first problem ("Calling httpServer.Shutdown() in a goroutine can cause some weird issues”), we won’t block on the consumer anymore. Instead, we’ll run the consumer on a separate goroutine and move our shutdown logic to be inside the main() itself:

// Wait for SIGTERM to be captured
<-termChan
log.Println("SIGTERM received. Shutdown process initiated")

// Shutdown the HTTP server
if err := httpServer.Shutdown(ctx); err != nil {
	log.Fatalf("Server Shutdown Failed:%+v", err)
}

// Cancel the context, this will make the consumer stop
cancel()

// Wait for the consumer's jobs to finish
log.Println("waiting consumer to finish its jobs")
<-doneChan
log.Println("done. returning.")

Now upon a SIGTERM we’ll cancel() the context, which will trigger the consumer to stop, then we’ll wait on doneChan for the running jobs to finish. Now our consumer looks like this:

func consumer(ctx context.Context, jobQueue chan string, 
									doneChan chan interface{}) {
	wg := &sync.WaitGroup{}

	for {
		select {
		// If the context was cancelled, a SIGTERM was captured
        // So we wait for the jobs to finish, 
        // write to the done channel and return
		case <-ctx.Done():
            // Note that the waiting time here is unbounded 
            // and can take a long time. If that's an issue you can:
			// (1) issue a SIGKILL after a certain time or
			// (2) use a context with timeout
			wg.Wait()
			fmt.Println("writing to done channel")
			doneChan <- struct{}{}
			log.Println("Done, shutting down the consumer")
			return
		case job := <-jobQueue:
			wg.Add(3)
			go slowJob1(job, wg)
			go slowJob2(job, wg)
			go slowJob3(job, wg)
		}
	}
}

Much cleaner and self-contained, isn’t it? Now once we cancel() the context, the case <-ctx.Done() will be selected, and we’ll not consume from the jobQueue anymore, wait for the running ones to finish, then let the main thread know the consumer is done. This solves the potential risk of writing to a closed channel (the channel isn’t being closed anymore). With a context with cancellation coming from the entrypoint, we can pass it further down in more complex scenarios.

Full code here: https://gist.github.com/digorithm/fc68573db5618bb6d57d090fd66528bd


Thanks to Kenny Louie and epsleq0 for reviewing this piece.