使用Lambda拉伸Async/Await


如果使用集合流水线[ 1 ]map(…->…).reduce(…->…)而不是for(…){……}语句块,则不仅可以作为一种时尚决策,而且可以提高整体吞吐量(如果涉及异步操作)。我们将展示一个简单的用例,它请求一个URL列表并求和响应体的长度,以及使用或不使用lambda可能如何导致并发或顺序计算[ 5 ]

请注意,我们并不是在要求并行性[ 6 ],例如Stream.parallel()功能,而是可以用于“将工作负载分解为独立的,粗粒度的任务(例如用户请求)的并发性” ,目的是通过处理多个任务来提高吞吐量。请求“[ 7 ])。

我们将以4种不同的编程语言(Java,Javascript,C#和Kotlin)分析此用例,以及它们的异步习惯用法之间的细微差异如何使相似的用法产生完全不同的结果。

介绍 我完全同意lambda不是函数式编程的普遍接受的观点[ 3 ]。因此,我将不再关注陈述每种编程范例的形式主义,而是关注编写具有可读性,效率和可伸缩性之间的最佳关系的代码的能力。

因此,对于许多开发人员而言,编写numbers.filter(::isPrime).map(::toString).forEach(::doSomething)而不是编写for(var nr in numbers){ if(isPrime((nr)) doSomething(nr)}只是一种风格上的决定。有些人可能同意前者具有更好的可读性,而另一些人则声称后者的效率。实际上,使用收集管道[ 1 ](就像前面的示例一样)可能会导致额外的性能开销。然而,许多其他人则认为,在衡量现实生活中的表现时,这些差异几乎不会引起注意,我们应该专注于哪种方法可以利用功能组合提供更多的灵活性和可扩展性。

在这里,我们带来另一个考虑:这些计算是否异步?如果它们是异步的,那么我们应该担心执行进度。每种选择都可能导致并发或顺序计算,进而影响总体吞吐量。

如果顺序无关紧要,并且我们正在处理具有数百万个元素的大型集合,那么同时进行计算可能会导致总执行时间相差几个数量级。正如Bryan Goetz所明确指出的那样,“并发主要是异步的-允许活动在等待I / O完成的同时放弃CPU。 ” [ 7 ]。

为了评估使用不同的编程习惯来处理并发的影响,我们采用了问题“ C#async / await的Java等价” [ 10 ]中提出的用例,该用例获得了对HTTP请求的主体响应的长度。 。

现在,考虑一下,例如,我们要对通过URL序列的HTTP获取得到的主体长度求和。使用收集管道方法,我们可以通过以下Javascript示例(简化版本)实现所需的行为:

const sum = urls.map(url => fetch(url)).reduce((prv, cur) => prv.then(p => cur.then(c => p.length + c.length)))

在此,fetch操作返回Promise[ 4 ](以下的Javascript相当于爪哇CompletableFuture)与方法then(提供的两个行为thenApply和thenCompose的CompletableFuture)。我们仍然可以利用async/await惯用法来简化每个管道的结果来简化每个管道的结果Promise,而不是使用链式then调用:

const sum = urls.map(url => fetch(url)).reduce(async (prv, cur) => await prv.length + await cur.length)

两种版本的管道fetch同时执行所有操作,并且在分派所有请求之后,它将通过reduce操作累积主体长度。

但是,现在,如果我们将此管道转换为等效for循环,则可能会出现意外情况。以下程序的执行顺序执行fetch操作,而不是同时执行。

let sum = 0; 
for (const url of urls) { 
  const body = await fetch(url); 
  sum += body.length 
}

如果我们按顺序处理1,000个URL ,并且每个请求的平均延迟为20毫秒,那么我们将花费大约20秒的时间来计算所有正文的长度。而请求这些网址的同时,为我们与没有收集管道版本可能需要大约20毫秒。

之所以在使用收集管道习惯用法和for循环之间观察不同的行为,是因为这些程序并不是真正等效的。实际上,我们正在使用来自JavaScript数组的收集管道API,该API正在处理Eagerly元素。因此,每个中间操作(例如map,filter等)遍历整个集合的所有元素应用所述下一个操作之前。为了实现等效执行,我们应该编写以下版本:

const resps = []
for (const url of urls) { 
    resps.push(fetch(url))
}
let sum = 0;
for (const r in resps) {
    const body = await r
    sum += body.length
}

在下面的内容中,我将解释不同的编程习惯会如何影响应用程序进度,以及如何使用不同的编程语言(包括Java,Javascript,C#Kotlin),每种语言都会建立不同的抽象模型来处理异步编程。Java提供了CompletableFutureAPIJavascript和C#提供了async/await习惯用语,而Kotlin引入了suspend功能。

无论哪种编程语言都具有异步模型特质,这4种语言都在Promise [ 4 ]的概念之上构建了它们的异步抽象,Promise [ 4 ]代表了异步计算的结果。

对于Kotlin而言,这并非完全正确,因为可以将原生Kotlin操作实现为原生挂起函数,从而避免使用promises。但是,在Kotlin和Java之间的大多数互操作性场景中,我们将看到最内部的异步操作返回a CompletableFuture。这也发生在我们的用例中,HttpClient在Java和Kotlin程序中都使用(包含在Java 11中)来获取URL。

简而言之,我们将观察到不同的效果,具体取决于每种异步抽象的使用以及是否使用lambda来实现一个对HTTP响应的主体长度进行求和的函数fetchAndSum(urls)。下表总结了我们将在本文中介绍的实验性观察路线图:

FETCHANDSUM(URLS: ARRAY) COMPLETABLEFUTURE ASYNC/AWAIT SUSPEND
collection pipeline and lambdas Concurrent Concurrent Sequential
single loop with async/await NA Sequential Sequentia

请注意,该表仅恢复了我们对的特定用例的观察,fetchAndSum(urls)并且几乎仅是本文的目录。您不应将此表视为建立由将不同的异步模型与lambda结合使用而产生的必然结果。

CompletableFuturefetchAndSum(urls)使用Java 11提供的异步操作进行实现之前,HttpClient我们将首先介绍同步实现。为此,我们将使用前者java.net.URL执行HTTP get请求。

这样,在下面的实现中fetchAndSumBlocking(urls)我们可以识别出四个操作:1)获取URL; 2)获取URL。2)读取HTTP响应正文,3)获取正文的长度,以及4)累加长度。操作1和2执行I / O,而操作3和4涉及读取,添加和设置计算。

