[TIL-230626] Android Flow 알아보기 - 2

반응형

안녕하세요
오늘은 이전 글 이어서 Flow에 대해 정리해보려고 합니다.


1.  Exception 처리

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // 중간 연산자 catch
                // Exception이 발생하면, catch 처리하고 UI를 업데이트 합니다.
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // 최신 인기 뉴스로 업데이트
                }
        }
    }
}
  • 위의 코드에서 collect, Exception이 발생하면 collect lambda는 호출되지 않습니다

 

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // 오류가 발생하면 마지막 캐시된 값을 출력합니다
            .catch { exception -> emit(lastCachedNews()) }
}
  • catch에서 emit으로 값을 출력할 수 있습니다.

 

2.  다른 CoroutineContext에서 실행

  • 기본적으로, Flow의 Producer는 수집하는 coroutine의 CoroutineContext에서 실행됩니다.
  • 다른 CoroutineContext로 값을 보낼 수 없습니다.
  • 예를 들어, 예제 소스에서 repository 계층에서 viewModelScope에서 사용하는 Dispatchers.Main에서 작업을 수행할 수 없습니다.
  • Flow의 CoroutineContext를 변경하려면, 중간 연산자 flowOn을 사용하면 됩니다.
  • flowOn은 upstream flow의 CoroutineContext를 변경합니다.
    *upstream flow : flowOn 이전에 적용한 중간 연산자와 Producer
  • downstream flow는 영향을 받지않고 Flow로 부터 collect 된 CoroutineContext에서 실행됩니다.
    * downstream flow :  Consumer와 함께 flowOn 이후 중간 연산자
  • flowOn 이 여러개가 있다면, 현재 위치로부터 각각의 upstream을 변경합니다

 

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // defaultDispatcher 에서 실행
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // defaultDispatcher 에서 실행
                saveInCache(news)
            }
            // flowOn은 the upstream flow ↑ 에 영향을 줍니다
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ 는 영향을 받지 않습니다
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}
  • 위의 코드에서, onEach map 연산자는 defaultDispatcher를 사용합니다.
  • 그러나, catch 연산자와 Consumer는 viewModelScope에서 사용되는 Dispatchers.Main에서 실행됩니다.

 

3.  callback 기반 API를 Flow로 변환

  • callbackFlow 란 callback 기반 API를 Flow로 변환해주는 flow 빌더입니다.
  • 아래 예제는 FireStore 데이터베이스 관련 예제입니다.
class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // the Firestore database를 사용자 이벤트로 가져오는 메소드
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Firestore 참고
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
        	
            // Firebase를 초기화하지 못하면, 데이터 스트림을 닫습니다.
            // Flow consumer는 수집을 중지하고 coroutine은 재시작합니다.
        }

        // 새 이벤트에 호출될 firestore에 callback 등록합니다.
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Flow에 이벤트를 보냅니다. Consumer는 새 이벤트를 얻습니다.
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // 이벤트를 Flow에 보낼 수 없습니다.
            }
        }
        // awaitClose에 callback은 Flow가 닫히거나 취소되면 실행됩니다.
        // 이때, Firestore로부터 callback이 삭제 됩니다.
        awaitClose { subscription?.remove() }
    }
}
  • flow 빌더와 달리, callbackFlow는 값을 send 함수나 coroutine 밖에서 trySend를 이용하여 다른 CoroutineContext 으로부터 출력하는 것을 허용합니다.
  • 내부적으로, callbackFlow는 blocking queue와 비슷한 형태인 channel을 사용합니다.
  • channel은 버퍼링 할 수 있는 최대 숫자를 표기하는 capacity로 구성되어집니다. 
  • callbackFlow에서 생성된 channel은 기본적으로 64 element의 기본 capacity를 가집니다.
  • 전체 channel에 새로운 element를 추가하려면, send 사용합니다.
  • send는 새로운 element의 공간이 있을 때 까지 Producer를 중지합니다.
  • offer는 channel에 element를 추가하지 않고, 즉시 false를 리턴합니다.

오늘은 남아있는 Flow에 대한 내용에 대해 정리해보았습니다.

잘못된 내용이 있다면 댓글 부탁드리고, 내용이 좋았다면 공감, 구독 부탁드려요!

반응형