using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; namespace OnlineSalesAutoCrop.RMQ.Consumer { public class WorkerService(IOptions appSettings, ILoggerFactory loggerFactory) : BackgroundService { private readonly AppSettings settings = appSettings.Value; private ILogger logger { get; } = loggerFactory.CreateLogger(); /// /// /// /// /// public override async Task StartAsync(CancellationToken cancellationToken) { try { logger.LogInformation("OnlineSalesAutoCrop.RMQ.Consumer is starting."); ConnectionFactory factory = new() { HostName = settings.RabbitMQHost, UserName = settings.RabbitMQUser, Password = settings.RabbitMQPwd, Port = settings.RabbitMQPort }; using IConnection connection = await factory.CreateConnectionAsync(cancellationToken); using IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); // Create an exchange string exchange = "CRMQDEF_EXCHG"; await channel.ExchangeDeclareAsync(exchange: exchange, ExchangeType.Direct, cancellationToken: cancellationToken); // Create a queue string queue = "CRMQDEF_QUEUE"; await channel.QueueDeclareAsync(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary { { "x-queue-type", "quorum" } }, cancellationToken: cancellationToken); string routingKey = "CRMQDEF_ROUTING_KEY"; await channel.QueueBindAsync(queue: queue, exchange: exchange, routingKey: routingKey, cancellationToken: cancellationToken); var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += (model, ea) => { byte[] body = ea.Body.ToArray(); string message = Encoding.UTF8.GetString(body); DoProcess(message: message); return Task.CompletedTask; }; await channel.BasicConsumeAsync(queue: queue, autoAck: true, consumer: consumer, cancellationToken: cancellationToken); await base.StartAsync(cancellationToken); logger.LogInformation("OnlineSalesAutoCrop.RMQ.Consumer was started."); while (!cancellationToken.IsCancellationRequested) { await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken); } } catch (OperationCanceledException ex) { logger.LogError(ex, "OnlineSalesAutoCrop.RMQ.Consumer was Cancelled: {Message}", ex.Message); } catch (Exception ex) { logger.LogError(ex, "OnlineSalesAutoCrop.RMQ.Consumer has error {Message}", ex.Message); Environment.Exit(1); } } /// /// /// /// /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { //Nothing to do here, as the main work is done in StartAsync. This method is required to be overridden, but we can simply return a completed task. } catch (OperationCanceledException ex) { logger.LogError(ex, "Cancel: {Message}", ex.Message); // When the stopping token is canceled, for example, a call made from services.msc, // we shouldn't exit with a non-zero exit code. In other words, this is expected... } catch (Exception ex) { logger.LogError(ex, "{Message}", ex.Message); // Terminates this process and returns an exit code to the operating system. // This is required to avoid the 'BackgroundServiceExceptionBehavior', which performs one of two scenarios: // 1. When set to "Ignore": will do nothing at all, errors cause zombie services. // 2. When set to "StopHost": will cleanly stop the host, and log errors. // // In order for the Windows Service Management system to leverage configured // recovery options, we need to terminate the process with a non-zero exit code. Environment.Exit(1); } } /// /// /// /// private void DoProcess(string message) { logger.LogInformation("{Message}", message); // This is the place to do actual work } } }