RabbitMQ Integration using C# .Net 8 - Message producer
RabbitMq producer integration using c# .Net 8 (Worker service)
NuGet Package to install:
NuGet\Install-Package MassTransit -Version 8.1.3
MQ Helper (Sending Message):
namespace Util.Helper
{
using MassTransit;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;
using System.Threading.Tasks;
public class MQHelper
{
private readonly string _rootURI;
public MQHelper()
{
var configurationBuilder = new ConfigurationBuilder();
var path = Path.Combine("./", "appsettings.json");
configurationBuilder.AddJsonFile(path, false);
var root = configurationBuilder.Build();
var appSettings = root.GetSection("MQSettings");
{
_rootURI = appSettings.GetSection("RootURI").Value;
};
}
public async Task SendMessage(string messageJsonString, string queueName)
{
var result = false;
try
{
var uri = new Uri($"{_rootURI}/{queueName}");
var endPoint = await _bus.GetSendEndpoint(uri);
await endPoint.Send(new MQData { Data = messageJsonString });
result = true;
}
catch (Exception ex)
{
}
return result;
}
}
public class MQData
{
public string Data { get; set; }
}
}
MQ Helper ( Sending Message using Exchange and Routine Key ):
namespace Util.Helper
{
using MassTransit;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;
using System.Threading.Tasks;
public class MQHelper
{
private readonly string _rootURI;
public MQHelper()
{
var configurationBuilder = new ConfigurationBuilder();
var path = Path.Combine("./", "appsettings.json");
configurationBuilder.AddJsonFile(path, false);
var root = configurationBuilder.Build();
var appSettings = root.GetSection("MQSettings");
{
_rootURI = appSettings.GetSection("RootURI").Value;
};
}
//// send message using exchange and routing key
public bool SendMessage(string messageJsonString, string queueName, out string errMsg)
{
var result = false;
errMsg = string.Empty;
try
{
ConnectionFactory factory = new()
{
UserName = _userName,
Password = _password,
VirtualHost = _virtualHost,
HostName = _hostName,
Port = AmqpTcpEndpoint.UseDefaultPort,
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var body = Encoding.UTF8.GetBytes(messageJsonString);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: properties,
body: body);
result = true;
}
catch (Exception ex)
{
errMsg = ex.Message;
}
return result;
}
}
public class MQData
{
public string Data { get; set; }
}
}
appsettings.json (AppSettings configuration)
"MQSettings": {
"UserName": "username",
"Password": "password",
"VirtualHost": "/",
"HostName": "mq ip",
"RootURI": "rabbitmq://mq ip"
},
Calling Helper API:
private readonly MQHelper _mqHelper;
var data="123456"; //// Can be class object (dynamic) object
_ = await _mqHelper.SendMessage(data, "My Queue Name");
--- All the best ---