ลองใช้ Bigtable Server Stream ร่วมกับ Spring WebFlux พร้อมทำความเข้าใจ Backpressure ใน Project Reactor
ช่วงนี้ต้องวนๆอยู่กับงานที่ใช้ Bigtable ที่ต้องประมวลผลข้อมูลใน Bigtable ที่มีขนาดใหญ่มาก ประกอบกับงานที่ทำ ใช้ Spring WebFlux อยู่แล้ว ซึ่งต้องบอกว่าประสิทธิภาพของตัว Spring WebFlux ไม่ธรรมดาเลย แต่ก็ต้องแลกมาด้วย learning curve ที่ต้องใช้เวลาทำความเข้าใจพอสมควรครับ
ใช้ Google Bigtable ร่วมกับ Spring WebFlux
และแน่นอน รอบนี้เราจะใช้ Google Bigtable เป็น datasource ร่วมกับ Spring WebFlux นี่แหละ โดยรอบนี้เราจะเขียนด้วย Kotlin แทน Java เพราะด้วยภาษาที่เข้าใจง่ายกว่า เป็น functional ทำให้จัดการ data ได้ง่ายกว่า
แต่เนื่องจากเราต้องเรียกข้อมูลขนาดใหญ่มากจาก Bigtable แล้วนำมาประมวลผลบางอย่างต่อใน application ทำให้การเขียนโค๊ดจะต้องเป็นไปอย่างระวัง และต้องคิดให้รอบด้าน
แน่นอนว่า Google Bigtable มี library เป็นภาษา Java เพื่อเปิดโอกาสให้ application ที่เขียนด้วย Java สามารถเข้าไปจัดการข้อมูลบน Bigtable ได้ แต่โชคชตาไม่ได้เข้าข้างเรามากนัก เนื่องจาก library ดังกล่าง ไม่สนับสนุน Reactive ตั้งแต่แรก
แต่ยังมีความโชคดีอยู่ ที่ library Bigtable มีตัวเลือกให้เราสามารถใช้สิ่งที่เรียกว่า ServerStream
ได้ โดยตัวเลือกดังกล่าง จะเป็นการทำให้ตัว Bigtable ค่อยๆ stream ข้อมูลออกมาเรื่อยๆ ทำให้ไม่จำเป็นที่จะต้อง buffer ข้อมูลมาไว้ใน application ทั้งหมด ไม่งั้นระบบก็อ๊วกแตกกันพอดี หลังจากนั้นเราก็เอา ServerSream
มาเปลี่ยนเป็น Flux
ได้ ด้วยโค๊ดด้านล่าง
fun ServerStream<Row>.toFlux(): Flux<Row> =
Flux.create { sink ->
runCatching {
this.forEach(sink::next)
sink.complete()
}.getOrElse { error ->
sink.error(error)
}
}
Extension สำหรับ convert ServerStream ให้กลายเป็น Flux
เมื่อเรานำ extension method ด้านบนมาใช้ ก็จะประมาณนี้
fun invoke(): Mono<List<String>> {
return bigtableDataClient.use { client ->
val query = Query.create(TableId.of(TEST_TABLE))
val fluxResult = client.readRows(query).toFlux()
fluxResult.map { row ->
val key = row.key.toStringUtf8()
val value = row.cells[0].value.toStringUtf8()
"${key}-${value}"
}.collectList()
}
}
เมื่อเรานำ Extension method นี้มาใช้งาน
คราวนี้ เราก็สามารถใช้ Reactor Operation ในการจัดการข้อมูลภายใน pipeline ได้แล้ว แต่ด้วยความที่ Project Reactor เป็น Reactive Programming การทำงานทุกอย่างจะเป็น event loop โดยประกอบไปด้วย 2 ส่วน คือ ผู้ผลิตข้อมูล (Producer) และผู้บริโภคข้อมูล (Consumer)
ปัญหาต่อมาคือ เมื่อผู้ผลิตข้อมูล (Producer) ทำงานได้เร็วกว่าผู้บริโภค (Consumer) ทำให้ตัว Consumer ทำงานไม่ทัน หรือต้องใช้เวลาประมวลผลเป็นเวลานาน อาจจะทำให้เกิด Buffer Overflow, Memory Overflow จนทำให้ application ของเราหยุดทำงานไปเลยก็ได้
ยกตัวอย่างง่ายๆครับ เราดึงข้อมูลจาก Bigtable จำนวนมาก ประมาณ 1 ล้านรายการ แล้วตัว Bigtable สามารถทำงานได้เร็วมาก เอาเป็น 100,000 rows/sec ส่งต่อมายัง logic ของเรา แต่ logic ที่ application ของเรา ทำงานได้แค่ 1,000 rows/sec
เหตุการนี้ จะมี 2 ผู้กระทำเกิดขึ้น
- (Producer) ดึงข้อมูลจาก Bigtable แล้วส่งต่อออกไป
- (Consumer) application logic ของเรา ที่รับสิ่งที่ Bigtable ส่งมาและประมวลผลต่อ
เมื่อเหตุการณ์นี้เกิดขึ้น ทำให้ consumer ทำงานไม่ทันกับสิ่งที่ producer ส่งมา มีโอกาสทำให้เกิด Buffer Overflow ขึ้นได้ โดยพระเอกที่จะเข้ามาแก้ปัญหานี้ก็คือ Backpressure ครับ
แล้วไอ Backpressure มันคืออะไร?
เอาง่ายๆ มันก็คือตัวที่ช่วยควบคุมการส่งข้อมูลนั่นแหละครับ ไม่ให้ producer ไปถล่ม consumer มากเกินไป จน consumer ทำงานหนักเกินไป จบด้วยการที่ application มันหยุดทำงานไปในที่สุด
โดยหลักการจัดการ Backpressure ใน Reactive Streams จะมีด้วยกัน 3 แบบ
- Buffering : เก็บข้อมูลใน buffer ก่อน รอจนกว่า consumer จะดึงไปประมวลผล
- Dropping : ทิ้งของนั้นซะ! กรณีที่มันเกินความสามารถของ consumer
- Throttling : จำกัดอัตราการส่งข้อมูลของ producer
โดยตัว Project Reactor นั้น รองรับการทำ Backpressure อยู่แล้ว เรามาดูว่ามีตัวเลือกไหนให้ใช้งานได้บ้าง
Throttling มันซะ จำกัดอัตราการส่งข้อมูลของ Producer
ตัว Project Reactor มี operator ที่ชื่อว่า .limitRate()
เพื่อจำกัดอัตราการส่งข้อมูล เพื่อไม่ให้ consumer อ๊วกตายซะก่อน โดยเมื่อเรานำโค๊ดด้านบนมา refactor ใหม่ ก็จะประมาณนี้
fun invoke(): Mono<List<String>> {
return bigtableDataClient.use { client ->
val query = Query.create(TableId.of(TEST_TABLE))
val fluxResult = client.readRows(query).toFlux()
fluxResult
.limitRate(100)
.map { row ->
val key = row.key.toStringUtf8()
val value = row.cells[0].value.toStringUtf8()
"${key}-${value}"
}.collectList()
}
}
ใช้ limitRate(100) เพื่อ throttling ให้ Bigtable ส่งข้อมูลมาทีละ 100
Buffering ซะสิ ไอ logic ตรงนี้ มันมีช้ามีเร็วบ้างหนะ
ตัว Project Reactor มี operator ที่ชื่อว่า .onBackpressureBuffer()
เมื่อเรารู้ว่า consumer ทำงานได้ช้ากว่า producer นิดหน่อย หรือบางเวลา อาจจะด้วย dependency ภายใน logic ของเราหรืออะไรก็ตาม เราสามารถใช้ .onBackpressureBuffer()
เพื่อ buffer ข้อมูลเอาไว้ก่อนได้ โดยเมื่อเรานำโค๊ดด้านบนมา refactor ใหม่ ก็จะประมาณนี้
fun invoke(): Mono<List<String>> {
return bigtableDataClient.use { client ->
val query = Query.create(TableId.of(TEST_TABLE))
val fluxResult = client.readRows(query).toFlux()
fluxResult
.onBackpressureBuffer(10000)
.map { row ->
val key = row.key.toStringUtf8()
val value = row.cells[0].value.toStringUtf8()
"${key}-${value}"
}.collectList()
}
}
ใช้ onBackpressureBuffer(1000) เพื่อ buffer ข้อมูลจาก Bigtable เอาไว้ก่อน หาก consumer logic ทำงานไม่ทัน
แต่อย่าลืมว่าตัวเลขขนาดที่เราใส่ใน .onBackpressureBuffer()
จะต้องสอดคล้องกับความเป็นจริง และต้องสอดคล้องกับจำนวน memory ที่เรามีอยู่ด้วย พร้อมทั้งต้องเข้าใจว่า หากขนาดของข้อมูลมันเกินจากที่เราตั้งไว้ มันจะ drop ข้อมูลเก่าใน buffer ทิ้งทันที ฉะนั้น ลองนึกถึง Use Case ที่จะใช้งานให้ดีด้วยนะครับ
หรือเราสามารถรวบรวมข้อมูลให้เป็นก้อนด้วย operator .buffer()
เพื่อปล่อยข้อมูลจาก producer ออกไปเป็นชุดๆ ในรูปแบบของ batch processing แล้วแตกข้อมูล batch ออกเป็น Stream อีกครั้งด้วย operator .flatMapIterable()
ได้ด้วย ก็เป็นอีกวิธีในการทำ Buffering ที่ดี
fun invoke(): Mono<List<String>> {
return bigtableDataClient.use { client ->
val query = Query.create(TableId.of(TEST_TABLE))
val fluxResult = client.readRows(query).toFlux()
fluxResult
.buffer(500)
.flatMapIterable { it }
.map { row ->
val key = row.key.toStringUtf8()
val value = row.cells[0].value.toStringUtf8()
"${key}-${value}"
}.collectList()
}
}
ใช้ buffer(500) เพื่อแปลงข้อมูลเป็น batch processing ข้อมูลรอบละ 500 แล้วเปลี่ยนกลับเป็น Stream เพื่อทำ operation ต่อไป
Dropping ซะเลย สนใจอะไรที่มันใหม่ที่สุดอยู่แล้วหนะ
ตัว Project Reactor มี operator ที่ชื่อว่า .onBackpressureDrop()
จะใช้ในกรณีที่ consumer ทำงานได้ช้า แล้วเราต้องการข้อมูลล่าสุดใน stream นำไปใช้ใน consumer โดยจะเหมาะกับพวก Stock pricing หรืออะไรก็ตามที่สนใจเฉพาะข้อมูลล่าสุดเท่านั้น ไม่สนใจข้อมูลก่อนหน้านี้ เน้นความล่าสุดของข้อมูลเป็นหลัก เมื่อเรานำโค๊ดมา refactor ก็จะประมาณนี้
fun invoke(): Mono<List<String>> {
return bigtableDataClient.use { client ->
val query = Query.create(TableId.of(TEST_TABLE))
val fluxResult = client.readRows(query).toFlux()
fluxResult
.onBackpressureDrop()
.map { row ->
val key = row.key.toStringUtf8()
val value = row.cells[0].value.toStringUtf8()
"${key}-${value}"
}.collectList()
}
}
ใช้ onBackpressureDrop() ในกรณีที่ consumer ทำงานได้ช้า ให้เอาข้อมูลอันล่าสุดมาใช้ ไม่สนใจข้อมูลเก่าที่ค้างอยู่
หรือเราสามารถใช้วิธีการทั้ง 3 ผสมกันก็ได้ ตาม Use Case ที่เหมาะสมกับสิ่งที่เราต้องการครับ ลองนำไปปรับใช้กันดูได้ครับ
Happy coding ครับ