RX.NET

# [C#] Observer Pattern

def : 定義對象間一種一對多的依賴關係,當一個對象的狀態發生狀態改變時,所有依賴他的都得的到通知並被自動更新。

  • Observable (a.) ⇒ 可觀察的
  • Subscribe (v.) ⇒ 訂閱
  • Subject (n.) ⇒ 主題

需求: 有一個氣象站,有三個傳感器分別為:溫度、濕度、氣壓,有一個WeatherData對象,要從氣象站獲得這三個數據;並展示實時的數據。

e.g. 1.蘋果日報發行報紙;2.你訂閱報紙,每一天都會收到;3.取消訂閱就收不到;4.報社運營時就會有人一直去訂閱/取消

Publishers + Subscribers = Observer Pattern在观察者模式里, 我们把报社叫做被观察对象(Subject), 把订阅者叫做观察者(Observers)

https://yqfile.alicdn.com/img_85460a3a2d4caf32dee9fccbe171ad48.png

  • 老鼠出來,人跟貓(擴充點)都會被嚇跑

# Rx.net vs RxJS 筆記

RX (Reactive Extensions) 概括為解決複雜非同步的問題。

程式有Imperative(命令式)即線型編程風格和Declarative(聲明式)類似LINQ。

聲明式的好處是不用在乎細節或底層原理即可得到想要的結果, e.g. SQL、LINQ

Observer pattern如視窗被點擊時的通知、老鼠出現人和貓都怕的案例、燒熱水的通知的經典案例。

# Observable

單字解釋: 可被觀察的;可被訂閱的;明星;實況主

會一直有stream產生的;一直被push的;會頻繁發生的事件;資料流;時間序列上的一連串資料事件

e.g. window的click事件;

  • Observable.Return

# Observer

單字解釋: 觀察者👀;觀眾

# Subscribe(v.)

單字解釋: 訂閱

舉一個生活中的例子,如帳號A、B ; 知名實況主:張 ;

var streamer = Observable.Create<YT>(observer)=>{
		observer.OnNext(1);
				
}

# Operator

# Select

  • map
let a = [1,2,3,4,5,6,7,8];
let source = a.map(e => {
    return e + 10;
});
console.log(source); // [11, 12, 13, 14, 15, 16, 17, 18]
var num = [1,2,3,4]
source = num.Select(num => num + 1); 
// 2,3,4,5 

# Where

  • filter
[{"age":"18","name":"john"},{"age":"22","name":"lee"}]
var source = people.filter(function(item, index, array){
  return item.age < 20;      
});
// {"age":"18","name":"john"}
var source = people.Where(x=>x.age<20);
// {"age":"18","name":"john"}

# Do

  • Tap
