作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
Diego Castorina's profile image

Diego Castorina

迭戈一直是意大利电信巨头Italtel等公司的高级自由工程师. 他还共同创立了一家基于网络的CRM业务.

Expertise

Years of Experience

17

分享

The challenge

写作 concurrent programs 是很困难的. Having to deal with threads, 锁, race conditions, 等等都非常容易出错,并可能导致代码难以阅读, 测试, maintain.

因此,许多人倾向于完全避免多线程. Instead, 它们专门使用单线程进程, 依赖于外部服务(如数据库), 队列, 等.)来处理任何需要的并发或异步操作. 虽然这种方法在某些情况下是一种合法的替代方法, 在许多情况下,它根本不是一个可行的选择. 许多实时系统-例如交易或银行应用程序, 或者实时游戏——没有等待单线程进程完成的奢侈(他们现在就需要答案)!). 其他系统是计算密集型或资源密集型的,如果没有在代码中引入并行化,它们将花费大量的时间(在某些情况下是数小时甚至数天)来运行.

一种相当常见的单线程方法(广泛用于 节点.js 例如,World)是使用基于事件的非阻塞范式. 这避免了上下文切换,从而提高了性能, 锁, blocking, 它仍然没有解决并发使用多个处理器的问题(这样做需要启动), coordinating between, multiple independent processes).

所以这是否意味着你别无选择,只能深入到线程的内部, 锁, 和竞争条件,以便构建并发应用程序?

感谢Akka框架,答案是否定的. 本教程介绍了Akka示例,并探讨了它促进和简化并发实现的方式, distributed applications.

What is the Akka Framework?

这篇文章介绍了Akka,并探讨了它促进和简化并发实现的方式, distributed applications.

Akka 用于构建的工具箱和运行时是高度并发的吗, distributed, 以及JVM上的容错应用程序. Akka is written in Scala,同时为Scala和Java提供了语言绑定.

Akka处理并发性的方法基于 演员 Model. In an 演员-based 系统, everything is an 演员, 就像在面向对象设计中,一切都是对象一样. A key difference, 但是,与我们的讨论特别相关的是,演员模型是专门设计和架构为并发模型的,而面向对象模型则不是. More specifically, in a Scala 演员 系统, 参与者交互并共享信息, 没有任何预设的顺序. 参与者相互共享信息的机制, t问 one another, is 消息 passing.

创建和调度线程的所有复杂性, 接收和发送消息, 处理竞争条件和同步, 是否降级到框架处理透明.

Akka在参与者和底层系统之间创建了一个层,这样参与者只需要处理消息. 创建和调度线程的所有复杂性, 接收和发送消息, 处理竞争条件和同步, 是否降级到框架处理透明.

Akka strictly adheres to the The Reactive Manifesto. 响应式应用旨在用满足以下一个或多个要求的架构取代传统的多线程应用:

  • Event-driven. Using 演员s, 可以编写异步处理请求的代码,并专门使用非阻塞操作.
  • Scalable. In Akka, 无需修改代码就可以添加节点, 多亏了消息传递和位置透明性.
  • Resilient. 任何应用程序都会在某个时间点遇到错误和失败. Akka提供了“监督”(容错)策略来促进系统的自我修复.
  • Responsive. 今天的许多高性能和快速响应应用程序需要向用户提供快速反馈,因此需要以极其及时的方式对事件做出反应. Akka的非阻塞、基于消息的策略有助于实现这一目标.

What is an 演员 in Akka?

参与者本质上只不过是接收消息并采取操作来处理消息的对象. 它与消息源解耦,它唯一的职责是正确识别它所接收到的消息类型并采取相应的操作.

在收到消息后,参与者可以采取以下一个或多个操作:

  • 自己执行一些操作(例如执行计算), persisting data, calling an external web service, so on)
  • 将消息或派生消息转发给另一个参与者
  • 实例化一个新的参与者并将消息转发给它

或者,参与者可以选择完全忽略该消息.e.(它可以选择不作为)如果它认为这样做是合适的.

要实现演员,必须扩展akka.演员.演员特征并实现接收方法. 当消息被发送到参与者时,参与者的receive方法将被调用(由Akka调用). 它的典型实现由模式匹配组成, 如下面的Akka示例所示, 要识别消息类型并作出相应的反应:

import akka.演员.演员
import akka.演员.道具
import akka.事件.Logging
 
class My演员 extends 演员 {
  def receive = {
    case value: String => doSomething(value)
    case _ => println("received unknown 消息")
  }
}

模式匹配是处理消息的一种相对优雅的技术, 比起基于回调的类似实现,哪一种倾向于生成“更干净”且更易于导航的代码. 例如,考虑一个简单的HTTP请求/响应实现.

首先,让我们在JavaScript中使用基于回调的范例来实现它:

