Lua中的协程即协同程序(coroutine)。协程与传统的线程(thread)差不多,也是一条执行序列,拥有独立的栈、局部变量和指令指针,同时又与其它协程共享全局变量和其它大部分东西。二者的区别在于:一个具有多个线程的程序在多核的情况下可以同时运行几个线程,而协程却需要彼此协作地运行,即一个具有多个协程的程序在任意时刻只能运行一个协程,并且正在运行的协程只会在其显式地要求挂起(suspend)时,它的执行才会暂停。
一、协程基础
Lua将所有关于协程的函数放置在一个名为coroutine的table中。
1、创建协程
函数create用于创建新的协程,其只有一个参数,就是协程过程函数,这点同线程函数,该函数的代码就是协程所需执行的内容。
create会返回一个thread类型的值,用以表示新的协程,通常create的参数是一个匿名函数,例如:
co = coroutine.create(function() print("hi")) end)
print(co) -->thread: 0x8071d98
一个协程可以处于4种不同的状态:挂起(suspended)、运行(running)、死亡(dead)和正常(normal)。当创建一个协程时,其处于挂起状态,即协程不会在创建它时自动执行其内容,可以通过函数status来检查协程的状态:
print(coroutine.status(co)) -->suspended
2、运行协程
函数coroutine.resume用于启动或再次启动一个协程的执行,并将其状态由挂起改为运行:
coroutine.resume(co) -->hi
协程函数运行结束,那么协程也就退出了,协程也就处于死亡状态了,上例中,协程函数仅仅是打印hi后就结束了:
print(coroutine.status(co)) -->dead
3、协程yield
协程真正强大之处在于函数yield的使用上,该函数可以让一个运行中的协程挂起,而之后可以再恢复它的运行:
co = coroutine.create(function()
for i=1,10 do
print("co", i)
coroutine.yield()
end
end)
现在当唤醒这个协程时,它就会开始执行,直到第一个yield:
coroutine.resume(co) -->co 1
如果此时检查其状态,会发现协程处于挂起状态,因此,可以再次恢复其运行:
print(coroutine.status(co)) -->suspended
从协程的角度看,所有在它挂起时发生的活动都发生在yield调用中。当恢复协程执行时,对于yield的调用才最终返回。然后协程继续它的执行,直到下一个yield调用或执行结束:
coroutine.resume(co) -->co 2
coroutine.resume(co) -->co 3
...
coroutine.resume(co) -->co 10
coroutine.resume(co) --什么都不打印
在最后一次调用resume时,协程函数已经执行完毕并已经返回了,因此,这时协程处于死亡状态。如果试图再次恢复它的执行,resume将返回false及一条错误消息:
print(coroutine.resume(co))
-->false cannot resume dead coroutine
请注意,resume是在保护模式中运行的,因此,如果在一个协程执行中发生任何错误,Lua不会显示错误消息,而将执行权返回给resume调用。
当一个协程A唤醒另一个协程B时,协程A就处于一个特殊状态,既不是挂起状态(无法继续A的执行),也不是运行状态(是B在运行)。所以将这时的状态称为“正常”状态。
Lua协程还可以通过一对resume-yield来交换数据。在第一次调用resume时,并没有对应的yield在等待它,因此,所有传递给resume的额外参数都将视为协程函数的参数:
co = coroutine.create(function(a, b, c)
print("co", a, b, c)
end)
coroutine.resume(co, 1, 2, 3) -->co 1 2 3
在resume调用的返回值中,第一个值为true则表示没有错误,而后面所有的值都是对应yield传入的参数:
co = coroutine.create(function(a, b)
coroutine.yield(a+b, a-b)
end)
print(coroutine.resume(co, 20, 10)) -->true 30 10
与此对应的是,yield返回的额外值就是对应的resume传入的参数:
co = coroutine.create(function()
print("co", coroutine.yield())
end)
coroutine.resume(co, 20, 10)) -->co 20 10
最后,当一个协程结束时,它的主函数所返回的值都将作为对应resume的返回值:
co = coroutine.create(function()
return 6, 7
end)
print(coroutine.resume(co)) -->true 6 7
Lua的协程是一种“非对称的协程(asymmetric coroutine)”即Lua提供两个函数来控制协程的执行,一个用于挂起执行yield,另一个用于恢复执行resume,而像python提供的是一种“对称的协程(symmetric coroutine)”即用一个函数来完成协程执行权的挂起和恢复。非对称协程也称为semi-coroutine即受限的协程也就是只有协程主函数才能调用类似于yield这样的函数来挂起执行权。当然在Lua协程函数中每次改变执行权时连续地调用一次yield和一次resume即可实现出对称协程。
二、管道与过滤器
1、生产者/消费者
协程的经典示例是“生产者-消费者”问题,生产者-消费者用C语言和socket如何来实现在此就不说了,下面介绍如何用协程来解决此问题:
--消费者
function receive()
local status, value = coroutine.resume(producer) --消费
return value
end
--生产者
function send(x)
coroutine.yield(x) --挂起
end
--协程
producer = coroutine.create(
function()
while true do
local x = io.read() --产生新值
send(x)
end
end)
在这种设计中,程序通过调用消费者来启动,当消费者需要一个新值时,它唤醒生产者。生产者返回一个新值后停止运行,并等待消费者的再次唤醒。将这种设计称为“消费者驱动(consumer-driven)”。
2、过滤器
对前述生产者-消费者设计进行扩展,就能实现“过滤器(filter)”。过滤器是一种位于生产者和消费者之间的处理功能,可用于对数据的一些变换。过滤器既是一个消费者又是一个生产者,它唤醒一个生产者促使其产生新值,然后又将变换后的值传递给消费者。下面举例说明协程在过滤器中的应用,该过滤器在每行起始处插入一个行号:
function receive(prod)
local status, value = coroutine.resume(prod)
return value
end
function send(x)
coroutine.yield(x)
end
function producer()
return coroutine.create(function()
while true do
local x = io.read() --产生新值
send(x)
end
end)
end
function filter(prod)
return coroutine.create(function()
for line=1, math.huge do
local x = receive(prod) --获取新值
x = string.format("%5d %s", line, x)
send(x) --将新值发送给消费者
end
end)
end
function consumer(prod)
while true do
local x = receive(prod) --获取新值
io,write(x, "\n") --消费新值
end
end
接下来创建运行代码就非常简单了,只需要将这些函数串联起来,然后启动消费者:
p = producer()
f = filter(p)
consumer(f)
或者,更简单地写为:
consumer(filter(producer()))
在pipe中每项任务都在各自独立的进程中运行,而协程中每项任务都在各自独立的协程中运行。pipe在writer(消费者)与reader(生产者)之间提供一个缓冲器,因此,它们的运行速度允许存在一定差异。值得注意的是:在pipe中进程间的切换代价很高,而在协程中,切换代价则小得多,大致相当于一次函数调用的代价,因此,writer和reader可以彼此协作地运行。
三、以协程实现迭代器
将循环迭代器视为“生产者-消费者”模式的一个特例,一个迭代器会产生一些内容,而循环体则会消费这些内容。下面编写一个迭代器使其可以遍历某个数组的所有排列组合形式,以此来说明协程实现迭代器。
用一个递归函数来产生所有的排列组合,即只要将每个数组元素都依次放到最后一个位置,然后递归地生成其余元素的排列,代码如下:
function permgen(a, n)
n = n or #a --默认n为a的大小
if n <= 1 then --还需要改变吗
printResult(a) --用于打印排列组合
else
for i=1, n do
--将第i个元素放到数组末尾
a[n], a[i] = a[i], a[n]
--生成其余元素的排列
permgen(a, n-1)
--恢复第i个元素
a[n], a[i] = a[i], a[n]
end
end
end
function printResult(a)
for i=1, #a do
io.write(a[i], " ")
end
io.write("\n")
end
permgen({1, 2, 3, 4})
-->2 3 4 1
-->3 2 4 1
...
-->2 1 3 4
-->1 2 3 4
前面这些还只是用普通递归实现了此功能,如果要用协程来实现,还需对此进行改进:
--将permgen中的printResult改为yield
function pergman(a, n)
n = n or #a
if n <= 1 then
coroutine.yield(a)
else
--定义一个工厂函数用于将生成函数放到一个协程中运行,并创建迭代器函数,迭代器只是简单地唤醒协程,让其产生下一种排列:
function permutations(a)
local co = coroutine.create(function() permgen(a) end)
return function() --迭代器
local code, res = coroutine.resume(co)
return res
end
end
有了上面的函数,在for语句中遍历一个数组的所有排列就非常简单了:
for p in permutations(“a”, "b", "c") do
printResult(p)
end
-->b c a
-->c b a
...
-->b a c
-->a b c
permutations函数将一条唤醒协程的调用包装在一个函数中,这在Lua中是一种很常见的模式,因此,提供了coroutine.wrap函数来完成这个功能。类似于create,wrap创建了一个新的协程,但不同的是,wrap并不返回协程本身,而是返回一个函数。每当调用这个函数,即可唤醒一次协程。但这个函数与resume的不同之处在于:它不会返回错误代码,当遇到错误时,它会引发错误。如果使用wrap,可以这么写permutations:
function permutations(a)
return coroutine.wrap(function() permgen(a) end)
end
通常,coroutine.wrap比coroutine.create更易于使用。它提供了一个对于协程编程实际所需的功能,即一个可以唤醒协程的函数。但也缺乏灵活性,无法检查wrap所创建的协程状态,此外,也无法检测出运行时的错误。
四、非抢先式的多线程
协程提供了一种协作式的多线程,每个协程都等于是一个线程。一对yield-resume可以将执行权在不同线程之间切换,但是协程与多线程的区别在于:协程是非抢先式的,即当一个协程运行时,无法从外部停止协程,只有当协程显示地要求挂起时(调用yield)才会停止。
下面以协程来实现一个多线程并行下载文件的程序,本程序应用LuaSocket库通过HTTP从下载《HTML 3.2参考规范》:
--下载文件并计算文件大小
function download(host, file)
local c = assert(socket.connect(host, 80))
local count = 0 --记录接收到的字节数
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
while true do
local s, status, partial = receive(c)
count = count + #(s or partial)
if status == "closed" then break end
end
c:close()
print(file, count)
end
由于对远程文件的内容不感兴趣,所以不需要将文件内容写到标准输出中,只需要计算并打印出文件大小即可。
--非阻塞接收数据
function receive(connection)
connection:settimeout(0) --使receive调用不会阻塞
local s, status, partial = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)
end
return s or partial, status
end
对于settimeout(0)的调用可使以后所有对此连接进行的操作不会阻塞,若一个操作返回的status为“timeout(超时)”,就表示该操作在返回时还未完成。此时,线程就会挂起执行,而以非假的参数来调用yield,可以告诉调度程序线程仍在执行任务中。
threads = {} --用于记录所有正在运行的线程
function get(host file)
--创建协程
local co = coroutine.create(function()
download(host, file)
end)
--将其插入记录表中
table.insert(threads, co)
end
table threads为调度程序保存着所有正在运行的线程。函数get确保每个下载任务都在一个独立的线程执行。
function dispatch()
local i = 1
local connections = {}
while true do
if threads[i] == nil then --还有线程吗?
if threads[1] == nil then break end
i = 1 --重新开始循环
connections = {}
end
local status, res = coroutine.resume(threads[i])
if not res then --线程是否已经完成了任务?
table.remove(threads, i)
else --超时
i = i + 1
connections[#connections + 1] = res
if #connections == #threads then --所有线程都阻塞了吗?
socket.select(connections)
end
end
end
end
调度函数主要是一个循环,遍历所有线程,逐个唤醒它们的执行,并且当线程完成任务时,将该线程从列表中删除。在所有线程都完成运行后,停止循环。调度函数将所有超时的连接收集到一个名为connections的table中。receive函数会将超时的连接通过yield传递,也就是resume会返回它们,如果所有的连接都超时了,调度函数就调用select来等待这些连接的状态变化发生。
最后,主程序需要创建所有的线程,并调用调度函数,比如:若要下载W3C站点上的4个文件,主程序如下:
require "socket"
host = ""
get(host, "/TR/html401/html140.txt")
get(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
get(host, "/TR/REC-html32.html")
get(host, "TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
dispatch() --主循环
通过协程下载文件比顺序下载要快很多,具体取决于硬件设备。
阅读(7863) | 评论(7) | 转发(0) |