# [C#] Observer Pattern
def : 定義對象間一種一對多的依賴關係,當一個對象的狀態發生狀態改變時,所有依賴他的都得的到通知並被自動更新。
- Observable (a.) ⇒ 可觀察的
- Subscribe (v.) ⇒ 訂閱
- Subject (n.) ⇒ 主題
需求: 有一個氣象站,有三個傳感器分別為:溫度、濕度、氣壓,有一個
WeatherData
對象,要從氣象站獲得這三個數據;並展示實時的數據。
e.g. 1.蘋果日報發行報紙;2.你訂閱報紙,每一天都會收到;3.取消訂閱就收不到;4.報社運營時就會有人一直去訂閱/取消
Publishers + Subscribers = Observer Pattern在观察者模式里, 我们把报社叫做被观察对象(Subject), 把订阅者叫做观察者(Observers)
- 老鼠出來,人跟貓(擴充點)都會被嚇跑
# Rx.net vs RxJS 筆記
RX (Reactive Extensions) 概括為解決複雜非同步的問題。
程式有Imperative(命令式)即線型編程風格和Declarative(聲明式)類似LINQ。
聲明式的好處是不用在乎細節或底層原理即可得到想要的結果, e.g. SQL、LINQ
Observer pattern如視窗被點擊時的通知、老鼠出現人和貓都怕的案例、燒熱水的通知的經典案例。
# Observable
單字解釋: 可被觀察的;可被訂閱的;明星;實況主
會一直有stream產生的;一直被push的;會頻繁發生的事件;資料流;時間序列上的一連串資料事件
e.g. window的click事件;
- Observable.Return
# Observer
單字解釋: 觀察者👀;觀眾
# Subscribe(v.)
單字解釋: 訂閱
舉一個生活中的例子,如帳號A、B ; 知名實況主:張 ;
var streamer = Observable.Create<張YT>(observer)=>{
observer.OnNext(1);
}
# Operator
# Select
- map
let a = [1,2,3,4,5,6,7,8];
let source = a.map(e => {
return e + 10;
});
console.log(source); // [11, 12, 13, 14, 15, 16, 17, 18]
var num = [1,2,3,4]
source = num.Select(num => num + 1);
// 2,3,4,5
# Where
- filter
[{"age":"18","name":"john"},{"age":"22","name":"lee"}]
var source = people.filter(function(item, index, array){
return item.age < 20;
});
// {"age":"18","name":"john"}
var source = people.Where(x=>x.age<20);
// {"age":"18","name":"john"}
# Do
- Tap
title$ = this.httpClient.get('').pipe(
tap(data=>console.log(data)),
map(data => data.title),
tap(data=>console.log(data)
)
source
.Do(value => Console.WriteLine($"Current: {value}")
.Subscribe();
side effect 的操作都盡可能在 Do 裡面處理。
這裡的理解會是 console.log
負責debug,紀錄除錯
盡而不去影響到參考型別(Reference Type)的最終數據
# 響應式開發
- 解決服務端實現的併發操作、在服務端通過平行計算解決請求,亦為「觀察者模式」。
- Publisher (製片廠)
- 生產者發布一個元素
- Subscriber (觀眾)
- 消費者從Publisher獲得元素
- Error , Next , Complete
- Subscription (電影院)
- 對Publisher和Subscriber做一個解藕
- cancel , request
- Processor (區經理→經理→員工)
- 既是Publisher又是Subscriber;同時扮演兩個角色
# Rx Net
- Event-driven systems in the cloud , client , and IoT devices (事件驅動)
- Popular today in client-side code (使用客戶端代碼、訊息流)
- Created by Cloud Programmability team at Microsoft (電腦A至電腦B的計算)
- Useful in any system where things happen (事情發生)
# IObservable
A sequence of things (一系列的事件)、是抽象的(事件的方式),可以訂閱它;
- 當新的流會通知你
- Don't Call Us , We Will Call you.
Same concept as IEnumerable
// IObservable<int> xs = ...? (沒有IMPL實現)
IObservable<int> xs = Observable.Range(1,10);
//foreach(int x in xs)
//{
//}
//Callback
xs.Subscrib(x=> Console.WriteLine(x));
static async Task Main(strings[] args)
{
IObservable<long> xs = Observable
.Timer(DateTimeOffset.Now.AddSeconds(1.5),TimeSpan.FromSeconds(0.5)) // 0.5秒給一個事件
.Where(x => x% 2 ==0)
.Take(5)
xs.Subscrib(x=> Console.WriteLine(x));
await new TaskCompletionSource<object>().Task;
}
static async Task Main(strings[] args)
{
var xs = Keystrokes.Where(char.IsUpper);
xs.Subscrib(x=> Console.WriteLine(x));
await new TaskCompletionSource<object>().Task;
}
static IObservable<char> Keystrokes()
{
return Observable.Create<char>(
(obs , cancel) =>
{
return Task.Run(()=>
{
while (!cancel.IsCancellationRequested)
{
char c = Console.ReadKey.KeyChar;
obs.OnNext(c);
}
});
});
}
static async Task Main(strings[] args)
{
var xs = Keystrokes.Where(char.IsUpper).TakeUntil(c == 'Q');
await new TaskCompletionSource<object>().Task;
}
private class Obs<T> : IObserver<T>
{
public void OnNext(T value)
{
//foreach things.
Console.WriteLine(value)
}
public void OnCompleted()
{
Console.WriteLine("Done!");
}
public void OnError(Exception error)
{
Console.WriteLine("Aaaaaaargh!!" + error);
}
}
static IObservable<char> Keystrokes()
{
return Observable.Create<char>(
(obs , cancel) =>
{
return Task.Run(()=>
{
while (!cancel.IsCancellationRequested)
{
char c = Console.ReadKey.KeyChar;
obs.OnNext(c);
}
});
});
}
- LINQ operators (Where, Select , etc)
- Scheduling (計時器正好運行我調用的loop,回調該計時器的實際運作方式)
- 時間敏感的操作: 基於計時器的操作、基於時間將事物分組、時間跑步
- 基於某個時間點、回到重前
- Bridging (subjects , Observable.Create)
- 任務(事件)、非同步流
- 管理訂閱管理取消,拋出異常而不是自動處理它 (呼叫完成)
static async Task Main(strings[] args)
{
var xs = Keystrokes.Where(char.IsUpper).TakeUntil(c == 'Q').Publish().RefCount();
xs.Subscribe(new Obs<char>());
IObservable<char> theLastChar = xs.LastAsync();
await theLastChar;
}
private class Obs<T> : IObserver<T>
{
public void OnNext(T value)
{
//foreach things.
Console.WriteLine(value)
}
public void OnCompleted()
{
Console.WriteLine("Done!");
}
public void OnError(Exception error)
{
Console.WriteLine("Aaaaaaargh!!" + error);
}
}
static IObservable<char> Keystrokes()
{
return Observable.Create<char>(
(obs , cancel) =>
{
return Task.Run(()=>
{
while (!cancel.IsCancellationRequested)
{
char c = Console.ReadKey.KeyChar;
obs.OnNext(c);
}
});
});
}