route(url, function(request){
  var query = buildQuery(request);
  dbCall(查询功能(dbResponse) {
    var wsRequest = buildWebServiceRequest(dbResponse);
    wcall (wsRequest, function(wsResponse)) {
      发送Reply(wsResponse);
    });
  });
});

现在让我们将其与基于模式匹配的实现进行比较:

msg match {
  case HttpRequest(request) => {
    val query = buildQuery(request)
    dbCall(query)
  }
  case DbResponse(dbResponse) => {
    var wsRequest = buildWebServiceRequest(dbResponse);
    wsCall(dbResponse)
  }
  case WsResponse(wsResponse) => 发送Reply(wsResponse)
}

虽然基于回调的JavaScript代码确实很紧凑, 当然,阅读和导航难度更大. In comparison, 基于模式匹配的代码使正在考虑的情况以及如何处理每种情况更加明显.

The 演员 System

把一个复杂的问题递归地分解成更小的子问题,通常是一种可靠的问题解决技术. 这种方法在计算机科学中特别有益(与 单一责任原则), as it tends to yield clean, modularized code, with little or no redundancy, 这相对容易维护.

In an 演员-based design, 使用这种技术可以方便地将参与者的逻辑组织成一个层次结构,称为 演员 System. 参与者系统提供了参与者相互交互的基础设施.

参与者系统在Akka框架中工作的一个例子.

在Akka中,与演员交流的唯一方式是通过an 演员Ref. An 演员Ref 表示对参与者的引用,该引用阻止其他对象直接访问或操纵该参与者的内部和状态. 消息可以通过控件发送给参与者 演员Ref 使用以下语法协议之一:

  • ! (" tell ")——发送消息并立即返回
  • ? (" 问 ")——发送消息并返回 Future representing a possible reply

每个参与者都有一个邮箱,将传入的消息传递到该邮箱. 有多种邮箱实现可供选择, 默认实现是FIFO.

参与者包含许多实例变量,以便在处理多个消息时保持状态. Akka确保参与者的每个实例在自己的轻量级线程中运行,并且每次处理一个消息. In this way, 每个参与者的状态都可以可靠地维护,而开发人员无需显式地担心同步或竞争条件.

