Artikel ini adalah tentang praktik terbaik yang kami temukan saat menggunakan Flow dalam aplikasi Android Dev Summit (ADS) 2019; yang baru saja dibuat open source. Baca terus untuk mengetahui bagaimana setiap layer aplikasi kami menangani streaming data.



Arsitektur aplikasi ADS mengikuti panduan arsitektur aplikasi yang direkomendasikan, dengan penambahan layer domain (UseCases) yang membantu memisahkan persoalan, menjaga class agar tetap kecil, terfokus, dapat digunakan kembali, dan dapat diuji:








Arsitektur aplikasi ADS 2019

Seperti banyak aplikasi Android, aplikasi ADS memuat data dari jaringan atau cache; kami menganggapnya sebagai kasus penggunaan yang sempurna untuk Flow. Untuk operasi tunggal, fungsi suspend lebih cocok dipakai. Ada dua komitmen utama yang menyebabkan refactor aplikasi menggunakan Coroutine. Komitmen pertama melakukan migrasi operasi tunggal, dan yang kedua adalah melakukan migrasi ke streaming data.


Pada artikel ini, Anda bisa menemukan prinsip-prinsip yang kami ikuti untuk melakukan refactor aplikasi dari penggunaan LiveData di semua layer arsitektur menjadi hanya penggunaan LiveData untuk komunikasi antara View dan ViewModel, serta Coroutine untuk UseCase dan layer bawah arsitektur kami.






1. Memilih mengekspos streaming sebagai Flow (bukan Channel)

Ada dua cara untuk menangani streaming data di coroutine: Flow API dan Channel API. Channel adalah sinkronisasi sederhana sedangkan Flow dibangun untuk memodelkan streaming data: ini adalah pabrik berlangganan untuk streaming data. Namun, Channel bisa digunakan untuk mendukung Flow, seperti yang akan kita lihat nanti.
Utamakan Flow karena ia menyediakan lebih banyak fleksibilitas, serta operator dan kontrak yang lebih eksplisit daripada Channel
Flow secara otomatis menutup streaming data karena sifat dari operator terminal yang memicu eksekusi streaming data dan menyelesaikannya dengan sukses atau luar biasa bergantung pada semua operasi flow di sisi produser. Karena itu, Anda tidak bisa (tidak dengan mudah) membocorkan sumber daya di sisi produser. Ini lebih mudah dilakukan dengan Channel: produser mungkin tidak perlu membersihkan sumber daya berat jika Channel tidak ditutup dengan benar.

Layer data aplikasi bertanggung jawab menyediakan data dengan membaca dari database atau mengambil dari Internet. Sebagai contoh ini adalah antarmuka DataSource yang mengekspos streaming data event pengguna:
interface UserEventDataSource { fun getObservableUserEvent(userId: String): Flow< UserEventResult > }



2. Cara menggunakan Flow di arsitektur aplikasi Android Anda


UseCase dan Repositori

Layer di antara View/ViewModel dan DataSource (dalam kasus kami adalah UseCase dan Repositori) sering kali perlu menggabungkan data dari beberapa kueri atau mentransformasi data sebelum ia bisa digunakan oleh layer ViewModel. Sama seperti sekuens Kotlin, Flow mendukung banyak operator untuk mentransformasi data Anda. Ada banyak operator yang sudah tersedia, atau Anda bisa membuat transformasi sendiri (mis. menggunakan operator transform). Namun, Flow mengekspos lambda suspend pada banyak operator, sering kali kita tidak perlu melakukan transformasi khusus untuk menyelesaikan tugas-tugas kompleks, cukup panggil fungsi suspend dari dalam Flow.

