Search This Blog

Wednesday, December 28, 2011

WebLogic JMS Partitioned Distributed Topics and Shared Subscriptions

I had previously blogged about the challenges of scaling topics with durable subscribers. In JMS, with topics, one can typically have a single topic subscriber for a particular [Client ID, Subscription Name] tuple.

In the example shown below the Order Topic has two durable subscribers, the Order Processor and the Auditor. Both of these get the message sent to the topic but at any give time, there can be exactly one of each consuming the message. There cannot be completing durable subscriptions as in the case of queues. The same translates to a scalability and availability challenge:
Oracle WebLogic has introduced the concept of a Partitioned distributed topic and shared connection subscriptions that will allow for more than one durable to subscription to be made available for a particular subscriber. In the figure shown below, with a shared subscription, one can have consumers of an "application" compete for messages of a Topic and this feature promotes scalability and availability. In the figure shown below there are two Order Processors which are competing for messages and so are the two Auditors. At no given time will a message be delivered to both the consumers of the same "application":

WebLogic introduces the concept of a UNRESTRICTED Client ID policy. With this setting on a topic, more than one connection in a cluster can share the same Client ID. The standard option of RESTRICTED enforces that only a single connection with a particular Client ID can exist in a cluster.

Sharing Subscriptions:
There are two policies for Subscription sharing:

1. Exclusive - This is the default and all subscribers created using the connection factory cannot share subscriptions with any other subscribers.

2. Sharable - Subscribers created using this connection factory can share their subscriptions with other subscribers. Consumers can share a durable subscription if they have the same [Client ID, Client ID policy and subscription name].

To promote HA, set the Subscription on the Topic as Shareable.

When creating a Topic in Weblogic set the Forwarding Policy to be Partitioned. This causes a message sent to a partitioned distributed topic to be sent to a single physical member. In addition, the message will not be forwarded to other members of the cluster if there are no consumers in the current physical member.

If we want a HA solution, then the listeners will need to connect to each and every physical member across the cluster. Consider the following figure, on Consumer Machine 1, there are two consumers of Type A and two Consumers of Type B, the same applies to the Consumer Machine 2. It is important to note that consumers of a type or application connect to both the physical members of the cluster. Doing so ensures that in the event a consumer machine dies unexpectedly, the other consumer machine can still continue to function ensuring availability:


The above can be achieved using Spring Framework's Message Listener Container with some wrapper code as shown below where the PartitionedTopicContainer is a container of containers connecting to each physical member of the topic with the same client ID and subscription name:
public class PartitionedTopicContainer {
  private final List<DefaultMessageListenerContainer> containers;

  public PartitionedTopicContainer(String clientID, String subscriptionName, ConnectionFactory connectionFactory, Destination ...physicalTopicMembers) {
    this.containers = Lists.newArrayList();

    for (Destination physicalMember : physicalTopicMembers) {
      DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();

      container.setConnectionFactory(connectionFactory);
      container.setDestination(physicalMember);
      container.setClientId(clientID);
      container.setDurableSubscriptionName(subscriptionName);
    }
  }

  public void setMessageListener(Object listener) {
    for (DefaultMessageListenerContainer container : containers) {
      container.setMessageListener(listener);
    }
  }

  public void start() {
    for (DefaultMessageListenerContainer container : containers) {
      container.start();
    }
  }

  public void shutdown() {
    for (DefaultMessageListenerContainer container : containers) {
      container.shutdown();
    }
  }
}
The above container could then be used as follows:
// Obtain each physical member of the partitioned distributed topic
Destination physicalMember1 = (Destination) context.lookup("Server1@orderTopic");
Destination physicalMember2 = (Destination) context.lookup("Server2@orderTopic");

// Container for the Order Processor
PartitionedTopicContainer subscriber1 = new PartitionedTopicContainer("orderConnectionId", "orderProcSubscription", connectionFactory, physicalMember1, physicalMember2);
subscriber1.setMessageListener(new SessionAwareMessageListener<TextMessage>() {
  public void onMessage(...) {
    System.out.println(Thread.currentThread().getId() + " of subscriber order processor got a message...");
    ...
  }
});
// Container for the Auditor
PartitionedTopicContainer subscriber2 = new PartitionedTopicContainer("auditorConnectionId", "auditorSubscription", connectionFactory, physicalMember1, physicalMember2);
subscriber2.setMessageListener(new SessionAwareMessageListener<TextMessage>() {
  public void onMessage(...) {
    System.out.println(Thread.currentThread().getId() + " of subscriber auditor got a message...");
    ...
  }
});

subscriber1.start();
subscriber2.start();


The parts of the code above where a consumer of the API has to look up each and every physical member and provide the same to  the container is a lot of boiler plate and does not account well for cases when a physical member becomes available/unavailable. Luckily, WebLogic provides the JmsDestinationAvailabilityHelper API which is a way to listen to events relating to physical  member availability and unavailability. The PartitionedTopicContainer shown above could easily be augmented with the availability helper API and get notified of physical destination availability and unavailability to correspondingly start and stop the internal container to the physical destination. Psuedo-code of how this can be achieved with the above container is shown below:

public class PartitionedTopicContainer implements DestinationAvailabilityListener {
  private final String partDistTopicJndi;

  private final ConnectionFactory connectionFactory;
   
  @GuardedBy("containerLock")
  private final Map<String, DefaultMessageListenerContainer> containerMap;

  private final Object containerLock = new Object();

  // WebLogic Handle
  private RegistrationHandle registrationHandle;

  private final CountdownLatch startLatch = new CountdownLatch(1);

  public PartitionedTopicContainer(String clientID, String subscriptionName, String clusterUrl,
    ConnectionFactory connectionFactory, String partDistTopicJndi) {
    this.clusterUrl = clusterUrl;
    this.clientID = clientID;
    this.subscriptionName = subscriptionName;
    this.partDistTopicJndi = partDistTopicJndi;
    this.containerMap = Maps.newHashMap();
    this.connectionFactory = connectionFactory;
  }

  public void start() throws InterruptedException {
    Hashtable<String, String> jndiProperties = ...;
    jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
    jndiProperties.put(Context.PROVIDER_URL, clusterUrl);
    
    JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
   
    // Register this container as a listener for destination events
    registrationHandle = dah.register(jndiProperties, partDistTopicJndi, this);
   
    // Wait for listener notification to start container
    startLatch.await();
  }

  @Override
  public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
    synchronized (containerLock) {
      // For all Physical destinations, start a container
      for (DestinationDetail detail : physicalAvailableMembers) {
        Destination physicalMember = lookupPhysicalTopic(detail.getJNDIName());
        DefaultMessageListener container = new DefaultMessageListenerContainer();
        
        container.setConnectionFactory(connectionFactory);
        container.setDestination(physicalMember);
        container.setClientId(clientID);
        container.setDurableSubscriptionName(subscriptionName);
        System.out.println("Starting Container for physical Destination:" + detail);
        container.start();
        containerMap.put(detail.getJNDIName(), container);
      }
    }
    startLatch.countdown();       
  }
  
  @Override
  public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
    synchronized (containerLock) {
      // Shutdown all containers whose physical members are no longer available
      for (DestinationDetail detail : physicalUnavailableMembers) {
        DefaultMessageListenerContainer container = containerMap.remove(detail.getJNDIName());
        container.shutdown();
      }
    }
  }

  @Override
  public void onFailure(String destJndiName, Exception exception) {
    // Looks like a cluster wide failure
    shutdown();
  }

  public void shutdown() {
    // Unregister for events about destination availability
    registrationHandler.unregister();

    // Shut down containers
    synchronized (containerLock) {
       for (Iterator<Map.Entry<String, DefaultMessageListenerContainer>> i = containerMap.entrySet().iterator(); i
          .hasNext();) {
        Map.Entry<String, DefaultMessageListenerContainer> entry = i.next();
        System.out.println("Shutting down container for:" + entry.getKey());
        entry.getValue().shutdown();
        i.remove();
      }
    }  
  }
}   

Some of the things to remember when creating a partitioned distributed topic and shared subscription:

1. Connection Factory being used should have "Subscription Sharing Policy" set as "Shareable"
2. Forwarding policy on the Partitioned Distributed Topic should be set as "Partitioned"
3. Message forwarding will not occur, so subscribers must ensure connections exist to every physical member else messages can pile up for the subscription on that topic
4. If a server hosting a physical member is unavailable then messages from that physical topic will be unavailable until server is made available.

Partitioned Distributed Topics and Shared Subscriptions looks promising. One thing I need to sort out is how does one handle error destinations on a per subscription level with WebLogic. Any passer by with thoughts, please do shoot it my way.