通过Akka 演员 API,每个演员都提供了以下有用的信息来执行其任务:

  • 发送方: an 演员Ref 到当前正在处理的消息的发送方
  • context:与参与者运行的上下文相关的信息和方法(包括, for example, an 演员Of 方法(用于实例化新参与者)
  • supervisionStrategy:定义用于从错误中恢复的策略
  • 自我: 演员Ref for the 演员 it自我
Akka确保参与者的每个实例在自己的轻量级线程中运行,并且每次处理一个消息. In this way, 每个参与者的状态都可以可靠地维护,而开发人员无需显式地担心同步或竞争条件.

将这些教程联系在一起, 让我们考虑一个计算文本文件中单词数量的简单示例.

对于我们的Akka例子来说, we’ll decompose the problem into two subt问s; namely, (1)计算单行单词数的“子”任务;(2)计算每行单词数之和的“父”任务,得到文件中的单词总数.

父角色将从文件中加载每行,然后将计算该行字数的任务委托给子角色. 当子进程完成后,它将把结果发送回父进程. 父进程将接收带有单词计数的消息(每行),并为整个文件中的单词总数保留一个计数器, 然后在完成后返回给调用者.

(请注意,下面提供的Akka教程代码示例仅用于教学,因此不一定涉及所有边缘条件, performance optimizations, so on. 此外,本文还提供了下面所示代码示例的完整可编译版本 要点.)

让我们首先看一下子节点的样例实现 StringCounter演员 类:

case类ProcessStringMsg(string: string)
case类StringProcessedMsg(字:整型)
 
类StringCounter演员扩展演员 {
  def receive = {
    case ProcessStringMsg(string) => {
      val wordsInLine = string.split(" ").长度
      发送方 ! StringProcessedMsg(wordsInLine)
    }
    case _ => println("Error: 消息 not recognized")
  }
}

这个演员有一个非常简单的任务:消费 ProcessStringMsg 消息(包含一行文本), 计算指定行上的单词数, 并将结果通过a返回给发送方 StringProcessedMsg 消息. 请注意,我们已经将类实现为使用 ! (“tell”) method to 发送 the StringProcessedMsg 消息 (i.e.,发送消息并立即返回).

好了,现在让我们把注意力转向父母 WordCounter演员 类:

1.  case类StartProcessFileMsg()
2.
3.  类WordCounter演员(文件名:String)扩展演员 {
4.  
5.    private var running = false
6.    private var totalLines = 0
7.    private var linesProcessed = 0
8.    private var 结果 = 0
9.    private var fileSender: Option[演员Ref] = None
10.  
11.   def receive = {
12.     case StartProcessFileMsg() => {
13.       if (running) {
14.         // println仅用于示例目的;
15.         //应该使用Akka记录器
16.         println("警告:收到重复的启动消息")
17.       } else {
18.         running = true
19.         fileSender = Some(发送方) //保存对进程调用者的引用
20.         import scala.io.Source._
21.         fromFile(filename).getLines.foreach { line =>
22.           context.演员Of(道具(StringCounter演员)) ! ProcessStringMsg(line)
23.           totalLines += 1
24.         }
25.       }
26.     }
27.     case StringProcessedMsg(words) => {
28.       结果 += words
29.       linesProcessed += 1
30.       if (linesProcessed == totalLines) {
31.         fileSender.地图(_ ! Result) //向进程调用者提供结果
32.       }
33.     }
34.     case _ => println("消息 not recognized!")
35.   }
36. }

这里发生了很多事情,所以让我们更详细地检查每一个 (请注意,下面讨论中引用的行号是基于上面的代码示例的)

首先,注意要处理的文件的名称被传递给 WordCounter演员 constructor (line 3). 这表明参与者仅用于处理单个文件. 这也简化了开发人员的编码工作, 通过避免重置状态变量(running, totalLines, linesProcessed, 结果),因为实例只被使用一次(i.e.(处理单个文件),然后丢弃.

Next, observe that the WordCounter演员 h和les two types of 消息s:

  • StartProcessFileMsg (line 12)
    • 从初始化的外部参与者接收 WordCounter演员.
    • When received, the WordCounter演员 首先检查它是否没有接收到冗余请求.
    • If the request is redundant, WordCounter演员 生成一个警告,不再执行任何操作(第16行).
    • 如果请求不是多余的:
      • WordCounter演员 控件中存储对发送方的引用 fileSender 实例变量(注意,这是一个 Option[演员Ref] rather than an Option[演员] - see line 9). 这 演员Ref 是否需要在处理最终结果时访问和响应它 StringProcessedMsg (which is received from a StringCounter演员 child, as described below).
      • WordCounter演员 然后读取文件,在加载文件中的每一行时,a StringCounter演员 创建子节点,并将包含要处理的行的消息传递给它(第21-24行)。.
  • StringProcessedMsg (line 27)
    • Received from a child StringCounter演员 当它完成处理时,分配给它的行.
    • When received, the WordCounter演员 增加文件的行计数器,如果文件中的所有行都已处理(i.e.,当 totalLineslinesProcessed 是相等的),它将最终结果发送给原始 fileSender (lines 28-31).

再次注意,在Akka中,参与者间通信的唯一机制是消息传递. 消息是参与者之间唯一共享的东西, 因为参与者可能并发地访问相同的消息, 对它们来说,不可变是很重要的, 以避免竞争条件和意外行为.

Case classes 是通过模式匹配提供递归分解机制的常规类.

因此,通常以case类的形式传递消息,因为它们在默认情况下是不可变的,并且由于它们与模式匹配的无缝集成.

让我们用运行整个应用程序的代码示例来结束这个示例.

object Sample extends App {
 
  import akka.跑龙套.超时
  import scala.concurrent.duration._
  import akka.模式.问
  import akka.dispatch.ExecutionContexts._
  
  implicit val ec = global
  
  重载def main(args: Array[String]) {
    val 系统 = 演员System(" 系统 ")
    val 演员 = 系统.演员Of(道具(新WordCounter演员 (args (0))))
    隐val timeout =超时(25秒)
    val Future = 演员 ? StartProcessFileMsg()
    Future.map { 结果 =>
      println("总字数" +结果)
      系统.shutdown
    }
  }
}
In concurrent programming, “Future”本质上是一个占位符对象,用于表示未知的结果.

Notice how this time the ? 方法用于发送消息. 这样,调用者就可以使用返回的 Future 在可用时打印最终结果,并通过关闭演员System退出程序.

Akka容错和管理者策略

在演员系统中,每个演员都是其子系统的监督者. 如果参与者未能处理消息, 它将自己及其所有子进程挂起,并发送一条消息, 通常以异常的形式出现, to its supervisor.

In Akka, 管理器策略是定义系统容错行为的主要和直接的机制.

In Akka, 管理器对从子进程渗透到它的异常作出反应和处理的方式称为管理器策略. Supervisor strategies 定义系统容错行为的主要和直接的机制是什么.

当表示失败的消息到达主管时,它可以采取以下操作之一:

  • 恢复子节点(及其子节点),保持其内部状态. 当子状态没有被错误破坏并且可以继续正常工作时,可以应用此策略.
  • 重新启动子进程(及其子进程),清除其内部状态. 此策略可用于与刚才描述的相反的场景. 如果子状态已被错误损坏, 在将来使用它之前,有必要重置它的状态.
  • 永久停止子进程(及其子进程). 在错误条件被认为是不可纠正的情况下,可以采用这种策略, 但不会危及正在执行的其余操作, 在没有失败孩子的情况下,哪些可以完成.
  • 停止自身并升级错误. 当主管不知道如何处理失败并将其升级到自己的主管时使用.

Moreover, 演员可以决定只对失败的子节点或它的兄弟节点应用操作. 这里有两个预先定义的策略:

  • OneForOneStrategy:只对失败的子节点应用指定的操作
  • AllForOneStrategy:将指定的操作应用于它的所有子操作

下面是一个简单的例子 OneForOneStrategy:

import akka.演员.OneForOneStrategy
import akka.演员.SupervisorStrategy._
import scala.concurrent.duration._
 
overoverval overoverval superorstrategy =
 OneForOneStrategy() {
   case _: ArithmeticException      => Resume
   case _: NullPointerException     => Restart
   case _: IllegalArgumentException => Stop
   case _: Exception                => Escalate
 }

如果不指定策略,则采用如下默认策略:

  • 如果在初始化参与者时出现错误,或者参与者被杀死,则会停止该参与者.
  • 如果存在任何其他类型的异常,则简单地重新启动演员.

akka提供的这个默认策略的实现如下:

defaultStrategy: superorstrategy = {
  def defaultDecider: Decider = {
    case _: 演员InitializationException⇒停止
    case _: 演员killledexception⇒停止
    case _: Exception:重启
  }
  OneForOneStrategy () (defaultDecider)
}

Akka允许实现 custom supervisor strategies, 但正如阿卡文件所警告的那样, 这样做要谨慎,因为不正确的实现可能会导致诸如阻塞参与者系统(i.e. permanently suspended 演员s).

Location transparency

The Akka architecture supports location transparency,使参与者完全不知道他们收到的消息来自哪里. 消息的发送方可能与参与者位于相同的JVM中,也可能位于单独的JVM中(运行在相同节点或不同节点上)。. Akka允许以一种对参与者(因此对开发人员)完全透明的方式处理这些情况。. 唯一需要注意的是,跨多个节点发送的消息必须是可序列化的.

Akka架构支持位置透明性,使参与者完全不知道他们收到的消息来自哪里.

参与者系统被设计为在分布式环境中运行,而不需要任何专门的代码. Akka只需要配置文件(application.相依),它指定要发送消息的节点. 下面是一个简单的配置文件示例:

akka {
  演员{
    provider = "akka.远程.Remote演员RefProvider"
  }
  远程 {
    transport = "akka.远程.网状的.NettyRemoteTransport"
    网状的{
      hostname = "127.0.0.1"
      port = 2552
    }
  }
}

A few parting tips…

我们已经看到Akka框架是如何帮助实现并发性和高性能的. However, as this tutorial pointed out, 为了充分利用Akka的力量,在设计和实施系统时要记住以下几点:

  • To the grea测试 extent possible, 每个参与者应该分配尽可能小的任务(如前所述), following the 单一责任原则)
  • 演员s should h和le 事件s (i.e., 异步处理消息,不应该阻塞, 否则将发生上下文切换,从而对性能产生不利影响. 具体来说,最好执行阻塞操作(IO等).) in a Future so as not to block the 演员; i.e.:

      case evt => blockingCall() // BAD
      case evt => Future {
        blockingCall() //好
      }
    
  • 确保您的消息都是不可变的, 因为相互传递它们的参与者都将并发地在自己的线程中运行. 可变消息很可能导致意外行为.
  • 因为在节点之间发送的消息必须是可序列化的, 重要的是要记住,消息越大, 序列化的时间就越长, 发送, deserialize them, 哪些会对性能产生负面影响.

Conclusion

Akka, written in Scala, 简化和促进高度并发的开发, distributed, 以及容错应用程序, 对开发人员隐藏了许多复杂性. 完全公正地对待Akka需要的远不止这一篇教程, 但希望这个介绍和它的例子足够吸引你,让你想要读更多.

亚马逊、VMWare和CSC只是积极使用Akka的领先公司的几个例子. Visit the official Akka website 以了解更多信息,并探索Akka是否也可以成为您项目的正确答案.

聘请Toptal这方面的专家.
Hire Now
Diego Castorina's profile image
Diego Castorina

Located in Prague, Czech Republic

Member since March 5, 2014

关于 the author

迭戈一直是意大利电信巨头Italtel等公司的高级自由工程师. 他还共同创立了一家基于网络的CRM业务.

Toptal作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.

Expertise

Years of Experience

17

世界级的文章,每周发一次.

订阅意味着同意我们的 privacy policy

世界级的文章,每周发一次.

订阅意味着同意我们的 privacy policy

Toptal Developers

Join the Toptal® community.