ekrem özer

her yerde olan şeyler.

.Net Core RabbitMQ Kullanımı

Merhaba arkadaşlar bu makalemde .Net Core üzerinde RabbitMQ kullanımını anlatacağım. RabbitMQ bir mesaj kuyruğu sistemidir. Kafka gibi çeşitli alternatifleri de vardır. Response süresi uzun olan işlemlerde
son kullanıcıyı bekletmemek için işlemi kuyruğa atıp kullanıcıyı bekletmemek için kullanılır, tabi bu işlemlerde kullanıcının o yanıta ihtiyacı olmadığını varsayarsak. Örneğin bir eticaret sitesinde müşteri
alışverişi tamamladıktan sonra kullanıcıya "siparişiniz alınmıştır" mesajı verdikten sonra e-postasına pdf formatında fatura gönderme işlemi için kullanılabilir. Sipariş tamamlanır kuyruğa fatura gönderme işlemi eklenir
ve kuyruk dinlenip gerekli mesaj alınınca fatura oluşturulup müşteriye mail atılır.

RabbitMQ çalışma prensipleri ve makale içinde kullanacağımız bazı terimlerin açıklamarı;

  1. Producer: Yayıncı. Mesajı gönderen uygulamadır. Publisher olarakta tanımlanabilir. Yukarıdaki örnekte fatura oluştur komutunu veren uygulamadır.
  2. Exchange: Producer’dan gelen mesajları belli bir kurala göre(Exchange Types) Queue’ye yönlendiren yapıdır Mesajları yönlendiren rooting mekanizması gibi düşünebiliriz.
  3. Queue : Kuyruk. Kuyruğa gönderilen mesajlar öncelikle burada tutulur. Alıcı mesajı okuyana kadar mesajlar burada saklanır.
  4. Consumer: Dinleyici. Kuyruğa yayıncı tarafından gönderilen mesajları kuyruk belli bir sıralamaya göre Consumer'a gönderir. İşlemi yapan uygulamadır. Subscribe olarakta tanımlanabilir.
  5. Fifo: First in first out. Kuyruğun çalışma mantığı ilk kuyruğa giren mesaj ilk çıkar.

Yukarıda bahsettiğim gibi sipariş işlemi tamamlandıktan sonra kuyruğa mesaj gönderen ve bu mesajo dinleyip faturayı müşterinin e-posta adresine gönderen bir uygulama örneği yapacağım.

RabbitMQ Kurulumu

Makalemde RabbitMQ'nun ücretsiz cloud servisini anlatarak ilerleyeceğim. Öncelikle cloudamqp.com adresine girip üye olalım. Ben github hesabımla login oldum. İlk kez giriş yapıyorsanız
sizden bir takım ismi isteyecek onu doldurduktan sonra Create New Instance diyerek uygulamamız için instance ayağa kaldırıyoruz.

Sonraki aşamada bizden instance için isin ve plan istiyor, free'yi seçip devam ediyoruz.

Sonraki aşamada da sunucumuzun olacağı bölgeyi seçiyoruz.

Son aşamada ise bize seçimlerimizi hatırlatıp onay vermemizi istiyor. Create instance diyip oluşturuyoruz.

Artık instance'ım ayakta ve kullanıma hazır.

Şimdi konuyu daha detaylı inlemeden önce küçük bir hello world uygulması yapalım. NetCoreRabbitMQ adında bir solution oluşturuyorum ve içine NetCoreRabbitMQ.Producer ve NetCoreRabbitMQ.Consumer adında iki tane console uygulaması ekliyorum.

İki projemede RabbitMQ için gerekli olan nuget'ı ekliyorum.

RabbitMQ.Client

Sonrada https://customer.cloudamqp.com/instance adresine gidip makalenin başında oluşturduğun Invoice instance'ının detayına girip RabbitMQ Cloud ile iletişim kurmam için gerekli olan AMQP URL'i alıyorum.

Artık Producer uygulamamızın kodlarını yazabiliriz.

using RabbitMQ.Client;

namespace NetCoreRabbitMQ.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            var connectionFactory = new ConnectionFactory
            {
                Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
            };

            using (var connection = connectionFactory.CreateConnection())
            {
                var channel = connection.CreateModel();
 
                channel.QueueDeclare(queue: "hello-world", durable: true, exclusive: false, autoDelete: false);
				
                Enumerable.Range(0, 60).ToList().ForEach(x =>
                {
                    var message = $"ekremozer.com | mesaj: {x}";
                    var messageBytes = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: string.Empty, routingKey: "hello-world", basicProperties: null, body: messageBytes);
                    Console.WriteLine(message);
                });
            }
            Console.ReadLine();
        }
    }
}

Kodlarımızı inceleyecek olursak;

var connectionFactory = new ConnectionFactory
{
	Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
};

Bu kısımda RabbitMQ ile iletişim kurmamızı sağlayacak connection nesnemizi oluşturduk.

