Building Our Own ESB - Publish / Subscribe (Part 3)

And now the code...

So now we are ready to start coding our framework. To start with, we are going to need a few classes. Here's a class diagram of our core:


click image to enlarge

We've already talked at a high level, so we'll dive straight into the core code now. Feel free to ask questions if you have them, since I'm going to let the code do most of the talking today.

MessageDelivery: Encapsulates information about a message. In addition to the message data, we need a place to store things like the destination of the message, how many attempts we've made to deliver the message, and the maximum number of tries to deliver the message. We don't want to force everyone to stuff this information into their message classes, since this is really information that the service bus needs to track for it's own purposes.

    [Serializable]
    public class MessageDelivery 
    {
        public MessageDelivery(SubscriptionEndpoint endpoint, string action, object message)
        {
            _messageId = Guid.NewGuid().ToString();
            _endpoint = endpoint;
            _action = action;
            _message = message;
        }

        public MessageDelivery(string messageId, SubscriptionEndpoint endpoint, string action, object message, int retryCount, DateTime? timeToProcess)
        {
            _messageId = messageId;
            _endpoint = endpoint;
            _action = action;
            _message = message;
            _retryCount = retryCount;
            _timeToProcess = timeToProcess;
        }

        private readonly string _messageId;

        public string MessageId
        {
            get { return _messageId; }
        }

        private readonly SubscriptionEndpoint _endpoint;

        public SubscriptionEndpoint Endpoint
        {
            get { return _endpoint; }
        } 

        private readonly string _action;

        public string Action
        {
            get { return _action; }
        }

        private readonly object _message;

        public object Message
        {
            get { return _message; }
        } 

        private readonly int _retryCount;

        public int RetryCount
        {
            get { return _retryCount; }
        } 

        private readonly DateTime? _timeToProcess;

        public DateTime? TimeToProcess
        {
            get { return _timeToProcess; }
        }

        private readonly int _maxRetries = 10;

        public int MaxRetries
        {
            get { return _maxRetries; }
        } 


        public bool RetriesMaxed
        {
            get
            {
                return _maxRetries < _retryCount;
            }
        }

        const int RETRY_DELAY_MS = 30000;

        public MessageDelivery CreateRetry()
        {
            int retryCount = (_retryCount + 1);
            
            return new MessageDelivery(_messageId, _endpoint, _action, _message, retryCount, DateTime.Now.AddMilliseconds(RETRY_DELAY_MS * retryCount * retryCount)); 
            
        }
      
    }

MessageDeliveryQueue: Abstracts our message queueing infrastructure. We don't want to take a direct dependancy on MSMQ by coding directly against the MessageQueue class in System.Messaging, since we want each part of our bus to be fully replaceable. Note the addition of the Dequeue by id method, which we will need at a later when we want to pull specific messages out of the dead-letter queue for reprocessing.

    public interface MessageDeliveryQueue 
    {
        void Enqueue(MessageDelivery value);
        MessageDelivery Dequeue(TimeSpan timeout);
        MessageDelivery Dequeue(string id, TimeSpan timeout);
    }

Endpoint: At this point, our ESB will contain two types of endpoints, listener endpoints and subscription endpoints. Listener endpoints will be hosted by the ESB, while subscription endpoints are where our ESB will be sending it's messages.

    [Serializable]
    public abstract class Endpoint
    {
        Guid _endpointId = Guid.NewGuid();
        public Guid Id
        {
            get
            {
                return _endpointId;
            }
            set
            {
                _endpointId = value;
            }
        }


        Type _contractType;
        public Type ContractType
        {
            get
            {
                return _contractType;
            }
            set
            {
                _contractType = value;
            }
        }

        string _address;
        public string Address
        {
            get
            {
                return _address;
            }
            set
            {
                _address = value;
            }
        }

        string _configurationName;
        public string ConfigurationName
        {
            get
            {
                return _configurationName;
            }
            set
            {
                _configurationName = value;
            }
        }

        string _name;
        public string Name
        {
            get
            {
                return _name;
            }
            set
            {
                _name = value;
            }
        }
    }

