Event Sourcing

Semih Şahan
20 min readAug 14, 2023

--

Developing business logic using event sourcing

  • Event sourcing özetle eventlerin toplu şekilde bir yerde tutulmasıdır, event’lerin depolanmasıdır ve sonrasında bu eventleri sıralı şekilde kullanarak farklı işler yapabiliriz, yani bu eventleri sonradan kaynak(source) olarak kullanabiliriz
  • Event sourcing için her aggregate state değişiminde domain event publish edilmeli peki developerlar uygulamanın business logic tasarlarken event publish etmeyi unutursa ne olacak ?
  • Event sourcing ile aggregate’lerin history’sini tutabiliriz -> AVANTAJ
  • Event sourcing ile business logic oluşturmak zordur, bir öğrenme eğrisi vardır ama geleneksel yaklaşımdan bu yaklaşıma alışınca faydaları oldukça fazladır -> Öğrenmenin zor olması DEZAVANTAJ
  • Event aggregate’in her bir state değişikliğini ifade eder demiştik, bir aggregate üzerinde çalışan eventlerin historysini elimizde tutabilirsek, eventleri aksi yönde tekrar çalıştırarak aggregate’in state’ini geçmişteki bir hale alabiliriz -> AVANTAJ
  • Event store’u sorgulamak zordur, bunun yoluda CQRS pattern kullanmaktan geçer -> DEZAVANTAJ

The trouble with traditional persistence

Geleneksel yaklaşım, sınıfları veritabanı tablolarına, bu sınıfların field’larını tablo sütunlarına ve bu sınıfların instance’larını bu tablolardaki satırlar olarak düşünür.

Bu geleneksel yöntemin birçok dezavantajı vardır;

  • Object-Relational impedance mismatch.
  • Lack of aggregate history.
  • Implementing audit logging is tedious and error prone.
  • Event publishing is bolted on to the business logic.

Bu dezavantajları açıklayalım..

Object-Relational impedance mismatch

Veri tabanındaki tablo-kolon-satır yapısının, OOP’deki yapıyla aslında uyumsuz olması ORM frameworkleri ile bizi bunu kullanmaya zorlaması, projelerimizde OOP’nin nimetlerinden faydalanmamızı engelliyor, data driven development yapmamızı sağlıyor, DDD bilincini kazanamıyoruz

Lack of aggregate history

Geleneksel yaklaşımda aggregate’lerin history’sini tutmak istersek synchronize çalışan duplicate kod yazmamız gerekir

Implementing audit logging is tedious and error prone

Audit logging’ler de başka bir sorundur, çoğu uygulama audit log tutmak zorundadır ama geleneksel yöntemde business logic’den ayrı bir konu olduğu için audit log bu işlemleri ayrı yazmak gerekir buda buglara yol açabilir, Örneğin, uygulamanın iş mantığında yapılan bir değişiklik, audit logging koduna yansıtılmazsa, audit logları yanıltıcı veya eksik olabilir. Örneğin, bir kullanıcının hesabının silindiği ancak audit logging kodunun bunu kaydetmediği varsayalım. Böyle bir durumda, kullanıcının hesabı hala uygulamada görülebilir, ancak audit logları bunun gerçekleştiğini göstermez. Bu, uygulamanın güvenlik veya düzenleyici gereksinimlerini karşılamamasına ve hatta yasal sorunlara yol açabilir.

Event publishing is bolted on to the business logic

ORM frameworkleri aggregate state değişikliği olduğunda event publish eden bir mekanizma sağlamaz bu yüzden her state değişikliğinde event publish etmek business logic’in içinde olan bir şey olur ve maintainable düşük verimsiz bir durum ortaya çıkar

Overview of event sourcing

Event sourcing, aggregate’leri kalıcı hale getirmek için kullanılan event-centric (olay odaklı) bir tekniktir.. Bir aggregate, veritabanında bir dizi event (olay) olarak depolanır. Her bir event (olay), aggregate’in bir state (durum) değişikliğini temsil eder.

Event sourcing persists aggregates using events

Geleneksel yöntemdeki her bir instance’ın veri tabanı tablosundaki satıra denk gelmesinden farklı olarak event sourcing deki yaklaşım her bir aggregate’in EVENTS tablosundaki bir veya daha fazla satıra denk gelmesi şeklindedir.

Events tablosundan sorguladığımız event kayıtlarını aggregate üzerinde şu şekilde uygulayabiliriz;

Bir aggregate’in bellek durumunun event’leri load edilerek yeniden oluşturulması, garip ve alışılmadık gelebilir. Ancak bir açıdan bakıldığında, JPA veya Hibernate gibi bir ORM frameworkünün bir nesneyi load etmesiyle çok da farklı değildir. Bir ORM frameworkü, mevcut state’i almak için bir veya daha fazla SELECT query çalıştırarak bir nesneyi load eder ve nesnelerin default constructor’larını kullanarak oluşturur. Bu nesneleri initialize etmek için reflection kullanır. Event sourcing’in farklılığı, bellek durumunun event’ler kullanılarak yeniden oluşturulmasıdır.

Events represent state changes

Aggregate’lerde her state değişikliği olduğunda veritabanına kayıt atılmalıdır, atılacak olan kayıtta ilgili aggregate hakkında ne kadar detaylı bilgi tutulacağı uygulama gerekliliklerine göre değişmektedir ama aggregate’lerde her state değişikliği olduğunda event publish edilmeli ve consume eden tarafta veri tabanına kayıt atılmalıdır.

Aggregate’ler create veya update olduğunda domain event publish etmelidir ve aggregate’lerin üzerinde olan apply() methodları parametre olarak event dizisi almalı ve aldığı event listesini ilgili aggregate apply ederek, ilgili aggregate’in state bilgisini değiştirmemizi sağlamalıdır

