0%

SpringBoot集成使用Akka

背景

需要做一个异步数据处理的任务,输入速度大于输出(推送形式)速度。脑子里面第一个想到的就是mq,mq香啊,rabbitmqkafkarocketmqpulsar。无奈资源有限,不能选择独立部署的中间件,排除掉独立部署的中间件之后,首先就可以想到一些其他处理技术:ForJoinPool,RxJava 当然还有就是今天的主角akka。首先,我以前简单用过akka,但是其实没有系统的了解过,这时候就强烈强烈强烈推荐看看官方网站

设定

假定我们的业务是一个数据采集回推的任务

1
2
3
4
graph
用户批量提交采集请求-->按照请求执行子采集任务
按照请求执行子采集任务-->执行回推结果

其中子采集任是比较耗时的任务,也是整个流程中相对的瓶颈点。

难点

Spring boot集成

或者跟准确的说是与spring-web-starter集成,这里以gradle+kotlin做例子,实际上想用的同学完全可以套用到maven+java组合里面去。

如果用官方的最新建议是加入下列依赖(有坑,稍后填)

build.gradle.kts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id("org.springframework.boot") version "2.4.3"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
kotlin("jvm") version "1.4.30"
kotlin("plugin.spring") version "1.4.30"
}

group = "org.chaos"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_1_8
val akkaVersion="2.6.3"

configurations {
compileOnly {
extendsFrom(configurations.annotationProcessor.get())
}
}

repositories {
mavenCentral()
}

dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("com.typesafe.akka:akka-actor-typed_2.13:${akkaVersion}")
testCompileOnly("com.typesafe.akka:akka-actor-testkit-typed_2.13:${akkaVersion}")
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}

tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "1.8"
}
}

tasks.withType<Test> {
useJUnitPlatform()
}

我们先定义一个接收用户请求分割为子任务的actor

TaskGenerateActor.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package org.chaos.springbootwebwithakka.akka.actors

import akka.actor.UntypedAbstractActor
import org.chaos.springbootwebwithakka.akka.msg.CollectResultMsg
import org.chaos.springbootwebwithakka.akka.msg.CollectTaskMsg
import org.chaos.springbootwebwithakka.service.TaskExecService
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.config.ConfigurableBeanFactory
import org.springframework.context.annotation.Scope
import org.springframework.stereotype.Component

@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
class TaskExecActor(val taskExecService: TaskExecService) : UntypedAbstractActor() {
private val log: Logger = LoggerFactory.getLogger(this.javaClass)

override fun onReceive(message: Any?) {
if (message is CollectTaskMsg) {
val execRet = taskExecService!!.exec(message.target)
log.info("execRet:{}", execRet)
message.next.tell(CollectResultMsg(execRet, message.target), self)
} else {
unhandled(message)
}
}
}

这里注意,我们使用了@ComponentTaskGenerateActor配置为spring的一个bean,这也对我们按照传统模式获取actorRef提出了要求;另外这里必须将Scope设置为SCOPE_PROTOTYPE,避免单例限制actor的实例生成。

接下来编写一个IndirectActorProducer实现SpringActorProducer.kt用于配置actor的生成策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package org.chaos.springbootwebwithakka.config

import akka.actor.Actor
import akka.actor.IndirectActorProducer
import org.springframework.context.ApplicationContext

class SpringActorProducer : IndirectActorProducer {
private val applicationContext: ApplicationContext
private val actorBeanName: String
private val args: Array<Any>?

constructor(applicationContext: ApplicationContext, actorBeanName: String) {
this.applicationContext = applicationContext
this.actorBeanName = actorBeanName
args = null
}

constructor(applicationContext: ApplicationContext, actorBeanName: String, args: Array<Any>?) {
this.applicationContext = applicationContext
this.actorBeanName = actorBeanName
this.args = args
}

/**
* 所有actor的创建都会调用此方法,
* 此方法由Props.create会调用SpringActorProducer构造方法
* 并靠构造器中的传入参数来获取对应的Actor bean,达到actor交由spring托管的效果
* @return
*/
override fun produce(): Actor {
return if (args == null) {
applicationContext.getBean(actorBeanName) as Actor
} else {
applicationContext.getBean(actorBeanName, *args) as Actor
}
}

override fun actorClass(): Class<out Actor> {
return applicationContext.getType(actorBeanName) as Class<out Actor>
}
}

由这个类中的produce方法可以看出,akka集成进入spring托管的关键就在于所有获取对应actorBeanName的实例时,都应该从springapplicationContext.getBean获取。

下一步,编写akkaExtension实现SpringAkkaExtension.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package org.chaos.springbootwebwithakka.config

import akka.actor.Props
import akka.actor.typed.Extension
import org.springframework.context.ApplicationContext
import org.springframework.stereotype.Component

@Component("springAkkaExtension")
class SpringAkkaExtension: Extension {
private var applicationContext: ApplicationContext? = null

/**
* 装填参数
* @param applicationContext springcontext上下文
*/
fun initialize(applicationContext: ApplicationContext?) {
this.applicationContext = applicationContext
}

fun props(actorBeanName: String?): Props? {
return Props.create(SpringActorProducer::class.java, applicationContext, actorBeanName)
}

fun props(actorBeanName: String?, vararg args: Any?): Props? {
return Props.create(SpringActorProducer::class.java, applicationContext, actorBeanName, args)
}
}

