Dynamic Topics pxGrid 2.0

Dynamic Topics consists of runing the customer server provider code for the publisher and the custom service consumer code for the subscriber. The ecosystem partner publishing the topic of information will run the customer service provider code. While ecosystem partmer connecting to the grid will run the custom service consumer code. In this examped,the pxGrid client Provider will publish the custom data topic, while the pxGrid Client consumer willl subscribe to this topic and consume the custom data information

Custom Service Provider

The Custom Service Provider code will act as the publisher. Note in the example below, CustomeServiceProvider3 is used for the public class. The default from the sample code will be CustomServiceProvider.

Main calls the ScheduleExecutor Service for sending the STOMP connect messages periodically to the pubsub service and for registering/reregistering the service. Main also parses the SampleConfiguration object which includes the pxGrid hostname, username or pxGrid client connection paramters, and the identity filename keystore (.jks) file and the truststore filename (.jks) values.

Under // AccountActivate, the pxGridcontrol object and waits for 60 seconds before the pxGrid client account is activated. The pxGrid controller version is also retrieved.

Under //pxGridServiceRegister, we register to the pubsubservice, and also register the custom topic for publishing, in this case, customTopic

Under // Schedule pxGrid ServiceReregister, we provide a time value for the pubsub service to re-register

Under // pxGrid ServiceLookup for pubsub service, we do a service lookup for com.cisco.ise.pubsub

Under // Use first service. Note that ServiceLookup randomize ordering of services, we get the ISE pxGrid node with the pubsub service that is available. If you have multiple ISE pxGrid nodes, the load will be distributed and the value returned may be the 2nd, 3rd, or 4th, ISE pxGrid node pending the ISE deployment. We also get the WebSocket URL properties.

Under // pxGrid AccessSecret, we get the access secret from the ISE pxGrid node

Under // Setup WebSocket client, we obtain the trust credentials in establishing connection to the ISE pxGrid node

Under // STOMP connect, we call a StompPubsubClientEndpoint to establish a connection with the STOMP messaging protocol over WebSockets. We connect the endpoint and under //STOP send periodically, the endpoint, otherwise known as the pxGrid client will publish the custom data for the published “topic/com/example.custom” topic.

Fot //STOMP disconnect, the client will send a STOMP disconnect and will receive an acknowledgement from the ISE pxGrid node that the endpoint has been disconnected, and the session will be closed.

Code

package com.cisco.pxgrid.samples.ise;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;

import javax.websocket.Session;

import org.apache.commons.cli.ParseException;
import org.glassfish.tyrus.client.ClientManager;
import org.glassfish.tyrus.client.ClientProperties; import org.glassfish.tyrus.client.SslEngineConfigurator; import org.glassfish.tyrus.client.auth.Credentials; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cisco.pxgrid.samples.ise.model.AccountState;
import com.cisco.pxgrid.samples.ise.model.Service;
import com.cisco.pxgrid.samples.ise.model.ServiceRegisterResponse;

/**
 * Demonstrates how to subscribe a topic from a custom service
 * 
 * The flow of the application is as follows:
 * 1. Parse arguments for configurations
 * 2. Activate Account. This will then require ISE Admin to approve this new node.
 * 3. pxGrid ServiceLookup for the custom service
 * 4. pxGrid ServiceLookup for ISE pubsub service
 * 5. pxGrid get AccessSecret for the ISE pubsub node
 * 6. Establish WebSocket connection with the ISE pubsub node
 * 7. Establish STOMP connection for pubsub messaging
 * 8. Subscribe to the topic in the custom service
 * 9. Wait for keyboard input for stopping the application
 */
public class CustomServiceProvider3 {
    private static Logger logger = LoggerFactory.getLogger(CustomServiceProvider3.class);
    
