flow

flow是有順序的把管子中的資料一個一個發射(emit)出去,接收端也是一個一個接收(collect)資料。

資料不是同時發射,也不是同時接收,皆是有順序性的發射與接收。

什麼時候發射(emit)資料,當呼叫collect時,才會發射(emit)資料。

import

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

簡單flow程式

當flow呼叫collect()時,才會執行flowtest1()。
使用emit()每一秒發射一筆資料,而collect()就會一筆一筆接收資料,接收到資料就會輸出在螢幕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun flowtest1() = flow<Int> {
  for (i in 1..3) {
    delay(1000)
    emit(i)
  }
}
@Test
fun coroutine10() = runBlocking {
  val flow = flowtest1()
  println("call collect..")
  flow.collect { value ->
    println(value)
  }
}
call collect..
1
2
3

證明flow與launch是同時執行

coroutine10()中有launch協程與flow流同時啟動,並同時執行。
可以看到執行結果互相交叉輸出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun flowtest1() = flow<Int> {
  for (i in 1..3) {
    delay(1000)
    emit(i)
  }
}
@Test
fun coroutine10() = runBlocking {
  launch {
    for (i in 10 .. 15) {
      delay(1000)
      println("other job i = $i")
    }
  }
  val flow = flowtest1()
  println("call collect..")
  flow.collect { value ->
    println(value)
  }
}
call collect..
1
other job i = 10
2
other job i = 11
3
other job i = 12
other job i = 13
other job i = 14
other job i = 15

多次collect

再一次collect就重新再發射(emit)一次資料。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun flowtest1() = flow<Int> {
  for (i in 1..3) {
    delay(1000)
    emit(i)
  }
}
@Test
fun coroutine11() = runBlocking {
  val flow = flowtest1()
  println("call collect..")
  flow.collect { value ->
    println(value)
  }
  println("collect..again")
  flow.collect { value ->
    println(value)
  }
}
call collect..
1
2
3
collect..again
1
2
3

asFlow()

asFlow()本身就有emit()的功能,asFlow使用在陣列或集合。
以下是asFlow()的原始碼,在forEach{}之間,一個一個發射(emit)。

1
2
3
4
5
public fun IntRange.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

實際使用方式。

1
2
3
4
5
6
@Test
fun coroutine12() = runBlocking<Unit> {
  (1..3).asFlow().collect{
    println("data = $it")
  }
}
data = 1
data = 2
data = 3

filter

asFlow()結合filter(過濾器),僅保留符合條件的元素。

filter { 條件 }

以下程式碼,若除2的餘數不為0,也就是奇數,就會保留下來,偶數就會剔除掉。

1
2
3
4
5
6
@Test
fun coroutine12() = runBlocking<Unit> {
  (1..3).asFlow().filter { it % 2 != 0 }.collect{
    println("data = $it")
  }
}
data = 1
data = 3

flowOf

flowOf(參數是可變集合),原始碼針對每一個元素進行發射。

1
2
3
4
5
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}

實際應用如下,字串陣列abc、def、ghi,逐一發射(emit)與接收(collect)。

1
2
3
4
5
6
@Test
fun coroutine13() = runBlocking<Unit> {
  flowOf("abc", "def", "ghi").collect {
    println("data = $it")
  }
}
data = abc
data = def
data = ghi

同一個執行緒與context

flow從emit()到collect()都是在同一個Context,同一個執行緒。

1
2
3
4
5
6
7
8
9
10
11
12
13
fun flowtest2() = flow<Int> {
  println("Flow Context = ${Thread.currentThread().name}")
  for (i in 1..3) {
    delay(1000)
    emit(i)
  }
}
@Test
fun coroutine14() = runBlocking<Unit> {
  flowtest2().collect { value ->
    println("Collected $value ${Thread.currentThread().name}")
  }
}
Flow Context = Test worker @coroutine#1
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1

flowOn

flowOn可以改變context。
假設要從網路下載圖片,要使用調度器IO,而UI是在Main Thread,不能互相影嚮,所以需要flowOn,來分成二個不同的執行緒,一個是main Thread,一個是background work thread。

錯誤的改變context

把flowtest2()調度器改成IO(硬碟輸出入、網路存取、資料庫存取)。
但執行完會有error,系統會告知你二個是不同的協程,不同的context。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun flowtest2() = flow<Int> {
  withContext(Dispatchers.IO) {
    println("Flow Context = ${Thread.currentThread().name}")
    for (i in 1..3) {
      delay(1000)
      emit(i)
    }
  }
}
@Test
fun coroutine14() = runBlocking<Unit> {
  flowtest2().collect { value ->
    println("Collected $value ${Thread.currentThread().name}")
  }
}
Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@26e1a71d, BlockingEventLoop@6e8fc98b],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@28717622, Dispatchers.IO]

正確使用flowOn

使用flowOn,可以發現flowtest2()使用的是Thread是DefaultDispatcher-worker-1 @coroutine#2,而coroutine14()使用的Thread是Test worker @coroutine#1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun flowtest2() = flow<Int> {
  println("Flow Context = ${Thread.currentThread().name}")
  for (i in 1..3) {
    delay(1000)
    emit(i)
  }
}.flowOn(Dispatchers.IO)

