背景 需要做一个异步数据处理的任务,输入速度大于输出(推送形式)速度。脑子里面第一个想到的就是mq
,mq
香啊,rabbitmq
、kafka
、rocketmq
、pulsar
。无奈资源有限,不能选择独立部署的中间件,排除掉独立部署的中间件之后,首先就可以想到一些其他处理技术:ForJoinPool
,RxJava
当然还有就是今天的主角akka
。首先,我以前简单用过akka,但是其实没有系统的了解过,这时候就强烈强烈强烈推荐看看官方网站 。
设定 假定我们的业务是一个数据采集回推的任务
1 2 3 4 graph 用户批量提交采集请求-->按照请求执行子采集任务 按照请求执行子采集任务-->执行回推结果
其中子采集任是比较耗时的任务,也是整个流程中相对的瓶颈点。
难点 Spring boot
集成或者跟准确的说是与spring-web-starter
集成,这里以gradle
+kotlin
做例子,实际上想用的同学完全可以套用到maven
+java
组合里面去。
如果用官方的最新建议是加入下列依赖(有坑,稍后填)
build.gradle.kts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import org.jetbrains.kotlin.gradle.tasks.KotlinCompileplugins { id("org.springframework.boot" ) version "2.4.3" id("io.spring.dependency-management" ) version "1.0.11.RELEASE" kotlin("jvm" ) version "1.4.30" kotlin("plugin.spring" ) version "1.4.30" } group = "org.chaos" version = "0.0.1-SNAPSHOT" java.sourceCompatibility = JavaVersion.VERSION_1_8 val akkaVersion="2.6.3" configurations { compileOnly { extendsFrom(configurations.annotationProcessor.get ()) } } repositories { mavenCentral() } dependencies { implementation("org.springframework.boot:spring-boot-starter-web" ) implementation("com.fasterxml.jackson.module:jackson-module-kotlin" ) implementation("org.jetbrains.kotlin:kotlin-reflect" ) implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8" ) implementation("com.typesafe.akka:akka-actor-typed_2.13:${akkaVersion} " ) testCompileOnly("com.typesafe.akka:akka-actor-testkit-typed_2.13:${akkaVersion} " ) compileOnly("org.projectlombok:lombok" ) annotationProcessor("org.projectlombok:lombok" ) testImplementation("org.springframework.boot:spring-boot-starter-test" ) } tasks.withType<KotlinCompile> { kotlinOptions { freeCompilerArgs = listOf("-Xjsr305=strict" ) jvmTarget = "1.8" } } tasks.withType<Test> { useJUnitPlatform() }
我们先定义一个接收用户请求分割为子任务的actor
TaskGenerateActor.kt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package org.chaos.springbootwebwithakka.akka.actorsimport akka.actor.UntypedAbstractActorimport org.chaos.springbootwebwithakka.akka.msg.CollectResultMsgimport org.chaos.springbootwebwithakka.akka.msg.CollectTaskMsgimport org.chaos.springbootwebwithakka.service.TaskExecServiceimport org.slf4j.Loggerimport org.slf4j.LoggerFactoryimport org.springframework.beans.factory.config.ConfigurableBeanFactoryimport org.springframework.context.annotation .Scopeimport org.springframework.stereotype.Component@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) class TaskExecActor (val taskExecService: TaskExecService) : UntypedAbstractActor() { private val log: Logger = LoggerFactory.getLogger(this .javaClass) override fun onReceive (message: Any ?) { if (message is CollectTaskMsg) { val execRet = taskExecService!!.exec(message.target) log.info("execRet:{}" , execRet) message.next.tell(CollectResultMsg(execRet, message.target), self) } else { unhandled(message) } } }
这里注意,我们使用了@Component
将TaskGenerateActor
配置为spring
的一个bean
,这也对我们按照传统模式获取actorRef
提出了要求;另外这里必须将Scope
设置为SCOPE_PROTOTYPE
,避免单例限制actor
的实例生成。
接下来编写一个IndirectActorProducer
实现SpringActorProducer.kt
用于配置actor的生成策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package org.chaos.springbootwebwithakka.configimport akka.actor.Actorimport akka.actor.IndirectActorProducerimport org.springframework.context.ApplicationContextclass SpringActorProducer : IndirectActorProducer { private val applicationContext: ApplicationContext private val actorBeanName: String private val args: Array<Any>? constructor (applicationContext: ApplicationContext, actorBeanName: String) { this .applicationContext = applicationContext this .actorBeanName = actorBeanName args = null } constructor (applicationContext: ApplicationContext, actorBeanName: String, args: Array<Any>?) { this .applicationContext = applicationContext this .actorBeanName = actorBeanName this .args = args } override fun produce () : Actor { return if (args == null ) { applicationContext.getBean(actorBeanName) as Actor } else { applicationContext.getBean(actorBeanName, *args) as Actor } } override fun actorClass () : Class<out Actor> { return applicationContext.getType(actorBeanName) as Class<out Actor> } }
由这个类中的produce
方法可以看出,akka
集成进入spring
托管的关键就在于所有获取对应actorBeanName
的实例时,都应该从spring
的applicationContext.getBean
获取。
下一步,编写akka
的Extension
实现SpringAkkaExtension.kt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package org.chaos.springbootwebwithakka.configimport akka.actor.Propsimport akka.actor.typed.Extensionimport org.springframework.context.ApplicationContextimport org.springframework.stereotype.Component@Component("springAkkaExtension" ) class SpringAkkaExtension : Extension { private var applicationContext: ApplicationContext? = null fun initialize (applicationContext: ApplicationContext ?) { this .applicationContext = applicationContext } fun props (actorBeanName: String ?) : Props? { return Props.create(SpringActorProducer::class .java, applicationContext, actorBeanName) } fun props (actorBeanName: String ?, vararg args: Any ?) : Props? { return Props.create(SpringActorProducer::class .java, applicationContext, actorBeanName, args) } }
Extensions will only be loaded once per ActorSystem
, which will be managed by Akka. You can choose to have your Extension loaded on-demand or at ActorSystem
creation time through the Akka configuration. Details on how to make that happens are below, in the “Loading from Configuration” section.
最后最关键的配置ActorSystem
Bean配置AkkaConfiguration.kt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package org.chaos.springbootwebwithakka.configimport akka.actor.ActorSystemimport com.typesafe.config.Configimport com.typesafe.config.ConfigFactoryimport org.chaos.springbootwebwithakka.utils.ACTOR_SYSTEM_NAMEimport org.springframework.beans.factory.annotation .Autowiredimport org.springframework.context.ApplicationContextimport org.springframework.context.annotation .Beanimport org.springframework.context.annotation .Configuration@Configuration class AkkaConfiguration { @Autowired private val applicationContext: ApplicationContext? = null @Autowired private val springAkkaExtension: SpringAkkaExtension? = null @Bean(destroyMethod = "terminate" ) fun actorSystem () : ActorSystem? { val actorSystem: ActorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, akkaConfig()) springAkkaExtension!!.initialize(applicationContext) return actorSystem } @Bean fun akkaConfig () : Config? { return ConfigFactory.load() } }
接下来编写入口接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package org.chaos.springbootwebwithakka.controllerimport org.chaos.springbootwebwithakka.akka.ActorSubmitServiceimport org.springframework.beans.factory.annotation .Autowiredimport org.springframework.web.bind.annotation .PostMappingimport org.springframework.web.bind.annotation .RequestBodyimport org.springframework.web.bind.annotation .RestController@RestController class APIController { @Autowired private val actorSubmitService: ActorSubmitService? = null @PostMapping("/greet" ) fun greetActor (@RequestBody msg: String ) : String { actorSubmitService!!.submitTask(msg) return "success" } }
这里的ActorSubmitService
是一个actor
的入口类,形式无所谓,但是它须负责将所需的actor
都初始化好。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package org.chaos.springbootwebwithakka.akkaimport akka.actor.ActorRefimport akka.actor.ActorSystemimport org.chaos.springbootwebwithakka.akka.msg.CollectTaskMsgimport org.chaos.springbootwebwithakka.config.SpringAkkaExtensionimport org.springframework.beans.factory.annotation .Autowiredimport org.springframework.stereotype.Serviceimport javax.annotation .PostConstruct@Service class ActorSubmitService { @Autowired private val actorSystem: ActorSystem? = null @Autowired private val springAkkaExtension: SpringAkkaExtension? = null private var taskExecActor: ActorRef? = null private var dataPushActor: ActorRef? = null @PostConstruct fun setUp () { taskExecActor = actorSystem!!.actorOf(springAkkaExtension!!.props("taskExecActor" ), "taskExecActor" ) dataPushActor = actorSystem!!.actorOf(springAkkaExtension!!.props("dataPushActor" ), "dataPushActor" ) } fun submitTask (taskInfo: String ) { val batchTasks = taskInfo.splitToSequence("," ).toList() batchTasks.forEach{ taskExecActor!!.tell(CollectTaskMsg(it, dataPushActor!!), null ) } } }
接下来编写相关的actor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package org.chaos.springbootwebwithakka.akka.actorsimport akka.actor.UntypedAbstractActorimport org.chaos.springbootwebwithakka.akka.msg.CollectResultMsgimport org.chaos.springbootwebwithakka.service.GatewayPushServiceimport org.slf4j.Loggerimport org.slf4j.LoggerFactoryimport org.springframework.beans.factory.config.ConfigurableBeanFactoryimport org.springframework.context.annotation .Scopeimport org.springframework.stereotype.Component@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) class DataPushActor (val gatewayPushService: GatewayPushService) : UntypedAbstractActor() { private val log: Logger = LoggerFactory.getLogger(this .javaClass) override fun onReceive (message: Any ?) { if (message is CollectResultMsg) { log.info("actor pushed Msg:{}" , message.toString()) gatewayPushService.push(message.ret, message.param) } else { unhandled(message) } } }
这里actor
的内部逻辑由2种形式,第一种直接将业务逻辑写在actor
中;第二种调用service
包中的业务服务接口。这里我推荐第二种方式,将职责分离:让actor
系统来负责调用顺序逻辑控制,让具体的service
执行具体的业务逻辑。
下面是涉及到的具体的几个业务逻辑。
1 2 3 4 5 6 package org.chaos.springbootwebwithakka.serviceinterface TaskExecService { fun exec (args: String ) : String }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package org.chaos.springbootwebwithakka.serviceimport org.slf4j.Loggerimport org.slf4j.LoggerFactoryimport org.springframework.stereotype.Service@Service class TaskExecServiceImpl : TaskExecService { private val log: Logger = LoggerFactory.getLogger(this .javaClass) override fun exec (args: String ) : String { log.info("handled exec with argument:{}" , args) return "execresult" } }
1 2 3 4 5 6 package org.chaos.springbootwebwithakka.serviceinterface GatewayPushService { fun push (msg: String , enhance: String , param: String ) }
1 2 3 4 5 6 7 8 9 10 11 12 13 package org.chaos.springbootwebwithakka.serviceimport org.slf4j.Loggerimport org.slf4j.LoggerFactoryimport org.springframework.stereotype.Service@Service class GatewayPushServiceImpl : GatewayPushService { private val log: Logger = LoggerFactory.getLogger(this .javaClass) override fun push (msg: String , enhance: String , param: String ) { log.info("push " + msg + enhance + param) } }
启动SpringbootwebwithakkaApplication
,使用httpie
来做测试
1 echo "haha,heihei,hoho,lolo" | http POST :8080/greet
HTTP/1.1 200 Connection: keep-alive Content-Length: 7 Content-Type: application/json Date: Mon, 08 Mar 2021 01:34:04 GMT Keep-Alive: timeout=4 Proxy-Connection: keep-alive
success
console日志如下
TaskExecServiceImpl : handled exec with argument:haha TaskExecActor : execRet:execresult TaskExecServiceImpl : handled exec with argument:heihei DataPushActor : actor pushed Msg:CollectResultMsg(ret=execresult, param=haha) TaskExecActor : execRet:execresult GatewayPushServiceImpl : push execresulthaha TaskExecServiceImpl : handled exec with argument:hoho DataPushActor : actor pushed Msg:CollectResultMsg(ret=execresult, param=heihei) TaskExecActor : execRet:execresult GatewayPushServiceImpl : push execresultheihei TaskExecServiceImpl : handled exec with argument:lolo
DataPushActor : actor pushed Msg:CollectResultMsg(ret=execresult, param=hoho) TaskExecActor : execRet:execresult GatewayPushServiceImpl : push execresulthoho DataPushActor : actor pushed Msg:CollectResultMsg(ret=execresult, param=lolo ) GatewayPushServiceImpl : push execresultlolo
以上就是就是目前akka
与springboot
集成使用的一个范例。
单元测试 基本上网上能找到的资料都把基础使用能覆盖到,但是我们要对自己要求提高一些,如何对于akka系统的逻辑 进行测试(这里暂时先忽略具体的业务逻辑,这也是我对老代码的改造思路,非常简单,但是非常实用)。如果将业务逻辑也纳入测试范围,可能会涉及到db、外部网络接口(GatewayPush
),这些都叫做有side effect
的方法,我们不应该做这种臃肿的集成测试 。应该集中力量办大事,只对特定的目标逻辑进行合理范围的测试。
这里也会说到上面依赖中预埋的一个坑。因为akka
官方的建议的依赖都是*type*
,但是上述我们做的spring
集成都是时使用的akka.actor.Actor
,而并非是akka.actor.typed.Actor
。我们应该将依赖改成如下形式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import org.jetbrains.kotlin.gradle.tasks.KotlinCompileplugins { id("org.springframework.boot" ) version "2.4.3" id("io.spring.dependency-management" ) version "1.0.11.RELEASE" kotlin("jvm" ) version "1.4.30" kotlin("plugin.spring" ) version "1.4.30" } group = "org.chaos" version = "0.0.1-SNAPSHOT" java.sourceCompatibility = JavaVersion.VERSION_1_8 val akkaVersion="2.6.3" configurations { compileOnly { extendsFrom(configurations.annotationProcessor.get ()) } } repositories { mavenCentral() } dependencies { implementation("org.springframework.boot:spring-boot-starter-web" ) implementation("com.fasterxml.jackson.module:jackson-module-kotlin" ) implementation("org.jetbrains.kotlin:kotlin-reflect" ) implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8" ) implementation("com.typesafe.akka:akka-actor-typed_2.13:${akkaVersion} " ) testCompileOnly("com.typesafe.akka:akka-testkit_2.13:${akkaVersion} " ) testCompileOnly("io.mockk:mockk:1.10.2" ) compileOnly("org.projectlombok:lombok" ) annotationProcessor("org.projectlombok:lombok" ) testImplementation("org.springframework.boot:spring-boot-starter-test" ) } tasks.withType<KotlinCompile> { kotlinOptions { freeCompilerArgs = listOf("-Xjsr305=strict" ) jvmTarget = "1.8" } } tasks.withType<Test> { useJUnitPlatform() }
接下来我们编写一个TaskExecActor
的测试,我们期待输入CollectTaskMsg
,获得一个CollectResultMsg
,由于我们不想对TaskExecService
的逻辑进行任何涉及,我们可以使用mockk
对其进行mock
,或则用一个自定义的MockClass
(特别适用于老工程)来模拟。
这里主要的测试逻辑主要参考official testing guide
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package org.chaos.springbootwebwithakka.akka.actorsimport akka.actor.ActorSystemimport akka.actor.Propsimport akka.testkit.javadsl.TestKitimport org.chaos.springbootwebwithakka.akka.mocks.MockTaskExecServiceImplimport org.chaos.springbootwebwithakka.akka.msg.CollectResultMsgimport org.chaos.springbootwebwithakka.akka.msg.CollectTaskMsgimport org.junit.jupiter.api.Assertionsimport org.junit.jupiter.api.Testimport org.springframework.beans.factory.annotation .Autowiredimport org.springframework.boot.test.context.SpringBootTestimport java.time.Duration@SpringBootTest internal class TaskExecActorTest { @Autowired private val actorSystem: ActorSystem? = null @Test fun `test TaskExecActor receive with valid data `() { object : TestKit(actorSystem) { init { val service = MockTaskExecServiceImpl() val props = Props.create(TaskExecActor::class .java, service) val targetActor = actorSystem!!.actorOf(props) targetActor.tell(CollectTaskMsg("a" , ref), ref) val msg = expectMsgClass(CollectResultMsg::class .java) Assertions.assertNotNull(msg) Assertions.assertEquals(service.MOCKED_EXEC_RESULT, (msg as CollectResultMsg).ret) Assertions.assertEquals("a" , msg.param) Assertions.assertEquals(1 , service.execTimes) } } } }
以上就是spring boot集成akka做单元测试的例子。需要单独提一下ActorContext.actorSelection
,可以看到上述例子里面是在消息里面传入了一个ActorRef来决定下一级actor的目的地址,这个方式是比较建议的,另外还可以由对应actor的业务服务提供一个方法来动态决定acotr地址,这里就可以用到ActorContext.actorSelection
。但是非常不建议直接在actor实现中静态写死下一级actor的path。因为这样逻辑绑死了,测试都不好搞(必须将整个链路的actor全部测完,又变成了集成测试)。
本本对应源代码地址为github sourcecode
参考 Introduction to Spring with Akka
aliakh-demo-akka-spring-github sourcecode
akka_essentials
official testing guide
Akka-SpringBoot集成