The RxJava3 library is being used in Android development to achieve Live Object and Live Collection behavior. It is a Java VM implementation of ReactiveX, a library for composing asynchronous and event-based programs by using observable sequences. The building blocks of RxJava are Observables and Subscribers. Observable is used for emitting items and Subscriber is used for consuming those items.
You can visit ReactiveX for more information.

How it Works

SDK handles lots of data received from various sources. Data can be present in local cache. It might also be queried from the server or received from some real-time events. What this means is that same data is constantly updating. The data that you are accessing at the moment can get updated by other sources and becomes out of sync. Rx3 Data Stream helps in syncing the data so you will always get the most recent one. Whenever the data updates, you will be notified through Flowable Objects and Flowable Collection. New data gets automatically collected everytime when there is an updation and user need not refresh to get the recent data.

Data Sources

Local Cache

Data present in local storage

Server Queries

Data queried from the server

Real-time Events

Data received from real-time events

How to Retrieve Data from Rx3 Data Stream

To retrieve data from the RxStream, we need to subscribe to the Stream(Flowable/Single/Completable) by defining subscribing and observing threads.
fun subscribeToFlowable() {
    //Flowable
    val flowableStream: Flowable<Any> =
        Flowable.just("one", "two", "three") // Flowable initialization
    flowableStream
        .subscribeOn(Schedulers.io()) // subscribing an operation on io thread (Background thread)
        .observeOn(AndroidSchedulers.mainThread()) // observing results on main thread (UI thread)
        .doOnNext {
            // data is available here
        }.doOnError {
            // handle error here
        }.subscribe()
}

Events a Data Stream can Emit

In the RxJava3 framework we have these different types of objects that can be observed:

Flowable

Emits a stream of elements
  • doOnNext
  • doOnError

Single

Emits exactly one element
  • doOnSuccess
  • doOnError

Completable

Emits a “complete” event, without emitting any data type, just a success/failure
  • doOnComplete
  • doOnError

Flow functions

By using the .asFlow() method, it enables the conversion of Flowable<T> functions of the Amity Android SDK into Flow functions.
fun getAllUsers(): Flow<PagingData<AmityUser>> {
    return AmityCoreClient.newUserRepository()
        .getUsers()
        .build()
        .query()
        .asFlow()
}

Jetpack Compose compatibility

Amity Android SDK seamlessly integrates with Jetpack Compose UI, allowing you to take full advantage of the modern UI toolkit provided by Jetpack Compose. You can effortlessly incorporate our SDK into your Jetpack Compose-based projects to enhance your app’s social experience. This compatibility ensures that you can leverage the power of Jetpack Compose while benefiting from the features and capabilities our SDK provides.

Flow of PagingData in Compose

In Jetpack Compose, integrating data from a Flow<PagingData<T>> source into your UI is made easy through the collectAsLazyPagingItems() function. This function allows you to seamlessly paginate and display items within your Composable functions. To start using it, add compose paging dependency in your project app level build.gradle file.
implementation "androidx.paging:paging-compose:x.y.z"
Then in your Composable functions, you can collect from flow and display data, and also can observe the load state.
@Composable
fun UserList(
    modifier: Modifier = Modifier
) {
    //  collect data from flow using collectAsLazyPagingItems
    val users = getAllUsers().collectAsLazyPagingItems()

    //  display data in LazyColumn or LazyRow
    LazyColumn(
        modifier = modifier.fillMaxSize(),
        horizontalAlignment = Alignment.CenterHorizontally,
    ) {
        items(
            count = users.itemCount,
            key = users.itemKey { it.getUserId() }
        ) { index ->
            //  render each item here
            val user = users[index]
            Text(text = "UserId: ${user?.getUserId()}")
        }

        //  handle load state on loading first page
        when (val state = users.loadState.refresh) {
            is LoadState.Error -> {
                //  handle error
                item {
                    Text(text = "Error: ${state.error.message}")
                }
            }

            is LoadState.Loading -> {
                //  loading first page
                item {
                    Column(
                        modifier = modifier.fillParentMaxSize(),
                        horizontalAlignment = Alignment.CenterHorizontally,
                        verticalArrangement = Arrangement.Center,
                    ) {
                        Text(text = "Refresh/First Loading")
                        CircularProgressIndicator(color = Color.Black)
                    }
                }
            }

            else -> {}
        }

        //  handle load state on loading next page
        when (val state = users.loadState.append) {
            is LoadState.Error -> {
                //  handle error
                item {
                    Text(text = "Error: ${state.error.message}")
                }
            }

            is LoadState.Loading -> {
                //  loading next page
                item {
                    Column(
                        modifier = modifier.fillMaxWidth(),
                        horizontalAlignment = Alignment.CenterHorizontally,
                        verticalArrangement = Arrangement.Center,
                    ) {
                        Text(text = "Pagination Loading")
                        CircularProgressIndicator(color = Color.Black)
                    }
                }
            }

            else -> {}
        }
    }
}

Flow in Compose

By using collectAsState() method, it can deliver asynchronous data updates to your Compose UI components.
fun getPostAsFlow(): Flow<AmityPost> {
    return AmitySocialClient.newPostRepository()
        .getPost(postId = "postId")
        .asFlow()
}

@Composable
fun SinglePostItem() {
    val post by getPostAsFlow().collectAsState(initial = null)

    Text(text = "Post ID: ${post?.getPostId()}")
}

Best Practices