Using Google Cloud SDK Emulators

2021-12-15

This article talks about how you can attach and use these the various service emulators GCP offers for local testing and development

For the most part, switching between the emulator and using productions is transparent and based on the existence of a predefined environment variable but in some rare cases, you need to specify the host and TLS settings (or lack thereof) to even connect to the emulator.

Finally, for many services (notably GCS), there are no emulators

We’ll take an easy example for this article: PubSub

For pubsub, if the following environment variable is set, Cloud SDK libraries will automatically attempt to contact the emulator instead of contacting GCP.

 export PUBSUB_EMULATOR_HOST=localhost:8085

But it gets a bit more complicated from there…for official documentation, see PubSub: Testing apps locally with the emulator

The following code snippets shows how to construct and use the emulators where one is available.


You can also emulate the GCE metadata server here but this technique is not supported by Google: GCE Metadata Server Emulator


Pubsub Emulators

PubSub emulators can be launched by simply running

 gcloud alpha emulators pubsub start

Then specifying the listener port (default: localhost:8085) as the environment variable shown below in the shell where you will run the GCP SDK.

The following snippets will:

  • create a topic
  • list topics
  • create a pull subscription
  • create a push subscription
  • publish a message
  • subscribe to messages

and separately if you run the push webserver, you can also recieve the push message.

from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

project_id='fooproject'
topic_id="mytopic"
push_subscription_id="push_subscription"
pull_subscription_id="pull_subscription"


publisher = pubsub_v1.PublisherClient()
project_path = f"projects/{project_id}"
topic_path = publisher.topic_path(project_id, topic_id)
topic = publisher.create_topic(request={"name": topic_path})
print(f"Created topic: {topic.name}")
for topic in publisher.list_topics(request={"project": project_path}):
  print(topic.name)


sc1 = pubsub_v1.SubscriberClient()
pull_subscription_path = sc1.subscription_path(project_id, pull_subscription_id)
with sc1:
    try:
        subscription = sc1.create_subscription(
            request={"name": pull_subscription_path, "topic": topic_path}
        )
        print(f"Subscription created: {subscription}")
    except:
        print(f"Subscription already exists: {pull_subscription_id}")


endpoint='http://localhost:3000/messages'
push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint)

sc2 = pubsub_v1.SubscriberClient()
push_subscription_path = sc2.subscription_path(project_id, push_subscription_id)
with sc2:
   subscription = sc2.create_subscription(
        request={
            "name": push_subscription_path,
            "topic": topic_path,
            "push_config": push_config,
        }
   )

print(f"Push subscription created: {subscription}.")
print(f"Endpoint for subscription is: {endpoint}")


## emulator does not populate a subscription with messages if the subscription does not 
## exist yet.  so we're sending in messages after the subscrition is created

for n in range(1, 10):
   data = f"Message number {n}"
   data = data.encode("utf-8")
   future = publisher.publish(topic_path, data)
   print(future.result()) 


pull_subscriber = pubsub_v1.SubscriberClient()

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = pull_subscriber.subscribe(pull_subscription_path, callback=callback)
print(f"Listening for messages on {pull_subscription_path}..\n")
timeout = 5.0

with pull_subscriber:
    try: 
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()
        streaming_pull_future.result()
  • Push Subscriber webserver

The following is a demo push subscriber http listener

from http.server import HTTPServer, BaseHTTPRequestHandler
import simplejson
import pprint

class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):

    def do_GET(self):
        self.send_response(200)
        self.end_headers()
        self.wfile.write(b'Hello, world!')

    def do_POST(self):
        content_length = int(self.headers['Content-Length'])

        print(self.headers)
        body = self.rfile.read(content_length)
        json_data = simplejson.loads(body)
        pprint.pprint(json_data)
        self.send_response(200)
        self.end_headers()
        return


httpd = HTTPServer(('localhost', 3000), SimpleHTTPRequestHandler)
httpd.serve_forever()
package main