static int fetchAndSumBlocking(String...urls) throws MalformedURLException, IOException {
    int sum = 0;
    for(var url : urls) {
        InputStream resp = new URL(url).openStream();                 // 1 - Fetch the url
        String body = new BufferedReader(new InputStreamReader(resp)) // 2 - Read the body
            .lines()
            .collect(joining("\n"));
        int length = body.length();  // 3 – Get body’s length
        sum += length;               // 4 - Sum lengths
    }
    return sum;
}

在前一个示例中,所有操作都是同步进行的。当我们调用诸如此类的函数openStream()时,它仅在该动作完成后才返回,并且可以返回其结果。这意味着迭代循环仅在完成第四个操作后才前进到下一步sum += length。因此,每个操作执行的动作是一个接一个地发生的,每个操作都消耗前一个操作产生的结果

另一方面,当我们调用异步操作时,它会在封闭操作完成之前返回。因此,它没有返回操作结果,而是返回表示可能在某个点完成并产生结果的异步操作的承诺。操作完成后,将通知promise并访问结果(例如,来自HTTP get请求的响应)。为了与Promise进行交互,我们可以使用其方法,该方法注册回调以接收其最终结果。考虑到产生结果的Promise ,以前我们可能会使用它来注册一个回调then()prp.then(r -> ...)r -> ...接收r来自的结果p。在这里,我们作两个简化:

无论不同的方法前缀then(如thenApply,thenCompose,thenAccept,等),我们将简单地表示他们作为then()。 我们不会处理错误和异常完成。 我们还可以使用该方法then()注册一个回调,该回调接收来自两个promise的结果。在这种情况下,考虑p1与p2这产生两种结果的两个承诺r1和r2,我们可以使用p1.then(p2, (r1, r2) -> ...)注册一个回调函数(r1, r2) -> ...接收两个承诺的结果p1p2(这是法的作用thenCombineCompletableFuture)。请注意,当两个诺言都得到满足时,才调用回调。