using (var connection = connectionFactory.CreateConnection())
{
	var channel = connection.CreateModel();
    
    channel.QueueDeclare(queue: "hello-world", durable: true, exclusive: false, autoDelete: false);
	
	Enumerable.Range(0, 60).ToList().ForEach(x =>
	{
		var message = $"ekremozer.com | mesaj: {x}";
		var messageBytes = Encoding.UTF8.GetBytes(message);
		channel.BasicPublish(exchange: string.Empty, routingKey: "hello-world", basicProperties: null, body: messageBytes);
		Console.WriteLine(message);
	});
}

Bu kısımda RabbitMQ ile iletişim kurmamızı sağlayacak connection nesnemizi oluşturduk. Sonra CreateModel() kuyruklarımız için bir kanal oluşturduk. channel.QueueDeclare() metoduylada kanalda bir kuyruk oluşturduk. Enumerable.Range(0, 60) ile 0'dan 60'a kadar bir döngü oluşturdum. string bir mesaj üretip channel'in body'sinde mesajımı göndermek için mesajımı byteArray'e çevirdim. channel.BasicPublish() metoduylada mesajımı kuyruğa gönderdim.

QueueDeclare() ve BasicPublish() metodlarının altığı paremetleri inceleyelim.

channel.QueueDeclare(queue: "hello-world", durable: true, exclusive: false, autoDelete: false);

queue: Kuyruk adı.

durable: Mesajın nasıl saklanacağı. true olursa mesajları fiziksel olarak sunucuda tutar, false olursa memory'de tutar. Memoryde tutulan mesajlar sunucu bir şekilde down olduğunda kaybolur.

exclusive: Kuyruğa hangi kanallardan ulaşılabileceği. true olursa sadece burada oluşturduğumuz kanal ulaşabilir. false olursa diğer kanallarda bu kuyruğa erişebilir. Gerçek hayat uygulamalarında genelde false yapılır. Producer'lar ve Consumer ayrı yerlerde olabilir.

autoDelete: Kuyruğun silinme durumu. true olursa Kuyruğu son dinleyen consumer'da bağlantısını kopartırsa kuyruk otomatik olarak silinir. false olursa kuyruk ayakta kalır.

channel.BasicPublish(exchange: string.Empty, routingKey: "hello-world", basicProperties: null, body: messageBytes);

exchange: Exchange adı, boş bıraklırsa default exchange olur.

routingKey: exchange'i boş bırakığımız için routingKey'e kuyruğun ismini yazmamız gerekiyor. Exchange kullandığımız örneklerde konuya değineceğiz.

basicProperties: kuyruğun propertileri, default olarak null bırakılabilir. 

body: byte[] türünde mesaj gövdemiz.

Producer uygulamamız tamam, şimdi de Consumer uygulamamızı ayağa kaldıralım.

using RabbitMQ.Client.Events;

namespace NetCoreRabbitMQ.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
            };

            using var connection = factory.CreateConnection();
            var channel = connection.CreateModel();

            channel.QueueDeclare(queue: "hello-world", durable: true, exclusive: false, autoDelete: false);

            channel.BasicQos(prefetchSize: 0, prefetchCount: 5, global: false);

            var consumer = new EventingBasicConsumer(channel);

            channel.BasicConsume(queue: "hello-world", autoAck: false, consumer: consumer);

            consumer.Received += (sender, e) =>
            {
                var message = Encoding.UTF8.GetString(e.Body.ToArray());

                Console.WriteLine(message);

                channel.BasicAck(e.DeliveryTag, multiple: false);
            };
            Console.ReadLine();
        }
    }
}

Buradaki kodlarda channel.QueueDeclare() metoduna kadar ki olan kısım aynı, o yüzden tekrar anlatmıyorum. Burada bilmeniz gereken durum channel.QueueDeclare() metodu kuyruk yoksa oluşturur varsa hiç bir şey yapmaz. Ama aynı kuyruk için gönderilen parametreler aynı olmak zorunda Producer den faklı bir parametre ile kuyruk oluşturmayı deneseydik exception alırdık. Veya kuyruk oluşturmayıp olmayan bir kuyruğu dinlemeyi denediğimizde yine exception alırız.

Şimdi kodlarımızı inceleyelim;

 channel.BasicQos(prefetchSize: 0, prefetchCount: 5, global: false);

BasicQos metodu kuyruğa mesajların nasıl  geleceğini yönetir.

prefetchSize: mesaj boyutunu tanımlarız, 0 olursa tüm mesajlar gelebilir.

prefetchCount: Tek seferde her bir Consumer'a kaç mesaj göndereceği tanımlıyoruz.

global: true olursa prefetchCount'u Consumer'lara dağıtır, false olursa her bir Consumer'a prefetchCount kadar mesaj gönderir. örneğin prefetchCount'umuz 10 olsun ve kuyruğu dinleyen iki tane consumer'ımız olsun. global değeriimiz true olursa consumer'lara 5'er adet mesaj gönderecek, false olursa 10'ar adet mesaj gönderecek.

