Building Our Own ESB - Publish / Subscribe (Part 4)
Time to Rock and Roll
It's been a few weeks since the last part of the ESB series, but we can't build an decent ESB overnight now can we? We left off with a draft of the basic framework. We have a few minor refactorings that are worth mentioning before we continue:
Listeners are now a first class citizen of our bus, rather than being spun up transparently by a runtime service.
Subscription and listener endpoints will now take object references to dispatchers and listeners instead of Types. This is a subtle change, but makes it much easier to configure various properties on listeners and dispatchers.
Message delivery will now take advantage of a ThreadPool for it's work. A thread pool will allow much higher throughput than dedicated worker threads.
A few classes like dispatchers, listeners, and the bus itself are marked as Disposable to ensure proper cleanup.
We're starting to fill out our class diagram a bit more:
Now, let's pick up where we left off...
Threading Support
We've added a few utility classes to our framework. Here's a quick rundown (click the class name if you want to see the source):
CountdownLatch - Allows us to signal an event when work on multiple threads has completed.
ReaderWriterLockedObject - It's easy to forget to acquire locks to shared resources. ReaderWriterLockedObject enforces that appropriate locks are acquired by eliminating our ability to directly access shared state.
ReadOnlyDictionary - Immutable counterpart of the Dictionary class (used to associate context information with a MessageDelivery).
MSMQ Message Delivery Queue
A key component of our service bus is the message delivery queue. MSMQ is readily available on Windows Servers, so we'll start by implementing our queue interface for MSMQ. In addition to the queue interface, we'll provide a message formatter that uses the DataContractSerializer instead of the BinaryFormatter. Though our interface will allow us to pick which formatter our queue uses, using a DataContract based formatter will ensure that messages stored in our queue version the same as incoming messages, avoiding subtle versioning issues that could occur due to message versioning.
Adding WCF Support
First, let's take a look at adding the ability for our bus to receive WCF messages. Hosting a service using WCF only takes a few lines of code:
ServiceHost host = new ServiceHost(implementationClass);
host.Open();
We need to host a service and then forward the messages to our bus. Supposing we had a simple service contract like so:
public interface IPublish
{
void Publish(object message);
}
We could write a simple implementation that does this:
public class PublishListener : IPublish
{
ServiceBusRuntime Runtime;
void Publish(object message)
{
Runtime.Publish(typeof(IPublish), "Publish", message)
}
}
The drawback of a generic publish interface is that it tightly couples consumers to our ESB implementation and it isn't user friendly at all (imagine if all the classes elsewhere in your code could only have a single method that accepted an object parameter). Ideally, we want our bus to be able to expose meaningful WCF service contracts to subscribers to consume. Since the code contained in each method will be exactly the same in our listener implementations, we can dynamically emit these listener implementations given an interface and host our dynamically generated listener class. Here's some code that does exactly that:
Note that our listener base class has a Publish method that adds some security context to the message before publishing it. We need to be able to capture caller security credentials if we want to be able to enforce security on subscription endpoints. We also have provided a custom ServiceHost implementation. The reason for the custom ServiceHost instead of using the standard ServiceHost is so that we can define configuration templates in our configuration file and repurpose them for multiple listeners instead of having to predefine every scenario inside our application's configuration file (note that we could also pull configuration information from a database or some other source instead of pulling it from the configuration files...but for today we'll stick with standard config files).
Now that we can dynamically generate interfaces, our WcfListener implementation code is drop dead simple:
There still may be cases where someone wants to provide their own implementation class or service host instead of using ours, so we've left a few of the methods in our class virtual for those folks who like messing with the guts (of course, they could always define their own Listener class instead of deriving from WcfListener, but our WcfListener class should make most sense for just about everyone).
Now, for WCF subscriptions...
If you recall, Dispatchers take an incoming publish request and deliver it to their endpoint (you may note the revised method signature from some refactoring, but the idea stays the same). WCF has built-in support for dynamic proxy generation, which we'll take advantage of with our dispatcher implementation. At a high level, all we are doing is telling WCF to give us a proxy class and then dynamically invoking the method on the proxy class that our delivery should be dispatched to. WCF will take care of all the messy communication details for us.
Now, our ESB has everything it needs to dynamically host WCF endpoints and dispatch messages to other WCF endpoints. We haven't covered a lot of the new classes in our class diagram, but we'll continue with them in the next post. Until then, feel free to check out the code over on GitHub at:
http://github.com/jezell/iserviceoriented/tree/master/