Dispatcher: Once a message is ready to be delivered, we need something to send it to it's endpoint. Dispatchers abstract the dispatch process, so we aren't bound tightly to WCF.

    public abstract class Dispatcher
    {
        [ThreadStatic]
        static DispatchContext _dispatchContext;
        public static DispatchContext DispatchContext
        {
            get
            {
                return _dispatchContext;
            }
        }

        internal void DispatchInternal(MessageDelivery delivery)
        {
            _dispatchContext = new DispatchContext(delivery);
            try
            {
                Dispatch(delivery.Endpoint, delivery.Action, delivery.Message);
            }
            finally
            {
                _dispatchContext = null;
            }

        }

        protected abstract void Dispatch(SubscriptionEndpoint endpoint, string action, object message);
    }

RuntimeService: We want to keep the core of the service bus as clean as possible, so things like persistence of subscribers or hosting of specific types of listeners will be provided by RuntimeServices. The service bus will pass messages to each registered runtime service when significant events happen and the runtime service can use these events to do things like start up a WCF ServiceHost or communicate with a SQL server.

    public class RuntimeService
    {
        protected ServiceBusRuntime Runtime
        {
            get;
            private set;
        }

        internal void SetRuntime(ServiceBusRuntime runtime)
        {
            Runtime = runtime;
        }

        volatile bool _started;
        protected bool Started
        {
            get
            {
                return _started;
            }
        }
        protected internal void Start()
        {
            if (!_started)
            {
                OnStart();
                _started = true;
            }
        }
        protected internal void Stop()
        {
            if (_started)
            {
                _started = false;
                OnStop();
            }
        }

        protected virtual void OnStart()
        {
        }

        protected virtual void OnStop()
        {
        }

        /// <remarks>
        /// Warning: Unhandled exception here could be fatal. Handle exceptions in this method carefully.
        /// </remarks>
        protected virtual internal void OnUnhandledException(Exception ex, bool terminating)
        {

        }

        protected virtual internal void OnListenerAdded(ListenerEndpoint endpoint)
        {
        }

        protected virtual internal void OnListenerRemoved(ListenerEndpoint endpoint)
        {
        }

        protected virtual internal void OnSubscriptionAdded(SubscriptionEndpoint endpoint)
        {
        }

        protected virtual internal void OnSubscriptionRemoved(SubscriptionEndpoint endpoint)
        {
        }

        protected virtual internal void OnMessageDelivered(MessageDelivery delivery)
        {            
        }

        protected virtual internal void OnMessageDeliveryFailed(MessageDelivery delivery, bool permanent)
        {
        }
    }

