RabbitMQ
6 იანვარი, 2021 წელი
მესიჯების პარალელური დამუშავება მესიჯების რიგითობის დაცვით
(RabbitMQ-ის მაგალითზე)
ნებისმიერი მესიჯინგის სისტემის და მესიჯ ბროკერის მიმართ (მაგ. RabbitMQ, ActiveMQ, Kafka, ...) ერთ-ერთი ყველაზე მნიშვნელოვანი მოთხოვნა არის მესიჯის გამომქვეყნებლისგან (producer/publisher) მესიჯების იმ რიგითობით მიწოდება მესიჯის მიმღებ(ებ)ისთვის (consumer/subscriber), რა რიგითობითაც მოხდა მესიჯების გამოქვეყნება. ამ მოთხოვნის რეალიზებისთვის მესიჯინგის სისტემის შესაბამისი პროტოკოლების უმრავლესობაში (მაგ. AMQP, JMS, …) გვხვდება რიგის (Queue) ტიპის არხები. ასევე ერთ-ერთი მნიშვნელოვანი მოთხოვნა არის მესიჯების პარალელური დამუშავების შესაძლებლობა, რათა სისტემის ჰორიზონტალური მასშტაბირება და დატვირთვის ოპტიმალური გადანაწილება შეგვეძლოს. ამ სტატიაში სწორედ ამ ორი მოთხოვნის ერთდროულად რეალიზების შესაძლებლობებს განვიხილავთ, RabbitMQ მესიჯ ბროკერის გამოყენებით.
რატომ არის მნიშვნელოვანი მესიჯების რიგითობის დაცვა?
(ვისთვისაც ეს საკითხი ცხადია, შეგიძლიათ გამოტოვოთ ეს სექცია)
როგორც წესი მესიჯინგის სისტემების გამოყენების ერთ-ერთი მთავარი მიზეზი არის მესიჯების საიმედო და ასინქრონული დამუშავება.
წარმოვიდგინოთ ონლაინ მაღაზიის სისტემა, რომელიც შედგება ორი აპლიკაციისგან:
- შეკვეთების აპლიკაცია — სადაც მომხმარებელი არჩევს სასურველ პროდუქტს და ათავსებს შეკვეთას,
- საწყობის აპლიკაცია — სადაც განთავსებული შეკვეთის შესაბამისი პროდუქტის მოძიება და მისაწოდებლად მომზადება ხდება.
იმისთვის რომ ეს ორი აპლიკაცია ერთმანეთთან არ იყოს მჭიდროდ დაკავშირებული და საწყობის აპლიკაციის დროებითი გათიშვა (აპლიკაციის ხარვეზის, ქსელის გაუმართაობის თუ ახალი ვერსიის დანერგვის გამო) არ იწვევდეს შეკვეთის პროცესის შეფერხებას ან საერთოდ შეკვეთის ვერ განთავსებას, ამ აპლიკაციებს შორის ნაცვლად სინქრონული კომუნიკაციისა, უმჯობესია ასინქრონული კომუნიკაციის აწყობა. ეს ტექნიკურად გულისხმობს იმას, რომ შეკვეთების აპლიკაცია შეკვეთის განთავსების დროს ნაცვლად საწყობის აპლიკაციის API-ის სინქრონული გამოძახებისა, შეკვეთის განთავსების შესახებ მესიჯს ათავსებს მესიჯინგის სისტემის არხში და შემდეგ ამ არხიდან, ასინქრონულად კითხულობს საწყობის აპლიკაცია მესიჯს და ამუშავებს მას. მესიჯინგის სისტემა უზრუნველყოფს შეკვეთების აპლიკაციიდან შეკვეთის შესახებ მესიჯების მიღებას, მათ საიმედოდ შენახვას და საწყობის აპლიკაციისთვის მიწოდებას, ამ უკანასკნელის ხელმისაწვდომობის გათვალისწინებით, ისე რომ მესიჯი არ დაიკარგოს.
დავუშვათ მომხმარებელმა განათავსა შეკვეთა და შემდეგ ამ შეკვეთიდან ერთ-ერთი პროდუქტი ამოიღო. ეს მოქმედებები გამოიწვევს იმას, რომ შეკვეთის აპლიკაცია მესიჯინგის სისტემაში გამოაქვეყნებს ორ მესიჯს, თავდაპირველად შეკვეთის განთავსების მესიჯს და შემდეგ შეკვეთის ცვლილების მესიჯს. აუცილებელია რომ ეს მესიჯები იგივე რიგითობით მიუვიდეს საწყობის აპლიკაციას, რადგან წინააღმდეგ შემთხვევაში, თუ საწყობის აპლიკაციას ჯერ მიუვა შეკვეთის ცვლილების შესახებ (პროდუქტის შეკვეთიდან ამოღება) მესიჯი, ის ვერ დაამუშავებს ამ მესიჯს, მას ჯერ საერთოდ არ აქვს ინფორმაცია შეკვეთის არსებობის შესახებ, რადგან შეკვეთის განთავსების მესიჯი არ მისვლია. ამ დაუმუშავებელ მესიჯს საწყობის აპლიკაცია შეინახავს პრობლემურ/გაუგებარ მესიჯებში, შემდგომი დამუშავებისთვის, ან უარეს შემთხვევაში საერთოდ უგულებელყოფს მას, რაც გამოიწვევს იმას, რომ როცა საწყობის აპლიკაცია მიიღებს შეკვეთის განთავსების მესიჯს და დაამუშავებს მას, მომხმარებელს მიაწოდებენ იმ პროდუქტსაც, რომელიც მან ამოიღო შეკვეთიდან. ორივე შემთხვევაში მომხმარებელი უკმაყოფილო დაგვრჩება, რადგან ერთ შემთხვევაში პრობლემური/გაუგებარი მესიჯების დამუშავებას სჭირდება მეტი დრო და ძალისხმევა, რაც აჭიანურებს შეკვეთის დამუშავების პროცესს, ხოლო მეორე შემთხვევაში პრობლემური/გაუგებარი მესიჯების უგულებელყოფა არასწორი შეკვეთის მიწოდებას გამოიწვევს.
ამ მარტივი მაგალითითაც ნათელია მესიჯების რიგითობის დაცვის აუცილებლობა და რეალურ ბიზნეს აპლიკაციებში ეს საკითხი კიდევ უფრო მეტ მნიშვნელოვნებას იძენს.
როგორც ზემოთ აღვნიშნეთ მესიჯინგის სისტემების უმრავლესობაში არსებობს რიგის (Queue) ტიპის არხები, რომლებიც უზრუნველყოფენ მესიჯების იგივე რიგითობით მიწოდებას მიღებისთვის, რა რიგითობითაც მესიჯები გამოქვეყნდა ამ არხში.
მესიჯების რიგითობის დაცვა მესიჯინგის არხში, საკმარისია ამ მესიჯების რიგითობით დამუშავების უზრუნველსაყოფად?
პასუხი არის არა, არ არის საკმარისი. იმისთვის რომ მესიჯები რიგითობით დამუშავდეს, აუცილებელია მესიჯინგის სისტემის რიგის (Queue) ტიპის არხს ჰყავდეს ერთი (ექსკლუზიური) მესიჯის მიმღები.
ჩვენს ზემოთ მოყვანილ მაგალითში, თუ საწყობის აპლიკაციის ორი ინსტანსი გვაქვს პარალელურად გაშვებული, იმისთვის რომ საწყობის აპლიკაციის დატვირთვა გადავანაწილოთ რამდენიმე სერვერზე, მაშინ შეიძლება საწყობის აპლიკაციის ერთმა ინსტანსმა მიიღოს შეკვეთის განთავსების მესიჯი, ხოლო მეორემ შეკვეთის ცვლილების მესიჯი. თუ ეს მეორე ინსტანსი უფრო ნაკლებად დატვირთულია და დაასწრებს პირველ ინსტანსს მესიჯის დამუშავებას, იგივე შედეგს მივიღებთ რაც ზემოთ აღწერილ მაგალითში მესიჯების არათანმიმდევრული მიწოდების შემთხვევაში გვქონდა, მიუხედავად იმისა რომ ამ შემთხვევაში მესიჯები თანმიმდევრულად იყო განთავსებულ მესიჯინგის არხში.
ფაქტობრივად გამოდის, რომ მესიჯების რიგითობით დამუშავების მოთხოვნა, საშუალებას არ გვაძლევს აპლიკაციის მასშტაბირება მოვახდინოთ და დატვირთვა გადავანაწილოთ სხვადასხვა სერვერებზე, რაც მაღალი დატვირთვის მქონე აპლიკაციებში, სადაც წამში ასობით მესიჯის მიღება ხდება ან მესიჯის დამუშავება კომპლექსური ოპერაციაა, იწვევს წარმადობის დეგრადაციას და დამუშავების დროის ზრდას.
გვეშველება რამე?
პასუხი არის კი, ხშირ შემთხვევაში გვეშველება. საშველი მდგომარეობს იმაში, რომ როგორც წესი მესიჯების რიგითობით დამუშავება აუცილებელია მხოლოდ გარკვეული კრიტერიუმის მიხედვით.
ჩვენ მიერ ზემოთ მოყვანილი მაგალითის მიხედვით, აუცილებელია შეკვეთის მესიჯები რიგითობით დამუშავდეს ერთი მომხმარებლის ჭრილში, ანუ ყოველი კონკრეტული მომხმარებლის მიერ განთავსებული შეკვეთის მესიჯები უნდა დამუშავდეს თანმიმდევრულად, ხოლო სხვადასხვა მომხმარებლის მიერ განთავსებული შეკვეთის მესიჯები შეგვიძლია დავამუშავოთ პარალელურად, რადგან სხვადასხვა მომხმარებლის მიერ განთავსებულ შეკვეთებს ერთმანეთთან არაფერი აკავშირებთ. ამგვარად მიიღწევა ის, რომ შეკვეთის მესიჯები დამუშავდება რიგითობის დაცვით და იმავდროულად შეგვეძლება დატვირთვის გადანაწილება და პარალელური დამუშავება. ზემოთ მოყვანილი მაგალითის მიხედვით საწყობის აპლიკაციის ერთი ინსტანსი დაამუშავებს ლევანის, გიორგის და ალეკოს შეკვეთის მესიჯებს, შესაბამისი მიმდევრობის დაცვით, ხოლო მეორე ინსტანსი დაამუშავებს რეზოს, შოთას და ტატოს შეკვეთის მესიჯებს, ასევე შესაბამისი მიმდევრობის დაცვით.
როგორ დავაკონფიგურიროთ მესიჯინგის სიტემა, რათა შესაძლებელი იყოს მესიჯების პარალელური დამუშავება, კონკრეტული კრიტერიუმის მიხედვით მესიჯების რიგითობის შენარჩუნებით?
საბედნიეროდ რამდენიმე ჩემთვის ცნობილ მესიჯინგის სისტემას აქვს ამის საშუალება.
ერთ-ერთი მათგანი არის საკმაოდ პოპულარული open-source მესიჯინგის სისტემა, RabbitMQ და ამ პოსტში განვიხილავთ აღნიშნული ფუნქციონალის რეალიზებას სწორედ RabbitMQ-ში.
RabbitMQ-ში ამგვარი ფუნქციონალის რეალიზებისთვის გამოიყენება Consistent Hash Exchange-ის ტიპი. RabbitMQ საიტზე არსებულ ტუტორიალებში და დოკუმენტაციებში ამ ტიპის Exchange-ს ვერ ნახავთ, თუმცა ის მაინც არსებობს, მოყვება ოფიციალურ დისტრიბუტივს/საინსტალაციოს, თუმცა გათიშულია და მისი ჩართვა ხდება ამ მარტივი ბრძანების მეშვეობით:
rabbitmq-plugins enable rabbitmq_consistent_hash_exchange
აღნიშნული ფლაგინი და Exchange-ის ტიპი იყენებს consistent hashing ალგორითმს, რომელიც საშუალებას იძლევა განაწილებულ/დისტრიბუციულ სისტემებში, სხვადასხვა მანქანებს შორის მონაცემების განაწილება მოხდეს იმგვარად, რომ ერთგვაროვანი მონაცემები, რომლის ერთგვაროვნებაც განისაზღვრება გარკვეულ კრიტერიუმზე დაყრდნობით (და ამ კრიტერიუმის შესაბამისად მონაცემის hash-ის გამოთვლით) ყოველთვის ერთ მანქანაზე განთავსდეს. ამ ალგორითმის მუშაობის დეტალებს შეგიძლიათ გაეცნოთ ამ ერთი, ორი, სამი და სხვა სტატიებში.
Consistent Hash Exchange-ის გამოყენებით შეგვიძლია გადავანაწილოთ მესიჯები სხვადასხვა Queue-ში, ისე რომ ყოველი კონკრეტული მომხმარებლის შეკვეთის მესიჯი ყოველთვის ერთ Queue-ში ხვდებოდეს, ხოლო სხვადასხვა მომხმარებლის მესიჯები სხვადასხვა Queue-ში. მაგ. ლევანის, გიორგის და ალეკოს შეკვეთის მესიჯები ხვდებოდეს orders-queue1-ში, ხოლო რეზოს, შოთას და ტატოს შეკვეთის მესიჯები orders-queue2-ში, რა თქმა უნდა მომხმარებლის ჭრილში რიგითობის დაცვით. orders-queue1-დან მიიღებს და დაამუშავებს მესიჯებს საწყობის აპლიკაციის ერთი ინსტანსი, ხოლო orders-queue2-დან მიიღებს და დაამუშავებს მესიჯებს საწყობის აპლიკაციის მეორე ინსტანსი, რის შედეგადაც შესაძლებელი გახდება მესიჯების პარალელური დამუშავება და დატვირთვის განაწილება საწყობის აპლიკაციის ორ ინსტანსს შორის.
Consistent Hash Exchange-ში, მესიჯის დაჯგუფების და პარალელიზაციის კრიტერიუმის (consistent hashing-ის კრიტერიუმის) მითითება შესაძლებელია მესიჯის Routing Key-ის, Header-ის ან Property-ის მეშვეობით. გაჩუმებით (By default) გამოიყენება მესიჯის Routing Key და მისი მნიშვნელობის მიხედვით ხდება hash-ის გამოთვლა და მესიჯის გადამისამართება შესაბამის Queue-ში. თუმცა ხშირად უფრო პრაქტიკულია Header-ის ან Property-ის გამოყენება, რადგან მესიჯს მხოლოდ ერთი Routing Key გააჩნია და ის შეიძლება გამოიყენებოდეს სხვა მიზნებისთვის (მესიჯის ტიპის აღნიშვნა და სხვა), ხოლო Header და Property შეიძლება რამდენიმე ჰქონდეს. მაგ. შეგვიძლია გამოვაქვეყნოთ მესიჯი Header პარამეტრით customer-id, რომლის მნიშვნელობაში მივუთითებთ მომხმარებლის იდენტიფიკატორს, შემდეგ კი ამ customer-id Header პარამეტრს გამოვიყენებთ დაჯგუფების და პარალელიზაციის კრიტერიუმად (consistent hashing-ის კრიტერიუმად).
Consistent Hash Exchange-ის ტიპის შექმნის დროს, Exchange-ის ტიპში ვუთითებთ x-consistent-hash მნიშვნელობას.
var connectionFactory = new ConnectionFactory
{HostName = Hostname, UserName = Username, Password = Password};
using var connection = connectionFactory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare("orders", type:"x-consistent-hash", durable:true, autoDelete: false);
თუ Header-ის ან Property-ის გამოყენება გვინდა hashing-ისთვის, მაშინ ამ Exchange-ის არგუმენტებში უნდა ჩავწეროთ:
- Header-ის შემთხვევაში, hash-header რომლის მნიშვნელობაში მივუთითებთ მესიჯის Header პარამეტრს (მაგ. customer-id)
var connectionFactory = new ConnectionFactory
{HostName = Hostname, UserName = Username, Password = Password};
using var connection = connectionFactory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare("orders", type:"x-consistent-hash", durable:true, autoDelete: false,
new Dictionary<string, object>
{
{ "hash-header", "customer-id" }
});
- Property-ის შემთხვევაში, hash-property რომლის მნიშვნელობაში მივუთითებთ მესიჯის Property პარამეტრს (მაგ. message_id, correlation_id)
ასევე უნდა შევქმნათ Queue-ები, რომლებსაც მივაბამთ ამ Exchange-ს. ბმის დროს Routing Key პარამეტრში იწერება ამ Queue-ს წონა, 1, 2 და ა.შ. რაც უფრო მაღალ რიცხვს მივუთითებთ, მით უფრო მაღალი წონა ექნება ამ Queue-ს სხვებთან მიმართებაში და მით მეტი მესიჯი მოხვდება ამ Queue-ში. მაგ. თუ orders-queue1-ის orders exchange-ზე მიბმის დროს Routing Key-ში მივუთითებთ 1-ს, ხოლო orders-queue2-ის orders exchange-ზე მიბმის დროს Routing Key-ში მივუთითებთ 2-ს, მაშინ orders-queue2-ში 2-ჯერ მეტი მომხმარებლის მესიჯი მოხვდება, ვიდრე orders-queue1-ში.
var connectionFactory = new ConnectionFactory
{HostName = Hostname, UserName = Username, Password = Password};
using var connection = connectionFactory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare("orders-queue1", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("orders-queue1", "orders", "1"); //Binding weight is 1
channel.QueueDeclare("orders-queue2", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("orders-queue2", "orders", "2"); //Binding weight is 2
თანაბარი განაწილებისთვის აუცილებელია რომ ყველა Queue-ს Consistent Hash Exchange-თან მიბმის დროს Routing Key-ში მითითებული ჰქონდეს ერთი და იგივე მნიშვნელობა (წონა).
სულ ესაა მთელი კონფიგურაცია და შედეგად შეგვიძლია მესიჯები დავამუშავოთ პარალელურად, სხვადასხვა Queue-ზე პარალელური მოსმენით და იმავდროულად მესიჯების გარანტირებული თანმიმდევრობის დაცვით.
RabbitMQ Consistent Hash Exchange Type-ზე უფრო მეტი ინფორმაცია შეგიძლია იხილოთ აქ: https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_consistent_hash_exchange
თიბისი ბანკში აქტიურად ვიყენებთ RabbitMQ მესიჯინგის სისტემას და ასევე Consistent Hash Exchange-ებს, მაღალი დატვირთვის მქონე სისტემებში მესიჯების დამუშავების პარალელიზაციისა და შესაბამისი დატვირთვის გადანაწილებისთვის, ჰორიზონტალური მასშტაბირებისთვის.