var consumer = new EventingBasicConsumer(channel);

channel.BasicConsume(queue: "hello-world", autoAck: false, consumer: consumer);

Burada EventingBasicConsumer türünden consumer'ımızı tanımlıyoruz, EventingBasicConsumer classının ctor metodunda hangi kanalden türeteceğimizi belirliyoruz. Sonra BasicConsume() metoduyla kanala consumer'ı ekliyoruzç

autoAck: Bu parametre kuyruktan mesajın silinip silinmeyeceğine karar verir. true olursa mesaj okunur okunmaz kuyruktan siler. false olursa mesajı silmek için bizden komut bekler.

consumer.Received += (sender, e) =>
{
	var message = Encoding.UTF8.GetString(e.Body.ToArray());

	Console.WriteLine(message);

	channel.BasicAck(e.DeliveryTag, multiple: false);
};

Burada consumer'ın Received eventini tanımlıyoruz. Yani consumer'a her mesaj geldiğinde tetiklenecek metod. İşlemleri bu kod bloğunun içinde yapabilriz veya event için aynı parametreleri alan başka bir metod tanımlayabilriz.

channel.BasicAck(e.DeliveryTag, multiple: false);

autoAck parametlerisini false verdiğimiz için burada mesajı işledikten sonra kuyruktan siliyoruz. true yapsaydık buna gerek kalmayacaktı.

multiple: true işlenmiş ama rabbitmq ya gitmemiş tüm işlemleri rabbitmqya bildirir, false sadece bu işlemin durumunu bildirir.

İki uygulamamızda hazır, şimdi Producer'i çalıştırıp kuyruğa mesaj gönderelim ve RabbitMQ Manager'dan mesajlarımızı kontrol ederlim.

hello-word kuyruğumda 60 adet bekleyen mesaj görünüyor. Şimdi Consumer'ı çalıştırıp mesajlarımızı dinleyelim.

Bekleyen mesajlarımız sayısı 0'a düşmüş. Consumer'ı uygulamanızı kuyruktaki yoğunluğa göre birden fazla çağırabilirsiniz. Hepsi tek tek kuyruğu dinleyip gelen mesajı işleyecektir. Temel seviyede RabbitMQ yapısını inceledikten sonra şimdi biraz detaylarına girelim.

Exchange tipleri

RabbitMQ'da 4 çeşit exchange tipi vardır.

  1. Fanout Exchange
  2. Direct Exchange
  3. Topic Exchange
  4. Header Exchange

Bu Exchange tiplerini tek tek inceleyelim.

Fanout Exchange

Mesajı ilgili exchange bağlı tüm kuyruklara iletilen exchange tipidir. Aynı mesajı birden fazla kuyruğa gönderme ihtiyacı duyduğumuzda bu exchange tipini kullanabiliriz. Örnek olarak bu exchange tipinde döviz kurlarını dönen bir consumer'ımız olduğunda mesajı tüm kuyruklara aynı anda iletecektir. Eğer exchange'e bağlı bir kuyruk yoksa mesajlar havada kalacaktır. Şimdi nasıl kodlarız ona bakalım. Projeme RabbitMqExchangeTypes adında bir class oluşturup örnek kodları onun üzerinde yapacağım.

Producer Kodları

using RabbitMQ.Client;

namespace NetCoreRabbitMQ.Producer.RabbitMQ
{
    public class RabbitMqExchangeTypes
    {
        public void FanoutExchange()
        {
            var connectionFactory = new ConnectionFactory
            {
                Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
            };

            using var connection = connectionFactory.CreateConnection();
            var channel = connection.CreateModel();
            
            channel.ExchangeDeclare(exchange: "FanoutExchange-example", type: ExchangeType.Fanout, durable: true);

            Enumerable.Range(0, 60).ToList().ForEach(x =>
            {
                var message = $"ekremozer.com | FanoutExchange: {x}";
                var messageBytes = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "FanoutExchange-example", routingKey: string.Empty, basicProperties: null, body: messageBytes);
                Console.WriteLine(message);
            });
           Console.ReadLine();
        }
    }
}

Burada ilk örneğimizden farklı olarak 17. satırda bir exchange oluşturduk ve tipini ExchangeType.Fanout olarak belirledik.  23. satırda ise bu sefer exchange'imizin adını verip routeKeyde kuyruk adı kısmını boş bıraktık. Kuyruğu consumer tarafında oluşturacağımız için burada bir kuyruk declare etmedik.

Şimdi program.cs de bu metodumu çağırıyorum. Diğer yazdığım kodları aynı classta farklı bir metoda taşıdım.

using RabbitMQ.Client;

namespace NetCoreRabbitMQ.Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            RabbitMqExchangeTypes.FanoutExchange();
        }
    }
}