import (
	"fmt"
	"sync"
	"time"

	pubsub "cloud.google.com/go/pubsub"

	"golang.org/x/net/context"
	"google.golang.org/api/iterator"
)

const (
	projectID          = "fooproject"
	topicID            = "mytopic"
	pushSubscriptionID = "push_subscription"
	pullSubscriptionID = "pull_subscription"
)

func main() {

	ctx := context.Background()

	pubsubClient, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		fmt.Printf("pubsub.NewClient: %v", err)
		return
	}
	defer pubsubClient.Close()

	t, err := pubsubClient.CreateTopic(ctx, topicID)
	if err != nil {
		fmt.Printf("CreateTopic: %v", err)
		return
	}
	fmt.Printf("Topic created: %v\n", t)

	pit := pubsubClient.Topics(ctx)
	for {
		topic, err := pit.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			fmt.Printf("pubssub.Iterating error: %v", err)
			return
		}
		fmt.Printf("Topic Name: %s\n", topic.ID())
	}

	pullSub, err := pubsubClient.CreateSubscription(ctx, pullSubscriptionID, pubsub.SubscriptionConfig{
		Topic:       t,
		AckDeadline: 20 * time.Second,
	})
	if err != nil {
		fmt.Printf("CreateSubscription: %v", err)
		return
	}
	fmt.Printf("Created Pull subscription: %v\n", pullSub)

	pushSub, err := pubsubClient.CreateSubscription(ctx, pushSubscriptionID, pubsub.SubscriptionConfig{
		Topic:       t,
		AckDeadline: 20 * time.Second,
		PushConfig: pubsub.PushConfig{
			Endpoint: "http://localhost:3000/messages",
		},
	})
	if err != nil {
		fmt.Printf("CreateSubscription: %v", err)
		return
	}
	fmt.Printf("Created Push subscription: %v\n", pushSub)

	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("foo"),
	})
	id, err := result.Get(ctx)
	if err != nil {
		fmt.Printf("Get Error: %v", err)
		return
	}
	fmt.Printf("Published a message; msg ID: %v\n", id)

	var mu sync.Mutex
	received := 0
	sub := pubsubClient.Subscription(pullSubscriptionID)
	cctx, cancel := context.WithCancel(ctx)
	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		fmt.Printf("Got message: %q\n", string(msg.Data))
		msg.Ack()
		received++
		if received == 1 {
			cancel()
		}
	})
	if err != nil {
		fmt.Printf("Receive Error: %v", err)
		return
	}

}

With maven:

 mvn -DPUBSUB_EMULATOR_HOST=localhost:8085 -DPUBSUB_PROJECT_ID=my-project-id clean install exec:java -q

Source