    public static void main(String[] args) throws Exception {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

        // Parse arguments
                SampleConfiguration config = new SampleConfiguration();
                try {
                    config.parse(args);
                } catch (ParseException e) {
                    config.printHelp("CustomServiceProvider");
                    System.exit(1);
                }
                
        // AccountActivate
                PxgridControl control = new PxgridControl(config);
                while (control.accountActivate() != AccountState.ENABLED) {
                    Thread.sleep(60000);		
                
        }
        logger.info("pxGrid controller version={}", control.getControllerVersion());
        
        // pxGrid ServiceRegister
        Map<String, String> sessionProperties = new HashMap<>(); sessionProperties.put("wsPubsubService", "com.cisco.ise.pubsub"); sessionProperties.put("customTopic", "/topic/com.example.custom"); ServiceRegisterResponse response = control.serviceRegister("com.example.custom",
        sessionProperties);
        String registrationId = response.getId();
        long reregisterTimeMillis = response.getReregisterTimeMillis();
        
        // Schedule pxGrid ServiceReregister
                executor.scheduleWithFixedDelay(() -> {
                    try {
                        control.serviceReregister(registrationId);
                    } catch (IOException e) {
                        logger.error("Reregister failure");
                    }
                }, reregisterTimeMillis, reregisterTimeMillis, TimeUnit.MILLISECONDS);
        
        
        // pxGrid ServiceLookup for pubsub service
        Service[] services = control.serviceLookup("com.cisco.ise.pubsub"); if (services.length == 0) {
        logger.info("Pubsub service unavailabe"); return;
        }
            
        // Use first service. Note that ServiceLookup randomize ordering of services
        Service wsPubsubService = services[0];
        String wsURL = wsPubsubService.getProperties().get("wsUrl"); logger.info("wsUrl={}", wsURL);
        
        // pxGrid AccessSecret
        String secret = control.getAccessSecret(wsPubsubService.getNodeName());
        
        // Setup WebSocket client
                ClientManager client = ClientManager.createClient();
                SslEngineConfigurator sslEngineConfigurator = new SslEngineConfigurator(config.getSSLContext());
                client.getProperties().put(ClientProperties.SSL_ENGINE_CONFIGURATOR, sslEngineConfigurator);
                client.getProperties().put(ClientProperties.CREDENTIALS,
                        new Credentials(config.getNodeName(), secret.getBytes()));
        
        // WebSocket connect
        StompPubsubClientEndpoint endpoint = new StompPubsubClientEndpoint();
        URI uri = new URI(wsURL);
        Session session = client.connectToServer(endpoint, uri);

        // STOMP connect
        endpoint.connect(uri.getHost());
        
        // STOMP send periodically
                executor.scheduleWithFixedDelay(() -> {
                    try {
                        endpoint.publish("/topic/com.example.custom", "custom data".getBytes());
                    } catch (IOException e) {
                        logger.error("Publish failure");
                    }
                }, 0, 5, TimeUnit.SECONDS);

                SampleHelper.prompt("press <enter> to disconnect...");


        // STOMP disconnect
        endpoint.disconnect("ID-123");
        // Wait for disconnect receipt
        Thread.sleep(3000);
        
        session.close();
    }
 }

Output

------ config ------
  hostname = ise24fc3.lab10.com
  nodename = Provider
  password = (not specified)
  description = (not specified)
  keystorefilename = /Applications/master_rest_samples/sw1.jks
  keystorepassword = Cisco123
  truststorefilename = /Applications/master_rest_samples/sw1root.jks
  truststorepassword = Cisco123
--------------------
04:04:17.203 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - AccountActivate request={}
04:04:17.727 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - AccountActivate response={"accountState":"ENABLED","version":"2.0.0.13"}
04:04:17.727 [main] INFO com.cisco.pxgrid.samples.ise.CustomServiceProvider3 - pxGrid controller version=2.0.0.13
04:04:17.750 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - ServiceRegister request={"name":"com.example.custom","properties":{"customTopic":"/topic/com.example.custom","wsPubsubService":"com.cisco.ise.pubsub"}}
04:04:17.946 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - ServiceRegister response={"id":"444b920e-2402-452a-beee-96344d3b0361","reregisterTimeMillis":300000}
04:04:17.966 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - ServiceLookup request={"name":"com.cisco.ise.pubsub"}
04:04:17.982 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - ServiceLookup response={"services":[{"name":"com.cisco.ise.pubsub","nodeName":"ise-pubsub-ise24fc3","properties":{"wsUrl":"wss://ise24fc3.lab10.com:8910/pxgrid/ise/pubsub"}}]}
04:04:17.982 [main] INFO com.cisco.pxgrid.samples.ise.CustomServiceProvider3 - wsUrl=wss://ise24fc3.lab10.com:8910/pxgrid/ise/pubsub
04:04:17.983 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - AccessSecret request={"peerNodeName":"ise-pubsub-ise24fc3"}
04:04:18.045 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - AccessSecret response={"secret":"89A50tOvRNKsRYx2"}
04:04:19.573 [Grizzly(1)] INFO com.cisco.pxgrid.samples.ise.StompPubsubClientEndpoint - WS onOpen
04:04:19.604 [main] INFO com.cisco.pxgrid.samples.ise.StompPubsubClientEndpoint - STOMP CONNECT host=ise24fc3.lab10.com
04:04:19.610 [pool-1-thread-1] INFO com.cisco.pxgrid.samples.ise.StompPubsubClientEndpoint - STOMP SEND topic=/topic/com.example.custom
press <enter> to disconnect...
04:04:19.620 [Grizzly(2)] INFO com.cisco.pxgrid.samples.ise.StompPubsubClientEndpoint - STOMP CONNECTED version=1.2
04:04:24.616 [pool-1-thread-1] INFO com.cisco.pxgrid.samples.ise.StompPubsubClientEndpoint - STOMP SEND topic=/topic/com.example.custom
04:04:29.622 [pool-1-thread-1] INFO com.cisco.pxgrid.samples.ise.StompPubsubClientEndpoint - STOMP SEND topic=/topic/com.example.custom