因此,使用异步API执行HTTP get请求可能会导致对响应的承诺,而不是具体的响应本身。为了用Java执行异步HTTP请求,我们可以使用java 11 HttpClient,它也可用于Kotlin。这些方法返回一个实例,CompletableFuture该实例实现了promise [ 4 ]的概念。

为简化起见,请在下面的示例中考虑我们可以访问以下静态成员:

static HttpClient httpClient = HttpClient.newHttpClient();
static HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
static HttpRequest request(String url) {
     return requestBuilder.uri(URI.create(url)).build();
}

利用HttpClient和CompletableFutureAPI,我们可以fetchAndSum(urls)通过以下方式编写的异步实现:

CompletableFuture<Integer> fetchAndSum(String...urls) {
    var sum = CompletableFuture.completedFuture(0);
    for(var url : urls) {
        var prev = sum;
        sum = httpClient
            .sendAsync(request(url), BodyHandlers.ofString()) // 1 - Fetch the url
            .thenApply(HttpResponse::body)                    // 2 - Read the body
            .thenApply(String::length)                        // 3 – Get body’s length
            .thenCombine(sum, (length, val) -> {              // 4 - Sum lengths
                return val + length;
            });
    }
    return sum;
}

由于所有请求都是异步的,因此迭代循环会在前一个请求完成之前前进到下一个请求,因此可以并行执行。通过sendAsync()结果执行的每个请求都会产生一个新请求CompletableFuture,并...::body在响应完成后继续执行。最后,我们使用thenCombine组合(aka zip)的先前结果sum与的新结果body.length()。

该fetchAndSum可以根据收集管道被改写:

CompletableFuture<Integer> fetchAndSum(String...urls) {
    var sum = CompletableFuture.completedFuture(0);
    return Stream
            .of(urls)
            .peek(url -> out.printf("FETCHING from %s\n", url))
            .map(url -> httpClient
                .sendAsync(request(url), BodyHandlers.ofString()) // 1 - Fetch the url
                .thenApply(HttpResponse::body)                    // 2 - Read the body
                .thenApply(String::length)                        // 3 – Get body’s length
                .whenComplete((l, err) -> out.printf("=======> from %s\n", url)))
            .reduce(sum, (prev, curr) -> prev
                .thenCombine(curr, (p, c) -> p + c));             // 4 - Sum lengths
}

如果out.printf("FETCHING from %s\n", url);在fetch语句之前插入,并out.printf("=======> from %s\n", url);在添加length到之前,则可以明显观察到每种方法的顺序和并发结果sum。

使用一系列方法来运行两种方法 `["https://stackoverflow.com/","https://github.com/",

"http://dzone.com/"] `我们可能会得到以下输出:

fetchAndSumBlocking(urls): fetchAndSum(urls):
FETCHING from https://stackoverflow.com/ =======> from https://stackoverflow.com/ FETCHING from https://github.com/ =======> from https://github.com/ FETCHING from http://dzone.com/ =======> from http://dzone.com/ FETCHING from https://stackoverflow.com/ FETCHING from https://github.com/ FETCHING from http://dzone.com/ =======> from http://dzone.com/ =======> from https://stackoverflow.com/ =======> from https://github.com/

您可以在这里找到此示例的源代码:https : //github.com/javasync/async-await-idioms/tree/master/java