package com.test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient.ListTopicsPagedResponse;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ListTopicsRequest;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class TestApp {
	public static void main(String[] args) {
		TestApp tc = new TestApp();
	}

	private static String projectID = "fooproject";
	private static String topicID = "mytopic";
	private static String pushSubscriptionID = "push_subscription";
	private static String pullSubscriptionID = "pull_subscription";

	public TestApp() {
		try {

			ManagedChannel channel = ManagedChannelBuilder.forTarget("127.0.0.1:8085").usePlaintext().build();
			TransportChannelProvider channelProvider = FixedTransportChannelProvider
					.create(GrpcTransportChannel.create(channel));
			// requires some credential provider
			TopicAdminClient topicAdminClient = TopicAdminClient.create(TopicAdminSettings.newBuilder()
					.setTransportChannelProvider(channelProvider)
					.setCredentialsProvider(NoCredentialsProvider.create()).build());

			String topicName = TopicName.format(projectID, topicID);
			Topic topic = topicAdminClient.createTopic(topicName);
			System.out.println("Created topic: " + topic.getName());

			ListTopicsRequest listTopicsRequest = ListTopicsRequest.newBuilder()
					.setProject(ProjectName.format(topicID))
					.build();

			System.out.println("Topics: ");
			ListTopicsPagedResponse response = topicAdminClient.listTopics(listTopicsRequest);
			Iterable<Topic> topics = response.iterateAll();
			for (Topic t : topics)
				System.out.println(t);

			SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient
					.create(SubscriptionAdminSettings.newBuilder()
							.setTransportChannelProvider(channelProvider)
							.setCredentialsProvider(NoCredentialsProvider.create()).build());

			SubscriptionName pullSubscriptionName = SubscriptionName.of(projectID, pullSubscriptionID);
			Subscription pullSubscription = subscriptionAdminClient.createSubscription(pullSubscriptionName.toString(),
					topicName, PushConfig.getDefaultInstance(), 10);
			System.out.println("Created pull subscription: " + pullSubscription.getName());

			SubscriptionName pushSubscriptionName = SubscriptionName.of(projectID, pushSubscriptionID);
			PushConfig pushConfig = PushConfig.newBuilder().setPushEndpoint("http://localhost:3000/messages").build();

			Subscription pushSubscription = subscriptionAdminClient.createSubscription(pushSubscriptionName.toString(),
					topicName, pushConfig, 10);
			System.out.println("Created push subscription: " + pushSubscription.getName());

			System.out.println("Subscriptions: ");
			for (String s : topicAdminClient.listTopicSubscriptions(topicName).iterateAll()) {
				System.out.println(s);
			}

			Publisher publisher = Publisher.newBuilder(topicName)
					.setCredentialsProvider(NoCredentialsProvider.create())
					.setChannelProvider(channelProvider).build();
			String message = "Hello World!";
			ByteString data = ByteString.copyFromUtf8(message);
			PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

			ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
			String messageId = messageIdFuture.get();
			System.out.println("Published message ID: " + messageId);

			MessageReceiver receiver = (PubsubMessage m, AckReplyConsumer consumer) -> {
				// Handle incoming message, then ack the received message.
				System.out.println("Id: " + m.getMessageId());
				System.out.println("Data: " + m.getData().toStringUtf8());
				consumer.ack();
			};

			ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(projectID, pullSubscriptionID);

			Subscriber subscriber = null;
			try {
				subscriber = Subscriber.newBuilder(projectSubscriptionName, receiver)
						.setCredentialsProvider(NoCredentialsProvider.create())
						.setChannelProvider(channelProvider).build();
				subscriber.startAsync().awaitRunning();
				System.out.printf("Listening for messages on %s:\n", projectSubscriptionName.toString());
				subscriber.awaitTerminated(10, TimeUnit.SECONDS);
			} catch (TimeoutException timeoutException) {
				subscriber.stopAsync();
			}

		} catch (Exception ex) {
			System.out.println("Error: " + ex);
		}
	}

}
var log4js = require("log4js");
var logger = log4js.getLogger();

const {PubSub} = require('@google-cloud/pubsub');

const projectID          = "fooproject";
const topicID            = "mytopic";
const pushSubscriptionID = "push_subscription";
const pullSubscriptionID = "pull_subscription";

async function main() {
	const pubSubClient = new PubSub({
 	 projectId: projectID
	});

	const [topic] = await pubSubClient.createTopic(topicID);
	console.log(`Topic ${topic.name} created.`);

	
	const [topics] = await pubSubClient.getTopics();
	topics.forEach(topic => logger.info(topic.name));

	const [pullSubscription] = await topic.createSubscription(pullSubscriptionID);
	console.log(`Subscription ${pullSubscription.name} created.`);

	const options = {
		pushConfig: {
		  pushEndpoint: `http://localhost:3000/messages`,
		},
	};
	const [pushSubscription] = await topic.createSubscription(pushSubscriptionID, options);
	console.log(`Subscription ${pushSubscription.name} created.`);

	const data = Buffer.from("foo");
    const messageId = await pubSubClient.topic(topicID).publishMessage({data});
    console.log(`Message ${messageId} published.`)

	const subscription = pubSubClient.subscription(pullSubscriptionID);
	let messageCount = 0;
	const messageHandler = message => {
	  console.log(`Received message ${message.id}:`);
	  console.log(`\tData: ${message.data}`);
	  console.log(`\tAttributes: ${message.attributes}`);
	  messageCount += 1;
	  message.ack();
	};
  
	subscription.on('message', messageHandler);
	setTimeout(() => {
	  subscription.removeListener('message', messageHandler);
	  console.log(`${messageCount} message(s) received.`);
	}, timeout * 1000);

}