@Test
fun coroutine14() = runBlocking<Unit> {
  flowtest2().collect { value ->
    println("Collected $value ${Thread.currentThread().name}")
  }
}
Flow Context = DefaultDispatcher-worker-1 @coroutine#2
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1

onEach

設定每一個元素要做什麼事。
以下程式碼設定每個元素都要暫停1秒,再發射。
執行結果,每筆資料的時間相差約1000ms。

1
2
3
4
5
6
7
8
9
@Test
fun coroutine13() = runBlocking<Unit> {
  flowOf("abc", "def", "ghi")
    .onEach { delay(1000) }
    .collect {
      val time = System.currentTimeMillis()
    println("time = $time  data = $it")
  }
}
time = 1759461086051  data = abc
time = 1759461087068  data = def
time = 1759461088070  data = ghi

onEach沒有collect功能

以下程式碼,若只有呼叫onEach(),最後沒有用collect(),就算資料發射,也沒有人去接收。

1
2
3
4
5
6
7
8
9
  fun events() = (1 .. 3)
    .asFlow().flowOn(Dispatchers.IO)
  @Test
  fun coroutine15() = runBlocking<Unit> {
    events()
      .onEach { event ->
        println("Event: $event ${Thread.currentThread().name}") }
      .collect {}
  }
Event: 1 Test worker @coroutine#1
Event: 2 Test worker @coroutine#1
Event: 3 Test worker @coroutine#1

asFlow()指派給函式

以下events()函式,指派= (1..3).asFlow() 給函式。
onEach印出event()發射方(emit)的Thread。

1
2
3
4
fun events() =
  (1..3).asFlow().onEach { event ->
    println("events() $event ${Thread.currentThread().name}")
  }.flowOn(Dispatchers.IO)

coroutine15()中的onEach印出接收方(collect)的thared。
真正接收資料是collect()函式。

1
2
3
4
5
6
7
8
9
10
@Test
fun coroutine15() = runBlocking<Unit> {
  events()
    .onEach { event ->
      println("coroutine15(): $event ${Thread.currentThread().name}")
    }
    .collect { value ->
      println("data= $value ${Thread.currentThread().name}")
    }
}
events() 1 DefaultDispatcher-worker-1 @coroutine#2
events() 2 DefaultDispatcher-worker-1 @coroutine#2
events() 3 DefaultDispatcher-worker-1 @coroutine#2
coroutine15(): 1 Test worker @coroutine#1
data= 1 Test worker @coroutine#1
coroutine15(): 2 Test worker @coroutine#1
data= 2 Test worker @coroutine#1
coroutine15(): 3 Test worker @coroutine#1
data= 3 Test worker @coroutine#1

launchIn()改變Scope

launchIn()可以改變Scope。
this代表目前的Thread。

launchIn(this)
1
2
3
4
5
6
7
8
9
10
11
12
fun events() =
  (1..3).asFlow().onEach { event ->
    println("events() $event ${Thread.currentThread().name}")
  }.flowOn(Dispatchers.IO)

@Test
fun coroutine15() = runBlocking<Unit> {
  events()
    .onEach { event ->
      println("coroutine15(): $event ${Thread.currentThread().name}")
    }.launchIn(this)
}
events() 1 DefaultDispatcher-worker-1 @coroutine#3
events() 2 DefaultDispatcher-worker-1 @coroutine#3
events() 3 DefaultDispatcher-worker-1 @coroutine#3
coroutine15(): 1 Test worker @coroutine#2
coroutine15(): 2 Test worker @coroutine#2
coroutine15(): 3 Test worker @coroutine#2

scope

因為變成別的scope,所以最後要使用join(),這樣runBlocking的Scope才會等待CoroutineScope結束才結束,不然印不出來結果。

.launchIn(CoroutineScope(Dispatchers.Default))
.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
  fun events() =
    (1..3).asFlow().onEach { event ->
      println("events() $event ${Thread.currentThread().name}")
    }.flowOn(Dispatchers.IO)

  @Test
  fun coroutine15() = runBlocking<Unit> {
    events()
      .onEach { event ->
        println("coroutine15(): $event ${Thread.currentThread().name}")
      }.launchIn(CoroutineScope(Dispatchers.Default))
      .join()
  }

launchIn 傳回值

launchIn()會傳回Job,因此可以去接收傳回值。

1
2
3
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

以下程式碼,接收launchIn()的傳回值,並且在2.5秒取消協程。
onEach設為1秒發射(emit)一次。
結果只有接收2筆。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  fun events() =
    (1..3).asFlow().onEach {
      delay(1000)
    }.flowOn(Dispatchers.IO)

  @Test
  fun coroutine15() = runBlocking<Unit> {
    val job = events()
      .onEach { event ->
        val time = System.currentTimeMillis()
        println("event(): $event time = $time")
      }.launchIn(CoroutineScope(Dispatchers.Default))
    delay(2500)
    job.cancel()
    job.join()
  }
event(): 1 time = 1759476638927
event(): 2 time = 1759476639932

Cancel Flow

results matching ""

    No results matching ""