Simple distributed tracing with OpenTracing and Stackdriver

2019-12-24

Nothing much, just my variation/helloworld for opentelemetry in golang..its my variation of Opentelemetry-Distributed Tracing sample

This is a simple frontend-backend application you can run on your laptop which demonstrates distributed tracing between microservices.

What step 5 below shows is an inbound request to one microservice (/frontend) which emits some subspans, then makes an http call to a backend app (/backend) which also emits some spans. The final trace you see is a combined end-to-end trace between microservices.

What it does not demonstrate are the cloudstorage API calls made within a span. This used to work in opencensus tracing libraries but at the time of writing, i’m not sure if it is even possible with opentelemetry…

Anyway,

gcloud auth application-default login
export PROJECT_ID=`gcloud config get-value core/project`


gsutil mb gs://$PROJECT_ID-ot/
echo foo > file.txt
gsutil cp file.txt gs://$PROJECT_ID-ot/

2. Server

export PROJECT_ID=`gcloud config get-value core/project`
export OT_BUCKET=$PROJECT_ID-ot

cd server/
go run main.go

3. Client


export PROJECT_ID=`gcloud config get-value core/project`
export OT_BUCKET=gs://$PROJECT_ID-ot/

cd client/
go run main.go

4. Invoke

curl http://localhost:8080/frontend

5. See traces


images/trace_1.png

  • client.go
package main

