OnlineSalesAutoCrop/Api/OnlineSalesAutoCrop.RMQ.Consumer/WorkerService.cs

114 lines
4.2 KiB
C#
Raw Normal View History

2026-06-14 12:46:29 +06:00
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
namespace OnlineSalesAutoCrop.RMQ.Consumer
{
public class WorkerService(IOptions<AppSettings> appSettings, ILoggerFactory loggerFactory) : BackgroundService
{
private readonly AppSettings settings = appSettings.Value;
private ILogger logger { get; } = loggerFactory.CreateLogger<WorkerService>();
/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public override async Task StartAsync(CancellationToken cancellationToken)
{
try
{
logger.LogInformation("OnlineSalesAutoCrop.RMQ.Consumer is starting.");
ConnectionFactory factory = new() { HostName = settings.RabbitMQHost, UserName = settings.RabbitMQUser, Password = settings.RabbitMQPwd, Port = settings.RabbitMQPort };
using IConnection connection = await factory.CreateConnectionAsync(cancellationToken);
using IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
// Create an exchange
string exchange = "CRMQDEF_EXCHG";
await channel.ExchangeDeclareAsync(exchange: exchange, ExchangeType.Direct, cancellationToken: cancellationToken);
// Create a queue
string queue = "CRMQDEF_QUEUE";
await channel.QueueDeclareAsync(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object?> { { "x-queue-type", "quorum" } }, cancellationToken: cancellationToken);
string routingKey = "CRMQDEF_ROUTING_KEY";
await channel.QueueBindAsync(queue: queue, exchange: exchange, routingKey: routingKey, cancellationToken: cancellationToken);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
DoProcess(message: message);
return Task.CompletedTask;
};
await channel.BasicConsumeAsync(queue: queue, autoAck: true, consumer: consumer, cancellationToken: cancellationToken);
await base.StartAsync(cancellationToken);
logger.LogInformation("OnlineSalesAutoCrop.RMQ.Consumer was started.");
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
}
}
catch (OperationCanceledException ex)
{
logger.LogError(ex, "OnlineSalesAutoCrop.RMQ.Consumer was Cancelled: {Message}", ex.Message);
}
catch (Exception ex)
{
logger.LogError(ex, "OnlineSalesAutoCrop.RMQ.Consumer has error {Message}", ex.Message);
Environment.Exit(1);
}
}
/// <summary>
///
/// </summary>
/// <param name="stoppingToken"></param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
//Nothing to do here, as the main work is done in StartAsync. This method is required to be overridden, but we can simply return a completed task.
}
catch (OperationCanceledException ex)
{
logger.LogError(ex, "Cancel: {Message}", ex.Message);
// When the stopping token is canceled, for example, a call made from services.msc,
// we shouldn't exit with a non-zero exit code. In other words, this is expected...
}
catch (Exception ex)
{
logger.LogError(ex, "{Message}", ex.Message);
// Terminates this process and returns an exit code to the operating system.
// This is required to avoid the 'BackgroundServiceExceptionBehavior', which performs one of two scenarios:
// 1. When set to "Ignore": will do nothing at all, errors cause zombie services.
// 2. When set to "StopHost": will cleanly stop the host, and log errors.
//
// In order for the Windows Service Management system to leverage configured
// recovery options, we need to terminate the process with a non-zero exit code.
Environment.Exit(1);
}
}
/// <summary>
///
/// </summary>
/// <param name="message"></param>
private void DoProcess(string message)
{
logger.LogInformation("{Message}", message);
// This is the place to do actual work
}
}
}