RabbitMQ.EventBus.AspNetCore
是一个基于官方RabbitMQ.Client
的二次封装包,专门针对Net Core
项目进行开发,在微服务
中进行消息的传递使用起来比较方便。
特性
- 断线重连机制
- 可扩展
- 消费失败自动打回
使用说明
1. 注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public void ConfigureServices(IServiceCollection services) { string assemblyName = typeof(Startup).GetTypeInfo().Assembly.GetName().Name; services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); services.AddRabbitMQEventBus(()=>"amqp://guest:guest@192.168.0.252:5672/", eventBusOptionAction: eventBusOption => { eventBusOption.ClientProvidedAssembly(assemblyName); eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30)); eventBusOption.RetryOnFailure(TimeSpan.FromMilliseconds(100)); eventBusOption.AddLogging(LogLevel.Warning); }); services.AddButterfly(butterfly => { butterfly.CollectorUrl = "http://192.168.0.252:6401"; butterfly.Service = "RabbitMQEventBusTest"; }); }
|
2. 订阅消息
2.1 自动订阅消息
1 2 3 4 5 6 7 8 9
| public void Configure(IApplicationBuilder app, IHostingEnvironment env, IServiceTracer tracer) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.RabbitMQEventBusAutoSubscribe(); app.UseMvc(); }
|
2.2 手动订阅消息
1 2 3 4 5 6 7 8 9
| public void Configure(IApplicationBuilder app, IHostingEnvironment env, IRabbitMQEventBus eventBus) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } eventBus.Serialize<EventMessage, EventMessageHandler>(); app.UseMvc(); }
|
3. 发消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| [Route("api/[controller]")] [ApiController] public class EventBusController : ControllerBase { private readonly IRabbitMQEventBus _eventBus;
public EventBusController(IRabbitMQEventBus eventBus) { _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus)); }
[HttpGet] public IActionResult Send() { _eventBus.Publish(new { Body = "发送消息", Time = DateTimeOffset.Now }, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test"); return Ok(); } }
|
4. 订阅消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| [EventBus(Exchange = "RabbitMQ.EventBus.Simple", RoutingKey = "rabbitmq.eventbus.test")] [EventBus(Exchange = "RabbitMQ.EventBus.Simple", RoutingKey = "rabbitmq.eventbus.test1")] [EventBus(Exchange = "RabbitMQ.EventBus.Simple", RoutingKey = "rabbitmq.eventbus.test2")] public class MessageBody : IEvent { public string Body { get; set; } public DateTimeOffset Time { get; set; } } public class MessageBodyHandle : IEventHandler<MessageBody>, IDisposable { private readonly ILogger<MessageBodyHandle> _logger;
public MessageBodyHandle(ILogger<MessageBodyHandle> logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); }
public void Dispose() { Console.WriteLine("释放"); }
public Task Handle(EventHandlerArgs<MessageBody1> args) { _logger.Information(args.Original); _logger.Information(args.Redelivered); _logger.Information(args.Exchange); _logger.Information(args.RoutingKey);
_logger.Information(args.Event.Body); return Task.CompletedTask; } }
|