在本教程中,你将学习Kotlin中的反应式流,并使用两种类型的流——SharedFlow和StateFlow,构建一个应用程序。

事件流已经成为Android的标准配置。多年来,RxJava一直是反应式流的标准。现在,Kotlin提供了自己的反应式流实现,称为Flow。与RxJava一样,Kotlin Flow可以创建数据流并对其做出反应。也和RxJava一样,事件流可以来自冷或热发布者。两者之间的区别很简单,冷流只有在有订阅者的情况下才会发出事件,而热流即使没有任何订阅者对其订阅,也可以发出新的事件。在本教程中,你将了解Flow的热流实现,称为SharedFlow和StateFlow。更具体地说,你将学习下面的内容。

  • 什么是SharedFlow?
  • 什么是StateFlow以及它与SharedFlow的关系。
  • 这些热流与RxJava、Channels和LiveData的比较。
  • 你如何在Android上使用它们。

你可能会问自己。"为什么要使用Kotlin的SharedFlow和StateFlow而不是RxJava?" 虽然RxJava能很好地完成工作,但有些人喜欢把它描述为「用火箭筒来杀死蚂蚁」。换句话说,尽管这个框架是有效的,但它很容易被它的所有功能所迷惑。这样做会导致过于复杂的解决方案和难以理解的代码。Kotlin Flow为反应式流提供了更直接和具体的实现。

Getting Started

你将在一个名为CryptoStonks5000的应用程序上工作。这个应用程序有两个界面。第一个界面向用户显示一些加密货币,第二个界面显示一种加密货币在过去24小时内的价格走势。

为了了解StateFlow和SharedFlow,你需要:

  • 用SharedFlow实现一个事件流,处理多界面之间共享的事件。
  • 重构CryptoStonks5000,使用StateFlow来处理界面的视图状态。

该项目遵循Clean Architecture和MVVM模式。

Crypto Stonks 5000 - Project structure

建立并运行该项目,以确保一切正常。在这之后,是时候学习SharedFlow了!

User exploring stock information on the CryptoStonks5000 app

SharedFlow

在进入代码之前,你至少要知道什么是SharedFlow。

一个SharedFlow的核心是一个Flow。但它与标准的Flow实现有两个主要区别:

  • 即使你不对它调用collect(),也会产生事件。毕竟,它是一个热流实现。
  • 它可以有多个订阅者。

注意这里使用的术语是「订阅者」,而不是像你在普通Flow中看到的「收集者」。这种命名上的变化,主要是因为SharedFlow永远不会完成。换句话说,当你在一个SharedFlow上调用Flow.collect()时,你不是在收集它的所有事件。相反,你订阅的是在该订阅存在时被发出的事件。

尽管这也意味着对SharedFlow的Flow.collect()的调用不会正常完成,但订阅仍然可以被取消。正如你所期望的,这种取消是通过取消coroutine发生的。

注意:Flow.take(count: Int)等Flow截断操作符可以强制完成一个SharedFlow。

说完了这些,现在该是编码的时候了。

Handling Shared Events

你要mock一个假的价格通知系统来模仿虚拟币的价值变化。这必须是一个假的,因为真的东西太不稳定了。

用户应该感知这些变化,无论他们在哪个界面上。为了使之成为可能,你将在所有界面共享的ViewModel中创建一个SharedFlow。

在演示Demo中,找到并打开CoinsSharedViewModel.kt。

在开始前,你需要知道如何创建一个SharedFlow。好吧,今天是你的幸运日,因为你将连续创建两个,在类的顶部添加这段代码。

private val _sharedViewEffects = MutableSharedFlow<SharedViewEffects>() // 1

val sharedViewEffects = _sharedViewEffects.asSharedFlow() // 2

在这段代码中。

  • 你调用MutableSharedFlow创建了一个可变的SharedFlow,它发出SharedViewEffects类型的事件,这是一个简单的Sealed Class来模拟可能的事件。注意,这是一个私有属性。你将在内部使用这个来发射事件,同时公开一个不可变的SharedFlow,使它们在外部可见(这是一个常见的技巧,你在LiveData中应该也看见过)。
  • 你通过在可变的SharedFlow上调用asSharedFlow()来创建上述的公共不可变的SharedFlow。这样一来,不可变的公开属性总是反映出可变的私有属性的值。