Aggregate methods are all about events

  • Mesela elinde bir command var dimi, burada value direkt update etmektense bir event serisi oluşturuyor, bu event seriside hangi fieldların değiştiğini göstermekte
  • Event sourcing için kullanılan iki farklı method tanıtmak isterim;

Process() -> parametre olarak bir Update object alınır, bu object validate edilip, ilgili update işlemi için event dizisi oluşturulur

Apply() -> ilgili event’i parametre olarak alır ve event ile yapılması gereken şeyi yapar, veri tabanına kayıt veya state değişikliği vs.

  • Yani transaction’lar series of event olarak veri tabanına kaydedilir
  • Yukarıdaki görselde esasında reviseOrder() methodunu process() ve apply() methodlarına ayırıyor

Aggregate Create Steps

  • Aggregate root’un default constructorı kullanılarak aggregate oluşturulur
  • Process() methodu çalıştırılır
  • Yeni eventleri iterate edip, her bir event için apply() methodunu çağırarak aggregate update edilir
  • Yeni event’ler event store’a eklenir

Aggregate Update Steps

  • Event store’dan ilgili aggregate’in event’leri yüklenir
  • Aggregate root constructor kullanılarak, aggregate initialize edilir
  • Event store’dan elde edilen eventler iterate edilerek, her biri için apply() methodu çalıştırılır
  • Process() methodu çalıştırılır ve yeni event oluşturulur
  • Yeni eventler iterate edilip her bir event için apply() methodu çalıştırılarak aggregate update edilir
  • Yeni event’ler event store’a kaydedilir

Event Sourcing-Based Order Aggregate

  • Event sourcing felsefesine geçince createOrder(), reviseOrder() methodları process() ve apply() methoduna dönüşür, process() methodu CreateOrder command parametre alır ve OrderCreated event publish eder, apply() method ise OrderCreated event consume eder ve Order’ın fieldlarını initialize eder.
  • Önceki business logic’de reviseOrder(), confirmRevision() ve rejectRevision() methodları vardı, bu methodlar yerine artık process() ve apply() methodları var.
  • Yukarıdaki kod örneğinde gördüğümüz üzere, reviseOrder() methodu yerine aynı işi yapan process(ReviseOrder) ve apply(OrderRevisionProposed) methodları var, confirmRevision() methodu yerine process(ConfirmReviseOrder) ve apply(OrderRevised) methodu var.

Handling concurrent updates using optimistic locking

  • Race condition olmaması için aggregate’lerde optimistic locking kullanılır(optimistic locking nedir bilindiği kabul ediliyor)

Event sourcing and publishing events

  • Event sourcing yaklaşımında aggregate’ler birtakım eventlerin toplamıdır, aggregate’in herhangi bir andaki state’ini elde etmek için birtakım eventleri çalıştırmamız gerekir
  • Event sourcing, güvenilir şekilde event publishing mekanizması olarak da kullanılabilir
  • Event sourcing de temel fark OUTBOX veritabanı tablosundaki kayıtları delete etmek yerine, EVENTS veritabanı tablosuna kaydedilir ve istenildiği zaman kullanılarak aggregate’lerin state’lerini istediğimiz bir T anına getirebiliriz

Using Polling To Publish Events

  • Event publish mekanizması şöyle çalışıyor, events tablosundaki kayıtları parça parça alıyor ve işliyor, en son işlediği eventId bilgisini kullanarak sonraki işleyeceği eventleri select ediyor, eventId otomatik artan olduğu için en son işlediği eventin id değerinden büyük olan kayıtları çekmek için şöyle bir sorgu çalıştırıyor; “select * from EVENTS where EVENT_ID > ….”
  • Bu senaryoda şöyle bir sorun çıkıyor; 2 tane transaction olduğunu düşünelim, event publish edicekler, A transaction EVENTS tablosuna eventId bilgisi 1010 olan kayıt oluşturdu ama commitlemedi, B transaction ise eventId değeri 1020 olan kayıt oluşturdu ve A transaction’dan önce commitledi ve event publisher EVENTS tablosunu sorguladı “select * from EVENTS where EVENT_ID > ….” ve ilgili event’leri brokera publish etti ve sonra A transaction eventId değeri 1010 olan kaydı insert etti ama event publisher en son publish ettiği eventin Id değerinden büyük kayıtları işliyor, EVENTS tablosuna attığı sorgu şöyle oluyor yani “select * from EVENTS where EVENT_ID > 1020” bu durumda da işte A transaction’ın commitlediği event ignore edildi ve işleme alınmadı, böyle bir case olabilir
  • Bu soruna çözüm olarak EVENTS tablosuna ilgili eventin publish edillip, edilmediğine dair bilgi tutan PUBLISHED isminde kolon ekleyebiliriz ve artık event publisher EVENTS tablosunu şu sorguyla sorgular; “select * from EVENTS where PUBLISHED = 0 order by EVENT_ID asc”
  • Event publisher event’leri publish ettikten sonra ilgili event’leri şu sorguyu kullanarak update eder; “update EVENTS set PUBLISHED=1 where EVENT_ID in”
  • Bu yaklaşım event publisher’ın bazı event’leri atlamasını engeller

Using Transaction Log Tailing To Reliably Publish Events

  • Daha gelişmiş event store mekanizmaları “transaction log tailing” yapısını kullanır, bu yapıyı bu yazımızda anlatmıştık; “https://medium.com/@semihshn/async-messaging-pattern-bf85f2a48fcb" özetle bu yapı EVENTS tablosuna yapılan insert işlemlerini database log’larından okur ve message brokera publish eder

Using snapshots to improve performance

  • Order aggregate’ini düşünelim, birde Account aggregate’ini düşünelim, Order aggregate, Account aggregate göre nispeten çok daha az event store edecektir çünkü bir order’ın ömrü maksimum 1–2 gün sürecektir ama account ömrü çok uzun sürebilir mesela facebook’daki kaydınızı düşünün 2004'de facebooka kayıt oldunuz diyelim o accountun ömrü 19 yıl olacak ve store edilen event miktarını düşünsenize ve event store etme miktarı arttıkça bu eventleri aggregate’lere load etmek ve event’leri store etmek çok zaman alacaktır
  • Buna çözüm olarak aggregate’in state’ini periyodik olarak veritabanına kaydedebilir, yani periyodik olarak aggregate’in state’inin snapshot’ını alabiliriz böylece uygulama aggregate’i güncellerken en son oluşturulan snapshot ve ondan sonraki event’leri load eder sadece
  • Aggregate’ler eventler’den oluştuğu için Aggregate’in state’inin snapshotunı alabiliriz derken periyodik olarak o ana kadarki oluşturulan o aggregate ile ilgili eventlerin snapshotu alınmakta aslında(bu kısım yanlış anlaşılabilir diye detaylandırmak istedim :) )
  • Events tablosundaki eventleri aggregate’e uyguladığımız kod parçamız şu şekilde değişir;
  • Yani aggregate’imizi initialize ederken default constructor kullanmak yerine artık snapshot kullanıyoruz
  • Basit aggregate’lerin snapshotını JSON serialization yöntemini kullanarak alabiliriz, complex aggregate’lerin snapshot için ise Memento pattern kullanabiliriz
  • EVENTS ve SNAPSHOTS tabloları arasında ilişki kurarız ve önce ilgili aggregate’ın snapshot data’sını deserialize ederiz ve sonra ilgili event’leri aggregate’e uygularız
  • Snapshot tablosundaki event_id değeri, o anki snapshot kaydının hangi event’e ait olduğunu belirtir. Örneğin, event_id değeri 103 olan bir snapshot kaydı, events tablosundaki event_id değeri 103 olan bir event ve ondan önceki ilgili aggregate ile ilgili tüm eventlerin oluşturduğu bir snapshot’u temsil eder.

Idempotent message processing

  • Bir message consumer kodu yazarken, idemptotency dikkat etmeliyiz, async messaging pattern yazımızda detaylı bahsetmiştik, consumer aynı event 2 defa publish edildiğinde 2 defa işlememelidir, message brokerlar event’ler konusunda tekillik sağlama yeteneğine sahip değildir bu yüzden consumerlarımız idempotent olmalıdır, bunu sağlamanın yolu kısaca gelen eventleri PROCESSED_MESSAGES ismindeki tabloya kaydederiz ve her yeni event consume edildiğinde bu veri tabanı tablosuna kaydı var mı kontrolü yapılır, eğer varsa ilgili event işleme alınmaz, event store mekanizmasında da bunu yapmalıyız ama kullandığımız veri tabanının RDBMS veya NoSql olmasına göre implementasyon detayları değişmektedir

Idempotent Message Processing With An RDBMS Based Event Store

  • Event’leri EVENTS tablosuna ekleyen işlemin bir parçası olarak event id bilgisini PROCESSED_MESSAGES tablosuna ekler ve duplicate event’leri işleme almaz.

Idempotent Message Processing When Using A NoSql Based Event Store

  • NoSql veritabanlarında ACID desteklenmediği için, NoSql veritabanlarında idempotentliği sağlamak biraz daha zordur, bu yüzden idempotent message handling mekanizması olarak farklı bir yöntem uygulanır.
  • Consumer event ve eventId bilgisini atomik olarak kaydetmelidir ama NoSql’de bu garanti edilmez, atomik kelimesinden kasıt şudur, yani bir event kaydedilirken, aynı anda diğer işlemlerle ilgili değişikliklerin kaydedilmesi ve tüm bu değişikliklerin tamamlanması gerekmektedir. Bu sayede, bir hata durumunda bile, tüm işlemler geri alınarak(rollback) sistemin tutarlılığı sağlanabilir.
  • Bu durumun çözümü şöyledir;
  • Event içerisinde message id iletilir ve consumer da her gelen event için event store’a event publish eder ve consumer’a yeni bir event geldiğinde gelen event daha önce işlendi mi diye event store’dan kontrol eder

Evolving domain events

  • Kavramsal olarak store edilen eventler sonsuza kadar saklanır ama bu riskli bir vaatdir.
  • Event source event’leri saklayarak uygulamanın audit loglarını da saklamış olur aslında
  • Diğer bir yandan event’lerin yapısı zamanla değişebilir bu yüzden tüm ilişkili eventleri bir arada tutmak zorluk yaratabilir

Event Schema Evolution

Kavramsal olarak event sourcing için 3 farklı şema oluşturulabilir;

  • Oluşturulan aggregate’lerin gösterildiği şema
  • Her aggregate’in publish ettiği eventlerin tanımlarının olduğu şema
  • Event yapısının tanımlandığı şema

Event şemalarımızda meydana gelebilcek değişiklikler ve bu değişikliklerin geriye dönük uyumlulukları olabilir mi ? İşte cevabı

  • Bu değişiklikler, bir servisin domain modeli zaman içinde geliştikçe doğal olarak gerçekleşir; örneğin, bir servisin gereksinimleri değiştiğinde veya geliştiricileri bir domain hakkında daha derin bilgiler edinip domain modelini iyileştirdiğinde. Bu değişikliklerden bazıları mesela evente yeni bir field eklendiğinde consumer bundan etkilenmez, consumer bilmediği fieldları ignore eder ama bir eventin adını değiştirirsek veya bir fieldın adını değiştirirsek consumerda değişiklik yapmamız gerekir.

Managing Schema Changes Through Upcasting

  • Sql database dünyasında database değişiklikleri genellikle schema migration yöntemiyle ele alınır. Her bir schema değişikliği(migration) bir sql script kullanılarak yapılır. Schema migration scriptleri version kontrol sisteminde tutulur ve Flyway gibi bir araçla veritabanına uygulanır.
  • Event sourcing yaklaşımını uygularken olabilecek schema değişikliklerinde buna benzer bir yaklaşım uygulayabiliriz.
  • Bunun detaylarına daha sonra bakacağız…

Benefits of event sourcing

Event sourcing yaklaşımının avantaj ve dezavantajlarını inceleyelim

Faydalar

  • Reliably publishes domain events
  • Preserves the history of aggregates
  • Mostly avoids the O/R impedance mismatch problem
  • Provides developers with a time machine

Hadi bu faydaları detaylı açıklayalım…

Reliably Publishes Domain Events

  • Event sourcing aggregate’lerin state değişikliğinde event publish etmesini şart koştuğu için, event-driven microservice architecture için bir temel oluşturur, ayrıca bu yaklaşım sistemin güvenilir audit-log’lar tutmasını sağlar. Tutulan event’ler, istatistikler sistemi monitörize etmede, sistemin analitiğini yapmada ve başka amaçlarda bize çok faydalar sağlar

Preserves The History Of Aggregates

  • Event sourcing uygulandığında her aggregate’in history’si tutulur ve geçici sorgularla istediğimiz aggregate’in geçmişteki durumunu getirebiliriz kolaylıkla mesela sistemimizdeki bir customer’ın geçmişteki bir andaki kullanılabilir kredi limiti neydi bu ana, customer’ın o an daki kullanılabilir limitinin ne olduğunu görmeye geçmişe kolaylıkla gidebiliriz.

Mostly Avoids The O/R Impedance Mismatch Problem

  • Yazımızın başlarında Object-Relational mismatch problemini açıklamıştık, bu problemin önüne geçmek için periyodik olarak aggregate’lerimizin snapshot’larını alalım, bunu da basit aggregate’ler için aggregate’leri serialize edelim, complex aggregate’ler için ise memento pattern kullanalım demiştik, böylece yazılım geliştiriciler daha object mindset(obje kafasıyla) düşünüp sistemi tasarlar ve sistemin karmaşıklığı az olur, daha DDD olur, daha SOLID olur, daha iyi olur :)

Provides Developers With a Time Machine

  • Event sourcing yaklaşımıyla sistemin her anının kaydı tutuluyor, yani sistemin geçmişteki bir haline aynı video kaydını geriye sarar gibi sarabiliyoruz, yani elimizde bir zaman makinesi var :) şimdi şunu hayal etmenizi istiyorum, trendyol’da çalışıyorsunuz ve müşterilerin sepetine ürün ekleyip daha sonra o ürünü sepetinden kaldırmasıyla ilgili bir ister geldi ve sizde bu gereksinimler için yeni kodlar yazdınız veya kodunuzu düzenlediniz, eğer elinizde event sourcing gibi bir zaman makinesi yoksa uygulamanızın yeni versiyonu için farklı testlerinizi yapacaksınız ama hiçbir zaman bu testler size kodunuzun sağlıklı çalıştığını garanti etmicek çünkü canlı müşteriler kullanmadı henüz, daha sonra deploy ediceksiniz ve canlı ortamda kodunuzu test ediceksiniz bazen çalışıcak bazen sorun çıkıcak ve bugfix/hotfix yapıcaksınız ama eğer elinizde event sourcing gibi bir zaman makinesi varsa sisteminizi geçmişteki bir tarihe alır ve ilgili işlemlerin canlı kullanıcıların geçmiş davranışlarıyla sisteminizden geçmesini sağlayabilir ve sorun yoksa ondan sonra canlı ortama uygulamanızı deploy edebilirsiniz :)

Evet bu kadar övgü yeter biraz da gerçekleri konuşalım :)

Drawbacks of event sourcing

  • Event sourcing’in bazı dezavantajları şunlardır
  • İt has a different programming model that has a learning curve
  • It has the complexity of a messaging-based application
  • Evolving events can be tricky
  • Deleting data is tricky
  • Querying the event store is challenging
  • Bunlara detaylıca bakalım :)

Different Programming Model That Has a Learning Curve

  • Event sourcing yaklaşımı alışık olunmadık bir programlama modeli olduğundan öğrenmesi biraz zorlayabilir ayrıca mevcutta olan bir uygulamanın event sourcing kullanır hale dönüşmesi için business logic tekrar yazılmalıdır. Eğer uygulamanız monolitik bir uygulama ise microservislere geçiş yapılıyorsa yeni microservislerinizi event sourcing modeline uygun şekilde yazmak bu dezavantaj maddesinin etkisini biraz azaltacaktır :)

Complexity Of a Messaging Based Application

  • Event handler’lar idempotent olmalıdır bunu da eventId bakarak yaparlar çünkü message brokerlar duplicate olmuş event publish etmemeyi garanti etmez, bu durumda complexity artırır

Evolving Events Can Be Tricky

  • Eventler sonsuza kadar saklanmalıdır ve aggregate versionları her geçen gün artmaktadır, aggregate’lerin kendinin geçmiş versiyonuna ait olan eventleri de uygulaması için yapılan şeyler aggregate’leri şişirmektedir, yani backward compatiblity sağlamak için tricky(zorlu) yöntemler uygulanır bu da event sourcing dezavantajıdır

Deleting Data Is Tricky

  • Event sourcing kullanmanın amacı aggregate’lerin history’sini tutmak olduğundan event’ler sonsuza kadar saklanır, event silme işlemi soft delete olarak yapılır yani ilgili eventin deleted kolonu true değerine setlenir ve ilgili kayıt silinmiş olur gibi. General Data Protection Regulation (GDPR) bireylerin bilgilerini istediği zaman bilgilerini sistemden silme özgürlüklerinin olmasını şart koşar yani bir uygulama kullanıcının e-posta adresi gibi kişisel bilgilerini unutma yeteneğine sahip olmalıdır, event sourcing’in dezavantajı e-posta gibi bir kullanıcı bilgisi AccountCreated event de tutulabilir, uygulama bir şekilde bu event kaydını silmeden kullanıcıyı unutmalıdır.

Querying The Event Store Is Challenging

  • Mesela kredi limiti bitmiş customer’ların listesini getirmek istiyoruz, bunun için “select * from customer where credit_limit=0” gibi bir sorgu atamayız iç içe geçmiş karışık select sorguları atmamız gerekecektir, bu durum baya verimsizdir, bunun için CQRS gibi yaklaşımlarla bu dezavantaj hafifletilmektedir

Implementing an event store

  • Event sourcing kullanan bir uygulama event’leri event store da depolar
  • Event store veri tabanı ile message broker karışımı bir şeydir aslında
  • Bir veritabanı gibidir event store çünkü bir aggregate’in event’lerini primary key ile birlikte insert ve get yapmak için bir api sağlar
  • Event’lere subscribe olduğu için bir message broker gibi de diyebiliriz
  • Event store implementasyonunun birkaç farklı yolu vardır;
  • Kendi event store ve event sourcing frameworkünüzü yapmak, mesela eventleri bir RDBMS veritabanında tutarak, ilgili eventlere subscribe olup, belirli end-pointler sağlayan bir event store yapılabilir
  • Diğer bir seçenek ise hazır event store mekanizmaları kullanmak, bunlardan bazıları şunlardır;
  • Event Store: .NET ekosistemi için Greg Young tarafından oluşturulmuş açık kaynaklı bir projedir
  • Axon: event sourcing ve CQRS kullanarak event-driven uygulamalar geliştirmek için oluşturulan açık kaynaklı bir java frameworkdür
  • Eventuate: Bu framewokün cloud service olan haline Eventuate SaaS, açık kaynak olarak Apache Kafka ve RDBMS tabanlı olan haline ise Eventuate Local denilmekte
  • Bu frameworkler detaylarda farklılık gösterse de temel kavramları aynıdır

How the Local Event Store Works

  • Hazır bir event store framework olan eventuate framework’ün nasıl çalıştığını inceleyelim, diğer event store framework’leri Axon ve Event Store da benzer yöntemleri uyguluyor
  • Event’ler Mysql(RDBMS) veritabanında tutulmakta
  • Event store uygulamamız belirtilen aggregate Id bilgisiyle beraber ilgili aggregate’e ait event’leri retrieve etme veya ilgili aggregate’e ait yeni event kaydını mysql veritabanına insert etme özelliklerine sahip
  • Yazımızın önceki kısımlarında bahsettiğimiz “transaction log tailing” yapısı kullanılarak events tablosundan alınan event’ler message broker’lara publish edilir
  • Event store uygulamamız consume ettiği eventleri kendi mysql veri tabanına kaydeder

The Schema Of Local’s Event Database

Event store uygulamamızın veritabanı 3 tablodan oluşur

  • Events: trigggering_event ilgili eventi tetikleyen kişinin kim olduğu bilgisini tutar, bu sütun neden var ? Mesela orderId değeri 5 olan 2 tane event var biz orderId bilgisine göre idempotency sağlamaya çalışırsak, bu 2 eventten birini ignore ederiz ama bu 2 eventi tetikleyen yani publish eden kısım farklıysa ? Bu durumda iki eventide işlememiz gerekir bu yüzden triggering_event kolonu tutarız, event_data sütunu ise eventlerin json halini tutar
  • create table events (
    event_id varchar(1000) PRIMARY KEY,
    event_type varchar(1000),
    event_data varchar(1000) NOT NULL,
    entity_type VARCHAR(1000) NOT NULL,
    entity_id VARCHAR(1000) NOT NULL,
    triggering_event VARCHAR(1000)
    );
  • Entities: bu tablo optimistic lock yapısını kullanabilmek için yaratılılr. Her entity create edildiğinde bu tabloya yeni kayıt atılır ve ilgili entity güncellendiğinde versiyonu arttırılır
  • create table entities (
    entity_type VARCHAR(1000),
    entity_id VARCHAR(1000),
    entity_version VARCHAR(1000) NOT NULL,
    PRIMARY KEY(entity_type, entity_id)
    );
  • Snapshots: entity_type ve entity_id alanları ilgili snapshot kaydının hangi entity için olduğunu gösterir
  • create table snapshots (
    entity_type VARCHAR(1000),
    entity_id VARCHAR(1000),
    entity_version VARCHAR(1000),
    snapshot_type VARCHAR(1000) NOT NULL,
    snapshot_json VARCHAR(1000) NOT NULL,
    triggering_events VARCHAR(1000),
    PRIMARY KEY(entity_type, entity_id, entity_version)
    )

Event store find, create ve update operasyonlarını destekler

Find:

  • snapshots tablosundaki son kaydı getirir, eğer snapshot varsa entity_id ve entity_version bilgilerini kullanarak events tablosunu sorgular, ilgili entity_id sahip eventlerden entity_version bilgiside snapshot kaydımızdakinden büyük olanları çeker, çünkü snapshot alındıktan sonra oluşan eventler lazım bize
  • Snapshot sorgusundan kayıt dönmezse find sorgusunda belirtilen entity ait tüm eventler döndürülür
  • entity tablosuna ilgili entity’nin son versiyonunu öğrenmek için de bakılabilir

Create:

  • Entity tablosuna yeni kayıt atılır ve ilgili entity ait eventler events tablosuna kaydedilir

Update:

  • Gelen eventler events tablosuna kaydedilir
  • Ayrıca her update işleminde entities tablosundaki ilgili entity’nin version bilgisi artırılarak OPTIMISTIC LOCK mekanizması da kurulmuş olur
  • Her update operasyonunda entity_version bilgiside gelir requestte ve kıyaslama yapılır eğer version bilgisi değişmişse update işlemi gerçekleşmez ve hata mesajı dönülür

Optimistic lock kısmını biraz daha açıklamak istiyorum bazı eventler geldi mesela ve events tablosuna kaydedilecek, kaydedilmeden önce entity tablosuna bakılır kaydedilecek eventlere ait entity kaydı var mı, varsa version bilgisine bakılır ve eğer requestten gelen version’dan farklıysa hata atılır, farklı değilse ilgili eventler events tablosuna eklenir ve entity version 1 arttırılır

Consuming Events By Subscribing To Local’s Event Broker

  • Event store kullanan uygulamalar, belli topiclere subscribe olur
  • Her aggregate type için ayrı topic oluşturulur

Local Event Relay Propagates Events From The Database To The Message Broker

  • Event store da “event relay” denilen bir mekanizma vardır, bu mekanizma events tablosuna kaydedilen eventleri event store’a subscribe olan uygulamalara publish eder
  • Mesela event relay mekanizması mysql de yapılan güncellemelerin bir kaydı olan binlog kayıtlarını okur
  • Events tablosuna yapılan insert işlemlerinde ilgili topic’lere eventleri publish eder ve diğer mysql veritabanına yapılan işlemleri görmezden gelir
  • Özetle event store uygulaması event database, message broker ve event relay dan oluşur

Client Framework For Java

Client framework aggregate, event ve command class’lar için base class’lar sağlar

ReflectiveMutableCommandProcessingAggregate

  • Aggreagte’ler için base class’dır
  • Concrete aggregate class’lar implement ettiği gibi aggregate class’ların command class’ları da implement eder
  • Command ve eventleri uygun methoda göndermek için reflection kullanır
  • Command’lar process methoduna parametre geçilir
  • Event’ler apply methoduna parametre geçilir
  • Example
public class Order extends ReflectiveMutableCommandProcessingAggregate<Order, OrderCommand> {
public List<Event> process(CreateOrderCommand command) { … }
public void apply(OrderCreatedEvent event) { … }

}

Command

  • Aggregate’in commandları aggregate spesifik bir base interface’i implement etmelidir ve bu base interface de Command interface’ini implement etmelidir
  • Mesela CreateOrder -> OrderCommand -> Command

Event

  • Aggregate’in eventleri aggregate spesifik bir base interface’i implement etmelidir ve bu base interface de herhangi bir method barındırmayan Event marker interface’ini implement etmelidir
  • Mesela OrderCreated -> OrderEvent -> Event

Creating, Finding And Updating Aggregates With The AggregateRepository Class

Aggregate repository tüm aggregate’lerin repository işlemleri için generic bir yapı sunar, bu generic classımız generic parametre olarak aggregate class ve ilgili aggregate class’ın base command class’ını parametre olarak alır

Bu generic class 3 method sağlar save, find, update, bunlar aggregate kaydetme, aggregate bulma ve aggregate güncelleme operasyonlarını yapar

Save ve update operasyonları özellikle aggregate’leri oluşturmak için gerekli olan standart kodu kapsülledikleri için çok kullanışlıdır

Save()

  • Bir command nesnesi parametre olarak alır ve şunları yapar

İlgili aggregate’i default constructor ile init eder

Komutu işlemek için process() methodunu çalıştırır

Process() methodundan gelen eventler apply() methoduna parametre olarak geçilir

Oluşturulan event’ler event store’a kaydedilir

Update()

  • Aggregate id değeri ve ilgil aggregate ait command nesnesi parametre olarak alır ve şunları yapar

Event store’dan aggregate’i retrieve eder

Process() methodunu çalıştırır

Process() methodu ile oluşturulan event’ler apply() methoduna geçilir

Oluşturulan event’ler event store’a kaydedilir

  • Order Service AggregateRepository nasıl kullanıyor bakalım
public class OrderService {

private AggregateRepository<Order, OrderCommand> orderRepository;

public OrderService(AggregateRepository<Order, OrderCommand> orderRepository){
this.orderRepository = orderRepository;
}

public EntityWithIdAndVersion<Order> createOrder(OrderDetails orderDetails) {
return orderRepository.save(new CreateOrder(orderDetails));
}
}

Using sagas and event sourcing together

  • Event sourcing, choreography-based sagas kullanmayı kolaylaştırır
  • Saga kümesi içerisindeki her step aggregate’ler tarafından publish edilen eventleri handle eder, bu şekilde choreography-based saga uygulanır
  • Ancak event sourcing tabanlı business logic de orchestration-based saga uygulamak zordur bunun sebebi event store’un transaction konseptinin sınırlı olmasıdır, bu konuyu biraz daha açarsak event store sadece 1 aggregate için create/update işlemi yapıp ortaya çıkan event’leri publish edebilir

