Configure Streaming of Events
Several APIs provide streaming of events. All of them use events.proto for configuration.
Recommended gRPC settings
Keep-Alive
The API Gateway hosting api.{region}.wgtwo.com (See API Environments) will silently drop connections if they are idle and keep-alive is not enabled.
That is, if the stream has low traffic you may experience that the stream is down without that being visible for your client.
Our event services is set up to allow keep-alive requests every minute. We strongly recommend that you configure a keep-alive period of most five minutes as that will ensure your connection is not silently dropped.
Recommended Settings
Setting | Recommended value |
---|---|
keep-alive period | 1 minute (Must be between 1 and 5 minutes) |
keep-alive timeout | 10 seconds |
permit keep-alive without calls | true |
Java Example
var managedChannel = ManagedChannelBuilder.forTarget("sandbox.api.shamrock.wgtwo.com:443")
.keepAliveWithoutCalls(true)
.keepAliveTime(1, TimeUnit.MINUTES)
.keepAliveTimeout(10, TimeUnit.SECONDS)
.idleTimeout(1, TimeUnit.HOURS)
.build()
Max Connection Age
Max connection age for our servers is, in general, set to 60 minutes with a random jitter of ± 6 minutes. After this the connection will be gracefully terminated.
General Advice
Your client should be able to handle recovering from random disconnects. That is, the client should simply reconnect on errors.
In the case of authentication errors, you should wait for some period to avoid being rate limited.
Configure for Development
Using a RegularStream
instead of the default value will result in the following:
- reading position will not be stored in the server
- load is not spread between your clients
This will make you lose events when you restart your application, and if you have multiple connections using the same OAuth 2.0 client ID, they will all receive all events (load is not spread between the clients).
These settings are not recommended for production use.
Kotlin example:
val request = SubscriptionEventsProto.StreamHandsetChangeEventsRequest.newBuilder()
.setStreamConfiguration(
EventsProto.StreamConfiguration.newBuilder()
.setRegular(EventsProto.RegularStream.getDefaultInstance())
)
.build()
context.run {
subscriptionEvents.streamHandsetChangeEvents(request, HandsetChangeObserver)
}
Configure for gRPCurl
Writing application code you would normally acknowledge events as you receive them. This is not something you can easily do when streaming with gRPCurl. To fix this you must disable explicit acknowledgement using disable_explicit_ack
.
Example Usage
grpcurl \
-d '
{
"stream_configuration": {
"regular": {},
"disable_explicit_ack": {}
}
}
' \
sandbox.api.shamrock.wgtwo.com:443 \
wgtwo.subscription.v1.SubscriptionEventService/StreamHandsetChangeEvents
This example also uses RegularStream
.
See the events.proto > StreamConfiguration > disable_explicit_ack
for more information.
Configure For Production
The default configuration is production ready. This configuration remembers queue position, so you don't lose any events, and spreads load across all servers using the same OAuth 2.0 client ID.
See the events.proto > stream_type
for more information.
Manually Acknowledge Events
You also have to explicitly ack(nowledge) each event to let the streaming API know that you have indeed received the message, and it should not be sent again.
private val channel = ManagedChannelBuilder.forAddress("sandbox.api.shamrock.wgtwo.com", 443).build()
private val subscriptionEvents: SubscriptionEventServiceGrpc.SubscriptionEventServiceStub =
SubscriptionEventServiceGrpc.newStub(channel)
.withWaitForReady()
// ...
// v example of ack method to ack a StreamHandsetChangeEventsResponse
private fun ack(event: StreamHandsetChangeEventsResponse) {
// v event.metadata.ackInfo holds the data to pass to the ack request
val request = AckHandsetChangeEventRequest.newBuilder().setAckInfo(event.metadata.ackInfo).build()
subscriptionEvents.ackHandsetChangeEvent(request, AckObserver)
}
Relevant events.proto
messages:
Example of API using this message in rpc method:
Tweak Max Number Of In-Flight Events
You can also tweak the default max_in_flight
.
val request = SubscriptionEventsProto.StreamHandsetChangeEventsRequest.newBuilder()
.setStreamConfiguration(
EventsProto.StreamConfiguration.newBuilder()
.setMaxInFlight(10) // <- setting max_in_flight to 10
)
.build()
context.run {
subscriptionEvents.streamHandsetChangeEvents(request, HandsetChangeObserver)
}
See events.proto > StreamConfiguration > max_in_flight
for more information.