拥有这两个属性是一个好的做法,它不仅让你可以通过_sharedViewEffects在内部自由地产生任何你想要的东西,而且还使外部代码只能通过订阅sharedViewEffects来对这些事件做出反应。因此,调用者没有权力改变SharedFlow的内容,这是一个强大的设计和职责分离的巧妙方法,避免了突变性错误。

Event Emission With SharedFlow

好了,你有了你的Flow。现在,你需要用它们产生一些东西——价格变化。CoinsSharedViewModel在其init块中调用getPriceVariations(),但该方法还没有做任何事情。

在getPriceVariations()中加入以下代码。

viewModelScope.launch { // 1
  for (i in 1..100) { // 2
    delay(5000) // 3
    _sharedViewEffects.emit(SharedViewEffects.PriceVariation(i)) // 4
  }
}

这段代码做了几件不同的事情。

  • 启动一个coroutine。
  • 执行一个从1到100的for循环。
  • delay()用于检查协程是否被取消,所以如果协程被取消,它将停止循环。
  • 在可变的SharedFlow上调用emit,传递给它一个PriceVariation的实例,它是SharedViewEffects的一个事件。
    emit(value: T)是你可以在SharedFlow上调用的两个事件生产方法之一。另一个方法是使用tryEmit(value: T)。

两者之间的区别在于,emit是一个暂停的函数,而tryEmit不是。这个小小的区别导致了这两个方法之间巨大的行为差异。要解释这一点,你需要深入了解SharedFlow的Replay cache和buffering。系好安全带吧。

Replay and Buffering

MutableSharedFlow()接受三个参数。

public fun <T> MutableSharedFlow(
  replay: Int = 0, // 1
  extraBufferCapacity: Int = 0, // 2
  onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND // 3
): MutableSharedFlow<T>

下面是它们的用途。

  • replay:向新订阅者重放的数值的数量。它不能是负数,默认为零。
  • extraBufferCapacity:缓冲的值的数量。不能为负数,默认为零。这个值加上replay的总和,构成了SharedFlow的总缓冲区大小。
  • onBufferOverflow(缓冲区溢出):达到缓冲区溢出时采取的行为。它可以有三个值:BufferOverflow.SUSPEND, BufferOverflow.DROP_OLDEST或BufferOverflow.DROP_LATEST。它的默认值是BufferOverflow.SUSPEND。

Default Behavior

这可能会变得很难理解,所以这里有一个简短的动画,展示了与使用默认值构建的SharedFlow的可能行为。假设该SharedFlow使用emit(value: T)。

SharedFlow with default constructor parameters

一步一步地走下去。

  • 这个SharedFlow有三个事件和两个订阅者。第一个事件是在还没有订阅者的情况下发出的,所以它将永远丢失。
  • 当SharedFlow发出第二个事件时,它已经有了一个订阅者,这个订阅者得到了上述事件。
  • 在到达第三个事件之前,另一个订阅者出现了,但第一个订阅者被suspend,并保持这样直到获取该事件。这意味着emit()将无法将第三个事件传递给那个订阅者。当这种情况发生时,SharedFlow有两种选择,它要么缓冲该事件,并在恢复时将其发射给suspend的订阅者,要么在没有足够的缓冲区留给该事件时造成缓冲区溢出。
  • 在这种情况下,总的缓冲区为零-replay+extraBufferCapacity。换句话说,就是缓冲区溢出。因为onBufferOverflow是使用的BufferOverflow.SUSPEND,Flow将suspend,直到它能把事件传递给所有的订阅者。
  • 当订阅者恢复时,Flow也会恢复,将事件传递给所有订阅者并继续其工作。
注意:SharedFlow规范禁止你在缓冲区总值为零时使用onBufferOverflow = BufferOverflow.SUSPEND以外的任何东西。因为tryEmit(value: T)不会暂停,如果你用默认的replay和extraBufferCapacity值来使用它,它就不会工作。换句话说,用tryEmit(value: T)发射事件的唯一方法是,至少要有一个总缓冲区。

With Replay

好吧,这还不算太糟。但是,如果有一个缓冲区,会发生什么?下面是一个replay=1的例子。

SharedFlow with replay = 1