What you see in ISE

Select Administration->**pxGrid Services->, you should see the registered provider Select Web Clients, you should see the Provider publishing the custom topic

Custom Service Consumer

The public class CustomServiceConsumer will call the private static class MessageHandler class that implements StompSubscription Handler and prints out the session contents

Main parses the SampleConfiguration object which includes the pxGrid hostname, username or pxGrid client connection paramters, and the identity filename keystore (.jks) file and the truststore filename (.jks) values.

Under // AccountActivate, the pxGridcontrol object and waits for 60 seconds before the pxGrid client account is activated. The pxGrid controller version is also retrieved.

Under //pxGridServiceRegister, we register to the pubsubservice, and also register the custom topic for publishing, in this case, customTopic

Under // Schedule pxGrid ServiceReregister, we provide a time value for the pubsub service to re-register

Under // pxGrid ServiceLookup for pubsub service, we do a service lookup for com.cisco.ise.pubsub

Under // Use first service. Note that ServiceLookup randomize ordering of services, we get the ISE pxGrid node with the pubsub service that is available. If you have multiple ISE pxGrid nodes, the load will be distributed and the value returned may be the 2nd, 3rd, or 4th, ISE pxGrid node pending the ISE deployment. We also get the WebSocket URL properties.

Under // pxGrid AccessSecret, we get the access secret from the ISE pxGrid node

Under // Setup WebSocket client, we obtain the trust credentials in establishing connection to the ISE pxGrid node

Under // STOMP connect, we call a StompPubsubClientEndpoint to establish a connection with the STOMP messaging protocol over WebSockets.

Under //STOMP subscribe, we subscribe to the custom topic

We connect the endpoint and under //STOP send periodically, the endpoint, otherwise known as the pxGrid client will publish the custom data for the published “topic/com/example.custom” topic.

Fot //STOMP disconnect, the client will send a STOMP disconnect and will receive an acknowledgement from the ISE pxGrid node that the endpoint has been disconnected, and the session will be closed

Output

/**
 * Demonstrates how to subscribe a topic from a custom service
 * 
 * The flow of the application is as follows:
 * 1. Parse arguments for configurations
 * 2. Activate Account. This will then require ISE Admin to approve this new node.
 * 3. pxGrid ServiceLookup for the custom service
 * 4. pxGrid ServiceLookup for ISE pubsub service
 * 5. pxGrid get AccessSecret for the ISE pubsub node
 * 6. Establish WebSocket connection with the ISE pubsub node
 * 7. Establish STOMP connection for pubsub messaging
 * 8. Subscribe to the topic in the custom service
 * 9. Wait for keyboard input for stopping the application
 */
public class CustomServiceConsumer {
    private static Logger logger = LoggerFactory.getLogger(CustomServiceConsumer.class);	

    // Subscribe handler class
    private static class MessageHandler implements StompSubscription.Handler {
        @Override
        public void handle(StompFrame message) {
            System.out.println(new String(message.getContent()));
        }
    }

