114 lines
4.2 KiB
C#
114 lines
4.2 KiB
C#
|
|
using Microsoft.Extensions.Hosting;
|
||
|
|
using Microsoft.Extensions.Logging;
|
||
|
|
using Microsoft.Extensions.Options;
|
||
|
|
using RabbitMQ.Client;
|
||
|
|
using RabbitMQ.Client.Events;
|
||
|
|
using System.Text;
|
||
|
|
|
||
|
|
namespace OnlineSalesAutoCrop.RMQ.Consumer
|
||
|
|
{
|
||
|
|
public class WorkerService(IOptions<AppSettings> appSettings, ILoggerFactory loggerFactory) : BackgroundService
|
||
|
|
{
|
||
|
|
private readonly AppSettings settings = appSettings.Value;
|
||
|
|
private ILogger logger { get; } = loggerFactory.CreateLogger<WorkerService>();
|
||
|
|
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
///
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="cancellationToken"></param>
|
||
|
|
/// <returns></returns>
|
||
|
|
public override async Task StartAsync(CancellationToken cancellationToken)
|
||
|
|
{
|
||
|
|
try
|
||
|
|
{
|
||
|
|
logger.LogInformation("OnlineSalesAutoCrop.RMQ.Consumer is starting.");
|
||
|
|
|
||
|
|
ConnectionFactory factory = new() { HostName = settings.RabbitMQHost, UserName = settings.RabbitMQUser, Password = settings.RabbitMQPwd, Port = settings.RabbitMQPort };
|
||
|
|
using IConnection connection = await factory.CreateConnectionAsync(cancellationToken);
|
||
|
|
using IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
||
|
|
|
||
|
|
// Create an exchange
|
||
|
|
string exchange = "CRMQDEF_EXCHG";
|
||
|
|
await channel.ExchangeDeclareAsync(exchange: exchange, ExchangeType.Direct, cancellationToken: cancellationToken);
|
||
|
|
|
||
|
|
// Create a queue
|
||
|
|
string queue = "CRMQDEF_QUEUE";
|
||
|
|
await channel.QueueDeclareAsync(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object?> { { "x-queue-type", "quorum" } }, cancellationToken: cancellationToken);
|
||
|
|
|
||
|
|
string routingKey = "CRMQDEF_ROUTING_KEY";
|
||
|
|
await channel.QueueBindAsync(queue: queue, exchange: exchange, routingKey: routingKey, cancellationToken: cancellationToken);
|
||
|
|
|
||
|
|
var consumer = new AsyncEventingBasicConsumer(channel);
|
||
|
|
consumer.ReceivedAsync += (model, ea) =>
|
||
|
|
{
|
||
|
|
byte[] body = ea.Body.ToArray();
|
||
|
|
string message = Encoding.UTF8.GetString(body);
|
||
|
|
DoProcess(message: message);
|
||
|
|
return Task.CompletedTask;
|
||
|
|
};
|
||
|
|
await channel.BasicConsumeAsync(queue: queue, autoAck: true, consumer: consumer, cancellationToken: cancellationToken);
|
||
|
|
await base.StartAsync(cancellationToken);
|
||
|
|
logger.LogInformation("OnlineSalesAutoCrop.RMQ.Consumer was started.");
|
||
|
|
|
||
|
|
while (!cancellationToken.IsCancellationRequested)
|
||
|
|
{
|
||
|
|
await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
catch (OperationCanceledException ex)
|
||
|
|
{
|
||
|
|
logger.LogError(ex, "OnlineSalesAutoCrop.RMQ.Consumer was Cancelled: {Message}", ex.Message);
|
||
|
|
}
|
||
|
|
catch (Exception ex)
|
||
|
|
{
|
||
|
|
logger.LogError(ex, "OnlineSalesAutoCrop.RMQ.Consumer has error {Message}", ex.Message);
|
||
|
|
Environment.Exit(1);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
///
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="stoppingToken"></param>
|
||
|
|
/// <returns></returns>
|
||
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||
|
|
{
|
||
|
|
try
|
||
|
|
{
|
||
|
|
//Nothing to do here, as the main work is done in StartAsync. This method is required to be overridden, but we can simply return a completed task.
|
||
|
|
}
|
||
|
|
catch (OperationCanceledException ex)
|
||
|
|
{
|
||
|
|
logger.LogError(ex, "Cancel: {Message}", ex.Message);
|
||
|
|
// When the stopping token is canceled, for example, a call made from services.msc,
|
||
|
|
// we shouldn't exit with a non-zero exit code. In other words, this is expected...
|
||
|
|
}
|
||
|
|
catch (Exception ex)
|
||
|
|
{
|
||
|
|
logger.LogError(ex, "{Message}", ex.Message);
|
||
|
|
|
||
|
|
// Terminates this process and returns an exit code to the operating system.
|
||
|
|
// This is required to avoid the 'BackgroundServiceExceptionBehavior', which performs one of two scenarios:
|
||
|
|
// 1. When set to "Ignore": will do nothing at all, errors cause zombie services.
|
||
|
|
// 2. When set to "StopHost": will cleanly stop the host, and log errors.
|
||
|
|
//
|
||
|
|
// In order for the Windows Service Management system to leverage configured
|
||
|
|
// recovery options, we need to terminate the process with a non-zero exit code.
|
||
|
|
Environment.Exit(1);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
///
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="message"></param>
|
||
|
|
private void DoProcess(string message)
|
||
|
|
{
|
||
|
|
logger.LogInformation("{Message}", message);
|
||
|
|
|
||
|
|
// This is the place to do actual work
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|