Extensions will only be loaded once per ActorSystem, which will be managed by Akka. You can choose to have your Extension loaded on-demand or at ActorSystem creation time through the Akka configuration. Details on how to make that happens are below, in the “Loading from Configuration” section.

最后最关键的配置ActorSystem Bean配置AkkaConfiguration.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package org.chaos.springbootwebwithakka.config

import akka.actor.ActorSystem
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.chaos.springbootwebwithakka.utils.ACTOR_SYSTEM_NAME
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration


@Configuration
class AkkaConfiguration {
@Autowired
private val applicationContext: ApplicationContext? = null

@Autowired
private val springAkkaExtension: SpringAkkaExtension? = null

@Bean(destroyMethod = "terminate")
fun actorSystem(): ActorSystem? {
val actorSystem: ActorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, akkaConfig())
springAkkaExtension!!.initialize(applicationContext)
return actorSystem
}

@Bean
fun akkaConfig(): Config? {
return ConfigFactory.load()
}
}

接下来编写入口接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package org.chaos.springbootwebwithakka.controller

import org.chaos.springbootwebwithakka.akka.ActorSubmitService
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController

@RestController
class APIController {

@Autowired
private val actorSubmitService: ActorSubmitService? = null

@PostMapping("/greet")
fun greetActor(@RequestBody msg: String): String {
actorSubmitService!!.submitTask(msg)
return "success"
}
}

这里的ActorSubmitService是一个actor的入口类,形式无所谓,但是它须负责将所需的actor都初始化好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package org.chaos.springbootwebwithakka.akka

import akka.actor.ActorRef
import akka.actor.ActorSystem
import org.chaos.springbootwebwithakka.akka.msg.CollectTaskMsg
import org.chaos.springbootwebwithakka.config.SpringAkkaExtension
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct


@Service
class ActorSubmitService {

@Autowired
private val actorSystem: ActorSystem? = null

@Autowired
private val springAkkaExtension: SpringAkkaExtension? = null

private var taskExecActor: ActorRef? = null
private var dataPushActor: ActorRef? = null

@PostConstruct
fun setUp() {
taskExecActor = actorSystem!!.actorOf(springAkkaExtension!!.props("taskExecActor"), "taskExecActor")
dataPushActor = actorSystem!!.actorOf(springAkkaExtension!!.props("dataPushActor"), "dataPushActor")
}

fun submitTask(taskInfo: String) {
val batchTasks = taskInfo.splitToSequence(",").toList()
batchTasks.forEach{
taskExecActor!!.tell(CollectTaskMsg(it, dataPushActor!!), null)
}
}
}

接下来编写相关的actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package org.chaos.springbootwebwithakka.akka.actors

import akka.actor.UntypedAbstractActor
import org.chaos.springbootwebwithakka.akka.msg.CollectResultMsg
import org.chaos.springbootwebwithakka.service.GatewayPushService
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.config.ConfigurableBeanFactory
import org.springframework.context.annotation.Scope
import org.springframework.stereotype.Component

@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
class DataPushActor(val gatewayPushService: GatewayPushService) : UntypedAbstractActor() {

private val log: Logger = LoggerFactory.getLogger(this.javaClass)

override fun onReceive(message: Any?) {
if (message is CollectResultMsg) {
log.info("actor pushed Msg:{}", message.toString())
gatewayPushService.push(message.ret, message.param)
} else {
unhandled(message)
}
}
}

这里actor的内部逻辑由2种形式,第一种直接将业务逻辑写在actor中;第二种调用service包中的业务服务接口。这里我推荐第二种方式,将职责分离:让actor系统来负责调用顺序逻辑控制,让具体的service执行具体的业务逻辑。

下面是涉及到的具体的几个业务逻辑。

1
2
3
4
5
6
package org.chaos.springbootwebwithakka.service