Dalam contoh ADS, kami ingin menggabungkan UserEventResult dengan data sesi dalam layer Repositori. Kami menggunakan operator map untuk menerapkan lambda suspend ke setiap nilai Flow yang diambil dari DataSource:
/* Copyright 2019 Google LLC. SPDX-License-Identifier: Apache-2.0 */ class DefaultSessionAndUserEventRepository( private val userEventDataSource: UserEventDataSource, private val sessionRepository: SessionRepository ) : SessionAndUserEventRepository { override fun getObservableUserEvent( userId: String?, eventId: SessionId ): Flow> { // Handles null userId // Observes the user events and merges them with session data return userEventDataSource.getObservableUserEvent(userId, eventId).map { userEventResult -> // lambda of the map operator that can call suspend functions val event = sessionRepository.getSession(eventId) // Merges session with user data and emits the result val userSession = UserSession( event, userEventResult.userEvent ?: createDefaultUserEvent(event) ) Result.Success(LoadUserSessionUseCaseResult(userSession)) } } }


ViewModel

Saat melakukan komunikasi UI ↔ ViewModel dengan LiveData, layer ViewModel harus menggunakan streaming data yang berasal dari layer data menggunakan operator terminal (mis. collect, first atau toList). Lihat kode selengkapnya di sini.
/* Copyright 2019 Google LLC. SPDX-License-Identifier: Apache-2.0 */ // Simplified version of the real code class SessionDetailViewModel( private val loadUserSessionUseCase: LoadUserSessionUseCase, ... ): ViewModel() { private fun listenForUserSessionChanges(sessionId: SessionId) { viewModelScope.launch { loadUserSessionUseCase(sessionId).collect { loadResult -> // Update multiple LiveDatas to notify the View } } } }

Jika Anda mengonversi Flow ke LiveData, Anda bisa menggunakan extension function Flow.asLiveData() dari library lifecycle LiveData ktx androidX. Sangat mudah karena ia akan membagikan satu langganan pokok untuk Flow dan akan mengelola langganan berdasarkan siklus hidup pengamat. Selain itu, LiveData juga mempertahankan nilai terbaru untuk pengamat yang datang terlambat dan langganan aktif saat terjadi perubahan konfigurasi. Lihat kode yang lebih sederhana berikut yang menunjukkan bagaimana Anda bisa menggunakan extension function:
class SimplifiedSessionDetailViewModel( private val loadUserSessionUseCase: LoadUserSessionUseCase, ... ): ViewModel() { val sessions = loadUserSessionUseCase(sessionId).asLiveData() }
Disclaimer: Cuplikan kode di atas bukan bagian dari aplikasi; itu adalah kode versi sederhana yang menunjukkan bagaimana Anda bisa menggunakan Flow.asLiveData().



3. Kapan waktu yang tepat menggunakan BroadcastChannel atau Flow sebagai detail implementasi

Kembali ke implementasi DataSource, bagaimana kita bisa mengimplementasikan fungsi getObservableUserEvent yang dijelaskan di atas? Tim mempertimbangkan dua implementasi alternatif: builder flow atauBroadcastChannel API. Masing-masing ditujukan untuk kasus penggunaan yang berbeda.

Kapan menggunakan Flow

Flow adalah cold stream. Cold stream adalah sumber data yang produsernya akan dijalankan untuk setiap listener yang mulai mengonsumsi event, menghasilkan streaming data baru yang dibuat pada setiap langganan. Setelah pemakai berhenti mendengarkan atau blok produser selesai, streaming data akan ditutup secara otomatis.
Flow sangat sesuai ketika produksi data harus dimulai/dihentikan untuk mencocokkan dengan pengamat
Anda bisa mengeluarkan elemen dalam jumlah terbatas atau tidak terbatas menggunakan builder flow.
val oneElementFlow: Flow = flow { // producer block starts here, stream starts emit(1) // producer block finishes here, stream will be closed } val unlimitedElementFlow: Flow = flow { // producer block starts here, stream starts while(true) { // Do calculations emit(result) delay(100) } // producer block finishes here, stream will be closed }
Flow cenderung digunakan untuk tugas-tugas berharga karena menyediakan pembersihan otomatis melalui pembatalan coroutine. Perhatikan bahwa pembatalan ini bersifat kooperatif, flow yang tidak pernah ditangguhkan tidak pernah dapat dibatalkan: dalam contoh kami, karena delay adalah fungsi penangguhan yang memeriksa pembatalan, ketika pelanggan berhenti mendengarkan, Flow akan berhenti dan membersihkan sumber daya.


Kapan harus menggunakan BroadcastChannel

Channel adalah saluran primitif serentak untuk berkomunikasi antara coroutine. BroadcastChannel adalah implementasi dari Channel dengan kemampuan multicast.
Ada beberapa kasus di mana Anda mungkin ingin menggunakan implementasi BroadcastChannel di layer DataSource Anda:
Gunakan BroadcastChannel ketika produser dan konsumen memiliki masa pakai yang berbeda atau beroperasi sepenuhnya secara independen satu sama lain
BroadcastChannel API sangat cocok ketika Anda menginginkan produser mengikuti siklus hidup yang berbeda dan menyiarkan hasil terkini kepada siapa pun yang mendengarkan. Dengan cara ini, produser tidak perlu memulai setiap kali listener baru mulai mengonsumsi event.

Anda masih bisa mengekspos Flow ke pemanggil, mereka tidak perlu tahu tentang cara implementasinya. Anda bisa menggunakan fungsi ekstensi BroadcastChannel.asFlow() untuk mengekspos BroadcastChannel sebagai Flow.

Namun, menutup Flow tersebut tidak akan membatalkan langganan. Saat menggunakan BroadcastChannel, Anda harus menjaga siklus hidupnya. Mereka tidak tahu apakah ada listener atau tidak, dan akan menjaga sumber daya tetap aktif sampai BroadcastChannel dibatalkan atau ditutup. Pastikan untuk menutup BroadcastChannel ketika tidak lagi diperlukan. Selain itu, ingat bahwa saluran yang ditutup tidak dapat aktif lagi, Anda harus membuat instance baru.

Contoh cara penggunaan BroadcastChannel API bisa ditemukan di bagian berikutnya.


Disclaimer

Beberapa bagian dari Flow dan Channel API masih dalam tahap eksperimental, mereka kemungkinan akan berubah. Ada beberapa situasi di mana Anda saat ini menggunakan Channel, tetapi rekomendasi di masa mendatang mungkin berubah menggunakan Flow. Secara khusus, proposal StateFlow dan share operator Flow mungkin mengurangi penggunaan Channel di masa mendatang.



4. Mengonversi streaming data API berbasis callback ke Coroutine.

Beberapa library sudah mendukung coroutine untuk operasi streaming data, termasuk Room. Bagi yang tidak, Anda bisa mengonversi semua API berbasis callback ke Coroutine


Implementasi Flow

Jika Anda ingin mengonversi streaming API berbasis callback agar menggunakan Flow, Anda bisa menggunakan fungsi channelFlow (juga callbackFlow, yang memiliki implementasi yang sama). channelFlow membuat instance Flow yang elemennya dikirim ke Channel. Ini memungkinkan kami untuk menyediakan elemen yang berjalan dalam konteks yang berbeda atau secara serentak.

Dalam contoh berikut, kami ingin mengeluarkan elemen yang kami dapatkan dari callback ke dalam Flow:
  1. Buat aliran dengan builder channelFlow yang mendaftarkan callback ke library pihak ketiga.
  2. Keluarkan semua item yang diterima dari callback ke Flow.
  3. Ketika pelanggan berhenti mendengarkan, kami membatalkan pendaftaran langganan API menggunakan suspend fun awaitClose
  4. /* Copyright 2019 Google LLC. SPDX-License-Identifier: Apache-2.0 */ override fun getObservableUserEvent(userId: String, eventId: SessionId): Flow { // 1) Create Flow with channelFlow return channelFlow { val eventDocument = firestore.collection(USERS_COLLECTION) .document(userId) .collection(EVENTS_COLLECTION) .document(eventId) // 1) Register callback to the API val subscription = eventDocument.addSnapshotListener { snapshot, _ -> val userEvent = if (snapshot.exists()) { parseUserEvent(snapshot) } else { null } // 2) Send items to the Flow channel.offer(UserEventResult(userEvent)) } // 3) Don't close the stream of data, keep it open until the consumer // stops listening or the API calls onCompleted or onError. // When that happens, cancel the subscription to the 3P library awaitClose { subscription.remove() } } }
    Lihat kode selengkapnya di sini.


Implementasi BroadcastChannel

Untuk streaming data yang melacak autentikasi pengguna dengan Firestore, kami menggunakan BroadcastChannel API karena kami ingin mendaftarkan satu listener Authentication yang mengikuti siklus hidup yang berbeda dan menyiarkan hasil terkini kepada siapa pun yang mendengarkan.

Untuk mengonversi API callback ke BroadcastChannel, Anda membutuhkan lebih banyak kode daripada dengan Flow. Anda bisa membuat class di mana instance BroadcastChannel dapat disimpan dalam variabel. Selama inisialisasi, daftarkan callback yang mengirimkan elemen ke BroadcastChannel seperti sebelumnya:
/* Copyright 2019 Google LLC. SPDX-License-Identifier: Apache-2.0 */ class FirebaseAuthStateUserDataSource(...) : AuthStateUserDataSource { private val channel = ConflatedBroadcastChannel>() private val listener: ((FirebaseAuth) -> Unit) = { auth -> // Data processing logic // Send the current user for observers if (!channel.isClosedForSend) { channel.offer(Success(FirebaseUserInfo(auth.currentUser))) } else { unregisterListener() } } @Synchronized override fun getBasicUserInfo(): Flow> { if (!isListening) { firebase.addAuthStateListener(listener) isListening = true } return channel.asFlow() } }
Lihat kode selengkapnya di sini.




5. Tips pengujian

Untuk menguji transformasi Flow (seperti yang kami lakukan di UseCase dan layer Repositori), Anda bisa menggunakan builder flow untuk menampilkan data palsu. Misalnya:
/* Copyright 2019 Google LLC. SPDX-License-Identifier: Apache-2.0 */ object FakeUserEventDataSource : UserEventDataSource { override fun getObservableUserEvents(userId: String) = flow { emit(UserEventsResult(userEvents)) } } class DefaultSessionAndUserEventRepositoryTest { @Test fun observableUserEvents_areMappedCorrectly() = runBlockingTest { // Prepare repo val userEvents = repository .getObservableUserEvents("user", true).first() // Assert user events } }
Agar sukses menguji implementasi Flow, kami sarankan menggunakan operator take untuk mendapatkan beberapa item dari Flow dan operator toList sebagai operator terminal untuk menerima hasilnya dalam daftar. Lihat contohnya dalam pengujian berikut:
class AnotherStreamDataSourceImplTest { @Test fun `Test happy path`() = runBlockingTest { // Prepare subject val result = subject.flow.take(1).toList() // Assert expected result } }
Operator take sangat cocok untuk menutup Flow setelah Anda mendapatkan item. Tidak menutup Flow yang dimulai (atau BroadcastChannel) setelah setiap pengujian akan membocorkan memori dan membuat rangkaian pengujian yang kacau dan tidak konsisten.


Catatan: Jika implementasi DataSource dilakukan dengan BroadcastChannel, kode di atas tidaklah cukup. Anda harus mengelola siklus hidupnya dengan memastikan bahwa Anda memulai BroadcastChannel sebelum pengujian dan menutupnya setelah pengujian selesai. Jika tidak, Anda akan membocorkan memori. Anda bisa melihat pengujian seperti ini dalam contoh Flow berikut.
Menguji praktik terbaik Coroutine juga berlaku di sini. Jika Anda membuat coroutine baru dalam kode yang sedang diuji, Anda mungkin ingin menjalankannya di thread pengujian untuk eksekusi deterministik pengujian. Ketahui selengkapnya tentang hal ini dalam pembicaraan Testing Coroutines ADS 2019.


Ringkasan

  • Utamakan mengekspos Flow kepada konsumen daripada Channel karena semua kontrak eksplisit dan operator yang disediakan Flow.
  • Dengan Flow, blok produser akan dieksekusi setiap kali ada listener baru dan siklus hidup streaming data akan ditangani secara otomatis.
  • Dengan BroadcastChannel, Anda bisa berbagi produser tetapi Anda harus mengelola siklus hidupnya sendiri.
  • Pertimbangkan untuk mengubah API berbasis callback ke coroutine untuk integrasi API yang lebih baik dan idiomatis dalam aplikasi Anda.
  • Uji implementasi Flow secara mudah dengan menggunakan operator take dan toList.