Programı çalıştırıp RabbitMQ panelimi kontrol ettiğimde exchange'imizin oluştuğunu görüyorum.

Consumer Kodları

using RabbitMQ.Client.Events;

namespace NetCoreRabbitMQ.Consumer.RabbitMQ
{
    public static class RabbitMqExchangeTypes
    {
        public static void FanoutExchange()
        {
            var factory = new ConnectionFactory
            {
                Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
            };

            using var connection = factory.CreateConnection();
            var channel = connection.CreateModel();

            var randomQueueName = channel.QueueDeclare().QueueName;

            channel.QueueBind(queue: randomQueueName, exchange: "FanoutExchange-example", routingKey: null);

            channel.BasicQos(prefetchSize: 0, prefetchCount: 5, global: false);

            var consumer = new EventingBasicConsumer(channel);

            channel.BasicConsume(queue: randomQueueName, autoAck: false, consumer: consumer);

            consumer.Received += (sender, e) =>
            {
                var message = Encoding.UTF8.GetString(e.Body.ToArray());

                Console.WriteLine(message);

                channel.BasicAck(e.DeliveryTag, multiple: false);
            };
            Console.ReadLine();
        }
    }
}

Burada da ilk örnekten faklı olarak 17. satırda kuyruk ismini random ürettik, çünkü exchange'imize kaç tane kuyruk bağlanacak bilmediğimiz için her consumer kendi kuyruğunu oluşturup sadece orayı dinlemesini sağlamak için QueueDeclare() metodundan faydalanarak random bir kuyruk ismi ürettik. 19. satırda QueueBind() metodunu kullandık, bir önceki örnekte QueueDeclare() metodunu kullanmıştık.QueueBind metodund Consumerların işi bitince veya uygulama kapanınca kuyruklar exchance'den otomatik olarak silinir. 

Consumer'ı çalıştırdığımda RabbitMQ üzerinden exchance'imi kontrol ettiğimde exchange'i dinleyen kuyrukları görebiliyorum. Producer her mesaj gönderdiğinde buradaki kuyruklara ayrı ayrı aynı anda mesaj iletilecektir.

Direct Exchange

Producer'den gelen mesajları route bilgisine göre ilgili kuyruğa gönderen exchange yapısıdır. Örnek olarak bir sistemde kullanıcı rollerine göre mesaj göndermeniz gerekiyor, Admin ve Moderatörler için ayrı ayrı route'lar tanımlayıp o mesajları ilgili
kuyruğun dinlemesini sağlayabilirsiniz. Bu kurguya göre küçük bir örnekle konuyu daha iyi anlayalım.

Producer Kodları

public static void DirectExchange()
{
	var connectionFactory = new ConnectionFactory
	{
		Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
	};

	using var connection = connectionFactory.CreateConnection();
	var channel = connection.CreateModel();

	channel.ExchangeDeclare(exchange: "DirectExchange-example", type: ExchangeType.Direct, durable: true);

	channel.QueueDeclare(queue: "direct-queue-admin", durable: true, exclusive: false, autoDelete: false);
	channel.QueueBind(queue: "direct-queue-admin", exchange: "DirectExchange-example", routingKey: "direct-route-admin");

	channel.QueueDeclare(queue: "direct-queue-moderator", durable: true, exclusive: false, autoDelete: false);
	channel.QueueBind(queue: "direct-queue-moderator", exchange: "DirectExchange-example", routingKey: "direct-route-moderator");

	Enumerable.Range(0, 15).ToList().ForEach(x =>
	{
		var message = $"ekremozer.com | DirectExchange Admin: {x}";
		var messageBytes = Encoding.UTF8.GetBytes(message);
		channel.BasicPublish(exchange: "DirectExchange-example", routingKey: "direct-route-admin", basicProperties: null, body: messageBytes);
		Console.WriteLine(message);
	});

	Enumerable.Range(0, 15).ToList().ForEach(x =>
	{
		var message = $"ekremozer.com | DirectExchange Moderator: {x}";
		var messageBytes = Encoding.UTF8.GetBytes(message);
		channel.BasicPublish(exchange: "DirectExchange-example", routingKey: "direct-route-moderator", basicProperties: null, body: messageBytes);
		Console.WriteLine(message);
	});

	Console.ReadLine();
}

Burada DirectExchange-example adında bir exchange oluşturup bu exchange'e e bağlı iki tane kuyruk bind ettim. Burada dikkat etmemiz gereken nokta kuyruk isimleri ile birlikte routingKey değeride unique olmalıdır. İlgili mesajın hangi route'a gideceğine burada karar veriyoruz. Admin ve Moderator için ayrı ayrı routingler oluşturdukdan sonra iki tane döngüyle mesajlarımı kuyruğa gönderiyorum. Program.cs den metodumu çağırıp uygulamayı çalıştırıyorum.