interface TaskExecService {

fun exec(args: String): String
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package org.chaos.springbootwebwithakka.service

import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service

@Service
class TaskExecServiceImpl : TaskExecService {

private val log: Logger = LoggerFactory.getLogger(this.javaClass)

override fun exec(args: String): String {
log.info("handled exec with argument:{}", args)
return "execresult"
}
}
1
2
3
4
5
6
package org.chaos.springbootwebwithakka.service

interface GatewayPushService {

fun push(msg: String, enhance: String, param: String)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
package org.chaos.springbootwebwithakka.service

import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service

@Service
class GatewayPushServiceImpl : GatewayPushService {
private val log: Logger = LoggerFactory.getLogger(this.javaClass)
override fun push(msg: String, enhance: String, param: String) {
log.info("push " + msg + enhance + param)
}
}

启动SpringbootwebwithakkaApplication,使用httpie来做测试

1
echo "haha,heihei,hoho,lolo" | http POST :8080/greet

HTTP/1.1 200
Connection: keep-alive
Content-Length: 7
Content-Type: application/json
Date: Mon, 08 Mar 2021 01:34:04 GMT
Keep-Alive: timeout=4
Proxy-Connection: keep-alive

success

console日志如下

TaskExecServiceImpl : handled exec with argument:haha
TaskExecActor : execRet:execresult
TaskExecServiceImpl : handled exec with argument:heihei
DataPushActor : actor pushed Msg:CollectResultMsg(ret=execresult, param=haha)
TaskExecActor : execRet:execresult
GatewayPushServiceImpl : push execresulthaha
TaskExecServiceImpl : handled exec with argument:hoho
DataPushActor : actor pushed Msg:CollectResultMsg(ret=execresult, param=heihei)
TaskExecActor : execRet:execresult
GatewayPushServiceImpl : push execresultheihei
TaskExecServiceImpl : handled exec with argument:lolo

DataPushActor : actor pushed Msg:CollectResultMsg(ret=execresult, param=hoho)
TaskExecActor : execRet:execresult
GatewayPushServiceImpl : push execresulthoho
DataPushActor : actor pushed Msg:CollectResultMsg(ret=execresult, param=lolo
)
GatewayPushServiceImpl : push execresultlolo

以上就是就是目前akkaspringboot集成使用的一个范例。

单元测试

基本上网上能找到的资料都把基础使用能覆盖到,但是我们要对自己要求提高一些,如何对于akka系统的逻辑进行测试(这里暂时先忽略具体的业务逻辑,这也是我对老代码的改造思路,非常简单,但是非常实用)。如果将业务逻辑也纳入测试范围,可能会涉及到db、外部网络接口(GatewayPush),这些都叫做有side effect的方法,我们不应该做这种臃肿的集成测试。应该集中力量办大事,只对特定的目标逻辑进行合理范围的测试。

这里也会说到上面依赖中预埋的一个坑。因为akka官方的建议的依赖都是*type*,但是上述我们做的spring集成都是时使用的akka.actor.Actor,而并非是akka.actor.typed.Actor。我们应该将依赖改成如下形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id("org.springframework.boot") version "2.4.3"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
kotlin("jvm") version "1.4.30"
kotlin("plugin.spring") version "1.4.30"
}

group = "org.chaos"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_1_8
val akkaVersion="2.6.3"

configurations {
compileOnly {
extendsFrom(configurations.annotationProcessor.get())
}
}

repositories {
mavenCentral()
}

dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("com.typesafe.akka:akka-actor-typed_2.13:${akkaVersion}")
testCompileOnly("com.typesafe.akka:akka-testkit_2.13:${akkaVersion}")
testCompileOnly("io.mockk:mockk:1.10.2")
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}

tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "1.8"
}
}

tasks.withType<Test> {
useJUnitPlatform()
}

接下来我们编写一个TaskExecActor的测试,我们期待输入CollectTaskMsg,获得一个CollectResultMsg,由于我们不想对TaskExecService的逻辑进行任何涉及,我们可以使用mockk对其进行mock,或则用一个自定义的MockClass(特别适用于老工程)来模拟。

这里主要的测试逻辑主要参考official testing guide

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package org.chaos.springbootwebwithakka.akka.actors

import akka.actor.ActorSystem
import akka.actor.Props
import akka.testkit.javadsl.TestKit
import org.chaos.springbootwebwithakka.akka.mocks.MockTaskExecServiceImpl
import org.chaos.springbootwebwithakka.akka.msg.CollectResultMsg
import org.chaos.springbootwebwithakka.akka.msg.CollectTaskMsg
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import java.time.Duration

@SpringBootTest
internal class TaskExecActorTest {

@Autowired
private val actorSystem: ActorSystem? = null

@Test
fun `test TaskExecActor receive with valid data`() {
object: TestKit(actorSystem) {
init {
val service = MockTaskExecServiceImpl()
val props = Props.create(TaskExecActor::class.java, service)
val targetActor = actorSystem!!.actorOf(props)
targetActor.tell(CollectTaskMsg("a", ref), ref)
val msg = expectMsgClass(CollectResultMsg::class.java)
Assertions.assertNotNull(msg)
// 香吗?
Assertions.assertEquals(service.MOCKED_EXEC_RESULT, (msg as CollectResultMsg).ret)
Assertions.assertEquals("a", msg.param)
// 这里像不像mock常用的exactOnce?
Assertions.assertEquals(1, service.execTimes)
}
}

}
}

以上就是spring boot集成akka做单元测试的例子。需要单独提一下ActorContext.actorSelection,可以看到上述例子里面是在消息里面传入了一个ActorRef来决定下一级actor的目的地址,这个方式是比较建议的,另外还可以由对应actor的业务服务提供一个方法来动态决定acotr地址,这里就可以用到ActorContext.actorSelection。但是非常不建议直接在actor实现中静态写死下一级actor的path。因为这样逻辑绑死了,测试都不好搞(必须将整个链路的actor全部测完,又变成了集成测试)。

本本对应源代码地址为github sourcecode

参考

Introduction to Spring with Akka

aliakh-demo-akka-spring-github sourcecode

akka_essentials

official testing guide

Akka-SpringBoot集成