体验一下 Dapr Actor
Dapr 是什么#
Dapr 是分布式应用程序运行时(Distribution APplication Runtime)的缩写,是一个可移植的、事件驱动的运行时,它使得开发人员能够基于此运行时轻松构建出弹性的、无状态和有状态的应用程序,它可运行在云平台或边缘计算中,同时支持多种编程语言和开发框架。
Dapr 本身是开源的(https://github.com/dapr/dapr),目前也是 CNCF 孵化期的项目,采用 Golang 语言编写。在云原生技术越来越成熟和被重视的现在,Dapr 也是颇有前景的项目,值得技术人员投入和研究。
👉 详细的功能及搭建方式,参看官方文档。
Actor 是什么#
Actor 计算模型是一个上了年纪的通用并发编程模型,是由 Carl Hewitt 在 1973 年定义(论文地址), 然后由 Erlang OTP 发扬光大,其后大大小小的框架或者编程语言带来了多多少少与纯 Actor 模型有差异的实现,Actor 模型就慢慢的进入了麻瓜开发者们(不能手撸 Actor 模型的开发者)的世界,现在各位麻瓜们偶尔可见其施展魔法留下的痕迹并习以为常了。
那么 Actor 模型是用来干嘛的?就原始定义来说,Actor是一个并发处理的数学模型,用来解决并发计算控制中的各种问题。Actor 模式声明 Actors 为并发计算的通用原语,换句话说,我们可以将代码写入独立单元 ( 称为Actor) ,该单元接收消息并一次处理消息,而不进行任何类型的并发控制或线程处理,当代码处理一条消息时,它可以向其他 Actor 发送一条或多条消息,或者创建新的 Actors。 底层运行时将管理每个 Actor 的运行方式、时机和位置,并在 Actors 之间传递消息,大量 Actors 可以同时执行,Actors 彼此相互独立执行。
Dapr 包含专门实现 virtual actors 模式 的运行时,通过 Dapr 的实现,我们可以根据 Actors 模型编写 Dapr Actor,而 Dapr 利用底层平台提供可扩展性和可靠性保证。
Dapr Actor 的使用和实现#
适用场景#
Actor 设计模式可以很好试用于一些分布式系统问题和场景,但我们首先应该考虑的是模式的约束。 一般来说,在下列场景下可以考虑 Actor 模式来模拟你的问题或场景:
- 您的问题空间涉及大量(数千或更多) 的独立和孤立的微小计算单元和逻辑处理单元
- 您想要处理单线程对象,这些对象不需要外部组件的大量交互,例如在一组 Actors 之间查询状态
- 您的 Actor 实例不会通过发出 I/O 操作来阻塞调用方
Actors 生命周期#
Dapr Actors 是虚拟的,它们不需要显式创建或销毁,一切都交给 Dapr Actor 运行时来管理
- Dapr Actors 运行时在第一次接收到该 Actor ID 的请求时自动激活 Actor
- 如果 Actor 在一段时间内未被使用,那么 Dapr Actors 运行时将回收内存对象
- 重新启动该 Actor ID 的 Actor 时,它还将持有回收前的一切原有数据(虽然不是同一个 Actor 实例了)
- Dapr 运行时用来检查 Actor 是否需要回收时,Actor 空闲超时时间和运行时的扫描间隔是可以配置的
Dapr Actor 提供的功能#
Dapr Actor 提供了一下的功能来实现 Virtual Actor 模式
-
调用特定的 Actor 方法
shellPOST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method> -
Actor Timers/Reminders 来注册一个定时执行的方法
-
Actor 状态管理,用来保存 Actor 运行时的各种状态数据
初体验#
下面我们就用 Dapr 的 Java SDK 来小小体验一下 Dapr Actor,Dapr Java SDK 基于 Reactor 项目提供了异步无阻塞的响应式编程模型,使用之前需要大家提前学习和掌握这个框架,它也是 Spring Webflux 的核心框架哦,学就完了。
正式开始,按照步骤来
- 确保正确安装 Dapr,参看
root@vm1:~/dapr-actor-demo# dapr -v
CLI version: 1.8.1
Runtime version: 1.8.4-
编写一个提供服务的接口
javapackage io.github.wynn5a; import io.dapr.actors.ActorMethod; import io.dapr.actors.ActorType; import reactor.core.publisher.Mono; @ActorType(name = "some-actor") public interface DemoActor { //创建一个 Reminder void registerReminder(); //一个 Actor 方法 @ActorMethod(name = "say_something") String say(String something); //一个 Actor 方法,带有异步返回值 @ActorMethod(returns = Integer.class) Mono<Integer> incrementAndGet(int delta); } -
实现这些服务
javapackage io.github.wynn5a; import io.dapr.actors.ActorId; import io.dapr.actors.runtime.AbstractActor; import io.dapr.actors.runtime.ActorRuntimeContext; import io.dapr.actors.runtime.Remindable; import io.dapr.utils.TypeRef; import reactor.core.publisher.Mono; import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Random; public class DemoActorImpl extends AbstractActor implements DemoActor, Remindable<Integer> { public static final String COUNTER_KEY = "counter"; private final DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME; private final Random random = new Random(); /** * Instantiates a new Actor. * * @param runtimeContext Context for the runtime. * @param id Actor identifier. */ public DemoActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { super(runtimeContext, id); } /** * create reminder */ @Override public void registerReminder() { super.registerReminder("reminder-1", random.nextInt(1000), Duration.ofSeconds(5), Duration.ofSeconds(2)).block(); } @Override public String say(String something) { System.out.println("Get from client: " + something); return LocalDateTime.now().format(formatter) + " --> " + something; } /** * use dapr state manager to update and save counter * * @param delta amount to be added to counter * @return new counter value */ @Override public Mono<Integer> incrementAndGet(int delta) { return getActorStateManager().contains(COUNTER_KEY) .flatMap(exists -> exists ? super.getActorStateManager().get(COUNTER_KEY, int.class) : Mono.just(0)) .map(c -> c + delta).flatMap(c -> super.getActorStateManager().set(COUNTER_KEY, c).thenReturn(c)); } /** * return type of reminder to consume * @return type reference */ @Override public TypeRef<Integer> getStateType() { return TypeRef.INT; } /** * do something to consume reminders * * @param reminderName The name of reminder provided during registration. * @param state The user state provided during registration. * @param dueTime The invocation due time provided during registration. * @param period The invocation period provided during registration. * @return nothing */ @Override public Mono<Void> receiveReminder(String reminderName, Integer state, Duration dueTime, Duration period) { return Mono.fromRunnable(() -> { String message = String.format("Server received reminder from actor id:%s and name:%s with state: %d @ %s", this.getId(), reminderName, state, LocalDateTime.now().format(formatter)); // Handles the request by printing message. System.out.println(message); }); } } -
启动应用,并设置 Actor
javapackage io.github.wynn5a; import io.dapr.actors.runtime.ActorRuntime; import io.dapr.actors.runtime.ActorRuntimeConfig; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.time.Duration; /** * How to run: <br/> * dapr run --components-path ./components --app-id demo-actor-server --app-port 8080 \ * -- java -jar target/dapr-actor-demo-1.0.0.jar */ @SpringBootApplication public class DemoActorApplication { public static void main(String[] args) throws Exception { ActorRuntimeConfig config = ActorRuntime.getInstance().getConfig(); // Idle timeout until actor instance is deactivated. config.setActorIdleTimeout(Duration.ofSeconds(30)); // How often actor instances are scanned for deactivation and balance. config.setActorScanInterval(Duration.ofSeconds(10)); // How long to wait until for draining an ongoing API call for an actor instance. config.setDrainOngoingCallTimeout(Duration.ofSeconds(10)); // Determines whether to drain API calls for actors instances being balanced. config.setDrainBalancedActors(true); // Register the Actor class. ActorRuntime.getInstance().registerActor(DemoActorImpl.class); SpringApplication.run(DemoActorApplication.class); } } -
编写一个 Client 去生成 Actor 并且调用 Actor 服务中的方法。测试流程:启动两个 Actor 来模拟并发,他们会分别注册 Reminder,分别调用
incrementAndGet和say两个 Actor 方法javapackage io.github.wynn5a; import io.dapr.actors.ActorId; import io.dapr.actors.client.ActorClient; import io.dapr.actors.client.ActorProxyBuilder; import java.util.ArrayList; import java.util.List; /** * How to run client: <br/> * dapr run --components-path ./components --app-id actor-client \ * -- mvn compile exec:java -Dexec.mainClass="io.github.wynn5a.DemoActorClient" */ public class DemoActorClient { private static final int NUM_ACTORS = 2; public static void main(String[] args) throws InterruptedException { try (ActorClient client = new ActorClient()) { ActorProxyBuilder<DemoActor> builder = new ActorProxyBuilder<>(DemoActor.class, client); List<Thread> threads = new ArrayList<>(NUM_ACTORS); // Creates multiple actors. for (int i = 0; i < NUM_ACTORS; i++) { ActorId actorId = ActorId.createRandom(); DemoActor actor = builder.build(actorId); // Start a thread per actor. Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor)); thread.start(); threads.add(thread); } // Waits for threads to finish. for (Thread thread : threads) { thread.join(); } } System.out.println("Done."); } /** * Makes multiple method calls into actor until interrupted. * * @param actorId Actor's identifier. * @param actor Actor to be invoked. */ private static void callActorForever(String actorId, DemoActor actor) { // First, call register reminder. actor.registerReminder(); // Now, we run until thread is interrupted. while (!Thread.currentThread().isInterrupted()) { // Invoke actor method to increment counter by 1, then build message. actor.incrementAndGet(1).map(i -> String.format("Actor %s said message #%d", actorId, i)) .map(actor::say) .subscribe(s -> System.out.printf("Actor %s got a reply: %s%n", actorId, s)); try { // Waits for up to 2 second. Thread.sleep((long) (2000 * Math.random())); } catch (InterruptedException e) { // We have been interrupted, so we set the interrupted flag to exit gracefully. Thread.currentThread().interrupt(); } } } } -
需要注册一个 Dapr component 来存储状态,这个需要支持事务的存储后端来做,我们就用 Dapr 默认的 Redis
javaapiVersion: dapr.io/v1alpha1 kind: Component metadata: name: statestore spec: type: state.redis version: v1 metadata: - name: redisHost value: localhost:6379 - name: redisPassword value: "" - name: actorStateStore value: "true" -
然后用 maven 把上面的东西放到一块,启动命令如下
-
启动 Service
shelldapr run --components-path ./components --app-id demo-actor-server --app-port 8080 -- java -jar target/dapr-actor-demo-1.0.0.jar -
启动 Client
shelldapr run --components-path ./components --app-id actor-client -- mvn compile exec:java -Dexec.mainClass="io.github.wynn5a.DemoActorClient"
-
总结#
上面的代码,我们简单的体验了 Dapr Actor 带来的方法调用和 Reminders 的功能,算是一个体验性质的玩具,还有很多进阶的玩法,需要我们下次去探索
此次示例,完整代码见下方连接