import (
	"flag"
	"fmt"
	"io"
	"log"
	"math/rand"
	"net/http"
	"os"
	"time"

	"context"

	"cloud.google.com/go/storage"
	"golang.org/x/net/http2"

	"google.golang.org/api/iterator"

	"go.opentelemetry.io/otel/api/distributedcontext"
	"go.opentelemetry.io/otel/api/global"
	"go.opentelemetry.io/otel/api/key"
	"go.opentelemetry.io/otel/api/trace"
	"go.opentelemetry.io/otel/exporter/trace/stackdriver"
	"go.opentelemetry.io/otel/plugin/httptrace"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

var (
	httpport = flag.String("httpport", ":8081", "httpport")

	mytracer trace.Tracer
	fooKey   = key.New("ex.com/foo")
)

const (
)

type server struct{}

func backhandler(w http.ResponseWriter, r *http.Request) {
	log.Println("/backend called")

	bucketName := os.Getenv("OT_BUCKET")

	attrs, entries, spanCtx := httptrace.Extract(r.Context(), r)
	fmt.Println("extracted context")

	r = r.WithContext(distributedcontext.WithMap(r.Context(), distributedcontext.NewMap(distributedcontext.MapUpdate{
		MultiKV: entries,
	})))
	fmt.Println("request context modified")
	sctx, span := mytracer.Start(
		r.Context(),
		"Child-Trace",
		trace.WithAttributes(attrs...),
		trace.ChildOf(spanCtx),
	)

	err := mytracer.WithSpan(sctx, "child-operation", func(ctx context.Context) error {
		trace.CurrentSpan(ctx).SetAttributes(fooKey.String("no"))
		trace.CurrentSpan(ctx).AddEvent(ctx, "Event2", key.New("bogons").Int(100))

		time.Sleep(2 * time.Second)
		delay := rand.Intn(300) + 50
		time.Sleep(time.Duration(delay) * time.Millisecond)

		client, err := storage.NewClient(ctx)
		if err != nil {
			log.Fatal(err)
		}

		it := client.Bucket(bucketName).Objects(ctx, nil)
		for {
			attrs, err := it.Next()
			if err == iterator.Done {
				break
			}
			if err != nil {
				return err
			}
			log.Println(attrs.Name)
		}

		return nil
	})
	if err != nil {
		log.Fatalf("Error making request %v", err)
	}

	defer span.End()
	fmt.Println("worker done ")
	_, _ = io.WriteString(w, "work-done\n")
	return

	fmt.Fprint(w, "ok")
}

func healthhandler(w http.ResponseWriter, r *http.Request) {
	log.Println("heathcheck...")
	fmt.Fprint(w, "ok from backend")
}

func main() {

	flag.Parse()

	if *httpport == "" {
		fmt.Fprintln(os.Stderr, "missing -httpport flag (:8080)")
		flag.Usage()
		os.Exit(2)
	}

	projectID := os.Getenv("PROJECT_ID")

	exporter, err := stackdriver.NewExporter(
		stackdriver.WithProjectID(projectID),
	)
	if err != nil {
		log.Fatal(err)
	}

	tp, err := sdktrace.NewProvider(sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
		sdktrace.WithSyncer(exporter))
	if err != nil {
		log.Fatal(err)
	}
	global.SetTraceProvider(tp)

	mytracer = global.TraceProvider().Tracer("stackdriver/example/starter")

	http.HandleFunc("/backend", backhandler)
	http.HandleFunc("/_ah/health", healthhandler)

	srv := &http.Server{
		Addr: *httpport,
	}
	http2.ConfigureServer(srv, &http2.Server{})
	//err := srv.ListenAndServeTLS("server_crt.pem", "server_key.pem")
	err = http.ListenAndServe(*httpport, nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}
  • server.go
package main

import (
	"flag"
	"fmt"
	"io/ioutil"
	"log"

	"net/http"
	"os"
	"time"

	"context"

	"golang.org/x/net/http2"
	"google.golang.org/grpc/codes"

	"go.opentelemetry.io/otel/api/distributedcontext"
	"go.opentelemetry.io/otel/api/global"
	"go.opentelemetry.io/otel/api/key"
	"go.opentelemetry.io/otel/api/trace"
	"go.opentelemetry.io/otel/exporter/trace/stackdriver"
	"go.opentelemetry.io/otel/plugin/httptrace"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

var (
	httpport = flag.String("httpport", ":8080", "httpport")

	mytracer trace.Tracer
	fooKey   = key.New("ex.com/foo")
)

const ()

type server struct{}

func fronthandler(w http.ResponseWriter, r *http.Request) {
	log.Println("/frontend called")
	client := http.DefaultClient
	ctx := distributedcontext.NewContext(r.Context(),
		key.String("username", "donuts"),
	)

	workerURL := "http://localhost:8081/backend"
	err := mytracer.WithSpan(ctx, "parent-tracer",
		func(ctx context.Context) error {
			err := mytracer.WithSpan(ctx, "operation", func(ctx context.Context) error {
				trace.CurrentSpan(ctx).AddEvent(ctx, "Event1", key.New("bogons").Int(100))
				trace.CurrentSpan(ctx).SetAttributes(fooKey.String("yes"))
				time.Sleep(2 * time.Second)
				return nil
			})

			req, _ := http.NewRequest("GET", workerURL, nil)

			ctx, req = httptrace.W3C(ctx, req)
			httptrace.Inject(ctx, req)

			res, err := client.Do(req)
			if err != nil {
				log.Fatalf("Unable to make request %v", err)
			}
			body, err := ioutil.ReadAll(res.Body)
			_ = res.Body.Close()
			log.Printf("Backend Response %s", string(body))
			trace.CurrentSpan(ctx).SetStatus(codes.OK)

			return err
		})

	if err != nil {
		log.Fatalf("Error making request %v", err)
	}

	fmt.Fprint(w, "ok from frontend")
}

func healthhandler(w http.ResponseWriter, r *http.Request) {
	log.Println("heathcheck...")
	fmt.Fprint(w, "ok from backend")
}

func main() {

	flag.Parse()

	if *httpport == "" {
		fmt.Fprintln(os.Stderr, "missing -httpport flag (:8080)")
		flag.Usage()
		os.Exit(2)
	}

	projectID := os.Getenv("PROJECT_ID")

	exporter, err := stackdriver.NewExporter(
		stackdriver.WithProjectID(projectID),
	)
	if err != nil {
		log.Fatal(err)
	}

	tp, err := sdktrace.NewProvider(sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
		sdktrace.WithSyncer(exporter))
	if err != nil {
		log.Fatal(err)
	}
	global.SetTraceProvider(tp)

	mytracer = global.TraceProvider().Tracer("stackdriver/example/starter")

	http.HandleFunc("/frontend", fronthandler)
	http.HandleFunc("/_ah/health", healthhandler)

	srv := &http.Server{
		Addr: *httpport,
	}
	http2.ConfigureServer(srv, &http2.Server{})
	//err := srv.ListenAndServeTLS("server_crt.pem", "server_key.pem")
	err = http.ListenAndServe(*httpport, nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}

This site supports webmentions. Send me a mention via this form.