DirectExchange-example'ın rabbitmq tarafında oluştuğunu görüyorum, şimdi exchange'in içine girip kuyruklarımı kontrol ediyorum.

Oluşturduğum 2 adet kuyrukta burada. Bu kuyrukları consumer tarafında oluşturup ilgili exchange'e routingKey'imizle bağlanabilirdik, ben örnekte bu şekilde ilerledim. Tercihiniz senaryoya göre değişebilir.

Consumer Kodları

public static void DirectExchange()
{
	var factory = new ConnectionFactory
	{
		Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
	};

	using var connection = factory.CreateConnection();
	var channel = connection.CreateModel();

	channel.BasicQos(prefetchSize: 0, prefetchCount: 5, global: false);
	var consumer = new EventingBasicConsumer(channel);

	channel.BasicConsume(queue: "direct-queue-admin", autoAck: false, consumer: consumer);

	consumer.Received += (sender, e) =>
	{
		var message = Encoding.UTF8.GetString(e.Body.ToArray());

		Console.WriteLine(message);

		channel.BasicAck(e.DeliveryTag, multiple: false);
	};
	Console.ReadLine();
}

Burada da kuyruğa producer tarafında admin için oluşturuğum direct-queue-admin kuyruk ismi ile bağlanıyorum, ve uyguamamı çalıştıyorum.

Görüldüğü gibi admin için oluşturduğum mesajlar kuyruktan okundu, burada işlem hacmi çok olsaydı eğer aynı uygulamadan birden fazla ayağa kaldırabilirdim ve her bir uygulamaya sırasıyla mesaj giderdi. Fanout exchange'de ki gibi her kuyruğa aynı mesaklar değil farkı mesajlar gönderildi.

Topic Exchange

Producerin gönderdiği mesajların routeKey'inde kullanacaığımız ifadeleri . (nokta) işaretleriyle ayırarak daha detaylı gruplandırmamıza yarıyor. Örnek olarak 

  1. Admin.Message.High
  2. Mod.Message.High
  3. Admin.Message.Low
  4. Mod.Message.Low

adında 4 adet routingKey'imiz olsun, bu routingKey'leri aşağıdaki şekilde kullanabiliridim;

Admin.Message.High: birebir eşleşen routingKey
Admin.*.*:  Admin ile başlayan tüm route'lar
#.High: High ile biten tüm route'lar
*.Message.*: Message ifadesi içeren tüm route'lar

gibi varyasyonlarda bu kullanım şeklini türetebilirsiniz. Şimdi kodlama kısmını inceleyelim.

Producer Kodları

public static void TopicExchange()
{
	var connectionFactory = new ConnectionFactory
	{
		Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
	};

	using var connection = connectionFactory.CreateConnection();
	var channel = connection.CreateModel();

	channel.ExchangeDeclare(exchange: "TopicExchange-example", type: ExchangeType.Topic, durable: true);

	var routeKeys = new[] { "Admin.Message.High", "Mod.Message.High", "Admin.Message.Low", "Mod.Message.Low" };

	routeKeys.ToList().ForEach(routeKey =>
	{
		Enumerable.Range(1, 6).ToList().ForEach(x =>
		{
			var message = $"ekremozer.com | TopicExchange | {routeKey}: {x}";
			var messageBytes = Encoding.UTF8.GetBytes(message);
			channel.BasicPublish(exchange: "TopicExchange-example", routingKey: routeKey, basicProperties: null, body: messageBytes);
			Console.WriteLine(message);
		});

	});

	Console.ReadLine();
}

Örneğimde routeKeys adında bir dizi oluşturup içine 4 adet değer atatım ve bunları döngüye sokarak her bir routingKey için kuyruğa 6 adet mesaj gönderdim. Program.cs'den metodumu çağırıp rabbitmq'dan kontrol ettiğimde exchange'imin oluştuğunu görüyorum.

Bu örnekte kuyruğu producer tarafında oluşturmadık çünkü çok fazla varyasyon olabilir. Kuyrukları consumer tarafında oluşturup mesajlarımızı öyle dinleyeceğiz.

Consumer Kodları

public static void TopicExchange()
{
	var factory = new ConnectionFactory
	{
		Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
	};

	using var connection = factory.CreateConnection();
	var channel = connection.CreateModel();

	channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
	var consumer = new EventingBasicConsumer(channel);

	var randomQueueName = channel.QueueDeclare().QueueName;
	var routeKey = "#.High";
	channel.QueueBind(queue: randomQueueName, exchange: "TopicExchange-example", routingKey: routeKey);

	channel.BasicConsume(queue: randomQueueName, autoAck: false, consumer: consumer);

	consumer.Received += (sender, e) =>
	{
		var message = Encoding.UTF8.GetString(e.Body.ToArray());

		Console.WriteLine(message);

		channel.BasicAck(e.DeliveryTag, multiple: false);
	};
	Console.ReadLine();
}