ServiceBusRuntime: ServiceBusRuntime is the heart of our ESB. It will handle essential functions like managing worker threads, scheduling deliveries, and providing a way for us to register subscriptions and listeners. Again, we will try to keep the runtime as slim as possible, letting additional functionality be packaged as runtime services. This is a big class, so I'll point out a few significant things. First, multiple threads might be trying to access our service bus at the same time, in fact, the bus itself will host multiple threads of its own so there is a bit of code devoted to keeping things thread-safe. Most of this can be handled by simple lock statements, but when it comes to collections like the subscription list, we need to use a reader/writer lock to make sure that threads don't block each other. A reader/writer lock allows us to have multiple threads accessing an object, but only one thread at a time can be in a write mode, and it must wait till it can gain exclusive access. Additionally, we have provided some helper functions to execute code blocks safely and automatically send unhandled exceptions to the unhandled exception handler to make sure that our runtime won't terminate unexpectedly and to ensure that we don't just swallow exceptions.

    public class ServiceBusRuntime
    {
        public ServiceBusRuntime(MessageDeliveryQueue deliveryQueue, MessageDeliveryQueue retryQueue, MessageDeliveryQueue failureQueue)
        {
            _messageDeliveryQueue = deliveryQueue;
            _retryQueue = retryQueue;            
            _failureQueue = failureQueue;
        }
        
        object _startLock = new object();
        
        List<Thread> _workerThreads = new List<Thread>();
        object _workerThreadsLock = new object();
        
        public void RegisterService(RuntimeService service)
        {
            lock(_startLock)
            {
                if (_starting || _started)
                {
                    throw new InvalidOperationException("Services cannot be registered or unregistered while the bus is running.");
                }
                _runtimeServicesRWLock.EnterWriteLock();
                try
                {
                    _runtimeServices.Add(service);
                    service.SetRuntime(this);    
                }
                finally
                {
                    _runtimeServicesRWLock.ExitWriteLock();
                }
            }
        }
        
        public void UnregisterService(RuntimeService service)
        {
            lock(_startLock)
            {
                if (_starting || _started)
                {
                    throw new InvalidOperationException("Services cannot be registered or unregistered while the bus is running.");
                }

                _runtimeServicesRWLock.EnterWriteLock();
                try
                {
                    _runtimeServices.Remove(service);
                }
                finally
                {
                    _runtimeServicesRWLock.ExitWriteLock();
                }
            }
        }                
        
        public T GetRuntimeService<T>() where T : RuntimeService
        {
            _runtimeServicesRWLock.EnterReadLock();
            try
            {
                foreach(RuntimeService service in _runtimeServices)
                {
                    T s = service as T;
                    if(s != null)
                    {
                        return s;
                    }
                }
            }           
            finally
            {
                _runtimeServicesRWLock.ExitReadLock();
            }
            return null;
        }
        
        public IEnumerable<T> GetRuntimeServices<T>() where T : RuntimeService
        {
            _runtimeServicesRWLock.EnterReadLock();
            try
            {
                List<T> matching = new List<T>();
                foreach(RuntimeService service in _runtimeServices)
                {
                    T match = service as T;
                    if(match != null)
                    {
                        matching.Add(match);
                    }
                }
                return matching;
            }
            finally
            {
                _runtimeServicesRWLock.ExitReadLock();
            }            
        }
        
        List<RuntimeService> _runtimeServices = new List<RuntimeService>();
        ReaderWriterLockSlim _runtimeServicesRWLock = new ReaderWriterLockSlim();
        
        public void Start()
        {
            lock (_startLock)
            {
                _starting = true;
            
                _runtimeServicesRWLock.EnterReadLock();
                try
                {

                    int i = 0;
                    try
                    {
                        for (; i < _runtimeServices.Count; i++)
                        {
                            _runtimeServices[i].Start();
                        }
                    }
                    catch(Exception ex)
                    {
                        DoSafely(() =>
                            {
                                notifyUnhandledException(ex, true);
                            });
                        // try to stop any started services since we couldn't start all of them
                        for (int j = 0; j <= i; j++)
                        {
                            try
                            {
                                _runtimeServices[j].Stop();
                            }
                            catch(Exception ex2)
                            {
                                DoSafely(() =>
                                {
                                    notifyUnhandledException(ex2, true);
                                });
                            }
                        }
                        throw;
                    }
                    addWorker(deliveryWorker, "Delivery worker {0}");
                    addWorker(retryWorker, "Retry worker {0}");

                    InvokeSafely(Started, this, EventArgs.Empty);

                    _started = true;
                }
                catch
                {
                    _runtimeServicesRWLock.ExitReadLock();
                }
            
                finally
                {
                    _starting = false;
                }             
            }
        }
        
        void addWorker(ParameterizedThreadStart start, string name)
        {
            lock(_workerThreadsLock)
            {
                Thread thread = new Thread(start);            
                thread.IsBackground = true;
                int threadIndex = _workerThreads.Count - 1;
                if (name != null)
                {
                    thread.Name = String.Format(name, threadIndex);
                }
                _workerThreads.Add(thread);
                _stopWaitHandles.Add(new AutoResetEvent(false));
                thread.Start(threadIndex);
            }
        }
        void deliveryWorker(object param)
        {
            int threadIndex = (int)param;
            while(true)
            {
                DoSafely(() =>
                {
                    using (TransactionScope ts = new TransactionScope())
                    {
                        DeliverOne(_messageDeliveryQueue, _retryQueue);
                       
                        ts.Complete();
                    }
                });

                if (_stopping)
                {
                    _stopWaitHandles[threadIndex].Set();
                    break;
                }

            }
        }
        
        void retryWorker(object param)
        {
            int threadIndex = (int)param;
            while(true)
            {
                DoSafely(() =>
                {
                    using (TransactionScope ts = new TransactionScope())
                    {
                        DeliverOne(_retryQueue, _retryQueue);
                        ts.Complete();
                    }
                });
                if (_stopping)
                {
                    _stopWaitHandles[threadIndex].Set();
                    break;
                }
                Thread.Sleep(RETRY_SLEEP_MS); 
            }
        }
                
        public bool Stop()
        {
            bool clean = true;
            lock(_startLock)
            {        
                _stopping = true;

                _runtimeServicesRWLock.EnterReadLock();
                try
                {
                    clean = ForEachSafely(_runtimeServices, service =>
                    {
                        service.Stop();
                    });
                }
                finally
                {
                    _runtimeServicesRWLock.ExitReadLock();
                }

                lock(_workerThreadsLock)
                {
                    for(int i = 0; i < _stopWaitHandles.Count; i++)
                    {
                        _stopWaitHandles[i].WaitOne();
                    }
                    _workerThreads.Clear();
                    _stopWaitHandles.Clear();
                }

                _started = false;
            }

            InvokeSafely(Stopped, this, EventArgs.Empty);                
            
            return clean;
        }

        volatile bool _starting;
        volatile bool _started;
        volatile bool _stopping = false;

        List<AutoResetEvent> _stopWaitHandles = new List<AutoResetEvent>();        
         
        public event EventHandler Started;
        public event EventHandler Stopped;        

        public event EventHandler<EndpointEventArgs> Subscribed;
        public event EventHandler<EndpointEventArgs> Unsubscribed;
        
        public event EventHandler<EndpointEventArgs> ListenerAdded;
        public event EventHandler<EndpointEventArgs> ListenerRemoved;
        
        List<Endpoint> _listenerEndpoints = new List<Endpoint>();
        object _listenerEndpointsLock = new object();

        public IEnumerable<Endpoint> ListeningEndpoints
        {
            get
            {
                return _listenerEndpoints.ToArray();
            }
        }
        ReaderWriterLockSlim _subscriptionsRWLock = new ReaderWriterLockSlim();
        List<SubscriptionEndpoint> _subscriptions = new List<SubscriptionEndpoint>();
                        
        public void AddListener(ListenerEndpoint endpoint)
        {
            bool added = false;
            try
            {
                using (TransactionScope ts = new TransactionScope())
                {
                    lock (_listenerEndpointsLock)
                    {
                        _listenerEndpoints.Add(endpoint);
                        added = true;
                    }
                    
                    _runtimeServicesRWLock.EnterReadLock();
                    try
                    {
                        foreach (RuntimeService service in _runtimeServices)
                        {
                            service.OnListenerAdded(endpoint);
                        }
                    }
                    finally
                    {
                        _runtimeServicesRWLock.ExitReadLock();
                    }

                    EventHandler<EndpointEventArgs> listenEvent = ListenerAdded;
                    if (listenEvent != null) listenEvent(this, new EndpointEventArgs(endpoint));

                    ts.Complete();
                }
            }
            catch
            {
                if (added)
                {
                    // remove on failure
                    lock (_listenerEndpointsLock)