MSMQ with WCF

Today I am going to show you how use Message Queuing (MSMQ) with Windows Communication Foundation (WCF). Message queue comes to show in different reasons like decoupling processes , ordering the process, handle bulk messages.

Writing an message into MSMQ is not a different, but processing the messages with effective ways is the challenge. As an example we had a problem which delay of sending emails and SMS from our side. Later when we were investigating the issue, we found that we were taking all messages and try to iterate at once.  This solution can be used to prevent kind of scenarios.

Hope you guys have installed the MSMQ before starting the development. So first create an empty solution and add two console projects. In my solution I have added three projects.

solution

 

  • Dto – contains data transfer objects
  • MSMQ – project used to write messages into message queue
  • WcfServer – read messages from message queue and process

 

Dto project contains this class.

[DataContract]
    public class User
    {
        public User()
        {
            
        }
        [DataMember(IsRequired = true)]
        public int UserId { get; set; }

        [DataMember(IsRequired = true)]
        public string FirstName { get; set; }

        [DataMember(IsRequired = false)]
        public string LastName { get; set; }

        [DataMember(IsRequired = true)]
        public string UserName { get; set; }

        [DataMember(IsRequired = true)]
        public string Email { get; set; }
    }

MSMQ project contains the code for write message into message queue

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;

namespace MSMQ
{
    class Program
    {
        
        static void Main(string[] args)
        {
            try
            {
                WriteToQueue();
            }
            catch (Exception ex)
            {
                throw ex;
            }
           
        }

        /// <summary>
        /// References : Thanks to below URLs
        /// http://www.dotnetcurry.com/wcf/1156/using-wcf-msmq-message-service-oriented-solutions
        /// 
        /// </summary>
        public static void WriteToQueue()
        {
            int i = 1;

            while (true)
            {
                Console.WriteLine("Creating new user");

                var user = new Dto.User()
                {
                    UserId = i,
                    FirstName = "Damith" + i,
                    LastName = "Wanninayake" + i,
                    Email = "damith.uwu@gmail.com",
                    UserName = "damith" + i
                };


                // Create a Message and set the body to the order object above
                var msmqPath = ConfigurationManager.AppSettings["MessageQueuePath"];
                var msg = new Message {Body = user};

                // Create a transaction
                using (var ts = new TransactionScope(TransactionScopeOption.Required))
                {
                    MessageQueue queue = null;

                    if (MessageQueue.Exists(msmqPath))
                    {
                        queue = new MessageQueue(msmqPath);
                    }
                    else
                    {
                        // Create transactional message queue
                        MessageQueue.Create(msmqPath, true);
                        queue = new MessageQueue(msmqPath);
                    }
                    queue.Send(user,"User registration",MessageQueueTransactionType.Automatic); // send the message
                    ts.Complete(); // complete the transaction
                }

                i += 1;
                Console.WriteLine("Message Sent");

                Thread.Sleep(5000);
            }
            
        }

    }
}

Here the app.config file

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <startup> 
        <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
    </startup>
  <appSettings>
    <add key="MessageQueuePath" value=".\Private$\EmailQueue"/>
      <!--<add key="MessageQueuePath" value="FormatName:Direct=TCP:127.0.0.1\private$\emailqueue"/>-->
    </appSettings>
</configuration>

Please be kind to adjust the code per your requirement. Here I want to add more and more messages into message queue quickly.

Run the project and see whether you have messages in your queue or not. Please see the below images of my queue.
write_console

queue

message

Now we have messages in out queue and need to read this messages. My requirement is when I receive a new message into queue, without looking into queue, automatically read method should be fired like a trigger.

I have a interface like below

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel;
using System.ServiceModel.MsmqIntegration;
using System.Text;
using System.Threading.Tasks;
using Dto;

namespace WcfServer.Interfaces
{
    [ServiceContract]
    [ServiceKnownType(typeof(User))]
    public interface IUserInboundMessageHandlerService
    {
        [OperationContract(IsOneWay = true, Action = "*")]
        void ProcessIncomingMessage(MsmqMessage incomingOrderMessage);
    }
}

and implemented class like below.

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel;
using System.ServiceModel.MsmqIntegration;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Dto;
using WcfServer.Interfaces;

namespace WcfServer
{
    [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Single, InstanceContextMode = InstanceContextMode.Single,
        ReleaseServiceInstanceOnTransactionComplete = false)]
    public class UserInboundMessageHandlerService : IUserInboundMessageHandlerService
    {
        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void ProcessIncomingMessage(MsmqMessage<User> incomingOrderMessage)
        {
            Console.WriteLine("New Message Recieved.. Procession Output...");
            Console.WriteLine();
            var orderRequest = incomingOrderMessage.Body;
            Console.WriteLine(orderRequest.UserId);
            Console.WriteLine(orderRequest.FirstName);
            Console.WriteLine(orderRequest.LastName);
            Console.WriteLine(orderRequest.Email);
            Console.WriteLine(orderRequest.UserName);
            Console.WriteLine();
            Console.WriteLine("Procession completed...");

            //Thread.Sleep(1000);
        }
    }
}

Above method will be triggered from the MSMQ. But we have to create a Service host to run this wcf service since we are not using the IIS server.

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel;
using System.Text;
using System.Threading.Tasks;

namespace WcfServer
{
    class Program
    {
        static void Main(string[] args)
        {
            ServiceHost host = new ServiceHost(typeof(UserInboundMessageHandlerService));
            host.Faulted += host_Faulted;

            host.Open();
            Console.WriteLine("The service is ready");
            Console.WriteLine("Press  to terminate the service");
            Console.ReadLine();

            if (host != null)
            {
                if (host.State == CommunicationState.Faulted)
                {
                    host.Abort();
                }
                host.Close();
            }
        }

        static void host_Faulted(object sender, EventArgs e)
        {
            Console.WriteLine("Faulted!"); // Change to something more sensible – this is just an example showing what happens when the host has faulted.
        }
    }
}

Let’s see the config file which has end point and other configuarations.

  <system.serviceModel>
    <behaviors>
      <endpointBehaviors>
        <behavior name="IncludeExceptionDetails">
          <callbackDebug includeExceptionDetailInFaults="true" />
        </behavior>
      </endpointBehaviors>
    </behaviors>
    <services>
      <service name="WcfServer.UserInboundMessageHandlerService">
        <endpoint address="msmq.formatname:DIRECT=OS:.\private$\EmailQueue" binding="msmqIntegrationBinding" bindingConfiguration="IncomingMessageHandlerBinding" contract="WcfServer.Interfaces.IUserInboundMessageHandlerService">
        </endpoint>
      </service>
    </services>
    <bindings>
      <msmqIntegrationBinding>
        <binding name="IncomingMessageHandlerBinding"
                 closeTimeout="00:30:00"
                 receiveTimeout="01:00:00"
                 retryCycleDelay="00:00:10"
                 receiveRetryCount="0"
                 exactlyOnce="true"
                 maxRetryCycles="3"
                 receiveErrorHandling="Move">
          <security mode="None"/>
        </binding>
      </msmqIntegrationBinding>
    </bindings>
  </system.serviceModel>

All set. We need to run the application now.

final

You can adjust the thread sleep time and check the speed of the reading process.

You can download the code from here

https://github.com/uwudamith/MSMQ

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s