LUA教程非抢占式多线程-38

对非抢占式多线程来说,不管什么时候只要有一个线程调用一个阻塞操作(blocking operation),整个程序在阻塞操作完成之前都将停止。对大部分应用程序而言,只是无法忍受的,这使得很多程序员离协同而去。下面我们将看到这个问题可以被有趣的解决。

看一个多线程的例子:我们想通过http协议从远程主机上下在一些文件。我们使用Diego Nehab开发的LuaSocket库来完成。我们先看下在一个文件的实现,大概步骤是打开一个到远程主机的连接,发送下载文件的请求,开始下载文件,下载完毕后关闭连接。
第一,加载LuaSocket库

require "luasocket"

第二,定义远程主机和需要下载的文件名

host = "www.w3.org"
file = "/TR/REC-html32.html"

第三,打开一个TCP连接到远程主机的80端口(http服务的标准端口)

c = assert(socket.connect(host, 80))

上面这句返回一个连接对象,我们可以使用这个连接对象请求发送文件

c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")

receive函数返回他送接收到的数据加上一个表示操作状态的字符串。当主机断开连接时,我们退出循环。
第四,关闭连接

c:close()

现在我们知道了如何下载一个文件,下面我们来看看如何下载多个文件。一种方法是我们在一个时刻只下载一个文件,这种顺序下载的方式必须等前一个文件下载完成后一个文件才能开始下载。实际上是,当我们发送一个请求之后有很多时间是在等待数据的到达,也就是说大部分时间浪费在调用receive上。如果同时可以下载多个文件,效率将会有很大提高。当一个连接没有数据到达时,可以从另一个连接读取数据。很显然,协同为这种同时下载提供了很方便的支持,我们为每一个下载任务创建一个线程,当一个线程没有数据到达时,他将控制权交给一个分配器,由分配器唤起另外的线程读取数据。
使用协同机制重写上面的代码,在一个函数内:

function download (host, file)
    local c = assert(socket.connect(host, 80))
    local count = 0      -- counts number of bytes read
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
    while true do
       local s, status = receive©
       count = count + string.len(s)
       if status == "closed" then break end
    end
    c:close()
    print(file, count)
end

由于我们不关心文件的内容,上面的代码只是计算文件的大小而不是将文件内容输出。(当有多个线程下载多个文件时,输出会混杂在一起),在新的函数代码中,我们使用receive从远程连接接收数据,在顺序接收数据的方式下代码如下:

function receive (connection)
    return connection:receive(2^10)
end

在同步接受数据的方式下,函数接收数据时不能被阻塞,而是在没有数据可取时yield,代码如下:

function receive (connection)
    connection:timeout(0)    -- do not block
    local s, status = connection:receive(2^10)
    if status == "timeout" then
       coroutine.yield(connection)
    end
    return s, status
end

调用函数timeout(0)使得对连接的任何操作都不会阻塞。当操作返回的状态为timeout时意味着操作未完成就返回了。在这种情况下,线程yield。非false的数值作为yield的参数告诉分配器线程仍在执行它的任务。(后面我们将看到分配器需要timeout连接的情况),注意:即使在timeout模式下,连接依然返回他接受到直到timeout为止,因此receive会一直返回s给她的调用者。
下面的函数保证每一个下载运行在自己独立的线程内:

threads = {}      -- list of all live threads
function get (host, file)
    -- create coroutine
    local co = coroutine.create(function ()
       download(host, file)
    end)
    -- insert it in the list
    table.insert(threads, co)
end

代码中table中为分配器保存了所有活动的线程。
分配器代码是很简单的,它是一个循环,逐个调用每一个线程。并且从线程列表中移除已经完成任务的线程。当没有线程可以运行时退出循环。

function dispatcher ()
    while true do
       local n = table.getn(threads)
       if n == 0 then break end    -- no more threads to run
       for i=1,n do
           local status, res = coroutine.resume(threads[i])
           if not res then   -- thread finished its task?
              table.remove(threads, i)
              break
           end
       end
    end
end

最后,在主程序中创建需要的线程调用分配器,例如:从W3C站点上下载4个文件:

host = "www.w3c.org"
get(host, "/TR/html401/html40.txt")
get(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
get(host, "/TR/REC-html32.html")
get(host,
    "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
dispatcher()      -- main loop

使用协同方式下,我的机器花了6s下载完这几个文件;顺序方式下用了15s,大概2倍的时间。

尽管效率提高了,但距离理想的实现还相差甚远,当至少有一个线程有数据可读取的时候,这段代码可以很好的运行。否则,分配器将进入忙等待状态,从一个线程到另一个线程不停的循环判断是否有数据可获取。结果协同实现的代码比顺序读取将花费30倍的CPU时间。
为了避免这种情况出现,我们可以使用LuaSocket库中的select函数。当程序在一组socket中不断的循环等待状态改变时,它可以使程序被阻塞。我们只需要修改分配器,使用select函数修改后的代码如下:

 

在内层的循环分配器收集连接表中timeout地连接,注意:receive将连接传递给yield,因此resume返回他们。当所有的连接都timeout分配器调用select等待任一连接状态的改变。最终的实现效率和上一个协同实现的方式相当,另外,他不会发生忙等待,比起顺序实现的方式消耗CPU的时间仅仅多一点点。


发布日期:

所属分类: 易语言 标签: