Tips: This article has been added Reading contents of series articles , you can click to view more related articles.
preface
Previous[ . Net Core microservice entry record (V) -- Ocelot API gateway (Part 2) ]The establishment of Ocelot + consult has been completed in. In this article, we will briefly talk about EventBus.
EventBus event bus
- First, what is an event bus?
Post a quote:
The event bus is an implementation of the observer (publish subscribe) pattern. It is a centralized event processing mechanism, which allows different components to communicate with each other without mutual dependence, so as to achieve the purpose of decoupling.
If you haven't contacted EventBus, you may not understand it very well. In fact, EventBus is widely used in client-side development (android, ios, web front-end, etc.), which is used for mutual communication between multiple components (or interfaces). Anyone who knows it knows it...
- So why do we use EventBus?
Take the current project for example. We have an order service and a product service. The client has an order placing function. When the user places an order, it calls the order placing interface of the order service. Then the order placing interface needs to call the inventory reduction interface of the product service, which involves calls between services. So how to call between services? Direct RESTAPI? Or more efficient gRPC? They may have their own usage scenarios, but they both have a coupling problem between services, or it is difficult to make asynchronous calls.
Imagine this: suppose we call the order service when placing an order, the order service needs to call the product service, the product service needs to call the logistics service, and the logistics service calls the xx service, and so on... If the processing time of each service needs 2s and asynchronous is not used, then this kind of experience can be imagined.
If you use EventBus, the order service only needs to send an "order event" to EventBus. The product service will subscribe to the "order event". When the product service receives the order event, it is good to reduce the inventory by itself. This avoids the coupling of direct invocation between the two services, and truly achieves asynchronous invocation.
Since asynchronous calls between multiple services are involved, distributed transactions have to be mentioned. Distributed transaction is not a unique problem of microservices, but a problem that exists in all distributed systems.
For distributed transactions, you can check the "CAP principle" and "BASE theory" to learn more. Today's distributed systems tend to pursue the ultimate consistency of transactions.
The following uses the excellent project "CAP" developed by Chinese people to demonstrate the basic use of EventBus. The reason why "CAP" is used is that it can not only solve the final consistency of distributed systems, but also is an EventBus. It has all the functions of EventBus!
Author introduction: https://www.cnblogs.com/savorboard/p/cap.html
CAP usage
- Environmental preparation
Prepare the required environment in Docker. The first is the database. I use PostgreSQL for the database, and I can use anything else. CAP support: SqlServer, MySql, PostgreSQL, MongoDB.
About running PostgreSQL in Docker, you can see my other blog: https://www.cnblogs.com/xhznl/p/13155054.html
Then MQ. Here I use RabbitMQ, and Kafka can also be used.
Docker runs RabbitMQ:
docker pull rabbitmq:management docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:management
Default user: guest, password: Guest
The environment is ready. Docker is so convenient...
- Code modification:
In order to simulate the above businesses, a lot of code needs to be modified. If there is any omission in the following code, go to github directly.
NuGet installation:
Microsoft.EntityFrameworkCore Microsoft.EntityFrameworkCore.Tools Npgsql.EntityFrameworkCore.PostgreSQL
CAP related:
DotNetCore.CAP DotNetCore.CAP.RabbitMQ DotNetCore.CAP.PostgreSql
Order.API/Controllers/OrdersController.cs add an order interface:
[Route("[controller]")] [ApiController] public class OrdersController : ControllerBase { private readonly ILogger<OrdersController> _logger; private readonly IConfiguration _configuration; private readonly ICapPublisher _capBus; private readonly OrderContext _context; public OrdersController(ILogger<OrdersController> logger, IConfiguration configuration, ICapPublisher capPublisher, OrderContext context) { _logger = logger; _configuration = configuration; _capBus = capPublisher; _context = context; } [HttpGet] public IActionResult Get() { string result = $"[Order service]{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}-" + $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}"; return Ok(result); } /// <summary> ///Order publishing order event /// </summary> /// <param name="order"></param> /// <returns></returns> [Route("Create")] [HttpPost] public async Task<IActionResult> CreateOrder(Models.Order order) { using (var trans = _context.Database.BeginTransaction(_capBus, autoCommit: true)) { //Business code order.CreateTime = DateTime.Now; _context.Orders.Add(order); var r = await _context.SaveChangesAsync() > 0; if (r) { //Issue order event await _capBus.PublishAsync("order.services.createorder", new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID }); return Ok(); } return BadRequest(); } } }
Order.API/MessageDto/CreateOrderMessageDto.cs:
/// <summary> ///Order event message /// </summary> public class CreateOrderMessageDto { /// <summary> ///Product ID /// </summary> public int ProductID { get; set; } /// <summary> ///Purchase quantity /// </summary> public int Count { get; set; } }
Order.API/Models/Order.cs Order entity class:
public class Order { [Key] [DatabaseGenerated(DatabaseGeneratedOption.Identity)] public int ID { get; set; } /// <summary> ///Order time /// </summary> [Required] public DateTime CreateTime { get; set; } /// <summary> ///Product ID /// </summary> [Required] public int ProductID { get; set; } /// <summary> ///Purchase quantity /// </summary> [Required] public int Count { get; set; } }
Order.API/Models/OrderContext.cs database Context:
public class OrderContext : DbContext { public OrderContext(DbContextOptions<OrderContext> options) : base(options) { } public DbSet<Order> Orders { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { } }
Order.API/appsettings.json add database connection string:
"ConnectionStrings": { "OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;" }
Order.API/Startup.cs modify the ConfigureServices method and add the Cap configuration:
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext"))); //CAP services.AddCap(x => { x.UseEntityFramework<OrderContext>(); x.UseRabbitMQ("host.docker.internal"); }); }
The above is the modification of order service.
Product. API/Controllers/ProductsController. Csadd inventory reduction interface:
[Route("[controller]")] [ApiController] public class ProductsController : ControllerBase { private readonly ILogger<ProductsController> _logger; private readonly IConfiguration _configuration; private readonly ICapPublisher _capBus; private readonly ProductContext _context; public ProductsController(ILogger<ProductsController> logger, IConfiguration configuration, ICapPublisher capPublisher, ProductContext context) { _logger = logger; _configuration = configuration; _capBus = capPublisher; _context = context; } [HttpGet] public IActionResult Get() { string result = $"[Product service]{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}-" + $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}"; return Ok(result); } /// <summary> ///Inventory reduction subscription order event /// </summary> /// <param name="message"></param> /// <returns></returns> [NonAction] [CapSubscribe("order.services.createorder")] public async Task ReduceStock(CreateOrderMessageDto message) { //Business code var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID); product.Stock -= message.Count; await _context.SaveChangesAsync(); } }
Product.API/MessageDto/CreateOrderMessageDto.cs:
/// <summary> ///Order event message /// </summary> public class CreateOrderMessageDto { /// <summary> ///Product ID /// </summary> public int ProductID { get; set; } /// <summary> ///Purchase quantity /// </summary> public int Count { get; set; } }
Product.API/Models/Product.cs product entity class:
public class Product { [Key] [DatabaseGenerated(DatabaseGeneratedOption.Identity)] public int ID { get; set; } /// <summary> ///Product name /// </summary> [Required] [Column(TypeName = "VARCHAR(16)")] public string Name { get; set; } /// <summary> ///Inventory /// </summary> [Required] public int Stock { get; set; } }
Product.API/Models/ProductContext.cs database Context:
public class ProductContext : DbContext { public ProductContext(DbContextOptions<ProductContext> options) : base(options) { } public DbSet<Product> Products { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { base.OnModelCreating(modelBuilder); //Initialize seed data modelBuilder.Entity<Product>().HasData(new Product { ID = 1, Name = "Product 1", Stock = 100 }, new Product { ID = 2, Name = "Product 2", Stock = 100 }); } }
Product.API/appsettings.json add database connection string:
"ConnectionStrings": { "ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;" }
Product.API/Startup.cs modify the ConfigureServices method and add the Cap configuration:
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext"))); //CAP services.AddCap(x => { x.UseEntityFramework<ProductContext>(); x.UseRabbitMQ("host.docker.internal"); }); }
The above is the modification of product service.
This completes the modification of order service and product service. It seems that there are many modifications. In fact, the function is very simple. They add their own database tables, and then the order service adds an order interface. The order interface will send an "order event". The product service adds an inventory reduction interface, which will subscribe to "order event". Then, when the client calls the order interface to place an order, the product service will subtract the corresponding inventory. The function is so simple.
Basic uses such as EF database migration will not be introduced. Use Docker to rebuild the image, run the order service, product service:
docker build -t orderapi:1.1 -f ./Order.API/Dockerfile . docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060" docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061" docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062" docker build -t productapi:1.1 -f ./Product.API/Dockerfile . docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050" docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051" docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"
Finally ocelot APIGateway/ocelot. JSON adds a route configuration:
OK, so far, the whole environment is a little complicated. Ensure that our PostgreSQL, RabbitMQ, consult, Gateway and service instances are running normally.
After the service instance runs successfully, the database should look like this:
Product table seed data:
cap.published table and CAP The received table is automatically generated by the CAP. Internally, it uses the local message table +MQ to achieve asynchronous assurance.
Run test
This time, Postman is used as the client to call the ordering interface (9070 is the previous Ocelot gateway port):
Order library published table:
Order table of order Library:
Product library received table:
Product library product table:
Try again:
OK, done. Although the function is very simple, we have realized the decoupling of services, asynchronous invocation, and final consistency.
summary
Note that the above example is purely to illustrate the use of EventBus, and the actual order placing process will never do this! I hope you are not too serious...
Some people may say that if the order is placed successfully, but inventory reduction fails due to insufficient inventory, do you want to rollback the data in the order table? If such an idea arises, it means that the idea of final consistency has not been really understood. First of all, the inventory quantity must be checked before placing an order. Since the order is allowed, the inventory must be sufficient. The transaction here refers to saving the order to the database and the order event to the CAP The published table (saved to the cap.published table can theoretically be sent to MQ) either succeeds or fails together. If the transaction is successful, the business process can be considered successful. Whether the inventory reduction of the product service is successful is a matter of the product service (in theory, it should be successful, because the message has been sent to MQ, and the product service will inevitably receive the message). The CAP also provides a failure retry and failure callback mechanism.
If it is necessary to roll back data, it can be realized. The CAP's icappublisher The publish method provides a callbackName parameter. This callback can be triggered when inventory is reduced. Its essence is to publish and subscribe. This is not recommended. I won't go into detail. I'm interested in studying it myself.
In addition, CAP cannot guarantee that messages are not repeated. In actual use, you need to consider the repetition filtering and idempotency of messages.
There are a lot of contents in this article. I don't know whether it has been clearly expressed. If there are problems, please comment and exchange. If there are any mistakes, please point out.
The next article plans to write about the contents related to authorization and certification.
The code is placed in: https://github.com/xiajingren/NetCoreMicroserviceDemo
To be continued