main().catch(console.error);
using System;

using Google.Cloud.PubSub.V1;
using System.Threading.Tasks;

using Google.Api.Gax.ResourceNames;
using System.Collections.Generic;
using Grpc.Core;
using Google.Protobuf;

using Grpc.Core.Logging;
using System.Linq;


namespace main
{
    class Program
    {

        const string projectID = "fooproject";
        const string topicID = "mytopic";
        const string pushSubscriptionID = "push_subscription";
        const string pullSubscriptionID = "pull_subscription";

        [STAThread]
        static void Main(string[] args)
        {
            new Program().Run().Wait();
        }

        private Task Run()
        {

            GrpcEnvironment.SetLogger(new ConsoleLogger());

            string emulatorHostAndPort = Environment.GetEnvironmentVariable("PUBSUB_EMULATOR_HOST");

            PublisherServiceApiClient publisher = new PublisherServiceApiClientBuilder
            {
                Endpoint = emulatorHostAndPort,
                ChannelCredentials = ChannelCredentials.Insecure,
                //EmulatorDetection=  Google.Api.Gax.EmulatorDetection.EmulatorOnly,
            }.Build();

            ProjectName projectName = ProjectName.FromProject(projectID);


            var topicName = TopicName.FromProjectTopic(projectID, topicID);
            Topic topic = null;

            topic = publisher.CreateTopic(topicName);
            Console.WriteLine($"Topic {topic.Name} created.");

            foreach (Topic t in publisher.ListTopics(projectName))
                Console.WriteLine(t.MessageStoragePolicy);


            SubscriberServiceApiClient subscriber = new SubscriberServiceApiClientBuilder
            {
                Endpoint = emulatorHostAndPort,
                ChannelCredentials = ChannelCredentials.Insecure,
                //EmulatorDetection=  Google.Api.Gax.EmulatorDetection.EmulatorOnly,
            }.Build();

            SubscriptionName pullSubscriptionName = SubscriptionName.FromProjectSubscription(projectID, pullSubscriptionID);
            var ackDeadlineSeconds = 60;
            var pullSubscription = subscriber.CreateSubscription(pullSubscriptionName, topicName, new PushConfig(), ackDeadlineSeconds);

            SubscriptionName pushSubscriptionName = SubscriptionName.FromProjectSubscription(projectID, pushSubscriptionID);

            PushConfig pushConfig = new PushConfig { PushEndpoint = "http://localhost:3000/messages" };
            var pushSubscriber = subscriber.CreateSubscription(pushSubscriptionName, topicName, pushConfig, ackDeadlineSeconds);

            var message = new PubsubMessage()
            {
                Data = ByteString.CopyFromUtf8("hello world")
            };

            var messageList = new List<PubsubMessage>() { message };

            var response = publisher.Publish(topicName, messageList);
            Console.WriteLine("  Message ids published:");
            foreach (string messageId in response.MessageIds)
            {
                Console.WriteLine($"  {messageId}");
            }

            PullResponse pulLResponse = subscriber.Pull(pullSubscriptionName, maxMessages: 20);
            foreach (ReceivedMessage msg in pulLResponse.ReceivedMessages)
            {
                string text = System.Text.Encoding.UTF8.GetString(msg.Message.Data.ToArray());
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
            }

            subscriber.Acknowledge(pullSubscriptionName, pulLResponse.ReceivedMessages.Select(msg => msg.AckId));

            return Task.CompletedTask;
        }
    }
}

This site supports webmentions. Send me a mention via this form.