앞선 포스팅( [RxSwift] Reactive Programming & RxSwift )에서는 RxSwift가 무엇인지 그 본질에 대해 알아보았습니다.
제가 저번 포스팅을 하면서 느낀 것은 결국 RxSwift는 Rx를 Swift로 구현한 것일뿐, 절대 Rx와 별개의 것이 아니라는 것입니다.
그래서 오늘부터 그냥 Rx의 공식문서를 읽어보며 Rx 그 본질을 학습하는 시간들을 가져볼까합니다. (그래서 사실은 RxSwift보다는 Rx 학습에 가까울 수는 있습니다... 그래도 RxSwift의 예제도 함께 확인하면서 공부할거예요!)
오늘은 Observable과 Operator과 Operator에 대해 읽고 정리했습니다.
Observable
Observable은 특정 형태의 데이터의 snapshot을 전달하는 일련의 이벤트들을 비동기적으로 생성합니다. 그리고 이를 Observer라는 것에 전달합니다.
Observer는 Observable을 Subscribe합니다. 그러면 Observer는 Observable이 발행(emit)하는 아이템이나 일련의 아이템 시퀀스에 반응할 수 있게됩니다. 이러한 패턴은 concurrent operation이 가능하게 합니다. 왜냐하면 Observable이 아이템을 발행할 때까지 기다릴 필요없이 특정 아이템이 발행되면 그 시점을 감시하는 관찰자(sentry)를 Observer 내부에 두고 그 관찰자를 통해 발행 알림을 받으면 되기 때문입니다.
마블 다이어그램
아래 보여지는 그림은 "마블 다이어그램"으로, Observable의 흐름과 전환을 그림으로 표현한 것입니다. 이는 Observable을 이해하는데 큰 도움이 되니 잘 알아두어야 합니다.
- 위의 왼쪽에서 오른쪽으로 향하는 화살표는 Observable의 타임라인입니다. 왼쪽에서 오른쪽으로 흐릅니다.
- 위의 화살표에 위치하는 아이템들은 Observable에 의해 발행되는 아이템들입니다.
- 아이템들과 함께 화살표에 위치하는 수직 직선은 Observable이 성공적으로 completed 이벤트를 발행했음을 의미합니다.
- 위에서 아래로 향하는 점선 화살표와 박스는 Observable에서 행해지는 특정한 변환을 의미합니다. 박스 안쪽에 적힌 텍스트로부터 이것이 어떤 변환인지 알 수 있습니다.
- 아래의 왼쪽에서 오른쪽으로 향하는 화살표는 Observable의 변환 결과입니다.
- 아이템들과 함께 화살표에 위치하는 엑스 표시는 error를 의미합니다.
Observer 생성
Observer는 아래와 같은 메서드를 구현하고 사용합니다.
onNext
: Observable이 아이템을 발행할 때마다 해당 메서드가 호출됩니다. 해당 메서드는 Observable이 발행하는 아이템을 파라미터로 전달받습니다.onError
: Observable에서 오류가 발생할 경우 호출됩니다. 해당 메서드가 호출되면onNext
나onCompleted
는 더 이상 호출되지 않고 라이프사이클이 종료됩니다. 해당 메서드는 error 객체를 파라미터로 전달받습니다.onCompleted
: 성공적으로 오류발생없이 주어진 시퀀스를 모두 발행했다면 호출됩니다.
let one = 1
let two = 2
let three = 3
let observable = Observable.of(one, two, three) // 1
observable.subscribe( // 2
onNext: { (element) in // 3
print(element)
},
onCompleted: { // 4
print("Completed")
}
)
/* Prints:
1
2
3
completed
*/
- 1) of 메서드를 통해 Observable 객체를 생성합니다. 해당 객체는 one, two, three의 데이터 시퀀스를 보유하고 이를 순서대로 발행합니다.
- 2) subscribe를 통해 Observable에 대한 Observer를 만들 수 있습니다.
- 3) onNext 메서드로 발행된 값을 이용한 동작을 처리합니다.
- 4) onCompleted 메서드로 completed에서 수행할 동작을 처리합니다.
Unsubscribing
Rx의 구현체에는 Subscriber라는 특별한 인터페이스가 존재합니다. 이는 unscribe라는 메서드를 제공하여 현재 subscribe중인 Observer에 대한 subscribe를 해지할 수 있습니다. 이는 연산자 체인을 통해 Observer가 구독중인 Observable들이 더 이상 발행되지 못하도록 체인 내 연결 링크는 끊어버립니다.
Hot Observable, Cold Observable
Observable은 보유중인 데이터 시퀀스를 언제 발행할까요? 정답은 "Observable에 따라 다르다"입니다. Observer에는 Hot Observable과 Cold Observable이라는 개념이 존재합니다.
Hot Observable은 생성되자마자 데이터에 대한 발행을 시작합니다. 따라서 데이터 발행 중간부터 Observable을 subscribe할 수도 있습니다.
반면 Cold Observable은 Observer가 구독할 때까지는 데이터를 발행하지 않습니다. 따라서 이를 subscribe한 Observer는 전체 데이터 시퀀스를 전달받습니다.
Observable의 다양한 Operator
Observable과 Observer는 단순히 표준 Observer Pattern을 조금 확장한 정도에 불과한 기본 개념입니다. Rx가 진짜 유용한 이유는 다양한 Operator로부터 나옵니다. Operator는 Observable이 발행하는 연속된 아이템들을 변환, 결합, 조작하는 기능을 가졌습니다.
이 Operator들은 선언적인 방법을 통해 연속적인 비동기 호출을 구성할 수 있는 방법을 제공합니다. 이것은 일반적인 비동기 시스템이 가진 중첩 콜백 핸들러의 단점 (일명 콜백지옥)을 제거했다는 점에서 높이 평가받습니다. 또한 대부분의 Operator들이 Observable 상에서 동작하고 Observable을 리턴합니다. 이런 접근 방법은 하나의 Observable에 대해 연속적으로 Operator를 적용하는 것을 가능하게 합니다. 이들은 초기 Observable로부터 순서대로 처리됩니다.
아래는 공식문서에서 소개하는 다양한 Operator들입니다. 공식문서에서 각 Operator의 자세한 설명과 마블 다이어그램을 확인할 수 있습니다.
Creating Observables
새로운 Observable을 만드는 Operator들입니다.
- Create — 직접적인 코드 구현을 통해 Observable을 생성합니다.
- Defer — Observer가 구독하기 전까지는 Observable 생성을 지연하고 구독이 시작되면 Observer 별로 새로운 Observable을 생성합니다.
- Empty/Never/Throw — 정확하고 제한된 행동을 하는 Observable을 생성합니다.
- From — 다른 객체나 자료 구조를 Observable로 변환합니다.
- Interval — 특정 시간별로 연속된 정수형을 발행하는 Observable을 생성합니다.
- Just — 객체 하나 또는 객채집합을 Observable로 변환합니다. 변환된 Observable은 단순히 원본 객체들을 발행합니다.
- Range — 연속된 범위(Range)의 정수를 발행하는 Observable을 생성합니다.
- Repeat — 특정 아이템이나 연속된 아이템들을 반복적으로 발행하는 Observable을 생성합니다.
- Start — 함수의 실행 결과를 발행하는 Observable을 생성합니다
- Timer — 지정된 시간이 지나고 난 후 아이템을 하나 발행하는 Observable을 생성합니다
Transforming Observables
Observable이 emit한 아이템들을 변환하는 Operator들입니다.
- Buffer — Observable로부터 정기적으로 아이템들을 수집하고 묶음으로 만든 후에 묶음 안에 있는 아이템들을 한번에 하나씩 발행하지 않고 수집된 묶음 단위로 발행합니다.
- FlatMap — 하나의 Observable이 발행하는 아이템들을 여러개의 Observable로 변환하고, 아이템들의 발행을 차례차례 줄 세워 하나의 Observable로 전달합니다.
- GroupBy — 원본 Observable이 발행하는 아이템들을 키(Key) 별로 묶은 후 Observable에 담습니다. 이렇게 키 별로 만들어진 Observable들은 자기가 담고 있는 묶음의 아이템들을 발행합니다.
- Map — Observable이 발행한 아이템에 함수를 적용합니다.
- Scan — Observable이 발행한 아이템에 연속적으로 함수를 적용하고 실행한 후 성공적으로 실행된 함수의 리턴 값을 발행합니다.
- Window — 정기적으로 Observable의 아이템들을 더 작은 단위의 Observable 윈도우로 나눈 후에, 한번에 하나씩 아이템들을 발행하는 대신 작게 나눠진 윈도우 단위로 아이템들을 발행합니다.
Filtering Observables
기존 Observable에서 특정 조건에 맞는 아이템만 발행하는 Operator들입니다
- Debounce — Observable의 시간 흐름이 지속되는 상태에서 다른 아이템들은 발행하지 않고 특정 시간 마다 그 시점에 존재하는 아이템 하나를 Observable로부터 발행합니다.
- Distinct — Observable이 발행하는 아이템들 중 중복을 제거한 아이템들을 발행합니다.
- ElementAt — Obserable에서 n번째 아이템만 발행합니다.
- Filter — 테스트 조건을 만족하는 아이템들만 발행합니다.
- First — 맨 첫 번째 아이템 또는 조건을 만족하는 첫 번째 아이템만 발행합니다.
- IgnoreElements — 아이템들을 발행하지는 않고 종료 알림은 보냅니다.
- Last — Observable의 마지막 아이템만 발행합니다.
- Sample — 특정 시간 간격으로 최근에 Observable이 발행한 아이템들을 발행합니다.
- Skip — Observable이 발행한 처음 n개의 아이템들을 숨깁니다.
- SkipLast — Observable이 발행한 마지막 n개의 아이템들을 숨깁니다.
- Take — Observable이 배츨한 처음 n개의 아이템들만 발행합니다.
- TakeLast — Observable이 발행한 마지막 n개의 아이템들만 발행합니다.
Combining Observables
여러 개의 Observable들을 하나의 Observable로 만드는 Operator들입니다.
- And/Then/When — 두 개 이상의 Observable들이 발행한 아이템들을 'Pattern'과 'Plan' 중계자를 이용해서 결합합니다.
- CombineLatest — 두 개의 Observable 중 하나가 아이템을 발행할 때 발행된 마지막 아이템과 다른 한 Observable이 발행한 아이템을 결합한 후 함수를 적용하여 실행 후 실행된 결과를 발행합니다.
- Join — A Observable과 B Observable이 발행한 아이템들을 결합하는데, 이때 B Observable은 발행한 아이템이 타임 윈도우를 가지고 있고 이 타임 윈도우가 열린 동안 A Observable은 아이템의 발행을 계속합니다. Join 연산자는 B Observable의 아이템을 발행하고 발행된 아이템은 타임 윈도우를 시작시킵니다. 타임 윈도우가 열려 있는 동안 A Observable은 자신의 아이템들을 계속 발행하여 이 두 아이템들을 결합합니다.
- Merge — 복수 개의 Observable들이 발행하는 아이템들을 머지시켜 하나의 Observable로 만듭니다.
- StartWith — 소스 Observable이 아이템을 발행하기 전에 다른 아이템들을 앞에 추가한 후 발행합니다.
- Switch — Observable들을 발행하는 Observable을 싱글 Observable로 변환하다. 변환된 싱글 Observable은 변환 전 소스 Observable들이 발행한 아이템들을 발행합니다.
- Zip — 명시한 함수를 통해 여러 Observable들이 발행한 아이템들을 결합하고 함수의 실행 결과를 발행합니다.
Error Handling Operators
Observable로부터 오는 오류를 확인하고 복구할 수 있도록 도와주는 Operator들입니다.
- Catch — 오류를 무시하고 발행되는 아이템들을 계속 진행시켜 'onError'로부터 전달된 오류를 복구합니다.
- Retry — 만약 소스 Observable이 'onError' 알림을 보낼 경우, 오류 없이 실행이 완료되기를 기대하며 재구독을 시도합니다.
Observable Utility Operators
Obserable와 함께 동작하는 서포트 역할의 Operator들입니다.
- Delay — Observable의 발행을 특정 시간동안 미룹니다.
- Do — Observable의 생명주기 동안 발생하는 여러 이벤트에서 실행 될 액션을 등록합니다.
- Materialize/Dematerialize — 발행된 아이템이 어떤 알림을 통해 옵저버에게 전달 됐는지를 표현합니다. 그 반대 과정도 수행할 수 있습니다.
- ObserveOn — 옵저버가 어느 스케줄러 상에서 Observable을 관찰할지 명시합니다.
- Serialize — Observable이 직렬화된 호출을 생성하고 제대로 동작하도록 강제합니다.
- Subscribe — Observable이 발행하는 아이템과 알림을 기반으로 동작합니다.
- SubscribeOn — Observable을 구독할 때 사용할 스케줄러를 명시합니다.
- TimeInterval — 아이템들을 발행하는 Observable을, 아이템을 발행하는데 걸린 시간이 얼마인지를 가리키는 Observable로 변환합니다.
- Timeout — 소스 Obvservable을 그대로 전달하지만, 대신 특정 시간 동안 발행된 아이템이 없으면 오류 알림을 보냅니다.
- Timestamp — Observable이 발행한 아이템에 타임 스탬프를 추가합니다.
- Using — 소스 Observable과 동일한 생명주기를 갖는 Observable을 생성하는데, 이 Observable은 생명주기가 완료되면 리소스를 종료하고 반환합니다.
Conditional and Boolean Operators
하나 이상의 Observable 또는 Observable이 발행한 아이템을 평가하는 Operator들입니다.
- All — Observable이 발행한 전체 아이템들이 어떤 조건을 만족시키는지 판단합니다.
- Amb — 두 개 이상의 소스 Observable이 주어 질때, 그 중 첫 번째로 아이템을 발행한 Observable이 발행하는 아이템들을 전달합니다.
- Contains — Observable이 특정 아이템을 발행하는지 아닌지를 판단합니다.
- DefaultIfEmpty — 소스 Observable이 발행하는 아이템을 전달한다. 만약 발행되는 아이템이 없으면 기본 아이템을 발행합니다.
- SequenceEqual — 두 개의 Observable이 아이템을 같은 순서로 발행하는지 판단합니다.
- SkipUntil — 두 번째 Observable이 아이템을 발행하기 전까지 발행된 아이템들을 버립니다.
- SkipWhile — 특정 조건이 false를 리턴하기 전까지 Observable이 발행한 아이템들을 버립니다.
- TakeUntil — 두 번째 Observable이 아이템을 발행하기 시작햤거나 두 번째 Observable이 종료되면 그 때부터 발행되는 아이템들은 버립니다.
- TakeWhile — 특정 조건이 false를 리턴하기 시작하면 그 이후에 발행되는 아이템들을 버립니다.
Mathematical and Aggregate Operators
Observable이 발행하는 항목 전체를 대상으로 동작하는 Operator들입니다.
- Average — Observable이 발행한 아이템의 평균 값을 발행합니다
- Concat — 두 개 이상의 Observable들이 아이템을 발행할 때 Observable 순서대로 발행하는 아이템들을 하나의 Observable 발행로 연이어 발행합니다.
- Count — 소스 Observable이 발행한 아이템의 개수를 발행합니다.
- Max — Observable이 발행한 아이템 중 값이 가장 큰 아이템을 발행합니다.
- Min — Observable이 발행한 아이템 중 값이 가장 작은 아이템을 발행합니다.
- Reduce — Observable이 발행한 아이템에 함수를 순서대로 적용하고 함수를 연산한 후 최종 결과를 발행합니다.
- Sum — Observable이 발행한 아이템의 합계를 발행합니다.
Connectable Observable Operators
좀 더 세세한 조작이 가능한 특별한 Observable들입니다.
- Connect — 구독자가 아이템 발행을 시작할 수 있도록 Connectable Observable에게 명령을 내립니다.
- Publish — 일반 Observable을 Connectable Observable로 변환합니다.
- RefCount — 일반 Observable처럼 동작하는 Connectable Observable을 만듭니다.
- Replay — 비록 옵저버가 Observable이 아이템 발행을 시작한 후에 구독을 했다 하더라도 발행된 모든 아이템들을 볼 수 있도록 합니다.
Operators to Convert Observables
- To — Observable을 다른 객체나 자료 구조로 변환합니다.
참고
'🍎 Apple > Combine & Rx' 카테고리의 다른 글
[RxSwift] Scheduler (2) | 2022.02.24 |
---|---|
[RxSwift] Subject (0) | 2022.02.16 |
[Combine] share, multicast (0) | 2022.01.13 |
[Combine] timer (by Runloop, Timer Class, DispatchQueue) (2) | 2022.01.13 |
[Combine] Operator (4): Sequence Operator (0) | 2022.01.12 |