Configure Streaming of Events

Several APIs provide streaming of events. All of them use events.proto for configuration.

Keep-Alive

The API Gateway hosting api.{region}.wgtwo.com (See API Environments) will silently drop connections if they are idle and keep-alive is not enabled.

That is, if the stream has low traffic you may experience that the stream is down without that being visible for your client.

Our event services is set up to allow keep-alive requests every minute. We strongly recommend that you configure a keep-alive period of most five minutes as that will ensure your connection is not silently dropped.

Setting Recommended value
keep-alive period 1 minute (Must be between 1 and 5 minutes)
keep-alive timeout 10 seconds
permit keep-alive without calls true

Java Example

var managedChannel = ManagedChannelBuilder.forTarget("sandbox.api.shamrock.wgtwo.com:443")
    .keepAliveWithoutCalls(true)
    .keepAliveTime(1, TimeUnit.MINUTES)
    .keepAliveTimeout(10, TimeUnit.SECONDS)
    .idleTimeout(1, TimeUnit.HOURS)
    .build()

Max Connection Age

Max connection age for our servers is, in general, set to 60 minutes with a random jitter of ± 6 minutes. After this the connection will be gracefully terminated.

General Advice

Your client should be able to handle recovering from random disconnects. That is, the client should simply reconnect on errors.

In the case of authentication errors, you should wait for some period to avoid being rate limited.

Configure for Development

Using a RegularStream instead of the default value will result in the following:

  • reading position will not be stored in the server
  • load is not spread between your clients

This will make you lose events when you restart your application, and if you have multiple connections using the same OAuth 2.0 client ID, they will all receive all events (load is not spread between the clients).

These settings are not recommended for production use.

Kotlin example:

val request = SubscriptionEventsProto.StreamHandsetChangeEventsRequest.newBuilder()
    .setStreamConfiguration(
        EventsProto.StreamConfiguration.newBuilder()
            .setRegular(EventsProto.RegularStream.getDefaultInstance())
    )
    .build()
context.run {
    subscriptionEvents.streamHandsetChangeEvents(request, HandsetChangeObserver)
}

Configure for gRPCurl

Writing application code you would normally acknowledge events as you receive them. This is not something you can easily do when streaming with gRPCurl. To fix this you must disable explicit acknowledgement using disable_explicit_ack.

Example Usage

grpcurl \
  -d '
  {
    "stream_configuration": {
      "regular": {},
      "disable_explicit_ack": {}
    }
  }
  ' \
  sandbox.api.shamrock.wgtwo.com:443 \
  wgtwo.subscription.v1.SubscriptionEventService/StreamHandsetChangeEvents

This example also uses RegularStream.

See the events.proto > StreamConfiguration > disable_explicit_ack for more information.

Configure For Production

The default configuration is production ready. This configuration remembers queue position, so you don't lose any events, and spreads load across all servers using the same OAuth 2.0 client ID.

See the events.proto > stream_type for more information.

Manually Acknowledge Events

You also have to explicitly ack(nowledge) each event to let the streaming API know that you have indeed received the message, and it should not be sent again.

private val channel = ManagedChannelBuilder.forAddress("sandbox.api.shamrock.wgtwo.com", 443).build()
private val subscriptionEvents: SubscriptionEventServiceGrpc.SubscriptionEventServiceStub =
    SubscriptionEventServiceGrpc.newStub(channel)
        .withWaitForReady()

// ...

// v example of ack method to ack a StreamHandsetChangeEventsResponse
private fun ack(event: StreamHandsetChangeEventsResponse) {
    // v event.metadata.ackInfo holds the data to pass to the ack request
    val request = AckHandsetChangeEventRequest.newBuilder().setAckInfo(event.metadata.ackInfo).build()
    subscriptionEvents.ackHandsetChangeEvent(request, AckObserver)
}

Relevant events.proto messages:

Example of API using this message in rpc method:

Tweak Max Number Of In-Flight Events

You can also tweak the default max_in_flight.

val request = SubscriptionEventsProto.StreamHandsetChangeEventsRequest.newBuilder()
    .setStreamConfiguration(
        EventsProto.StreamConfiguration.newBuilder()
            .setMaxInFlight(10) // <- setting max_in_flight to 10
    )
    .build()
context.run {
    subscriptionEvents.streamHandsetChangeEvents(request, HandsetChangeObserver)
}

See events.proto > StreamConfiguration > max_in_flight for more information.

Read More