把它分解开来。

  • 当SharedFlow到达第一个没有任何活动订阅者的事件时,它不再暂停。由于replay=1,所以现在总的缓冲区大小为1。因此,这个Flow缓冲了第一个事件并继续前进。
  • 当它到达第二个事件时,缓冲区没有更多的空间了,所以它suspend了。
  • Flow保持suspend,直到订阅者恢复。一旦它恢复,它就会得到缓冲的第一个事件,以及最新的第二个事件。SharedFlow恢复了,但第一个事件永远消失了,因为第二个事件现在在重放缓冲区中占据了位置。
  • 在到达第三个事件之前,一个新的订阅者出现了。由于replay,它也得到一份最新事件的副本。
  • 当流最终到达第三个事件时,两个订阅者都得到了它的副本。
  • SharedFlow缓冲了这第三个事件,同时抛弃了之前的事件。后来,当第三个订阅者出现时,它也得到了第三个事件的副本。

With extraBufferCapacity and onBufferOverflow

这个过程与extraBufferCapacity类似,但没有类似replay的行为。第三个例子显示了一个SharedFlow,extraBufferCapacity = 1,onBufferOverflow = BufferOverflow.DROP_OLDEST的场景。

SharedFlow with extraBufferCapacity = 1 and onBufferOverflow = DROP_LATEST

在这个例子中。

  • 这个行为和第一个例子是一样的。在一个suspend的订阅者和总缓冲区大小为1的情况下,SharedFlow缓冲了第一个事件。
  • 不同的行为开始于第二个事件的发射。由于onBufferOverflow = BufferOverflow.DROP_OLDEST,SharedFlow放弃了第一个事件,缓冲了第二个事件并继续进行。另外,注意到第二个订阅者没有得到缓冲事件的副本。记住,这个SharedFlow有extraBufferCapacity = 1,但replay = 0。
  • 这个Flow最终到达第三个事件,活动用户收到了这个事件。然后,Flow缓冲了这个事件,放弃了之前的事件。
  • 不久之后,suspend的用户恢复,触发SharedFlow向它发出缓冲的事件,并清理缓冲区。

Subscribing to Event Emissions

好了,到此为止,做得很好 你现在知道如何创建一个SharedFlow并定制其行为。现在只剩下一件事要做,那就是订阅一个SharedFlow。

在代码中,进入Demo中的coinhistory包,打开CoinHistoryFragment.kt。在该类的顶部,声明并初始化共享的ViewModel。

private val sharedViewModel: CoinsSharedViewModel by activityViewModels { CoinsSharedViewModelFactory }

由于你希望SharedFlow无论在哪个界面上都能产生数据,所以你不能把这个ViewModel绑定到这个特定的Fragment上。相反,你想把它绑定到Activity上,这样当你从一个Fragment到另一个Fragment时,它就能存活下来。这就是为什么代码中使用了by activityViewModels委托。至于CoinsSharedViewModelFactory,不用担心。应用程序中的每个ViewModel工厂都已经准备好正确地注入任何依赖关系。

Collecting the SharedFlow

现在你有了共享的ViewModel,你可以使用它。找到subscribeToSharedViewEffects()。通过添加以下代码在这里订阅SharedFlow。

viewLifecycleOwner.lifecycleScope.launchWhenStarted { // 1
  sharedViewModel.sharedViewEffects.collect { // 2
    when (it) {
      // 3
      is SharedViewEffects.PriceVariation -> notifyOfPriceVariation(it.variation)
    }
  }
}

这段代码有几个重要的细节。

  • coroutine的范围是View而不是Fragment。这确保了只有在View还活着的时候,coroutine才是活的,即使Fragment比它存活时间长。代码用launchWhenStarted创建了coroutine,而不是使用最常见的launch。这样,只有当生命周期至少处于STARTED状态时,coroutine才会启动,当它至少处于STOPPED状态时才会暂停,并在协程作用域取消时而取消。在这里使用launch会导致潜在的崩溃,因为即使在后台,coroutine也会继续处理事件。
  • 正如你所看到的,订阅一个SharedFlow与订阅一个普通流是一样的。代码在SharedFlow上调用collect()来订阅新事件。
  • 订阅者对SharedFlow事件作出反应。

在任何时候都要记住,即使使用launchWhenStarted,SharedFlow也会在没有订阅者的情况下继续产生事件。因此,你总是需要考虑是否在浪费资源。在这种情况下,事件产生的代码是无害的,但事情会变得很严重,特别是当你使用类似shareIn的东西将冷流变成热流时。

注意:将冷流变成热流不在本教程的范围内--说实话,它值得一个独立的教程。如果你有兴趣,请查看本教程的最后一节,了解有关该主题的参考资料。