async/await idiom 如今,许多编程语言(例如Javascript,Python或C#)都提供async/await关键字,使您可以隐藏对promises的明确使用,并允许您编写不带lambda的“伪同步”代码。通常,“ async/ await”允许以类似于普通同步功能的方式构造异步,非阻塞功能” [ 8 ]。关于Javascript和由Marijn Haverbeke在其显着的书中解释能言善辩的JavaScript:“一个async函数是一个函数,隐式返回一个承诺,并且可以在其身上,await对方兑现承诺的方式,看起来是同步的。 ” [ 9 ]。

因此,我们可以轻松地将Java实现转换为fetchAndSumBlocking(urls)利用的asyncJavascript函数fetchAndSum(urls),await而不会导致阻塞使用。我们可以从使用异步fetch()函数开始执行HTTP get请求开始,此后,我们可以使用await来获得所产生的Promise的实现值。这等效于传递给上一部分中的then()方法的lambda执行的操作,但此处未使用lambda,如以下的实现所示fetchAndSum(urls)

async function fetchAndSum(urls) {
    let sum = 0
    for (const url of urls) {
        const res = await fetch(url)  // 1 - Fetch the url
        const body = await res.text() // 2 - Read the body
        const length = body.length    // 3 – Get body’s length
        sum += length                 // 4 - Sum lengths
    }
    return sum
}

每次操作返回承诺时,我们都会使用await来获得其结果。此实现放弃了对promise和lambda的显式使用,它们以类似于同步功能的方式构造,但保留了异步功能的非阻塞性质。运行以下程序可以很容易地观察到异步行为:

const urls = [ "https://stackoverflow.com/", "https://github.com/", "http://dzone.com/"]
fetchAndSum(urls).then(sum => console.log(`Sum = ${sum}`))
console.log("fetchAndSum(urls) dispatched!")

可能产生输出:

fetchAndSum(urls) dispatched!
Sum = 338742

我们可以看到最后一条console.log("fetchAndSum(urls) dispatched!")语句的结果是在完成之前产生的fetchAndSum(urls),代表该函数的非阻塞行为,否则我们将以相反的顺序观察消息。乍一看,这是async/await使用带来的不错结果。但是,存在一个局限性:不能同时获取url

在第一个await语句上,函数将立即继续将a返回Promise给其调用方。之后,fetchAndSum仅当第一个promise解析后,才会继续执行will,然后继续执行下一条语句。

为了实现并发执行,我们必须将await最后一个fetch操作之后的第一条语句推迟到给定的urls数组中。这样,当方法fetchAndSumfirst上返回时,所有提取操作都可以同时进行await。要实现此行为,我们可以先从中间数组中的所有提取操作中收集所得的Promise,然后再await针对的下一个实现中提供的响应进行收集fetchAndSumConcur

async function fetchAndSumConcur(urls) {
    let sum = 0
    let promises = []
    for (const url of urls) {
        promises.push(fetch(url))       // 1 - Fetch the url
    }
    for (let i = 0; i < urls.length; i++) {
        const resp = await promises[i]  
        const body = await resp.text()  // 2 - Read the body
        const length = body.length      // 3 – Get body’s length
        sum += length                   // 4 - Sum lengths
   }
    return sum
}

如果在fetch语句之前插入a和在添加到之前将a插入,我们可以很容易地观察到这些函数之间的差异。用一个数组运行两个函数console.log(FETCHING from ${url})console.log(=======> from ${urls[i]}) lengthsum["https://stackoverflow.com/","https://github.com/",

"http://dzone.com/"]可能会产生以下输出:

fetchAndSum(urls): fetchAndSumConcur(urls):
FETCHING from https://stackoverflow.com/ =======> from https://stackoverflow.com/ FETCHING from https://github.com/ =======> from https://github.com/ FETCHING from http://dzone.com/ =======> from http://dzone.com/ FETCHING from https://stackoverflow.com/ FETCHING from https://github.com/ FETCHING from http://dzone.com/ =======> from https://stackoverflow.com/ =======> from https://github.com/ =======> from http://dzone.com/
`

查看该块,for (const url of urls) promises.push(fetch(url))我们意识到这等效于map()在urls数组上使用该函数,例如,urls.map(url => fetch(url))使其变得不太冗长。这是使用收集管道而不是两个for块的起点。因此,我们也可以for用另一个块替换第二个块,map该块将每个响应转换为相应的长度。最后,我们可以sum通过reduce()实现以下实现的操作来收集fetchAndSumλ

async function fetchAndSumλ(urls) {
    return urls
        .map(url => fetch(url))            // 1 - Fetch the url
        .map(async (promise, i) => {
            const resp = await promise
            const body = await resp.text() // 2 - Read the body
            return body.length             // 3 – Get body’s length
        })
        .reduce(async (l1, l2) => {
            return await l1 + await l2     // 4 - Sum lengths
        })
}

请注意,第二个lambda是一个async函数,而前一个不是。在async当我们需要使用,需要await得到一个承诺,这不前拉姆达发生的最终值。第一个map()只分派HTTP get请求,而没有指定如何处理它们的响应。在第二秒的后面,map()我们继续用an处理每个响应,await然后它返回带有每个响应的正文长度的新promise。最后,由于前一个map()结果产生了一个promise数组,所以下一个reduce()也应该使用一个async函数来累加长度。

我们可以进一步合并两个map操作,map并实现以下实现,fetchAndSumλ该实现仍然是并发的,并且布局与前者非常相似fetchAndSum。重要的是要注意,fetchAndSumλ在返回之前将收集管道进行到完成。在这种情况下,await指令是相对于内部asynclambda而不是相对于外部fetchAndSumλ

对于每次迭代,map()我们都调用一个内部async函数,该函数立即在每个上返回await。从最内部的async函数返回后,外部函数将恢复到的下一个迭代,map()然后继续进行另一个迭代,fetch依此类推,从而导致并发请求。

async function fetchAndSumλ(urls) {
    return urls
        .map(async (url, i) => {
            const resp = await fetch(url)  // 1 - Fetch the url
            const body = await resp.text() // 2 - Read the body
            return body.length             // 3 – Get body’s length
        })
        .reduce(async (l1, l2) => {
            return await l1 + await l2     // 4 - Sum lengths
        })
}

fetchAndSum并且fetchAndSumλ具有相同的行数和相似的操作顺序。两者都利用了async/await惯用语,但后者fetchAndSumλ仍在收集管道中使用lambda。尽管乍一看它们看起来是解决相同问题的两个等效选项,但是现在我们知道它们实现了不同的进展,如果处理大量的url,则吞吐量可能会有所不同。

如果我们在C#中复制两个实现,则可以观察到相同的行为,C#还提供了async/await功能。以下清单显示了与Java语言非常接近的C#对应实现。主要区别在于命名map(…).reduce(…)与对应的位置Select(…).Aggregate(….)。返回类型Task是.NetPromise概念的实现,等效于Javascript Promise

static async Task<int> FetchAndSum(string[] urls) {
    int sum = 0;
    using(HttpClient httpClient = new HttpClient()) {
        foreach(var url in urls) {
            var body = await httpClient.GetStringAsync(url);
            sum += body.Length;
        }
    }
    return sum;
}

static async Task<int> FetchAndSumλ(string[] urls) {
    using(HttpClient httpClient = new HttpClient()) {
        return await urls
            .Select(async url => {
                var body = await httpClient.GetStringAsync(url);
                return body.Length;
            })
            .Aggregate(async (l1, l2) => await l1 + await l2);
    }
}

两种实现都显示了与JavaScript实现相同的结果。出于相同的原因,前者FetchAndSum(urls)顺序FetchAndSumλ(urls)执行,而同时执行。我们可以通过在同一urls数组中运行这些函数来轻松观察这种行为。

您可以从以下位置提供的源中尝试两种实现:

Suspend 功能 Kotlin通过协程提供异步功能。这不仅导致在使用for块而不是在收集管道之间做出二元决策,而且还导致了协程作用域范围的边界。而且,正如我们将在下面看到的那样,Kotlin的内联功能也可能会影响最终的行为。

因此,首先从fetchAndSum(urls)fetchAndSumλ(urls)Kotlin的最接近的翻译开始,并使用其对应suspend函数代替async。然后,我们将观察到与Java,Javascript和C#中看到的结果的第一个主要区别。现在,这两种实现都按顺序执行。对于那些不太熟悉kotlin的人,以下列表中的主要特殊之处与使用在{...}和之间表示的lambda有关.await(),这是一种扩展方法,而不是关键字:

suspend fun fetchAndSum(urls: Array<String>): Int {
    var sum = 0
    for (url in urls) {
       val resp = httpClient
            .sendAsync(request(url), BodyHandlers.ofString())
            .await()
        sum = sum + resp.body().length
    }
    return sum
}
suspend fun fetchAndSumλ(urls: Array<String>): Int {

    return urls
            .map { url ->
                val resp = httpClient
                        .sendAsync(request(url), HttpResponse.BodyHandlers.ofString())
                        .await()
                resp.body().length
            }
            .reduce { l1, l2 -> l1 + l2 }
}

的实现fetchAndSum(urls)和都会fetchAndSumλ(urls)导致类似的行为,因为集合的方法(例如Array)是在编译时内联的。因此,两种实现方式的翻译都是从kotlin源代码的翻译产生的。这意味着lambda的转换不会产生任何函数,{url -> …}并且.await()调用它是相对于外部函数的fetchAndSumλ,这与前者完全相同fetchAndSum(urls)

但是,如果将urls类型更改为Sequence而不是,则会Array得到不同的效果。由于序列表示延迟评估的集合,因此无法内联其效用方法,并且{url -> …}必须将lambda转换为匿名函数。然而,这种修改引起另一个问题。由于map()接收到常规函数(不是a suspend),因此我们无法.await()在该lambda内部使用。

为此,我们需要使用协程生成器,该协程生成器将在新协程中运行给定的lambda并返回其执行的承诺。在这种情况下,我们将使用生成async()器,该生成器返回的实例Deferred,这是promise概念的kotlin实现。相同的修改出现在传递给的lambda中,reduce()如以下清单所示:

suspend fun CoroutineScope.fetchAndSumλ(urls: Sequence<String>): Int {
    return urls
            .map{url -> async {
                    val resp = httpClient
                            .sendAsync(request(url), HttpResponse.BodyHandlers.ofString())
                            .await()
                    resp.body().length
            }}
            .reduce{ l1, l2 -> async {
                l1.await() + l2.await()
            }}
            .await();
}

最后,async()构建器是扩展方法,CoroutineScope它需要正确调用特定的目标(在kotlin中也称为接收器)target.async{…}。如果我们也将自己声明fetchAndSumλ为的扩展方法,CoroutineScope则可以禁止在async()调用中显式使用接收器,而该调用是从外部函数推断出来的。

现在运行更高版本的代码,fetchAndSumλ(urls)我们已经实现了并发执行,类似于在JavaScriptC#中观察到的并发执行。

您可以在这里找到这些示例的源代码:https : //github.com/javasync/async-await-idioms/tree/master/kotlin

结论 可以编写用于编写异步程序的以前的技术,从而使其难以编写,调试和维护。例如,在回调习语中管理lambda可能很乏味,并且容易导致广为人知的“回调地狱”。

幸运的是,今天许多编程语言提供了替代技术,使您几乎可以像创建同步方法一样轻松地创建异步方法。编译器完成了开发人员过去所做的艰巨工作,并且您的应用程序保留了类似于同步代码的逻辑结构。

但是,您不能忽略幕后发生的事情,否则您可能会对应用程序所取得的进展感到措手不及。

用于非阻塞IO操作(例如网络访问)的异步编程的目的之一。通过使用异步技术,我们可以让应用程序继续执行其他任务,直到潜在的阻塞任务完成为止。但是,要利用这种行为,我们必须以不依赖异步任务完成的方式编写其他非阻塞任务。否则,我们可能会强制执行顺序执行,这会阻止不同任务同时执行。

在本文中,我们以不同的编程语言展示了诸如使用for语句而不是收集管道之类的简单决策如何影响应用程序的整体进度。我们刻意开始使用for语句来解决问题,并避免使用lambda。通过这种方法,我们展示了我们如何陷入将一些解耦操作(即fetch url)按顺序链接在一起的情况,从而限制了函数的整体进度fetchAndSum。

最后,避免回调与避免lambda不同,并且使用不同的异步习惯用法不仅是编程范例之间的决定。充分利用您的大多数编程语言构造,不要忽略背后的实际情况。


原文链接:http://codingdict.com