버그 잡이

[RxJava] Operator란 무엇인가? #create #fromIterater() #fromCallable() #fromPublisher() 본문

모던 안드로이드/RxJava

[RxJava] Operator란 무엇인가? #create #fromIterater() #fromCallable() #fromPublisher()

버그잡이 2020. 4. 29. 11:47

Operator란?

 

아직은 operator를 직접 써보지 않아서 정의를 내리긴 힘들지만 나의 짧은 지식으로 정의를 내려보자면

 

 

"observable에서 나오는 data를 처리하는 연산자이다. "

 

*생성, 변환, 필터, 결합, 조건, 에러 처리 등 다양한 종류의 연산자가 있다.

 

 

아무튼 RxJava는 다양한 operator들을 잘 조합함으로써 완성되는 것 같다.

그렇기 때문에 operator들을 잘 이해하는 것이 곧 rxJava를 이해하는 길이라고 생각한다.

 

 

 

 

 

Operator의 종류

 

1. create : 데이터 받아서 observable로 만들어 주기 ex_ fromIterable() 

 

2. filter : 말 그대로 거르는 것  ex_ filter()

 

3. other format : 다른 형태로 바꿔주는 것  ex_ map()

 

 

 

 

 

이번 글에서는 create 기능을 수행하는 Operator를 중점으로 살펴보자.

 

 

 

 

create()

 

observable을 만들때 사용.

 

 

 

1) single observable 만들기

 

// Create the Observable
Observable<Task> singleTaskObservable = Observable
        .create(new ObservableOnSubscribe<Task>() {
            @Override
            public void subscribe(ObservableEmitter<Task> emitter) throws Exception {
                if(!emitter.isDisposed()){
                    emitter.onNext(task);
                    emitter.onComplete();
                }
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

 

- 원리를 이해해보자.

- emitter라는 observable 을 반환하는 놈을 만든다. (ObservableOnSubscribe 는 emitter 생성 공장 같은 느낌인가?)

- emitter의 onNext 메서드에 task를 전달함으로써 task를 observable로 만든 후 observer에 전달한다.

 

- 명확히 이해는 안 되지만 일단 이런 흐름이라는 것을 이해해두자.

- 흠... 근데 너무 수다스러운 코드 아닌가? 나중에 다른 방법도 알려주겠지?

 

 

 

 

2) 2개 이상 observable 만들기

 

Observable<Task> taskListObservable = Observable
    .create(new ObservableOnSubscribe<Task>() {
        @Override
        public void subscribe(ObservableEmitter<Task> emitter) throws Exception {
        
            // Inside the subscribe method iterate through the list of tasks and call onNext(task)
            for(Task task: DataSource.createTasksList()){
                if(!emitter.isDisposed()){
                    emitter.onNext(task);
                }
            }
            // Once the loop is complete, call the onComplete() method
            if(!emitter.isDisposed()){
                emitter.onComplete();
            }

        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

- for문을 사용해서 여러개의 emitter를 만들었다.

 

 

 

 

 

fromArray, fromIterable, fromCallable, fromPublisher

 

- mitch said "가장 유용하게 쓸 operator일 것이다."

 

 

 

fromArray()

 

 : 이름 그대로 array형태의 data를 받을 수 있다.

 

 

fromIterable()

 

 : iterable한 모든 객체

    ex) List, ArrayList, Set 등등

 

    *iterable : member를 차례로 반환할 수 있는 객체

 

 

 

fromCallable()

 : SQLite 데이터 접근 및 결과를 반환 받을때.

 

 

*예제를 먼저 살펴보자

Observable.fromCallable{useRoom(); return data}.subscribeOn().observenOn().subscribe({},{},{})

 

- RoomDB에 접근(in Background)하고 결과 값을 반환한다. 

- 옵저버는 반환받은 값을 구독하여 이를 MainThread에서 특정 작업(UI최신화 등)에 반영한다. 

 

이렇게 fromCallable()을 사용하면 기존 AsyncTask로 하던 작업을 보다 쉽게 할 수 있다.

 

*예체 참고 블로그 : https://gamjatwigim.tistory.com/61

 

 

 

 

fromPublisher()

- LiveData <-> Observable 형태로 서로 바꿔줄때 사용

- Mvvm패턴에서 LiveData를 활용할때 유용하다.

 

 

*사용법

 

 

1. 라이브러리 추가

   

 - implementation "android.arch.lifecycle:reactivestreams:1.1.1"

 

  *추가적으로 retrofit으로 response를 받을때 observable로 받을 수 있게 해주는 라이브러리도 추가한다.

 

   - implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"

 

 

 

2. Retrofit response를 observable로 받기

 

.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) 를 추가해준다.

 

public class ServiceGenerator {

    public static final String BASE_URL = "https://jsonplaceholder.typicode.com";

    private static Retrofit.Builder retrofitBuilder =
            new Retrofit.Builder()
                    .baseUrl(BASE_URL)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .addConverterFactory(GsonConverterFactory.create());

    private static Retrofit retrofit = retrofitBuilder.build();

    private static RequestApi requestApi = retrofit.create(RequestApi.class);

    public static RequestApi getRequestApi(){
        return requestApi;
    }
}

 

 

3. 받은 observable을 LiveData로 변환

 

.fromPublisher : observable -> LiveData

.toPublisher : LiveData -> observable

 

    public LiveData<ResponseBody> makeReactiveQuery(){
        return LiveDataReactiveStreams.fromPublisher(ServiceGenerator.getRequestApi()
                .makeQuery()
                .subscribeOn(Schedulers.io()));
    }

 

*코드 출처 : https://codingwithmitch.com/courses/rxjava-rxandroid-for-beginners/rx-operators-from-publisher/

 

 

 

 

 

 

 

 

 

반응형
Comments