In this interlude, we will see an implementation of a multithreading system on top of coroutines.
As we saw earlier, coroutines allow a kind of
collaborative multithreading.
Each coroutine is equivalent to a thread.
A pair yield–resume switches control from one thread to another.
However, unlike regular multithreading,
coroutines are non preemptive.
While a coroutine is running,
we cannot stop it from the outside.
It suspends execution only when it explicitly requests so,
through a call to yield.
For several applications, this is not a problem,
quite the opposite.
Programming is much easier in the absence of preemption.
We do not need to be paranoid about synchronization bugs,
because all synchronization among threads
is explicit in the program.
We just need to ensure that
a coroutine yields only when it is outside a critical region.
However, with non-preemptive multithreading, whenever any thread calls a blocking operation, the whole program blocks until the operation completes. For many applications, this behavior is unacceptable, which leads many programmers to disregard coroutines as a real alternative to conventional multithreading. As we will see here, this problem has an interesting (and obvious, with hindsight) solution.
Let us assume a typical multithreading situation: we want to download several remote files through HTTP. To download several remote files, first we must learn how to download one remote file. In this example, we will use the LuaSocket library. To download a file, we must open a connection to its site, send a request to the file, receive the file (in blocks), and close the connection. In Lua, we can write this task as follows. First, we load the LuaSocket library:
local socket = require "socket"
Then, we define the host and the file we want to download. In this example, we will download the Lua 5.3 manual from the Lua site:
host = "www.lua.org"
file = "/manual/5.3/manual.html"
Then, we open a TCP connection to port 80 (the standard port for HTTP connections) of that site:
c = assert(socket.connect(host, 80))
This operation returns a connection object, which we use to send the file request:
local request = string.format(
"GET %s HTTP/1.0\r\nhost: %s\r\n\r\n", file, host)
c:send(request)
Next, we read the file in blocks of 1 kB, writing each block to the standard output:
repeat
local s, status, partial = c:receive(2^10)
io.write(s or partial)
until status == "closed"
The method receive returns either a string with
what it read or nil in case of error;
in the latter case, it also returns an error code (status)
and what it read until the error (partial).
When the host closes the connection,
we print that remaining input and break the receive loop.
After downloading the file, we close the connection:
c:close()
Now that we know how to download one file,
let us return to the problem of downloading several files.
The trivial approach is to download one at a time.
However, this sequential approach,
where we start reading a file only after finishing the previous one,
is too slow.
When reading a remote file,
a program spends most of its time waiting for data to arrive.
More specifically, it spends most of its time blocked
in the call to receive.
So, the program could run much faster if
it downloaded all files concurrently.
Then, while a connection has no data available,
the program can read from another connection.
Clearly, coroutines offer a convenient way to structure these
simultaneous downloads.
We create a new thread for each download task.
When a thread has no data available,
it yields control to a simple dispatcher,
which invokes another thread.
To rewrite the program with coroutines, we first rewrite the previous download code as a function. The result is in Figure 26.1, “Function to download a Web page”.
Figure 26.1. Function to download a Web page
function download (host, file)
local c = assert(socket.connect(host, 80))
local count = 0 -- counts number of bytes read
local request = string.format(
"GET %s HTTP/1.0\r\nhost: %s\r\n\r\n", file, host)
c:send(request)
while true do
local s, status = receive(c)
count = count + #s
if status == "closed" then break end
end
c:close()
print(file, count)
end
Because we are not interested in the remote file contents, this function counts and prints the file size, instead of writing the file to the standard output. (With several threads reading several files, the output would shuffle all files.)
In this new code, we use an auxiliary function (receive)
to receive data from the connection.
In the sequential approach, its code would be like this:
function receive (connection)
local s, status, partial = connection:receive(2^10)
return s or partial, status
end
For the concurrent implementation, this function must receive data without blocking. Instead, if there is not enough data available, it yields. The new code is like this:
function receive (connection)
connection:settimeout(0) -- do not block
local s, status, partial = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)
end
return s or partial, status
end
The call to settimeout(0) makes any operation over the
connection a non-blocking operation.
When the resulting status is "timeout",
it means that the operation returned without completion.
In this case, the thread yields.
The non-false argument passed to yield
signals to the dispatcher that
the thread is still performing its task.
Note that, even in case of a timeout,
the connection returns what it read until the timeout,
which is in the variable partial.
Figure 26.2, “The dispatcher” shows the dispatcher plus some auxiliary code.
Figure 26.2. The dispatcher
tasks = {} -- list of all live tasks
function get (host, file)
-- create coroutine for a task
local co = coroutine.wrap(function ()
download(host, file)
end)
-- insert it in the list
table.insert(tasks, co)
end
function dispatch ()
local i = 1
while true do
if tasks[i] == nil then -- no other tasks?
if tasks[1] == nil then -- list is empty?
break -- break the loop
end
i = 1 -- else restart the loop
end
local res = tasks[i]() -- run a task
if not res then -- task finished?
table.remove(tasks, i)
else
i = i + 1 -- go to next task
end
end
end
The table tasks keeps a list of all live tasks
for the dispatcher.
The function get ensures that each download task
runs in an individual thread.
The dispatcher itself
is mainly a loop that goes through all tasks,
resuming them one by one.
It must also remove from the list
the tasks that have finished.
It stops the loop when there are no more tasks to run.
Finally, the main program creates the tasks it needs and calls the dispatcher. To download some distributions from the Lua site, the main program could be like this:
get("www.lua.org", "/ftp/lua-5.3.2.tar.gz")
get("www.lua.org", "/ftp/lua-5.3.1.tar.gz")
get("www.lua.org", "/ftp/lua-5.3.0.tar.gz")
get("www.lua.org", "/ftp/lua-5.2.4.tar.gz")
get("www.lua.org", "/ftp/lua-5.2.3.tar.gz")
dispatch() -- main loop
The sequential implementation takes fifteen seconds to download these files, in my machine. This implementation with coroutines runs more than three times faster.
Despite the speedup, this last implementation is far from optimal. Everything goes fine while at least one thread has something to read. However, when no thread has data to read, the dispatcher does a busy wait, going from thread to thread only to check that they still have no data. As a result, this coroutine implementation uses three orders of magnitude more CPU than the sequential solution.
To avoid this behavior,
we can use the function select from LuaSocket:
it allows a program to block while waiting
for a status change in a group of sockets.
The changes in our implementation are small:
we have to change only the dispatcher,
as shown in Figure 26.3, “Dispatcher using select”.
Figure 26.3. Dispatcher using select
function dispatch ()
local i = 1
local timedout = {}
while true do
if tasks[i] == nil then -- no other tasks?
if tasks[1] == nil then -- list is empty?
break -- break the loop
end
i = 1 -- else restart the loop
timedout = {}
end
local res = tasks[i]() -- run a task
if not res then -- task finished?
table.remove(tasks, i)
else -- time out
i = i + 1
timedout[#timedout + 1] = res
if #timedout == #tasks then -- all tasks blocked?
socket.select(timedout) -- wait
end
end
end
end
Along the loop,
this new dispatcher collects the timed-out connections
in the table timedout.
(Remember that receive passes such connections to yield,
thus resume returns them.)
If all connections time out,
the dispatcher calls select
to wait for any of these connections to change status.
This final implementation runs as fast as the previous implementation,
with coroutines.
Moreover, as it does not do busy waits,
it uses just as much CPU as the sequential implementation.
Personal copy of Eric Taylor <jdslkgjf.iapgjflksfg@yandex.com>