Bu consumerımızda random bir kuyruk adıyla sonu High ile biten tüm routingKey deki mesajları okuyoruz, Kuyruğu producer tarafında oluşturmadığımız için ilk önce producer'ı ayağa kaldırdığımızda gönderilen mesajlar hiç bir kuyruğa iletilmez. Şimdi uygulamamı çalıştırdığımız zaman, exchange'e bağlı bir adet kuyruk görüyoruz.

Producer'i çalıştırdığımızda ise consumer mesajları dinlemeye başlıyor.

Header Exchange

Direct exchange'den farklı olarak routing bilgilerini header'da key-value olarak gönderdiğimiz exchange tipidir, consumer tarafında parametrelere ek olarak x-match parametresi alır. Şimdi kodları inceleyerek konuyu daha iyi anlayalım.

Producer Kodları

public static void HeaderExchange()
{
	var connectionFactory = new ConnectionFactory
	{
		Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
	};

	using var connection = connectionFactory.CreateConnection();
	var channel = connection.CreateModel();

	channel.ExchangeDeclare(exchange: "HeaderExchange-example", type: ExchangeType.Topic, durable: true);

	var headers = new Dictionary<string, object>
	{
		{ "Role", "Admin" },
		{ "Level", "High" }
	};

	var properties = channel.CreateBasicProperties();
	properties.Headers = headers;
    properties.Persistent = true;

	var message = "ekremozer.com | HeaderExchange";
	var messageBytes = Encoding.UTF8.GetBytes(message);
	channel.BasicPublish(exchange: "HeaderExchange-example", routingKey: string.Empty, basicProperties: properties, body: messageBytes);

	Console.WriteLine(message);
	Console.ReadLine();
}

Diğer exchange türlerinden farklı olarak Dictionary<string, object> tipinde headers adında bir değişken olulşturdum ve buraya key-value olarak değerlerimi verdim. Sonra channel.CreateBasicProperties() fonksiyonuyla IBasicProperties tipinde nesnemi oluşturarak bu nesnemin headers propertisine oluşturduğum headers değişkenini verdim.  metoduyla basi Ve 24. satırda BasicPublish() metodunda parametre olarak verdim. Producer'ı ayağa kaldırdığımda  rabbitmq üzerinden exchange'imin oluştuğunu görüyorum.

Property'de makalemizde daha önce kullanmadığımız Persistent parametresini de kullandım. Bu parametre kuyruklarımızla birlikte kuyruktaki mesajlarıda kalıcı hale getirmek için kullanılır. Exchange ve Queue'larda durable:true yaparak exchange ve kuyrukları kalıcı hale getirebiliyorduk, içerisindeki mesajların kalıcı hale gelmesi içinde tüm exchange türlerinde bu parametreyi kullanabiliriz.

Consumer Kodları

public static void HeaderExchange()
{
	var factory = new ConnectionFactory
	{
		Uri = new Uri("amqps://fqykbkan:fiHTx9mjpDk33teVSASdjex-dk-vlbTe@fox.rmq.cloudamqp.com/fqykbkan")
	};

	using var connection = factory.CreateConnection();
	var channel = connection.CreateModel();

	channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
	var consumer = new EventingBasicConsumer(channel);

	var randomQueueName = channel.QueueDeclare().QueueName;
	var headers = new Dictionary<string, object>
	{
		{ "Role", "Admin" },
		{ "Level", "High" },
		{ "x-match", "all" }
	};

	channel.QueueBind(queue: randomQueueName, exchange: "HeaderExchange-example", routingKey: string.Empty, arguments: headers);

	channel.BasicConsume(queue: randomQueueName, autoAck: false, consumer: consumer);

	consumer.Received += (sender, e) =>
	{
		var message = Encoding.UTF8.GetString(e.Body.ToArray());

		Console.WriteLine(message);

		channel.BasicAck(e.DeliveryTag, multiple: false);
	};
	Console.ReadLine();
}

Burada da 19. satırda bahsettiğim x-match parametresini header'ımıza ekliyoruz. Parametremize all değerini verirsek headerda gönderdiğimiz tüm key ve valueların birebir eşit olduğu mesajları alır, eğer any dersek parametrelerden herhangi biri uyuşan tüm mesajları dinler. Consumer uygulamasını çalıştırıyorum ve ardından producer ile tekrar mesaj gönderdiğimde mesajı dinleyebiliyorum.

Exchange ve kuyruk oluşturma işlemleri senaryoya göre iki tarafrada yapılabilir, ancak producer veya consumer bir kuyruğa veya exchange eğer daha önce oluşturulmaışsa hata alırsınız.

