.NET Core使用Cap

.NET Core使用Cap
Fantasy-ke准备工作
CAP需要依赖消息队列和数据存储,支持情况如下:
消息队列
1 | Kafka |
数据存储
1 | SqlServer |
两者自行搭配选择即可,其中SqlServer和RabbitMQ在docker中的安装可参照我另一篇备忘开发常用的docker镜像
项目设定
- 两个服务,分别为
ServiceA,ServiceB SqlServer使用CapDemo数据库,默认sa账号,密码为Today_is_20200328,端口为默认的1433端口RabbitMQ安装地址为本机,默认端口,账号密码为admin/admin
创建ServiceA
在
Visual Studio 2019中创建空白解决方案,取名为Fantasy.CapDemo在解决方案中新建
Asp.net core项目,名字为Fantasy.CapDemo.ServiceA,.net core版本为3.1nuget安装以下4个包1
2
3
4DotNetCore.CAP
DotNetCore.CAP.Dashboard
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.SqlServer在
Startup.cs的ConfigureServices方法中加入以下代码1
2
3
4
5
6
7
8
9
10
11services.AddCap(options =>
{
options.UseSqlServer("Password=Today_is_20200328;Persist Security Info=True;User ID=sa;Initial Catalog=CapDemo;Data Source=127.0.0.1");
options.UseRabbitMQ(r =>
{
r.HostName = "127.0.0.1";
r.UserName = "admin";
r.Password = "admin";
});
options.UseDashboard();
});此时直接运行项目,如果数据库与消息队列均能正常连接,则会在数据库中生成以下两张表
1
2cap.Published
cap.Received在
Controllers/WeatherForecastController.cs中编写消息发布代码
6.1 通过构造函数依赖注入ICapPublisher对象
6.2 发布消息只需要调用ICapPublisher对象的PublishAsync或Publish方法,传入参数为队列名和消息值,全部代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
namespace Fantasy.CapDemo.ServiceA.Controllers
{
[]
[]
public class WeatherForecastController : ControllerBase
{
private static readonly string[] Summaries = new[]
{
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
};
private readonly ILogger<WeatherForecastController> _logger;
private readonly ICapPublisher _capPublisher;
public WeatherForecastController(ILogger<WeatherForecastController> logger,ICapPublisher capPublisher)
{
_logger = logger;
_capPublisher = capPublisher;
}
[]
public async Task<IEnumerable<WeatherForecast>> Get()
{
await _capPublisher.PublishAsync("Fantasy.cap.demo.show.time", DateTime.Now);
var rng = new Random();
return Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Date = DateTime.Now.AddDays(index),
TemperatureC = rng.Next(\-20, 55),
Summary = Summaries[rng.Next(Summaries.Length)]
})
.ToArray();
}
}
}编写接收消息代码,接收消息有两种方式,一种在控制器中,即代码写在
***Controller.cs中,另一种是在非控制器中,一般为***Service.cs中,以下分两步执行,因为CAP在默认情况下一个服务多个地方进行订阅,只会进行一次接收,除非进行分组(后面介绍),所以8/9两个步骤在测试时需要注释其中一个,只保留另一个编写在控制器中接收消息的代码,直接在控制器中写对应方法,方法参数为发送消息时传入的消息值类型,方法无返回值,再加上方法标签
CapSubscribe即可,具体代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
namespace Fantasy.CapDemo.ServiceA.Controllers
{
public class SubController : Controller
{
[]
public void ShowTime(DateTime value)
{
Console.WriteLine($"接受方:SubController.ShowTime 接收到值:{value}");
}
}
}做到这步可以进行一次测试,就可以发现访问
/WeatherForecast这个接口,会发送一条当前时间的消息出去,同时接收方也会在控制台打印出接收到的消息编写服务中接收消息的代码,需要注意接收消息的类需要继承
ICapSubscribe接口,同时这里为了方便进行依赖注入,这里我们也自己定义了一个ISubscriberService接口,同时服务需要在Startup.cs的ConfigureServices方法中进行注册
9.1ISubscriberService.cs代码如下1
2
3
4
5
6
7
8
9
10
11
12using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Fantasy.CapDemo.ServiceA.Services
{
public interface ISubscriberService
{
void ReceivedShowTimeMessage(DateTime value);
}
}9.2
SubscriberService.cs代码如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Fantasy.CapDemo.ServiceA.Services
{
public class SubscriberService: ISubscriberService, ICapSubscribe
{
[]
public void ReceivedShowTimeMessage(DateTime value)
{
Console.WriteLine($"接受方:SubscriberService.ReceivedShowTimeMessage 接收到值:{value}");
}
}
}9.3 服务注册代码如下
1
services.AddScoped<ISubscriberService, SubscriberService>();
注意:这行服务注册代码需要在
services.AddCap这个方法之前注册,否则CAP在进行接口扫描的时候找不到对应已经实现了ICapSubscribe接口的实现类,则无法进行订阅者注册
注释掉第8步,进行第9步的测试,效果应该与第8步一致有时候一个消息需要多个消费者同时消费,则可以使用
Group概念,消息发送代码不变,在标记CapSubscribe的时候,使用Group参数,多个Group订阅同个消息Id,消息则会对应分发到多个Group中,同个Group只能消费一次,即一个Group对一个消息Id订阅多次,也会只接收一次,具体代码如下
SubscriberService.cs中新增Group参数
1
[]
SubController.cs中新增Group参数1
[]
- 测试Group功能,将项目跑起来,此时再访问发送消息的api,会在控制台中打印出两条消息接收记录
创建ServiceB
ServiceB使用EFCore来进行配置,找ServiceA步骤2创建Fantasy.CapDemo.ServiceB项目,因为.Net Core 3.1没有自带EFCore,所以这里除了CAP的4个包之外,还需要nuget安装EFCore的两个包,汇总起来需要nuget安装的包如下:
1 | DotNetCore.CAP |
创建ServiceDbContext.cs文件,代码如下
1 | using System; |
在Startup.cs中配置EFCore和CAP,具体代码如下:
1 | services.AddDbContext<ServiceDbContext>(options => |
后续操作与ServiceA类似,需要注意:多个站点订阅同个消息Id,消息会往多个站点进行发送
附上CAP的Github地址:https://github.com/dotnetcore/CAP
还有一些配合EF的操作还没来得及写,有空再说吧