title$ = this.httpClient.get('').pipe(
	tap(data=>console.log(data)),
  map(data => data.title),
  tap(data=>console.log(data)
)
source
    .Do(value => Console.WriteLine($"Current: {value}")
    .Subscribe();

side effect 的操作都盡可能在 Do 裡面處理。

這裡的理解會是 console.log 負責debug,紀錄除錯

盡而不去影響到參考型別(Reference Type)的最終數據



# 響應式開發

  • 解決服務端實現的併發操作、在服務端通過平行計算解決請求,亦為「觀察者模式」。
  • Publisher (製片廠)
    • 生產者發布一個元素
  • Subscriber (觀眾)
    • 消費者從Publisher獲得元素
    • Error , Next , Complete
  • Subscription (電影院)
    • 對Publisher和Subscriber做一個解藕
    • cancel , request
  • Processor (區經理→經理→員工)
    • 既是Publisher又是Subscriber;同時扮演兩個角色

# Rx Net

  • Event-driven systems in the cloud , client , and IoT devices (事件驅動)
  1. Popular today in client-side code (使用客戶端代碼、訊息流)
  2. Created by Cloud Programmability team at Microsoft (電腦A至電腦B的計算)
  3. Useful in any system where things happen (事情發生)

# IObservable

A sequence of things (一系列的事件)、是抽象的(事件的方式),可以訂閱它;

  • 當新的流會通知你
  • Don't Call Us , We Will Call you.

Same concept as IEnumerable but ... reactive!

// IObservable<int> xs = ...? (沒有IMPL實現)
IObservable<int> xs = Observable.Range(1,10);
//foreach(int x in xs)
//{
//}
//Callback
xs.Subscrib(x=> Console.WriteLine(x));
static async Task Main(strings[] args)
{
		IObservable<long> xs = Observable
				.Timer(DateTimeOffset.Now.AddSeconds(1.5),TimeSpan.FromSeconds(0.5)) // 0.5秒給一個事件
				.Where(x => x% 2 ==0)
				.Take(5)
		xs.Subscrib(x=> Console.WriteLine(x));
		await new TaskCompletionSource<object>().Task;
}
static async Task Main(strings[] args)
{
		var xs = Keystrokes.Where(char.IsUpper);
		xs.Subscrib(x=> Console.WriteLine(x));
		await new TaskCompletionSource<object>().Task;
}

static IObservable<char> Keystrokes()
{
		return Observable.Create<char>(
		(obs , cancel) => 
		{
			return Task.Run(()=>
			{
					while (!cancel.IsCancellationRequested)
					{
							char c = Console.ReadKey.KeyChar;
							obs.OnNext(c);
					}
			});
		});
}

%E9%9F%BF%E6%87%89%E5%BC%8F%E9%96%8B%E7%99%BC%20151de94922f646c3bf91bc0c8332cfea/Untitled.png

static async Task Main(strings[] args)
{
		var xs = Keystrokes.Where(char.IsUpper).TakeUntil(c == 'Q');
		await new TaskCompletionSource<object>().Task;
}
 

private class Obs<T> : IObserver<T>
{
	 public void OnNext(T value)
	 {
			//foreach things.
			Console.WriteLine(value)
	 }
	 public void OnCompleted()
	 {
			Console.WriteLine("Done!");
	 }
	 public void OnError(Exception error)
	 {
			Console.WriteLine("Aaaaaaargh!!" + error);
	 }

}

static IObservable<char> Keystrokes()
{
		return Observable.Create<char>(
		(obs , cancel) => 
		{
			return Task.Run(()=>
			{
					while (!cancel.IsCancellationRequested)
					{
							char c = Console.ReadKey.KeyChar;
							obs.OnNext(c);
					}
			});
		});
}
  • LINQ operators (Where, Select , etc)
  • Scheduling (計時器正好運行我調用的loop,回調該計時器的實際運作方式)
    1. 時間敏感的操作: 基於計時器的操作、基於時間將事物分組、時間跑步
    2. 基於某個時間點、回到重前
  • Bridging (subjects , Observable.Create)
    • 任務(事件)、非同步流
    • 管理訂閱管理取消,拋出異常而不是自動處理它 (呼叫完成)
static async Task Main(strings[] args)
{
		var xs = Keystrokes.Where(char.IsUpper).TakeUntil(c == 'Q').Publish().RefCount();
		xs.Subscribe(new Obs<char>());
		IObservable<char> theLastChar = xs.LastAsync();
		await theLastChar;
}
 

private class Obs<T> : IObserver<T>
{
	 public void OnNext(T value)
	 {
			//foreach things.
			Console.WriteLine(value)
	 }
	 public void OnCompleted()
	 {
			Console.WriteLine("Done!");
	 }
	 public void OnError(Exception error)
	 {
			Console.WriteLine("Aaaaaaargh!!" + error);
	 }

}

static IObservable<char> Keystrokes()
{
		return Observable.Create<char>(
		(obs , cancel) => 
		{
			return Task.Run(()=>
			{
					while (!cancel.IsCancellationRequested)
					{
							char c = Console.ReadKey.KeyChar;
							obs.OnNext(c);
					}
			});
		});
}

WHY RX (opens new window)