Exchange tipleri bu kadardı arkadaşlar, şimdide küçük bir web uygulaması yapıp bir web uygulamasının kuyruğu nasıl dinleyebileceğini inceleyelim. Basit bir ad soyad email alan bir form yapıp siparişi oluştur butonu koyacağım ve sipariş oluşunca kuyruğa bir mesaj gidecek. Kuyruğu dinleyen consumer mesajı alınca müşteriye faturayı mail olarak atacak. Eticaret sitelerinde yaptığınız alışverişlerde faturanın sonradan gelmeside buna benzer bir olay. Solution'uma NetCoreRabbitMQ.Web adında bir web projesi ekliyorum.

Sonra Services adlı bir klasör oluşturup içine RabbitMQClientService adında bir class ekliyorum.

namespace NetCoreRabbitMQ.Web.Services
{
    public class RabbitMQClientService : IDisposable
    {
        private readonly ConnectionFactory _connectionFactory;
        private IConnection _connection;
        private IModel _channel;
        public static string ExchangeName = "InvoiceDirectExchange";
        public static string RoutingInvoice = "invoice-route";
        public static string QueueName = "invoice-queue";

        public RabbitMQClientService(ConnectionFactory connectionFactory)
        {
            _connectionFactory = connectionFactory;
        }

        public IModel Connect()
        {
            _connection = _connectionFactory.CreateConnection();

            if (_channel is { IsOpen: true })
            {
                return _channel;
            }

            _channel = _connection.CreateModel();
            _channel.ExchangeDeclare(exchange: ExchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false);
            _channel.QueueDeclare(queue: QueueName, durable: true, exclusive: false, autoDelete: false);
            _channel.QueueBind(queue: QueueName, exchange: ExchangeName, routingKey: RoutingInvoice);

            return _channel;
        }

        public void Dispose()
        {
            _channel?.Close();
            _channel?.Dispose();
            _channel = default;

            _connection?.Close();
            _connection?.Dispose();
            _connection = default;
        }
    }
}

Bu classım bana RabbitMQ ile bağlantı kurmak için bir connection dönecek ve ben channel'ımı bu connection üzerinden oluşturacağım. Classımı oluşturduktan sonra Startupda servis olarak ekliyorum.

public void ConfigureServices(IServiceCollection services)
{
	services.AddSingleton(x => new ConnectionFactory { Uri = new Uri(Configuration["AmpqpUrl"]), DispatchConsumersAsync = true });
	services.AddSingleton<RabbitMQClientService>();
}

İlk olarak ConnectionFactory'i ekledim var parametre olarak Uri adresini apsettings'den okudum ve backgroun servisimde asenkron olarak çalışacağım için DispatchConsumersAsync parametresini true'ya set ettim. Producer için kodlarıma geçmeden önce Customer modelim için basit bir class ekliyorum.

public class Customer
{
	public string Name { get; set; }
	public string Surname { get; set; }
	public string Email { get; set; }
}

Şimdi Services klasörüme RabbitMQProducer adında kuyruğa mesaj gönderecek classımı yazıyorum.

namespace NetCoreRabbitMQ.Web.Services
{
    public class RabbitMQProducer
    {
        private readonly RabbitMQClientService _rabbitMQClientService;

        public RabbitMQProducer(RabbitMQClientService rabbitMqClientService)
        {
            _rabbitMQClientService = rabbitMqClientService;
        }

        public void Publish(Customer customer)
        {
            var channel = _rabbitMQClientService.Connect();

            var bodyJson = JsonSerializer.Serialize(customer);

            var bodyByte = Encoding.UTF8.GetBytes(bodyJson);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: RabbitMQClientService.ExchangeName, routingKey: RabbitMQClientService.RoutingInvoice, basicProperties: properties, body: bodyByte);
        }
    }
}

Bu classımda RabbitMQClientService'i kullanarak Publish metodunda customer nesnemi alıyorum, önce json'a sonrada rabbitmq ile gönderebilmek için byte[]'a parse ediyorum. Bu şekilde complex type'ları ve fiziksel dosyalarıda mesaj olarak kuyruğa gönderebilirsiniz.

Son olarak RabbitMQProducer classımı Startupda AddSingleton olarak tanımlıyorum.

public void ConfigureServices(IServiceCollection services)
{
	services.AddSingleton(x => new ConnectionFactory { Uri = new Uri(Configuration["AmpqpUrl"]), DispatchConsumersAsync = true });
	services.AddSingleton<RabbitMQClientService>();
	services.AddSingleton<RabbitMQProducer>();
}

Artık kuyruğa mesaj göndermek için gerekli kodlarım hazır, şimdi Index view'ıma basit bir form ekleyip buradaki bilgileri kuyruğa gönderelim.

Formum bu şekilde, HomeController'a RabbitMQProducer classımı ekliyorum ve constructor metodda içini dolduruyorum.

private readonly RabbitMQProducer _rabbitMQProducer;

public HomeController(RabbitMQProducer rabbitMqProducer)
{
	_rabbitMQProducer = rabbitMqProducer;
}

Şimdi sadece Index actionunun post metodunda modelimizi kuyruğa göndermek kalıyor.

