稀有猿诉

十年磨一剑,历炼出锋芒,说话千百句,不如码二行。

专家之路上的Flow高级秘籍

『君不见,黄河之水天上来,奔流到海不复回。』

学习与河流一样,一方面学无止境,又是逆水行舟,不进则退,因为其他人都在卷。前文一篇文章讲了Flow的基础,大多数情况下够用了,但是不能停止卷,因为你不卷,就会被别人卷。一旦涉及到复杂的应用场景,就需要用到一些高级的API。今天就来学习一下Flow的高级特性,当遇到问题时也能更从容的应对。

上下文切换

Flow是基于协程的,是用协程来实现并发,前面也提到过像flow {…},在上游生产数据,以及中游做变幻时,都是可以直接调用suspend,耗时甚至是阻塞的函数的。而终端操作符如collect则是suspend的,调用者(也就是消费者)需要负责确保collect是在协程中调用。我们还知道Flow是是冷流,消费者终端才会触发上游生产者生产,所以对于flow {…}来说,它的上游和中游运行的上下文来自于终端调用者的上下文,这个叫做『上下文保留』(context preservation),我们可以用一个🌰 来验证一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun main() = runBlocking {
    // Should be main by default
    simple().collect { log("Got: $it") }

    // Collect in a specified context
    withContext(Dispatchers.Default) {
        simple().collect { log("Now got: $it") }
    }
}

private fun simple(): Flow<Int> = flow {
    log("Started the simple flow")
    for (i in 1..3) {
        delay(100)
        log("Producing $i")
        emit(i)
    }
}