Saga Creation

  • Mesela bir order service’deki createOrder() methodu; Order aggregate ve CreateOrderSaga oluşturmalıdır

Saga Orchestration

  • Eventleri consume etmeli ve aggregate state’ini update edip command message publish etmelidir

Saga Participants

  • Saga participants örnek olarak Order Service ve Kitchen Service verilebilir, publish edilen eventleri consume etmeli, duplicate eventleri ignore etmeli, aggregate’ler için create/update işlemi yapmalı ve event publish etmelidir
  • Eğer event store RDBMS türünde bir database kullanıyorsa, orchestraction base saga da ilgili aggregate’i ACID şekilde create/update edebilir ancak NoSql bir veritabanı kullanıyorsa farklı yöntemlere başvurmak gerekir bu yüzden ACID işlemler yapmak için event store’da RDBMS database tercih etmek daha iyidir diyebiliriz

Implementing choreography-based sagas using event sourcing

  • Bir aggregate’in state’i değiştiğinde bir event publish eder
  • Başka bir aggregate bu eventi handle edebilir ve kendi state’ini güncelleyebilir
  • Tüm event handler’ların idempotent olmasına dikkat edilmelidir
  • choreography-based saga uygulamak event sourcing kullanılan, event centric ortamda çok kolaydır ama choreography-based saga’nın da event source özgü bazı dezavantajları vardır bu sebeple orchestration base saga önerilir

Creating an orchestration-based saga

  • Saga orchestrator’leri service methodları ile generate edilir, mesela OrderService.createOrder()
  • Service methodları ilgili aggregate için create veya update işlemi yapmalı ve bir saga orchestrator create etmelidir
  • Create veya update işleminden saga orchestrator create ediceğini garanti etmelidir

CREATING A SAGA ORCHESTRATOR WHEN USING AN RDBMS-BASED EVENT STORE

  • Eğer event store RDBMS türünde bir database kullanıyorsa service methodumuzda tek bir ACID transaction içerisinde hem event store update işlemi yapabilir hem de orchestration saga create edebiliriz, mesela şöyle methodlar yazmalıyız service class’larında
class OrderService
@Autowired
private SagaManager<CreateOrderSagaState> createOrderSagaManager;

❶@Transactional
public EntityWithIdAndVersion<Order> createOrder(OrderDetails orderDetails) {
EntityWithIdAndVersion<Order> order = orderRepository.save(new CreateOrder(orderDetails));
❷CreateOrderSagaState data = new CreateOrderSagaState(order.getId(), orderDetails);
❸createOrderSagaManager.create(data, Order.class, order.getId());

return order;
}

@Transactional anotasyonu ile createOrder() işleminin tek bir veritabanı ACID transaction içerisinde gerçekleştiğinden emin oluruz

Order agregate create edilir ve CreateOrderSaga create edilir

CREATING A SAGA ORCHESTRATOR WHEN USING A NOSQL-BASED EVENT STORE

  • Event store RDBMS türünde bir database kullandığında order created event persist işlemiyle CreateOrderSaga aynı transaction scope içerisinde yapılıyordu ama event store nosql database kullanırsa bunu yapamayız
  • Şöyle olur; order created event persist ettikten sonra event store OrderCreated event publish eder ve bu eventi handle ettiğimiz kısımda CreateOrderSaga create edilir
  • Ayrıca events tablosuna yapılan her persist işleminin event_id değeri farklı olduğu için çünkü PK, event_id değerini saga id olarak kullanabiliriz
  • Aslında bu yöntemi event store’umuz RDBMS türü bir database kullandığında da yapabiliriz, bu yöntem yukarıdaki yönteme göre daha loosely coupled bir yöntemdir

Implementing an event sourcing-based saga participant

Bu kısımda saga flowundaki participant’lara event sourcingi nasıl implemente ediceğimizi inceliyeceğiz

Event store’u communication için de kullansak ne olur ? Onu anlamaya çalışalım

Bunu yapabilmek için bazı sorunlara çözüm getirmek gerekir;

  • İdempotent command message handling
  • Atomatically sending a reply message

Idempotent Command Message Handling

  • Consumer gelen mesajların messageId bilgisini bir veritabanı tablosunda tutup, gelen her mesajın messageId bilgisini bu tabloda aratıp daha önce işlenmiş mi bu mesaj diye kontrol edebilir bu sayede aynı mesaj tekrar işlenmez

Atomically Sending Reply Message

  • Event sourcing felsefesinde event dediğimiz şeyin aggregate state’inde bir değişikliği yansıttığını söylemiştik ama state değişikliği olmayan durumlar olabilir ama saga participant’larının eventlerle birbirleriyle haberleşmesi lazım, mesela craftgate sistemine bir ödeme isteği gerçekleştirdik diyelim ve bu işlem sonucunda bu işlemi gerçekleştiren participant herhangi bir aggregate state değişikliğine ihtiyaç duymadı ama bir sonraki participantı tetiklemesi gerekiyor bu durumda içi boş bir event olan SagaReplyRequested event publish edebilir bir sonraki participant’ı tetiklemek için

Example Event Sourcing-Based Saga Participant

  1. Create Order Saga, bir mesajlaşma kanalı aracılığıyla AccountingService’e bir AuthorizeAccount command gönderir. SagaCommandDispatcher komut mesajını işlemek için AccountingServiceCommandHandler’ı çağırır.
  2. AccountingServiceCommandHandler, command’ı belirtilen Account aggregate’e gönderir.
  3. Account aggregate, AccountAuthorized ve SagaReplyRequestedEvent olmak üzere iki event publish eder.
  4. SagaReplyRequestedEventHandler, CreateOrderSaga’ya bir yanıt mesajı göndererek SagaReplyRequestedEvent’i işler.

