ABP+WorkflowCore+jsplumb实现工作流
前言
ABP目前已经是很成熟的开发框架了,它提供了很多我们日常开发所必须的功能,并且很方便扩展,让我们能更专注于业务的开发。但是ABP官方并没有给我们实现工作流。
在.net core环境下的开源工作流引擎很少,其中WorkflowCore是一款轻量级工作流引擎,对于小型工作流和责任链类型的需求开发很适合,但只能通过后台编码或者json的方式定义工作流程,看了源码后觉得扩展性还是挺好的,至少能满足我的需求,于是选择对它下手。
jsPlumb是一个开源的比较强大的绘图组件,这里不多介绍,我就是用它实现一个简单的流程设计器。
花了差不多一个月的时间,把这三者结合到一起实现一个简单而强大的工作流模块。
目录
ABP模块实现WorkflowCore持久化存储接口(IPersistenceProvider)
ABP中AbpWorkflow和AbpStepBody的自定义注册
设计器实现
设计器提交的流程数据转换成WorkflowCore支持的Json数据结构
总结
注:公众号阅读效果不佳,可以点击阅读全文,网页浏览
1.ABP模块实现WorkflowCore持久化存储接口(IPersistenceProvider)
这里我参考了WorkflowCore.Persistence.EntityFramework 持久化项目的实现方式 用ABP的方式实现了WorkflowCore的持久化。这样做有两个好处:
1.让工作流能支持ABP的多租户和全局数据过滤功能
2.数据库操作能使用统一的数据上下文,方便事务提交和回滚。
ABP实现的流程Workflow持久化存储所必须的实体类,其中PersistedWorkflowDefinition是用来持久化存储流程定义(在Workflow中流程定义在内存中)如下图:
实现IPersistenceProvider接口
1 public interface IAbpPersistenceProvider : IPersistenceProvider
2 {
3 TaskGetPersistedWorkflow(Guid id);
4
5 TaskGetPersistedExecutionPointer(string id);
6 TaskGetPersistedWorkflowDefinition(string id, int version);
7 }
8
9
10 public class AbpPersistenceProvider : DomainService, IAbpPersistenceProvider
11 {
12 protected readonly IRepository_eventRepository;
13 protected readonly IRepositorystring> _executionPointerRepository;
14 protected readonly IRepository_workflowRepository;
15 protected readonly IRepositorystring > _workflowDefinitionRepository;
16 protected readonly IRepository_eventSubscriptionRepository;
17 protected readonly IRepository_executionErrorRepository;
18 protected readonly IGuidGenerator _guidGenerator;
19 protected readonly IAsyncQueryableExecuter _asyncQueryableExecuter;
20 public IAbpSession AbpSession { get; set; }
21
22
23 public AbpPersistenceProvider(IRepositoryeventRepository, IRepository string> executionPointerRepository, IRepository workflowRepository, IRepository eventSubscriptionRepository, IGuidGenerator guidGenerator, IAsyncQueryableExecuter asyncQueryableExecuter, IRepository executionErrorRepository, IRepository string > workflowDefinitionRepository)
24 {
25
26 _eventRepository = eventRepository;
27 _executionPointerRepository = executionPointerRepository;
28 _workflowRepository = workflowRepository;
29 _eventSubscriptionRepository = eventSubscriptionRepository;
30 _guidGenerator = guidGenerator;
31 _asyncQueryableExecuter = asyncQueryableExecuter;
32 _executionErrorRepository = executionErrorRepository;
33 _workflowDefinitionRepository = workflowDefinitionRepository;
34
35
36 }
37 [UnitOfWork]
38 public virtual async Task<string> CreateEventSubscription(EventSubscription subscription)
39 {
40
41 subscription.Id = _guidGenerator.Create().ToString();
42 var persistable = subscription.ToPersistable();
43 await _eventSubscriptionRepository.InsertAsync(persistable);
44 return subscription.Id;
45 }
46 [UnitOfWork]
47 public virtual async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
48 {
49 workflow.Id = _guidGenerator.Create().ToString();
50 var persistable = workflow.ToPersistable();
51 if (AbpSession.UserId.HasValue)
52 {
53 var userCache = AbpSession.GetCurrentUser();
54 persistable.CreateUserIdentityName = userCache.FullName;
55 }
56 await _workflowRepository.InsertAsync(persistable);
57 return workflow.Id;
58 }
59 [UnitOfWork]
60 public virtual async Taskstring>> GetRunnableInstances(DateTime asAt)
61 {
62 var now = asAt.ToUniversalTime().Ticks;
63
64 var query = _workflowRepository.GetAll().Where(x => x.NextExecution.HasValue && (x.NextExecution <= now) && (x.Status == WorkflowStatus.Runnable))
65 .Select(x => x.Id);
66 var raw = await _asyncQueryableExecuter.ToListAsync(query);
67
68 return raw.Select(s => s.ToString()).ToList();
69 }
70 [UnitOfWork]
71 public virtual async Task> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
72 {
73
74 IQueryablequery = _workflowRepository.GetAll()
75 .Include(wf => wf.ExecutionPointers)
76 .ThenInclude(ep => ep.ExtensionAttributes)
77 .Include(wf => wf.ExecutionPointers)
78 .AsQueryable();
79
80 if (status.HasValue)
81 query = query.Where(x => x.Status == status.Value);
82
83 if (!String.IsNullOrEmpty(type))
84 query = query.Where(x => x.WorkflowDefinitionId == type);
85
86 if (createdFrom.HasValue)
87 query = query.Where(x => x.CreateTime >= createdFrom.Value);
88
89 if (createdTo.HasValue)
90 query = query.Where(x => x.CreateTime <= createdTo.Value);
91
92 var rawResult = await query.Skip(skip).Take(take).ToListAsync();
93 Listresult = new List ();
94
95 foreach (var item in rawResult)
96 result.Add(item.ToWorkflowInstance());
97
98 return result;
99
100 }
101 [UnitOfWork]
102 public virtual async TaskGetWorkflowInstance(string Id)
103 {
104
105 var uid = new Guid(Id);
106 var raw = await _workflowRepository.GetAll()
107 .Include(wf => wf.ExecutionPointers)
108 .ThenInclude(ep => ep.ExtensionAttributes)
109 .Include(wf => wf.ExecutionPointers)
110 .FirstAsync(x => x.Id == uid);
111
112 if (raw == null)
113 return null;
114
115 return raw.ToWorkflowInstance();
116
117 }
118 [UnitOfWork]
119 public virtual async Task> GetWorkflowInstances(IEnumerable<string> ids)
120 {
121 if (ids == null)
122 {
123 return new List();
124 }
125
126
127 var uids = ids.Select(i => new Guid(i));
128 var raw = _workflowRepository.GetAll()
129 .Include(wf => wf.ExecutionPointers)
130 .ThenInclude(ep => ep.ExtensionAttributes)
131 .Include(wf => wf.ExecutionPointers)
132 .Where(x => uids.Contains(x.Id));
133
134 return (await raw.ToListAsync()).Select(i => i.ToWorkflowInstance());
135
136 }
137 [UnitOfWork]
138 public virtual async Task PersistWorkflow(WorkflowInstance workflow)
139 {
140
141 var uid = new Guid(workflow.Id);
142 var existingEntity = await _workflowRepository.GetAll()
143 .Where(x => x.Id == uid)
144 .Include(wf => wf.ExecutionPointers)
145 .ThenInclude(ep => ep.ExtensionAttributes)
146 .Include(wf => wf.ExecutionPointers)
147 .AsTracking()
148 .FirstAsync();
149 var persistable = workflow.ToPersistable(existingEntity);
150 await CurrentUnitOfWork.SaveChangesAsync();
151 }
152 [UnitOfWork]
153 public virtual async Task TerminateSubscription(string eventSubscriptionId)
154 {
155
156 var uid = new Guid(eventSubscriptionId);
157 var existing = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == uid);
158 _eventSubscriptionRepository.Delete(existing);
159 await CurrentUnitOfWork.SaveChangesAsync();
160
161 }
162 [UnitOfWork]
163 public virtual void EnsureStoreExists()
164 {
165
166
167 }
168 [UnitOfWork]
169 public virtual async Task> GetSubscriptions(string eventName, string eventKey, DateTime asOf)
170 {
171
172 asOf = asOf.ToUniversalTime();
173 var raw = await _eventSubscriptionRepository.GetAll()
174 .Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf)
175 .ToListAsync();
176
177 return raw.Select(item => item.ToEventSubscription()).ToList();
178
179 }
180 [UnitOfWork]
181 public virtual async Task<string> CreateEvent(Event newEvent)
182 {
183
184 newEvent.Id = _guidGenerator.Create().ToString();
185 var persistable = newEvent.ToPersistable();
186 var result = _eventRepository.InsertAsync(persistable);
187 await CurrentUnitOfWork.SaveChangesAsync();
188 return newEvent.Id;
189 }
190 [UnitOfWork]
191 public virtual async TaskGetEvent(string id)
192 {
193
194 Guid uid = new Guid(id);
195 var raw = await _eventRepository
196 .FirstOrDefaultAsync(x => x.Id == uid);
197
198 if (raw == null)
199 return null;
200
201 return raw.ToEvent();
202
203 }
204 [UnitOfWork]
205 public virtual async Taskstring>> GetRunnableEvents(DateTime asAt)
206 {
207 var now = asAt.ToUniversalTime();
208
209 asAt = asAt.ToUniversalTime();
210 var raw = await _eventRepository.GetAll()
211 .Where(x => !x.IsProcessed)
212 .Where(x => x.EventTime <= now)
213 .Select(x => x.Id)
214 .ToListAsync();
215
216 return raw.Select(s => s.ToString()).ToList();
217
218 }
219 [UnitOfWork]
220 public virtual async Task MarkEventProcessed(string id)
221 {
222
223 var uid = new Guid(id);
224 var existingEntity = await _eventRepository.GetAll()
225 .Where(x => x.Id == uid)
226 .AsTracking()
227 .FirstAsync();
228
229 existingEntity.IsProcessed = true;
230 await CurrentUnitOfWork.SaveChangesAsync();
231 }
232 [UnitOfWork]
233 public virtual async Taskstring>> GetEvents(string eventName, string eventKey, DateTime asOf)
234 {
235
236 var raw = await _eventRepository.GetAll()
237 .Where(x => x.EventName == eventName && x.EventKey == eventKey)
238 .Where(x => x.EventTime >= asOf)
239 .Select(x => x.Id)
240 .ToListAsync();
241
242 var result = new List<string>();
243
244 foreach (var s in raw)
245 result.Add(s.ToString());
246
247 return result;
248
249 }
250 [UnitOfWork]
251 public virtual async Task MarkEventUnprocessed(string id)
252 {
253
254 var uid = new Guid(id);
255 var existingEntity = await _eventRepository.GetAll()
256 .Where(x => x.Id == uid)
257 .AsTracking()
258 .FirstAsync();
259
260 existingEntity.IsProcessed = false;
261 await CurrentUnitOfWork.SaveChangesAsync();
262
263 }
264 [UnitOfWork]
265 public virtual async Task PersistErrors(IEnumerableerrors)
266 {
267
268 var executionErrors = errors as ExecutionError[] ?? errors.ToArray();
269 if (executionErrors.Any())
270 {
271 foreach (var error in executionErrors)
272 {
273 await _executionErrorRepository.InsertAsync(error.ToPersistable());
274 }
275 await CurrentUnitOfWork.SaveChangesAsync();
276
277 }
278
279 }
280 [UnitOfWork]
281 public virtual async TaskGetSubscription(string eventSubscriptionId)
282 {
283
284 var uid = new Guid(eventSubscriptionId);
285 var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == uid);
286
287 return raw?.ToEventSubscription();
288
289 }
290 [UnitOfWork]
291 public virtual async TaskGetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf)
292 {
293
294 var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf && x.ExternalToken == null);
295
296 return raw?.ToEventSubscription();
297
298 }
299 [UnitOfWork]
300 public virtual async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
301 {
302
303 var uid = new Guid(eventSubscriptionId);
304 var existingEntity = await _eventSubscriptionRepository.GetAll()
305 .Where(x => x.Id == uid)
306 .AsTracking()
307 .FirstAsync();
308
309 existingEntity.ExternalToken = token;
310 existingEntity.ExternalWorkerId = workerId;
311 existingEntity.ExternalTokenExpiry = expiry;
312 await CurrentUnitOfWork.SaveChangesAsync();
313
314 return true;
315
316 }
317 [UnitOfWork]
318 public virtual async Task ClearSubscriptionToken(string eventSubscriptionId, string token)
319 {
320
321 var uid = new Guid(eventSubscriptionId);
322 var existingEntity = await _eventSubscriptionRepository.GetAll()
323 .Where(x => x.Id == uid)
324 .AsTracking()
325 .FirstAsync();
326
327 if (existingEntity.ExternalToken != token)
328 throw new InvalidOperationException();
329
330 existingEntity.ExternalToken = null;
331 existingEntity.ExternalWorkerId = null;
332 existingEntity.ExternalTokenExpiry = null;
333 await CurrentUnitOfWork.SaveChangesAsync();
334
335 }
336
337 public TaskGetPersistedWorkflow(Guid id)
338 {
339 return _workflowRepository.GetAsync(id);
340 }
341
342 public TaskGetPersistedWorkflowDefinition(string id, int version)
343 {
344 return _workflowDefinitionRepository.GetAll().AsNoTracking().FirstOrDefaultAsync(u => u.Id == id && u.Version == version);
345 }
346
347 public TaskGetPersistedExecutionPointer(string id)
348 {
349 return _executionPointerRepository.GetAsync(id);
350 }
351 }
服务注册添加AddWorkflow时把IPersistenceProvider提供的默认实现换成AbpPersistenceProvider
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
到此为止,ABP已经实现了WorkflowCore的默认的持久化存储。
2.ABP中AbpWorkflow和AbpStepBody的自定义注册
为了满足开发人员和用户的需求,我提供了两种流程注册方式,一种是开发人员后台编码定义固定流程另一种是用户通过流程设计器实现自定义业务流程。
开发人员后台编码定义固定流程
这里参考ABP的EventBus注册方式,实现IWindsorInstaller ,在组件注册时拦截并注册:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
|
到这里,把拦截器注册到模块类的Initialize中,开发人员定义流程只需要实现IAbpWorkflow接口,系统启动时会自动注册。如图:
自定义注册StepBody
这里参考ABP中标准的配置模式(不清楚的可以去看下ABP的源码,ABP的配置系统和权限系统都是这样配置的),将注册的StepBody存储在内存中提供给用户自定义组合流程节点使用,下列代码展示了注册指定用户审核的StepBody,执行方法体的实现:
1 public class DefaultStepBodyProvider : AbpStepBodyProvider
2 {
3 public override void Build(IAbpStepBodyDefinitionContext context)
4 {
5 var step1 = new AbpWorkflowStepBody();
6 step1.Name = "FixedUserAudit";
7 step1.DisplayName = "指定用户审核";
8 step1.StepBodyType = typeof(GeneralAuditingStepBody);
9 step1.Inputs.Add(new WorkflowParam()
10 {
11 InputType = new SelectUserInputType(),//定义前端输入类型,继承Abp.UI.Inputs.InputTypeBase
12 Name = "UserId",
13 DisplayName = "审核人"
14 });
15 context.Create(step1);
16
17 }
18 }
19
20
21
22 ///
23 /// 指定用户审批StepBody
24 ///
25 public class GeneralAuditingStepBody : StepBody, ITransientDependency
26 {
27 private const string ActionName = "AuditEvent";
28 protected readonly INotificationPublisher _notificationPublisher;
29 protected readonly IAbpPersistenceProvider _abpPersistenceProvider;
30 protected readonly UserManager _userManager;
31
32 public readonly IRepository_auditorRepository;
33
34 public GeneralAuditingStepBody(INotificationPublisher notificationPublisher, UserManager userManager, IAbpPersistenceProvider abpPersistenceProvider,
35 IRepositoryauditorRepository)
36 {
37 _notificationPublisher = notificationPublisher;
38 _abpPersistenceProvider = abpPersistenceProvider;
39 _userManager = userManager;
40 _auditorRepository = auditorRepository;
41 }
42
43 ///
44 /// 审核人
45 ///
46 public long UserId { get; set; }
47
48 [UnitOfWork]
49 public override ExecutionResult Run(IStepExecutionContext context)
50 {
51 if (!context.ExecutionPointer.EventPublished)
52 {
53 var workflow = _abpPersistenceProvider.GetPersistedWorkflow(context.Workflow.Id.ToGuid()).Result;
54 var workflowDefinition = _abpPersistenceProvider.GetPersistedWorkflowDefinition(context.Workflow.WorkflowDefinitionId, context.Workflow.Version).Result;
55
56 var userIdentityName = _userManager.Users.Where(u => u.Id == workflow.CreatorUserId).Select(u => u.FullName).FirstOrDefault();
57
58 //通知审批人
59 _notificationPublisher.PublishTaskAsync(new Abp.Notifications.TaskNotificationData($"【{userIdentityName}】提交的{workflowDefinition.Title}需要您审批!"),
60 userIds: new UserIdentifier[] { new UserIdentifier(workflow.TenantId, UserId) },
61 entityIdentifier: new EntityIdentifier(workflow.GetType(), workflow.Id)
62 ).Wait();
63 //添加审核人记录
64 var auditUserInfo = _userManager.GetUserById(UserId);
65 _auditorRepository.Insert(new PersistedWorkflowAuditor() { WorkflowId = workflow.Id, ExecutionPointerId = context.ExecutionPointer.Id, Status = Abp.Entitys.CommEnum.EnumAuditStatus.UnAudited, UserId = UserId, TenantId = workflow.TenantId, UserHeadPhoto = auditUserInfo.HeadImage, UserIdentityName = auditUserInfo.FullName });
66 DateTime effectiveDate = DateTime.MinValue;
67 return ExecutionResult.WaitForEvent(ActionName, Guid.NewGuid().ToString(), effectiveDate);
68 }
69 var pass = _auditorRepository.GetAll().Any(u => u.ExecutionPointerId == context.ExecutionPointer.Id && u.UserId == UserId && u.Status == Abp.Entitys.CommEnum.EnumAuditStatus.Pass);
70
71 if (!pass)
72 {
73 context.Workflow.Status = WorkflowStatus.Complete;
74 return ExecutionResult.Next();
75 }
76 return ExecutionResult.Next();
77 }
78 }
3.设计器实现
流程设计器我用的是Abp提供的Vue项目模板+jsplumb来实现的,话不多说直接上图把:
上图所示,每个节点执行操作选择的是我们后台注册的AbpStepBody。
注:开发人员可根据业务需求尽可能的给用户提供所需的StepBody。这样一来,整个流程的灵活性是非常好的。
4.设计器提交的流程数据转换成WorkflowCore支持的Json数据结构
前端传给后台的数据结构如下:
后台接收数据后转换成Workflow 支持的Josn字符串,再使用WorkflowCore.DSL提供的帮助类注册流程即可,转换后的Json如下:
1 {
2 "DataType": "System.Collections.Generic.Dictionary`2[[System.String, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.Object, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]], System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e",
3 "DefaultErrorBehavior": 0,
4 "DefaultErrorRetryInterval": null,
5 "Steps": [{
6 "StepType": "Abp.Workflows.DefaultSteps.NullStepBody, Abp.Workflows, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
7 "Id": "start_1600248885360yurl0hgrvpd",
8 "Name": "start_1600248885360yurl0hgrvpd",
9 "CancelCondition": null,
10 "ErrorBehavior": null,
11 "RetryInterval": null,
12 "Do": [],
13 "CompensateWith": [],
14 "Saga": false,
15 "NextStepId": null,
16 "Inputs": {},
17 "Outputs": {},
18 "SelectNextStep": {
19 "step_1600248890720r3o927aajy8": "1==1"
20 }
21 }, {
22 "StepType": "Abp.Workflows.StepBodys.GeneralAuditingStepBody, Abp.Workflows, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
23 "Id": "step_1600248890720r3o927aajy8",
24 "Name": "step_1600248890720r3o927aajy8",
25 "CancelCondition": null,
26 "ErrorBehavior": null,
27 "RetryInterval": null,
28 "Do": [],
29 "CompensateWith": [],
30 "Saga": false,
31 "NextStepId": null,
32 "Inputs": {
33 "UserId": "\"4\""
34 },
35 "Outputs": {},
36 "SelectNextStep": {
37 "end_16002488928403hmjauowus7": "decimal.Parse(data[\"Days\"].ToString()) <= 1",
38 "step_160032897781681o9ko9j3nr": "decimal.Parse(data[\"Days\"].ToString()) > 1"
39 }
40 }, {
41 "StepType": "Abp.Workflows.DefaultSteps.SendNotificationToInitiatorStepBody, Abp.Workflows, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
42 "Id": "end_16002488928403hmjauowus7",
43 "Name": "end_16002488928403hmjauowus7",
44 "CancelCondition": null,
45 "ErrorBehavior": null,
46 "RetryInterval": null,
47 "Do": [],
48 "CompensateWith": [],
49 "Saga": false,
50 "NextStepId": null,
51 "Inputs": {
52 "Message": "\"您的流程已完成\""
53 },
54 "Outputs": {},
55 "SelectNextStep": {}
56 }, {
57 "StepType": "Abp.Workflows.StepBodys.GeneralAuditingStepBody, Abp.Workflows, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
58 "Id": "step_160032897781681o9ko9j3nr",
59 "Name": "step_160032897781681o9ko9j3nr",
60 "CancelCondition": null,
61 "ErrorBehavior": null,
62 "RetryInterval": null,
63 "Do": [],
64 "CompensateWith": [],
65 "Saga": false,
66 "NextStepId": null,
67 "Inputs": {
68 "UserId": "\"5\""
69 },
70 "Outputs": {},
71 "SelectNextStep": {
72 "end_16002488928403hmjauowus7": "1==1"
73 }
74 }],
75 "Id": "c51e908f-60e3-4a01-ab63-3bce0eaedc48",
76 "Version": 1,
77 "Description": "请假"
78 }
总结
一句话,上面所写的一切都是为了将流程注册到WorkflowCore中而做的铺垫。
后面我会把代码整理一份作为一个ABP的独立模块开源出来供大家参考!
有四年没写博客了,很多东西写着写着觉得没意思,就不写了,这篇写得不好希望各位博友口下留情!
.NET Core实战项目之CMS 第一章 入门篇-开篇及总体规划
【.NET Core微服务实战-统一身份认证】开篇及目录索引
Redis基本使用及百亿数据量中的使用技巧分享(附视频地址及观看指南)
.NET Core中的一个接口多种实现的依赖注入与动态选择看这篇就够了
用abp vNext快速开发Quartz.NET定时任务管理界面