Applying the Stream Data to the View

回到代码中,你可以看到notifyOfPriceVariation()还不存在。把它也加进去。

private fun notifyOfPriceVariation(variation: Int) {
  val message = getString(R.string.price_variation_message, variation)
  showSnackbar(message)
}

简单易行。build并运行该应用程序。现在,当你进入虚拟币历史界面时,你会在底部看到一些周期性的Snackbar信息。不过,SharedFlow只有在你进入那个界面时才会开始产生数据。即使CoinsSharedViewModel实例被绑定到Activity上,它也只在你第一次访问虚拟币历史界面时被创建。

Crypto Stonks 5000 - Shared events in detail screen

而你希望所有的界面都能知道价格的变化,所以这并不理想。为了解决这个问题,在CoinListFragment中做完全相同的修改。

  • 用同样的方法创建CoinSharedViewModel实例。
  • 添加代码到subscribeToSharedViewEffects()。
  • 创建notifyOfPriceVariation()。

build并运行该应用程序。现在你会在CoinListFragment中也看到周期性的Snackbar信息。当你切换屏幕时,你会看到消息总是显示下一个事件,而不是之前的事件。CoinsSharedViewModel中的MutableSharedFlow()正在使用默认参数。但你可以随意玩一玩,看看它是如何影响SharedFlow的!

Crypto Stonks 5000 - Shared events between screens

SharedFlow and Channels

像SharedFlow一样,Channels代表热流。但这并不意味着SharedFlow将取代Channels API--至少不完全是。

SharedFlow被设计为完全取代BroadcastChannel。SharedFlow不仅使用起来更简单、更快速,而且比BroadcastChannel的功能更丰富。但请记住,在有意义的时候,仍然可以而且应该使用Channels API中的其他元素。

StateFlow

一个StateFlow的结构像一个SharedFlow。这是因为StateFlow只不过是SharedFlow的一个特殊化子类。事实上,你可以创建一个SharedFlow,它的行为完全像一个StateFlow。

val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(InitialState()) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior

上面的代码创建了一个SharedFlow,只向任何新的订阅者发送最新的值。由于底部的distinctUntilChanged,它只会在与之前的值不同的情况下发出任何值。这正是StateFlow所做的,这使得它非常适合保持和处理状态。

Handling App State

不过还有更简单的方法来创建StateFlow,你现在就可以使用。展开coinlist包,在里面打开CoinListFragmentViewModel.kt。这个简单的ViewModel使用LiveData来向CoinListFragment暴露一个视图状态类。状态类本身也相当简单,它有默认值来匹配初始视图状态。

data class CoinListFragmentViewState(
    val loading: Boolean = true,
    val coins: List<UiCoin> = emptyList()
)

然后,Fragment通过观察LiveData使用当前状态来更新视图。

// Code in CoinListFragment.kt
private fun observeViewStateUpdates(adapter: CoinAdapter) {
  viewModel.viewState.observe(viewLifecycleOwner) { updateUi(it, adapter) }
}

通过将MutableLiveData改为MutableStateFlow来开始重构。所以在CoinListFragmentViewModel中,从。

private val _viewState = MutableLiveData(CoinListFragmentViewState())

改为:

private val _viewState = MutableStateFlow(CoinListFragmentViewState())

请确保包括MutableStateFlow的必要导入。这就是你如何创建一个可变的StateFlow的方法。与SharedFlow不同,StateFlow需要一个初始值,或者换句话说,一个初始状态。但是因为StateFlow是SharedFlow的具体实现,你没有办法定制像replay或extraBufferCapacity这样的东西。但不管怎么说,SharedFlow的通用规则和约束仍然适用。

接下来,相应地更新不可变的LiveData。

val viewState: LiveData<CoinListFragmentViewState> get() = _viewState

改为:

val viewState: StateFlow<CoinListFragmentViewState> get() = _viewState

当然,你也可以这样做。

val viewState = _viewState.asStateFlow()

添加StateFlow的导入。无论是SharedFlow还是StateFlow,你都可以用这两个选项创建一个不可变的实例。使用asStateFlow()或asSharedFlow()的好处是,你可以得到额外的安全行为,即明确地创建一个不可变版本的流。这就避免了错误地创建另一个可变版本的事情。

Event Emission With StateFlow

SharedFlow和StateFlow之间值得注意的一个区别是事件生成方式。你仍然可以在StateFlow中使用emit和tryEmit,但是...不要这样,相反,你应该这样做。