[HttpPost]
public IActionResult Index(Customer customer)
{
	_rabbitMQProducer.Publish(customer);

	ViewBag.Info = "Sipariş oluşturuldu.";
	return View(customer);
}

Formu doldurup RabbitMQ'yu kontrol ettiğimde exchange'imin oluştuğunu görüyorum.

Exchange'in içine girdiğimde ise bağlı olan bir kuyruk olduğunu görebiliyorum.

Şimdi kuyruğu dinleyecek BackgroundService'imizi yazalım. Projeme BackgroundServices adında bir klasör açıyorum ve içine InvoiceSenderBackgroundService adında bir class ekliyorum.

namespace NetCoreRabbitMQ.Web.BackgroundServices
{
    public class InvoiceSenderBackgroundService : BackgroundService
    {
        private readonly RabbitMQClientService _rabbitMQClientService;
        private IModel _channel;

        public InvoiceSenderBackgroundService(RabbitMQClientService rabbitMqClientService)
        {
            _rabbitMQClientService = rabbitMqClientService;
        }

        public override Task StartAsync(CancellationToken cancellationToken)
        {
            _channel = _rabbitMQClientService.Connect();
            _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
            return base.StartAsync(cancellationToken);
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var consumer = new AsyncEventingBasicConsumer(_channel);

            _channel.BasicConsume(queue: RabbitMQClientService.QueueName, autoAck: false, consumer: consumer);

            consumer.Received += Consumer_Received;

            return Task.CompletedTask;
        }

        private Task Consumer_Received(object sender, BasicDeliverEventArgs @event)
        {
            try
            {
                var bodyArray = @event.Body.ToArray();
                var bodyString = Encoding.UTF8.GetString(bodyArray);
                var customer = JsonSerializer.Deserialize<Customer>(bodyString);

                //Pdf dönüştür ve e-mail gönder...

                _channel.BasicAck(@event.DeliveryTag, false);
            }
            catch (Exception e)
            {
                //Hatayı logla...
            }


            return Task.CompletedTask;
        }

        public override Task StopAsync(CancellationToken cancellationToken)
        {
            return base.StopAsync(cancellationToken);
        }
    }
}

Classımı Microsoft.Extensions.Hosting namespaceinin altındaki BackgroundService classından türetiyorum. Bu classın temelde 3 adet metodu var;

  1. StartAsync(); Uygulama ayağa kalktığında bir kez çalışır.
  2. ExecuteAsync(); Uygulama ayaktayken bir kez çalışır.
  3. StopAsync(); Uygulama kapantığında bir kez çalışır.

ExecuteAsync metodu abstract'dır yani implemente etmek zorundasınız. Diğer iki metod ise virtual'dır dilerseniz override edip kullanabilirsiniz.

RabbitMQClientService'imizi ve ExecuteAsync metodunda kullanacağımız channel'ımızı property olarak classıma ekliyorum ve RabbitMQClientService'i constructor metodda dolduruyourm.

StartAsync() metodumda  _rabbitMQClientService.Connect() ile channel'ımı oluşturuyorum ve _channel.BasicQos() mesajları nasıl dinleyeceğimi set ediyorum.

ExecuteAsync() metodumda consumer'ımı tanımlıyorum bu sefer askenkron çalışmasını istediğim için AsyncEventingBasicConsumer classından instance alıyorum ve Received eventi için ayrı bir metod oluşturuyorum.

Consumer_Received metodumda BasicDeliverEventArgs @event parametresinin boyd'sini toarray ile byte[]'a dönüştürüyorum ve Encoding.UTF8.GetString() ile de stringe dönüştürüyorum. Ve son olarakta  JsonSerializer.Deserialize metoduylada eriştiğim json'u modelime parse ediyorum.

BackgroundService için kodlarımda bu kadardı, pdf oluşturma ve email gönderme bu makalenin konusu oldmadığı ve konuyu daha da uzatacağı için yazmadım. BackgroundService'imle ilgili son olarak yapmam gereken Startupda servis olarak tanımlamak.

public void ConfigureServices(IServiceCollection services)
{
	services.AddSingleton(x => new ConnectionFactory { Uri = new Uri(Configuration["AmpqpUrl"]), DispatchConsumersAsync = true });
	services.AddSingleton<RabbitMQClientService>();
	services.AddSingleton<RabbitMQProducer>();
	services.AddHostedService<InvoiceSenderBackgroundService>();
	services.AddControllersWithViews();
}

Şimdi formu doldurup kuyruğa bir mesaj gönderelim ve BackgroundService'imizi break point koyarak kontrol edelim.

Gördüğünüz gibi web uygulaması kendi akışına devam ederken asenkron olarka background sevisimizle kuyruğu dinleyip istediğimiz işlemi yaptırabiliriz. Benim bu makalede anlatacaklarım bu kadar umarım faydalı olmuştur.

Projenin kaynak kodları: https://github.com/ekremozer/NetCoreRabbitMQ