输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[main @coroutine#1] Started the simple flow
[main @coroutine#1] Producing 1
[main @coroutine#1] Got: 1
[main @coroutine#1] Producing 2
[main @coroutine#1] Got: 2
[main @coroutine#1] Producing 3
[main @coroutine#1] Got: 3
[DefaultDispatcher-worker-1 @coroutine#1] Started the simple flow
[DefaultDispatcher-worker-1 @coroutine#1] Producing 1
[DefaultDispatcher-worker-1 @coroutine#1] Now got: 1
[DefaultDispatcher-worker-1 @coroutine#1] Producing 2
[DefaultDispatcher-worker-1 @coroutine#1] Now got: 2
[DefaultDispatcher-worker-1 @coroutine#1] Producing 3
[DefaultDispatcher-worker-1 @coroutine#1] Now got: 3

从这个🌰 可以清楚的看到,Flow的context是来自于终端调用者的。

用flowOn来指定上下文

有时候使用终端调用者的上下文可能不太方便,因为生产者与消费者的模式其实是解耦的,它们不应该相互受制于对方,对于关键的并发的上下文更是如此。比如说在GUI的应用中,明显应该在工作线程中生产数据,在UI线程中消费数据,从上面的例子来看,由终端调用者来决定上游上下文明显不可取。有同学举手了,欺负我没学过协程是吧?我可以在Flow内部使用withContext来指定上下文啊,我们来试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun main() = runBlocking {
    // Should be main by default
    simple().collect { log("Got: $it") }
}

private fun simple(): Flow<Int> = flow {
    withContext(Dispatchers.Default) {
        log("Started the simple flow")
        for (i in 1..3) {
            delay(100)
            log("Producing $i")
            emit(i)
        }
    }
}

这位同学可以直接出去了,因为你的代码crash 了:

1
2
3
4
5
6
[DefaultDispatcher-worker-1 @coroutine#1] Started the simple flow
[DefaultDispatcher-worker-1 @coroutine#1] Producing 1
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
      Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@545486c7, BlockingEventLoop@13bfcf14],
      but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@27015c5a, Dispatchers.Default].
      Please refer to 'flow' documentation or use 'flowOn' instead

意思大概是说Flow内部不让直接用withContext来切上下文,破坏了Flow的不变式,想切上下文要用flowOn。而且仔细看,异常是由emit函数抛出来的。

其实Flow的设计者已经考虑到了这个问题,并且给出了优雅的方式,如果想切换Flow内部(也即上游和中游)的运行上下文,要用flowOn函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun main() = runBlocking {
    // Should be main by default
    simple().collect { log("Got: $it") }
}

private fun simple(): Flow<Int> = flow {
    log("Started the simple flow")
    for (i in 1..3) {
        delay(100)
        log("Producing $i")
        emit(i)
        Thread.sleep(50)
    }
}.flowOn(Dispatchers.Default)
//[DefaultDispatcher-worker-1 @coroutine#2] Started the simple flow
//[DefaultDispatcher-worker-1 @coroutine#2] Producing 1
//[main @coroutine#1] Got: 1
//[DefaultDispatcher-worker-1 @coroutine#2] Producing 2
//[main @coroutine#1] Got: 2
//[DefaultDispatcher-worker-1 @coroutine#2] Producing 3
//[main @coroutine#1] Got: 3

这回就和谐多了,后台搞生产,UI只展示,完美!还需要特别注意的是函数flowOn只影响它的上游,不影响它的下游,更不会影响终端,终端永远都在其调用者的上下文中,来看一个🌰 :

1
2
3
4
5
6
7
8
withContext(Dispatchers.Main) {
    val singleValue = intFlow // will be executed on IO if context wasn't specified before
        .map { ... } // Will be executed in IO
        .flowOn(Dispatchers.IO)
        .filter { ... } // Will be executed in Default
        .flowOn(Dispatchers.Default)
        .single() // Will be executed in the Main
}

第一个flowOn切到IO,只影响到它前面的创建和map,第二次切换到Default,只影响filter。single是终端,是在Main,因为它的调用者是在Main里面。

注意,注意: Flow是一个数据流,保持其数据流的特点是相当重要的,无论是正常数据,异常数据,还是出错都是一种数据,应该让其自上而下的流动,在中游变幻时或者终端时通过操作符来处理。所以,像硬性的上下文切换,或者异常的try/catch都是不允许的。这就是所谓的流的不变性(Flow invariant)。后面讲异常时还会提到这点。

任意上下文的Flow builders

从前面的学习我们知道了,下下文保留的特性,终端会决定上游生产者的上下文,当然也可以通过flowOn来改变上下文。Flow builder其实就是一个生产者,异步的emit数据。但有些时候生产数据时的上下文,也就是调用emit时的上下文,是不确定的。比如说安卓 上面的各种回调(callback)有些是回调在调用者的线程里,有些则不是。flow {…}中的emit就不能在异步的回调里面调用,这时就要用callbackFlow {…}。callbackFlow专门适用于把现有的一些回调转为Flow,最典型的应用就是位置信息:

1
2
3
4
5
6
7
8
9
10
11
12
fun locationFlow(): Flow<Location> = callbackFlow {
  val listener = object : LocationListener {
      override fun onLocationUpdate(loc: Location) {
          trySend(location)
      }
  }
  locationManager.reqisterLocaitonUpdates(listener)
  
  awaitClose {
      locationManager.unregisterLocationUpdates(listener)
  }
}

如果这个Flow,用flow {}去创建会抛异常,因为emit没法在回调中使用。callbackFlow会在回调中发射数据,并在awaitClose代码块中反注册回调以清理资源。awaitClose会在这个流结束时(完成或者被取消)被回调到,以有机会进行资源清理。

其实,无论是flow {}还是callbackFlow {}都是channelFlow {}的简单化,channelFlow非常复杂,也超级强大,它可以自带buffer,自带并发,适用于创建一些非常复杂的Flow。在多数时候flow {}和callbackFlow {}就够我们用了。

扩展阅读:

副作用函数

Flow是一个数据流,核心思想是把数据的生产和处理和最终消费分开,上游只负责生产数据,各种操作都应该由中游操作符来做,最终数据由终端消费掉。需要加强数据的封装性,和流的不变性,不破坏管道,用各种转换器来对数据进行操作。那么,对于流何时开始,每个数据何时产生,流什么时候终止,这些事件对于调试来说是很有帮助的。Flow的设计者给出了一系列副作用函数来做之些事情。副作用的意思就是这些函数不会对流本身产生影响。

  • onStart Flow开始生产之前会调用此函数。
  • onEach 在生产(emit)每个数据之前调用此函数,这个函数最常用被用来打日志,以查看每个产生的数据。
  • onCompletion 当Flow终止时或者被取消后会调用此函数。
  • onSubscritpion 有消费者了时调用此函数(也就是有人collect了此Flow时)。

异常,取消和错误处理

这一小节重点来看看非正常代码逻辑的处理。先来看看异常处理(Exception handling)。

用catch函数来处理Flow过程中的异常

代码随时都可能抛出异常,所以异常处理是一个必须要考虑的事情。当然可以在Flow的各个节点如上游生产,中游变幻和下游终端的代码块里面各种try/catch。一来是不够优雅,再者这会破坏Flow的不变性或者说一致性,它就是管道,数据在里面流动,不应该加以过多的干扰,想要对数据处理应该用操作符。也就是说要让异常(包括其他错误也是如此)对Flow是透明的,意思是说Flow并不关心是否有异常。所以提供了一个catch函数,它的作用是捕获并处理上游操作中发生的异常:

1
2
3
simple()
    .catch { e -> emit("Caught $e") } // emit on exception
    .collect { value -> println(value) }

需要注意catch与flowOn一样,只影响上游发生的异常,管不了下游:

1
2
3
4
5
flow { emitData() }
    .map { computeOne(it) }
    .catch { ... } // catches exceptions in emitData and computeOne
    .map { computeTwo(it) }
    .collect { process(it) } // throws exceptions from process and computeTwo

取消Flow

Flow没有显式的取消函数。Flow是冷流,有消费者时才会去生产数据,消费者停止消费了,Flow自然也就被取消了。终端操作都是suspend的,也就是要在协程中调用,因此取消终端调用的协程,就会取消Flow。

错误处理

其实没有特别的错误处理函数,前面的异常算是一个,如果上游没有抛出异常,就不会有其他错误了,因为错误也是数据的一种类型,并且是由我们自己根据场景来定义的。比如说从网络获取新闻列表,正常时的数据当然是一个个的新闻条目。出错了,比如无网络,或者服务器无响应,这时可能返回一个空的条目,里面有错误的具体信息。但这都是由业务逻辑决定的,是业务逻辑层面的东西。对于Flow而言,都还是有数据的,都是一种数据,具体数据的解读,那是消费者终端的事情,Flow并不关心。

唯一算得上错误处理的函数就是onEmpty,它会在Flow是空的时候,也就是不生产任何数据的时候被回调。可以在onEmpty里面生产emit数据,比如产生一个带有错误信息的数据,或者产生一个默认值。因为Flow为空,不产生emit任何数据时,管子是空的数据没有流动,Flow的整个链路,特别是终端collect是不会被执行的,这时可能会有问题,比如UI根本无法做出任何react,除非你设置过了默认UI状态,否则可能会不对。这个时候如果用onEmpty去产生一些默认值或者错误信息的话,就能激活整个Flow,终端能做出预期的响应。

重试机制

另一个非常有用的函数就是retry,它可以预设一个条件,当条件满足时就会触发重新collect。Flow是冷流,有消费者collect时才会触发生产者emit数据,因此重新collect就能让Flow重新emit数据流。

背压

Flow是异步数据流,响应式编程范式,上游生产数据,下游终端消费数据。有时候可能会遇到这样一种情况,就是上游数据生产的速度超过了下游终端的消费速度,这会造成数据流积压在管道中,终端无法及时响应。这种情况称为『背压(Back pressure)』。想像一下一个水管,如果进水速度大于水龙头流出的速度,水就会积压在水管里,如果水管是比较薄弱的(如气球),那么它会膨胀,最后爆掉。

通常情况下,当上游是较为可控的生产者时,不会产生背压,但如果是一些不是开发人员可控的,如硬件(触摸事件,位置信息,传感器,摄像头),其他系统(系统框架的回调,或者服务器的Push)等等,就会产生背压,这时必须进行相应的处理。所有的FRP式异步数据流API都必须处理『背压』,Flow也有相应的API来处理:

  • buffer 把生产者的emit的数据缓存,然后用Channel以并发的方式流向中游和下游,可以简单理解为并发地调用collect。正常情况下Flow是顺序的(Sequentially),就是数据从上游到中游再到终端,按顺序流动,先生产的数据先流到collect,这就是顺序的数据流sequentially。用上buffer后,就是会是并发的流,先emit的数据不一定先到collect,这就是concurrently。明显,能用buffer的前提是终端处理数据时没有对数据顺序的依赖。
  • conflate 也会像buffer一样启动并发式emit数据,但未能及时被终端消费掉的数据会被丢弃,终端只处理最新数据。
  • collectLatest 当有新的数据流出来时,终端只处理最新的数据,此之的终端处理会被取消掉(如果还没有处理完)。

转为热流

常规的Flow都是冷的(cold flow),但有时热流(hot flow)也有它的应用场景,Flow API中也有创建热流的方法。

StateFlow

StateFlow是一个『状态持有』流,它仅包含一个当前元素value,可以用过update来更新此状态。它是一个热流,可以有多个终端colloctor,每次更新都会把当前的值emit给所有的终端。

可以用构造方法MutableStateFlow创建一个StateFlow,或者通过函数stateIn来把一个冷流转化为一个StateFlow。

StateFlow是比较常用的,在安卓开发中,几乎所有的ViewModel都会用StateFlow来暂存UI状态数据。

SharedFlow

比StateFlow更为通用的便是通用的热流SharedFlow。可以通过构造方法MutableSharedFlow来创建SharedFlow,或者通过函数sharedIn把一个冷流转为SharedFlow。

SharedFlow可以有多个终端collector,所以可以实现一对多的通知,如实现观察者模式,或者像设置/配置更新,或者广播等等就可以考虑用SharedFlow来实现。

扩展阅读:

Comments