mutableState.value = newState

原因是因为,对value的更新总是混合在一起的,这意味着即使你的更新速度超过了订阅者的消费速度,他们也只能得到最新的值。需要记住的一点是,无论你给value分配什么,都必须是一个与之前的对象完全不同的对象。例如,以这段代码为例。

data class State(
  var name: String = "",
  var age: Int = -1
)

val mutableState = MutableStateFlow<State>(State())

// ...

// newState and mutableState.value will reference the same object
val newState = mutableState.value 

// Reference is the same, so this is also changing mutableState.value!
newState.name = "Marc"

mutableState.value = newState

在这种情况下,StateFlow将不会发出新的值。因为被引用的对象是相同的,所以Flow将假定它是相同的状态。

为了使其发挥作用,你需要使用不可变的对象。比如说。

data class State(
  val name: String = "",
  val age: Int = -1
)

val mutableState = MutableStateFlow<State>(State())

// ...

mutableState.value = State(name = "Marc")

这样一来,StateFlow将正确地发出状态更新。不变性再次拯救了世界。

回到代码中,用StateFlow替换LiveData的好处是,它们都使用一个叫做value的属性,所以那里没有什么变化。

在CoinListFragmentViewModel中,在requestCoinList()方法中,还有最后一个改动要做。你现在可以将开头的if条件更新为。

if (viewState.value.coins.isNotEmpty()) return

你不再需要「?」了,因为值不会是空的。另外,你通过使用isNotEmpty()而不是isNullOrEmpty()来反转条件,并在开头去掉!。这使得代码更容易阅读。

如果你试图构建这个应用程序,你会在CoinListFragment上得到一个错误,说明有一个未解决的引用要观察。StateFlow没有观察方法,所以你也需要重构它。

Subscribing to State Updates

打开CoinListFragment.kt。找到observeViewStateUpdates()并将其更新为。

private fun observeViewStateUpdates(adapter: CoinAdapter) {
  viewLifecycleOwner.lifecycleScope.launchWhenStarted {
    viewModel.viewState.collect { updateUi(it, adapter) }
  }
}

这段代码很像你用SharedFlow做的,它们有着相同的逻辑。尽管如此,你可能还是会担心当应用程序在后台时,StateFlow会产生出数值。但你不需要这样担心。确实,因为它的作用域是viewModelScope,只要ViewModel存在,即使没有任何订阅者,它也会产生数据。不管怎么说,StateFlow的数据生产是轻量级的操作,它只是更新值并通知所有订阅者。另外,你可能确实希望应用程序在进入前台时向你展示最新的UI状态。

build并运行该应用程序。一切都应该像以前一样工作,因为你刚刚重构了代码。在使用StateFlow方面做得很好!

Crypto Stonks 5000 - Shared events between screens

StateFlow and Channels

就像SharedFlow可以完全取代BroadcastChannel,StateFlow可以完全取代ConflatedBroadcastChannel。这有几个原因。StateFlow比ConflatedBroadcastChannel更简单、更高效。它也有更好的区分可变性和不可变性的MutableStateFlow和StateFlow。

Hot Flows, RxJava and LiveData

你现在知道SharedFlow和StateFlow是如何工作的了。但它们在Android上是否有用呢?

虽然它们可能没有带来什么 "新 "东西,但它们提供了更直接和有效的替代方案。例如,在你使用RxJava的PublishSubject的地方,你可以使用SharedFlow。或者在你使用BehaviorSubject的地方,你可以使用StateFlow。事实上,如果hot event emission不是一个问题,StateFlow甚至可以轻松地取代LiveData。

注意:你也可以通过lifecycle-livedata-ktx将SharedFlow和StateFlow对象转换为LiveData。该库提供了一个扩展方法asLiveData(),允许你转换Flow并将其作为LiveData公开,以便在你的视图中使用。更多细节,请参见Android开发者StateFlow和SharedFlow文章中的StateFlow、Flow和LiveData部分。

所以,用更简单的术语来说。

  • 如果你有某种状态管理,你可以使用StateFlow。
  • 只要你有一些事件流在进行,如果事件没有被所有可能的订阅者处理,或者过去的事件可能根本没有被处理,都不是问题,你可以使用SharedFlow。

翻译自原文:https://www.raywenderlich.com/22030171-reactive-streams-on-kotlin-sharedflow-and-stateflow