.NET并发编程-反应式编程
共 4045字,需浏览 9分钟
·
2021-03-11 00:43
你以为我没回头
我以为你没挽留
年轻的想法还是尽早安排落地
本系列学习在.NET中的并发并行编程模式,实战技巧
本小节开始学习反应式编程,以及在.NET中数据并行的实现方式。本系列保证最少代码呈现量,虽然talk is cheap, show me the code被奉为圭臬,我的学习习惯是,只学习知识点,代码不在当下立马要用的时候不会认真去读的,更何况在大多时候在手机阅读更不顺畅。
反应式编程
就是将事件作为数据流异步地监听和处理的一种编程范式。类似编辑Excel表格时,某个单个元的值是由其他单元格值决定的公式,一旦其他单元格的值发生变化,此公式单个格也会随之更新。
如果以声明式描述操作,那么就是函数式反应型编程。
反应式编程的好处就是通过描述一种简单且可维护的方法来处理异步、无阻赛计算和IO,从而增加了对多核和多CPU硬件上计算资源的使用。事件的控制从“请求”变为了“等待”。大数据实时流处理环境,都会使用到反应式编程,例如推文系统处理,日志系统收集等。
.NET中的工具
.NET中是有基于委托的模型事件的。订阅者的事件处理程序注册一系列事件,并在调用时触发事件。类似桌面应用程序中经常使用Button的Click事件,通过使用+=来注册绑定事件。所以传统意义上的事件就是来处理GUI用户界面的交互。这种命令式编程难以处理多个复杂事件的编排,并且容易造成内存泄漏。
在F#中使用|>管道操作符可以连接多个事件。不熟悉F#,就不多做说明。主要看看C#如果处理。
Reactive Extensions(Rx)反应式扩展
LINQ 是 对 序 列 数 据 进 行 查 询 的 一 系 列 语 言 功 能。 内 置 的 LINQ to Objects( 基 于 IEnumerable<T>) 和 LINQ to Entities( 基 于 IQueryable<T>) 是 两 个 最 常 用 的 LINQ 提 供 者。另外还有很多提供者,并且大多数都采用相同的基本架构。查询是延后执行的,只有在需要时才会从序列中获取数据。从概念上讲,这是一种拉取模式。 在查询过程中数据项是被逐个拉取出来的。
Reactive Extensions(Rx)把事件看作是依次到达的数据序列。因此,将 Rx 认作是 LINQ to events( 基 于 IObservable<T>) 也 是 可 以 的, 它 与 其 他 LINQ 提 供 者 的 主 要 区 别 在 于, Rx 采用“推送”模式。就是说,Rx 的查询规定了在事件到达时程序该如何响应。Rx 在 LINQ 的基础上构建,增加了一些功能强大的操作符,作为扩展方法。
通过nuget安装Rx
Observable和Enumerable是对偶存在,很多方法相似可以相互转换。像下面输出结果都是1-5,。
// Observable对象
Observable.Range(1, 5)
.Subscribe(x => Console.WriteLine(x));
// Enumerable对象
foreach (var x in Enumerable.Range(1, 5))
Console.WriteLine(x);
Rx里主要的接口有两个,IObervable<T>和 IObserver<T>,上面Observable遍历输出只是简化了IObserver对象的创建,Subscribe方法可以只传入OnNext委托,也可以传入完整的IObserver对象。
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
var observer = Observer.Create<int>(
x => Console.WriteLine(x), // onNext参数(delegate)
ex => { throw ex; }, // onError参数(delegate)
() => { }); // onCompleted参数(delegate)
Observable.Range(1, 5).Subscribe(observer);
Rx主要用处
Rx的有点主要在于能够将传统的异步编程方式从支离破碎的代码调用中解放出来,将各个事件的处理连接起来放在一个单独的方法中,增加代码可读和可维护。
GUI编程中合并事件
桌面编程中多个事件进行组合的情况,比如鼠标按下/移动/放开事件进行关联处理,一般可能需要定一个变量Flag来标记状态,管理比较混乱。Rx可以将它们合成一个事件。
var drag = from down in this.MouseDownAsObservable()
from move in this.MouseMoveAsObservable().TakeUntil(this.MouseUpAsObservable())
select new { move.X, move.Y };
// 利用扩展方法将Winform原有的事件变换为 IObservable<T> 对象
public static class FormExtensions
{
public static IObservable<MouseEventArgs> MouseMoveAsObservable(this Form form)
{
return Observable.FromEventPattern<MouseEventArgs>(form, "MouseMove").Select(e => e.EventArgs);
}
public static IObservable<MouseEventArgs> MouseDownAsObservable(this Form form)
{
return Observable.FromEventPattern<MouseEventArgs>(form, "MouseDown").Select(e => e.EventArgs);
}
public static IObservable<MouseEventArgs> MouseUpAsObservable(this Form form)
{
return Observable.FromEventPattern<MouseEventArgs>(form, "MouseUp").Select(e => e.EventArgs);
}
}
Timer通知事件
在一定的时间间隔监视某个值的场景。下面例子中就是每隔5s检查textbox值是否变化,变化了就更新label。比以往简介了很多
// 每隔1秒监视一下watchTarget.Value的值
var polling =
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(5))
.Select(_ => textBox1.Text)
.DistinctUntilChanged(); // 只有在值发生变化时才引发事件(polling)
polling.Subscribe(msg => this.Invoke(new Action<string>((e) => this.label1.Text = e),msg));
先这样草草结束吧,用到了再去详细研究下。像Rx是以往没听过的概念,有些场景中确实好用,但习惯了以往的方式,也比较难尝试,慢慢破圈慢慢生成吧。
你以为我没回头
我以为你没挽留
年轻的想法还是尽早安排落地