AccountingServiceCommandHandler aşşağıda görüldüğü gibi Account aggregate’ini update eder

public class AccountingServiceCommandHandler {
@Autowired
private AggregateRepository<Account, AccountCommand> accountRepository;

public void authorize(CommandMessage<AuthorizeCommand> cm) {
AuthorizeCommand command = cm.getCommand();
accountRepository.update(command.getOrderId(),command,
replyingTo(cm)
.catching(AccountDisabledException.class,
() -> withFailure(new AccountDisabledReply()))
.build());
}
  • AccountingServiceCommandHandler, AccountAggregate’i güncellemek için AggregateRepository.update() methodunu kullanarak AuthorizeAccount command mesajını işler.
  1. Mesajın tam olarak bir kez işlenmesini sağlamak için(idempotency) mesaj id bilgisini bir güvenlik anahtarı olarak kullanın.
  2. Event store’a kaydedilen event listesine bir SagaReplyRequestedEvent sözde event ekleyin. SagaReplyRequestedEventHandler, SagaReplyRequestedEvent sözde eventini aldığında, CreateOrderSaga’nın reponse channel’ına bir response gönderir.
  3. Aggregate bir AccountDisabledException oluşturduğunda, default exception response yerine bir AccountDisabledReply göndeririz

Implementing Saga Orchestrators Using Event Sourcing

Şimdiye kadar event-sourcinge dayalı microservislerin nasıl saga başlatabileceğini ve başlatılmış saga’lara nasıl katılacağını anlattık ama Saga orkestratörlerini implement etmek için event store’da kullanabiliriz bu şekilde tamamen event store mantığına dayalı uygulamalar geliştirmiş oluruz.

Saga orchestration pattern uygularken çözmemiz gereken 3 temel şey vardır;

  • How can you persist a saga orchestrator ?
  • How can you atomically change the state of the orchestrator and send command message ?
  • How can you ensure that a saga orchestrator processes reply messages exactly once ?

PERSISTING A SAGA ORCHESTRATOR USING EVENT SOURCING

  • SagaOrchestratorCreated ve SagaOrchestratorUpdated eventleri publish edilerek bu yapı sağlanır, bu eventler saga orchestrator’lerini yeniden yaratabilmek için gerekli verileri içerir

SENDING COMMAND MESSAGES RELIABLY

  • Komutları nasıl düzgün bir şekilde gönderebiliriz ? Event store içerisinde zaten transactional scope içerisinde olduğu için, RDBMS türünde db varsa birde, burda pek de fazla problem çıkarmıyor
  • Saga orchestrator’daki stepler event store’daki mesajları sırayla çalıştırmaktan ibaret, saga’nın mantığında şu vardır, saga’yı create edersin bu saga’nın içerisinde 1., 2., 3. stepler vardır sonra saga’yı bitiririz, bu ama kod seviyesinde bir şey, peki bunu event store’a taşıdığımızda ne olur ? Event store da artık bu stepler event’ler üzerinde ilerler yani yukarıdaki görselde gözüktüğü gibi SagaCommandEvent’in her biri saga orchestrator’deki bir stepe karşılık geliyor, event store SagaCommandEvent içerisindeki değerleri anlayıp ilgili microservisi çalıştırıyor daha önceden nasıl yapıyorduk her step için source code da 3–5 satır kod oluyordu

PROCESSING REPLIES EXACTLY ONCE

  • Aynı event 2 defa yollanırsa birini işlememek gerekir, yani consumerın idempotent olması gerekir bunun için message id bilgisini kullanarak her consume edilen eventin daha önce işlenip işlenmediğine bakılır

Event sourcing öğrenmek zordur ama öğrendikten sonra microservislerdeki iletişim patternlerindeki zorluklardan tamamen kurtulmamızı sağlar, o yüzden bunu öğrenmeye uygulamaya uygulatmaya çalışmalıyız

Summary

  • Her event, aggregate’in oluşturulmasını veya bir state değişikliğini temsil eder. Bir uygulama, eventleri yeniden load ederek bir aggregate’in state’ini yeniden oluşturur. Event store, bir domain object’in history’sini korur, düzgün bir log yapısı sağlar ve domain event’lerini güvenilir bir şekilde publish eder.
  • Snapshot’lar, yeniden load edilmesi gereken eventlerin sayısını azaltarak performansı artırır.
  • Eventler, bir veri tabanının ve bir message broker’ın karışımı olan bir event store’da saklanır. Bir microservice, bir eventi event store’a kaydettiğinde, event store eventi kendine subscribe olan microservice’lere iletir.
  • Event store kullanmanın zorluklarından biri, eventlerin değişimlerini handle edebilmektir. Bir uygulama, eventleri yeniden load ederken potansiyel olarak birden çok event versionunu işlemelidir. Bunun için iyi bir çözüm olarak, eventler event store’a yüklendiğinde ilgili eventin versionunu en son sürüme yükselten bir yapı kullanmaktır(veri tabanı şemalarında kullanılan flyway gibi bir şey)
  • Bir event store uygulamasındaki verileri silmek zordur. Bir uygulamanın, bir kişinin verilerini silmesini gerektiren Avrupa Birliği GDPR’si gibi düzenlemelere uymak için, bir uygulamanın encryption ve pseudonymization gibi teknikler kullanması gerekir.
  • Event sourcing de choreography-based sagas implement etmek çok kolaydır.
  • Event sourcing, saga orchestrators uygulamak için iyi bir yoldur. Sonuç olarak, yalnızca bir event store kullanan uygulamalar yazabiliriz.

Kaynak: https://microservices.io/book

--

--