    public static void main(String [] args) throws Exception {
        // Parse arguments
        SampleConfiguration config = new SampleConfiguration();
        try {
            config.parse(args);
        } catch (ParseException e) {
            config.printHelp("CustomServiceConsumer");
            System.exit(1);
        }
        
        // AccountActivate
        PxgridControl control = new PxgridControl(config);
        while (control.accountActivate() != AccountState.ENABLED) {
            Thread.sleep(60000);
        }
        logger.info("pxGrid controller version={}", control.getControllerVersion());
        
        
        // pxGrid ServiceLookup for custom service
        Service[] services = control.serviceLookup("com.example.custom");
        if (services.length == 0) {
            logger.info("Service unavailabe");
            return;
        }
        
        // Use first service. Note that ServiceLookup randomize ordering of services
        Service customService = services[0];
        String wsPubsubServiceName = customService.getProperties().get("wsPubsubService");
        String customTopic = customService.getProperties().get("customTopic");
        logger.info("wsPubsubServiceName={} sessionTopic={}", wsPubsubServiceName, customTopic);
        
        // pxGrid ServiceLookup for pubsub service
        services = control.serviceLookup(wsPubsubServiceName);
        if (services.length == 0) {
            logger.info("Pubsub service unavailabe");
            return;
        }

        // Use first service
        Service wsPubsubService = services[0];
        String wsURL = wsPubsubService.getProperties().get("wsUrl");
        logger.info("wsUrl={}", wsURL);

        // pxGrid AccessSecret for the pubsub node
        String secret = control.getAccessSecret(wsPubsubService.getNodeName());

        // WebSocket config
        ClientManager client = ClientManager.createClient();
        SslEngineConfigurator sslEngineConfigurator = new SslEngineConfigurator(config.getSSLContext());
        client.getProperties().put(ClientProperties.SSL_ENGINE_CONFIGURATOR, sslEngineConfigurator);
        client.getProperties().put(ClientProperties.CREDENTIALS,
                new Credentials(config.getNodeName(), secret.getBytes()));
        
        // WebSocket connect
        StompPubsubClientEndpoint endpoint = new StompPubsubClientEndpoint();
        URI uri = new URI(wsURL);
        Session session = client.connectToServer(endpoint, uri);

        // STOMP connect
        endpoint.connect(uri.getHost());
        
        // Subscribe
        StompSubscription subscription = new StompSubscription(customTopic, new MessageHandler());
        endpoint.subscribe(subscription);

        SampleHelper.prompt("press <enter> to disconnect...");

        // STOMP disconnect
        endpoint.disconnect("ID-123");
        // Wait for disconnect receipt
        Thread.sleep(3000);
        
        session.close();
    }
}

Output

------ config ------
  hostname = ise24fc3.lab10.com
  nodename = Consumer
  password = (not specified)
  description = (not specified)
  keystorefilename = /Applications/master_rest_samples/sw1.jks
  keystorepassword = Cisco123
  truststorefilename = /Applications/master_rest_samples/sw1root.jks
  truststorepassword = Cisco123
--------------------
04:31:17.178 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - AccountActivate request={}
04:31:19.199 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - AccountActivate response={"accountState":"ENABLED","version":"2.0.0.13"}
04:31:19.199 [main] INFO com.cisco.pxgrid.samples.ise.CustomServiceConsumer - pxGrid controller version=2.0.0.13
04:31:19.201 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - ServiceLookup request={"name":"com.example.custom"}
04:31:19.220 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - ServiceLookup response={"services":[{"name":"com.example.custom","nodeName":"provider","properties":{"customTopic":"/topic/com.example.custom","wsPubsubService":"com.cisco.ise.pubsub"}}]}
04:31:19.220 [main] INFO com.cisco.pxgrid.samples.ise.CustomServiceConsumer - wsPubsubServiceName=com.cisco.ise.pubsub sessionTopic=/topic/com.example.custom
04:31:19.220 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - ServiceLookup request={"name":"com.cisco.ise.pubsub"}
04:31:19.232 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - ServiceLookup response={"services":[{"name":"com.cisco.ise.pubsub","nodeName":"ise-pubsub-ise24fc3","properties":{"wsUrl":"wss://ise24fc3.lab10.com:8910/pxgrid/ise/pubsub"}}]}
04:31:19.232 [main] INFO com.cisco.pxgrid.samples.ise.CustomServiceConsumer - wsUrl=wss://ise24fc3.lab10.com:8910/pxgrid/ise/pubsub
04:31:19.234 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - AccessSecret request={"peerNodeName":"ise-pubsub-ise24fc3"}
04:31:19.475 [main] INFO com.cisco.pxgrid.samples.ise.PxgridControl - AccessSecret response={"secret":"Lx95QSluWdSqiE3e"}
04:31:21.140 [Grizzly(1)] INFO com.cisco.pxgrid.samples.ise.StompPubsubClientEndpoint - WS onOpen
04:31:21.168 [main] INFO com.cisco.pxgrid.samples.ise.StompPubsubClientEndpoint - STOMP CONNECT host=ise24fc3.lab10.com
04:31:21.195 [main] INFO com.cisco.pxgrid.samples.ise.StompPubsubClientEndpoint - STOMP SUBSCRIBE topic=/topic/com.example.custom
04:31:21.206 [Grizzly(2)] INFO com.cisco.pxgrid.samples.ise.StompPubsubClientEndpoint - STOMP CONNECTED version=1.2
press <enter> to disconnect...
custom data
custom data

What you see in ISE

Select Administrator->pxGrid Services, you should see the Consumer register

Select Administrator->pxGrid Services->Web Clients, you should see the Consumer subscribe to the Custom data topic