Channel 通道
Channel跟flow一樣,有傳送資料與接收資料的功能。
Channel的傳送資料是send(),接收資料是receive()。
儲存要傳送資料的地方,稱為Channel。
Channel跟Flow一樣都是順序(依序)傳送。
Flow | Channel | |
---|---|---|
傳送 | emit() | send() |
接收 | collect() | receive() |
傳送資料的地方 | Flow 資料流 | Channel 通道 |
Channel 語法
val 變數 = Channel<資料型態>()
val chanel = Channel<Int>()
傳送
chanel.send(1)
chanel.send("Test")
接收
val data = chanel.receive()
以下程式碼,一個是send,一個是receive。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
fun coroutine02() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
channel.send(1)
println("send1")
channel.send(2)
println("send2")
channel.send(3)
println("send3")
channel.close()
}
delay(1000)
println("receive1 = ${channel.receive()}")
delay(1000)
println("receive2 = ${channel.receive()}")
delay(1000)
println("receive3 = ${channel.receive()}")
}
receive1 = 1
send1
receive2 = 2
send2
receive3 = 3
send3
channel buffer
capacity是通道可以放資料的大小。
預設是capacity = 0,也就是傳送3筆資料不會一次傳送完畢,另一端要接收,才能再傳送下一筆資料。
capacity = 2,通道大小為2,可以先傳送2筆資料放在通道,但要傳送第3筆資料時,另一端要先接收一筆資料,通道才會有多餘的空間,才能再傳送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
fun coroutine01() = runBlocking<Unit> {
val channel = Channel<Int>(capacity = 2)
launch {
channel.send(1)
println("send1")
channel.send(2)
println("send2")
println("send3 暫停suspend")
channel.send(3)
println("send3 send")
channel.close()
}
delay(1000)
println("receive1 = ${channel.receive()}")
delay(1000)
println("receive2 = ${channel.receive()}")
delay(1000)
println("receive3 = ${channel.receive()}")
}
send1
send2
send3 暫停suspend
receive1 = 1
send3 send
receive2 = 2
receive3 = 3
傳送的時間比接收的時間快
即便傳送間隔只花50 ms,但channel會「暫停」等到「接收」了,才會再send,而不是每50 ms送1次。
執行結果是收到的時間相差約1000 ms。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
fun coroutine05() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
channel.send(1)
println("send1")
delay(50)
channel.send(2)
println("send2")
delay(50)
channel.send(3)
println("send3")
channel.close()
}
val startT = System.currentTimeMillis()
delay(1000)
println("receive1 = ${channel.receive()} time = ${System.currentTimeMillis() - startT}")
delay(1000)
println("receive2 = ${channel.receive()} time = ${System.currentTimeMillis() - startT}")
delay(1000)
println("receive3 = ${channel.receive()} time = ${System.currentTimeMillis() - startT}")
}
receive1 = 1 time = 1012
send1
receive2 = 2 time = 2029
send2
receive3 = 3 time = 3031
send3
即便傳送間隔只花100 ms,但channel會「暫停」等到「接收」了,才會再send,而不是每100 ms送1次。
執行結果是收到的時間相差約1000 ms。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
fun coroutine17() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (i in 1..5) {
delay(100)
channel.send(i)
}
channel.close()
}
launch {
val startT = System.currentTimeMillis()
for (data in channel) {
delay(1000)
println("data = $data time = ${System.currentTimeMillis() - startT} ")
}
}
}
data = 1 time = 1105
data = 2 time = 2125
data = 3 time = 3126
data = 4 time = 4131
data = 5 time = 5135
channel.close() 放在send
channel.close()都是放在傳送完後,要手動關閉通道。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
fun coroutine01() = runBlocking<Unit> {
val channel = Channel<Int>(capacity = 2)
launch {
// 傳送的區域
channel.send(1)
println("send1")
channel.send(2)
println("send2")
// close放在傳送的區域
channel.close()
}
// 接收的區域
delay(1000)
println("receive1 = ${channel.receive()}")
delay(1000)
println("receive2 = ${channel.receive()}")
}
channel 迴圈
channel for
使用for代替receive。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
fun coroutine16() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (i in 1..5) {
delay(1000)
channel.send(i)
}
channel.close()
}
launch {
for (data in channel) {
println("data = $data")
}
}
}
data = 1
data = 2
data = 3
data = 4
data = 5
channel iterator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
fun coroutine16() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (i in 1..5) {
delay(1000)
channel.send(i)
}
channel.close()
}
launch {
val iter = channel.iterator()
while (iter.hasNext()) {
val data = iter.next()
println("data = $data")
}
}
}
data = 1
data = 2
data = 3
data = 4
data = 5
多個接收者
以下有三個協程,一個協程是傳送者,另外二個是接收者協程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
fun coroutine03() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (i in 1 .. 5) {
channel.send(i)
}
channel.close()
}
// receiver1
launch {
for (data in channel) {
println("receiver1 data = $data")
}
}
// receiver2
launch {
for (data in channel) {
println("receiver2 data = $data")
}
}
}
receiver1 data = 1
receiver1 data = 2
receiver1 data = 4
receiver2 data = 3
receiver1 data = 5
produce與actor
produce與actor自動建立傳送訊息的channel通道,而且會自動關閉channel,不用手動關閉close。
produce 產生訊息
1
2
3
4
5
6
7
8
9
10
11
12
13
fun produceNumber() = GlobalScope.produce<Int> {
for (i in 1..5) {
delay(1000)
send(i)
}
}
@Test
fun coroutine18() = runBlocking<Unit> {
val channel = produceNumber()
for (data in channel) {
println("data = $data")
}
}
data = 1
data = 2
data = 3
data = 4
data = 5
actor 接收訊息
1
2
3
4
5
6
7
8
9
10
11
12
fun receiver() = GlobalScope.actor<Int> {
for (data in channel) {
println("data = $data")
}
}
@Test
fun coroutine19() = runBlocking<Unit> {
val channel = receiver()
for (i in 1..5) {
channel.send(i)
}
}
判斷Channel是否關閉
傳送端用isClosedForSend。
接收端用isClosedForReceive。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
fun coroutine21() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (i in 1..5) {
delay(1000)
channel.send(i)
}
channel.close()
println("isClosedForSend = ${channel.isClosedForSend}")
}
launch {
for (data in channel) {
println("data = $data")
}
println("isClosedForReceive = ${channel.isClosedForReceive}")
}
}
receiveCatching
邊檢查channel是否關閉,邊取出channel資料。
傳回值透過isClosed,判斷已關閉。
傳回值透過getOrNull(),取出channel中的資料。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
fun coroutine16() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (i in 1..5) {
delay(1000)
channel.send(i)
}
channel.close()
}
launch {
while (true) {
val result = channel.receiveCatching()
if (result.isClosed) break
println("data = ${result.getOrNull()}")
}
}
}
data = 1
data = 2
data = 3
data = 4
isClosedForSend = true
data = 5
isClosedForReceive = true
select
語法
- onReceive() 在select{}中,接收訊息。
- onSend() 在select{}中,傳遞訊息。
select<傳回值型態> {
多個channel
}
取得傳送速度最快的通道
select裡面放多個通道。
value(該 Channel 傳來的資料)
onReceive()接收資料。
select<傳回值型態> {
channel1.onReceive {value -> "傳回值" }
channel2.onReceive {value -> "傳回值" }
channel3.onReceive {value -> "傳回值" }
}
如果有多個Channel,要分辦那個通道先發送消息,使用select。
因為delay(50)的速度比較快,select會先接收到World。
後面加上coroutineContext.cancelChildren(),是因為阻塞,junit無法執行完畢。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
fun coroutine06() = runBlocking<Unit> {
val channel1 = Channel<String>()
val channel2 = Channel<String>()
launch {
delay(100)
channel1.send("Hello")
channel1.close()
}
launch {
delay(50)
channel2.send("World")
channel2.close()
}
val result = select<String> {
channel1.onReceive { value ->
"channel1 data = $value"
}
channel2.onReceive { value ->
"channel2 data = $value"
}
}
println("result = $result")
coroutineContext.cancelChildren()
}
result = channel2 data = World
判斷那個通道的buffer未滿
判斷那個通道的buffer未滿,就用那個通道傳送資料。
onSend語法
select<傳回值型態> {
channel1.onSend(傳送給通道的值) { 傳送成功的傳回值callback }
channel2.onSend("World") {"傳回值"}
}
以下程式碼是判斷那個通道有空間,就送那邊。
以下Unit是沒有傳回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
fun coroutine07() = runBlocking<Unit> {
val channel1 = Channel<String>(capacity = 1)
val channel2 = Channel<String>(capacity = 1)
// channel1先塞滿資料
channel1.send("Hello")
select<Unit> {
channel1.onSend("ABC") {
println("channel1 send")
}
channel2.onSend("World") {
println("channel2 send")
}
}
coroutineContext.cancelChildren()
}
channel2 send
有傳回值的寫法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
fun coroutine07() = runBlocking<Unit> {
val channel1 = Channel<String>(capacity = 1)
val channel2 = Channel<String>(capacity = 1)
// channel1先塞滿資料
channel1.send("Hello")
val result = select<String> {
channel1.onSend("ABC") {
"channel1 send"
}
channel2.onSend("World") {
"channel2 send"
}
}
println(result)
coroutineContext.cancelChildren()
}
channel2 send
判斷那一個job先完成onAwait
使用有傳回值的async,不能是launch,launch沒有傳回值。
async的傳回值Deferred要用onAwait()來接收。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
fun coroutine08() = runBlocking<Unit> {
val job1 = async {
delay(50)
"job1 finish"
}
val job2 = async {
delay(10)
"job2 finish"
}
val result = select<String> {
job1.onAwait { it }
job2.onAwait { it }
}
println(result)
coroutineContext.cancelChildren()
}
job2 finish
判斷那一個job先完成onJoin
使用launch,launch沒有傳回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
fun coroutine09() = runBlocking<Unit> {
val job1 = launch {
delay(50)
println("job1 finish")
}
val job2 = launch {
delay(10)
println("job2 finish")
}
select<Unit> {
job1.onJoin { println("select job1 finish") }
job2.onJoin { println("select job2 finish") }
}
coroutineContext.cancelChildren()
}
job2 finish
select job2 finish
比較
類型 | 監聽事件 | 說明 |
---|---|---|
channel.onReceive {} |
收資料 | 哪個 Channel 先有資料就先處理哪個 |
channel.onSend {} |
傳資料 | 哪個 Channel 有空間就先發送 |
deferred.onAwait {} |
等結果 | 哪個任務先完成就先用誰 |
job.onJoin {} |
等任務結束 | 等待最早結束的任務 |