I've been hacking Erlang for close to a year now, and in the process of learning Erlang there as been quite a few surprises. Here I'd like to share some of these thoughts, and this is the first of a series of postings I intend to do about this.
In Erlang, each process has only one! message queue
This boggled me for a while. If each process/actor has only one message queue, then how can it wait for multiple things in an efficient manner?
In my programming career, I've done many different things that involve concurrency or event-driven programming, and usually it boils down to some kind of event and/or select loop in which you usually want to wait for one of several things to happen. Not just one. With Erlang a process has only one mailbox, so how can you wait for more than one?
The answer is that in Erlang, you essentially multiplex multiple conversations on the same message channel (i.e. the same mailbox). The key is that you include a "conversation id" in messages pertaining to a given interaction, and then match on multiple such messages using so-called selective receive. When you use selective receive, a process can wait for one of several message patterns to arrive. The patterns will typically be tuples containing the conversation id.
receive
{ID1, Value1...} -> ... do something in conversation 1... ;
{ID2, Value2...} -> ... do something in conversation 2 ... ;
after 100 -> timeout after 0.1s
end
Executing the receive operation will first scan all existing messages in the current process' mailbox and see if any one of those messages match either tuples that containID1
or ID2
in the first position; and then it will sleep (for up to 100ms) until more messages arrive and then try the matching on those new messages.
Having selective receive is extremely useful and important for managing multiple concurrent conversations in your system; as e.g. Ulf Wiger explains expertly in this presentation from QCon London 2010.
The server + rpc problem
One issue with this however, is that scanning all messages does not scale well if the message queue is long, and you're looking for a specific message. And that can often be the case when doing an "rpc-like" message exchange from the context of a "server-like" process that has many incoming requests.
Take this example of an rpc-like call to the process OtherPID
, which is done by sending a message and receiving a reply; something like this:
send_receive_rpc(OtherPID, RPCMessage) ->
% create unique "conversation id"
Ref = create_ref(),
% send message to OtherPID, including where to send the response
% and the "conversation id" to include in the response.
OtherPID ! {Ref, self(), RPCMessage},
% now, wait for a tuple {ID, Result}; that corresponds to the call
receive
{Ref, RPCResult} ->
RPCResult
end.
rpc_server_loop(State) ->
receive
{Ref, Sender, RPCMessage} ->
{RPCResult, NewState} = do_work(Message,State),
Sender ! {Ref, RPCResult},
rpc_server_loop(NewState)
end.
In the above code, we create a unique value Ref
that can be used to match the specific response to this message, and then we use that to match the reply. OK, that's elegant, but is it efficient?
Upon entering the receive loop in this case waiting for the RPC response, we need to first scan all existing messages in the process' mailbox and see if any one of those messages match {Ref, _}
(a lost cause since the Ref is unique so there are none such) and then we go to sleep and wait for the reply.
So ... if the rpc client is a process that itself has a long message queue then this is fundamentally flawed. ... How on earth did Erlang ever succeed to get this far with this problem?
Luckily it has been "fixed" by a kind of peep-hole optimization in the Erlang compiler that recognizes receive
statements that do not need to scan the existing message queue. The fix is in Erlang R14,
here. The optimization works by a simple flow analysis that can (in some cases) determine that the receive does not need to match messages that were already in the message queue by the time the erlang:create_ref/0
was called. As a result, the code is encoded into something like this pseudo-code; recv_mark
saves a pointer to the end of the message queue, and recv_set
validates some preconditions and instructs the subsequent receive to start scanning from the save point.
recv_mark(UID),
Ref = create_ref(),
OtherPID ! {Ref, self(), RPCMessage},
recv_set(UID),
receive
{Ref, RPCResult} ->
RPCResult
end.
This is a good reason to use gen_server:call
which does something like the above for you rather than hand-crafting your own code to do message sends ... because that function exhibit the exact code pattern that ensures that the peep-hole optimization kicks in.
And you have to be careful; it only works for create_ref/0
and monitor/2
; and only when the created ref occurs in all clauses of a subsequent receive
expression. For rpc:call
for instance, it doesn't work because it uses erlang:spawn_monitor
not one of the two functions mentioned above. [It could be made to work though, but that needs another fix that can cover the flow analysis for the ref in the {Pid,Ref}
tuple returned by spawn_monitor
.]
One obvious "real fix" for this would be to have multiple message queues (or channels) as Go has it plus selective receive; and we could easily create a new unique queue for the purpose of the RPC.
ChannelRef = create_channel(),
OtherPID ! {ChannelRef, RPCMessage}
from ChannelRef receive
Reply -> Reply;
from OtherChannel receive
... -> other action
end
Not an "easy fix", because it would fundamentally change the language; but seems like a better choice should one choose to design a new language in this space. And it is sort of amazing that Erlang gets away with being so simple!; it's just kind of confusing (to me at least) the way incoming messages are mixed up with RPC replies in the same mailbox.
Another "fix" that might make sense would be to introduce a new language construct to do explicitly what the compiler does implicitly; sort of a stack-oriented way of pushing a new (empty) mailbox onto an implicit mbox queue.
push_mbox
... send rpc request ...
receive
... match rpc reply ...
end
Which would then make the receive
clause disregard messages that are already in the mailbox at the time push_mbox
was called. The completion of the receive
clause would then need to merge any new messages into the end of the previous mbox. this language construct would be fairly easy to add, because the instructions to handle it are already in the VM.
Stray Messages
Another "issue" with using selective receive is what happens if messages arrive that you do not understand. Erlang folklore says that you should have somewhere in your main loop, where you simply discard such unknown messages (and perhaps log them as errors).
But in the presence of multiple concurrent "coversations" multiplexed onto a single message queue, it can be very difficult to know when we have a safe point where only certain kinds of messages make sense.
In this context it would also make sense to use a model with multiple channels, because then it would at least be easier to establish a contract (kind of a protocol/interface for meaningful sequences of messages occurring on a channel).
Scala's actors - even though they have channels - do not seem to support this pattern of concurrently doing selective receive across a number of channels. That would be very cool.
Other RPC-goodies
Digging around the code you also come realize how many cool things there are in the OTP frameworks that you don't think of. One such thing is how doing an RPC will actually make you monitor the callee. The code in there is something like this Warning pseudocode!:
do_rpc_call(OtherPID, RPCMessage) ->
MonitorRef = erlang:monitor(process, OtherPID),
OtherPID ! {MonitorRef, self(), RPCMessage},
receive
{'DOWN', MonitorRef, _, _, Info} ->
{badrpc, ...};
{MonitorRef, Result} ->
erlang:demonitor(MonitorRef),
{ok, Result}
end.
The MonitorRef doubles as the "conversation id". What this code does is that while you're doing an RPC call, you also monitor the callee; so if the OtherPID
dies, then you get a sensible reply. If OtherPID
is on a remote node, then erlang distribution's keep-alive messages may eventually deem the remote end dead and thus deliver appropriate local DOWN
messages. Way cool.
You could also do this the other way around; have the callee monitor the caller while it does some computation; canceling the an expensive computation if the caller goes away. That would indeed make sense if the caller was an HTTP connection handler for an impatient and-or slow user!
Who's waiting for who?
Another - related issue - with Erlang's mailbox mechanism is that it is impossible to know who is waiting for who. So even though you encode the dependencies between processes quite explicitly; there is no way to know which process is responsible for producing the reply you're waiting for.
At the recent Java Language Summit Doug Lea gave an excellent talk, in which one of the points was that to schedule light-weight concurrency effectively it is very valuable to have a dependency graph in the scheduler so that you know which processes are waiting for which other processes. The point is that it reduces "global work" to spend processing time finishing work that has already been started before you take on (or generate) more work.
In Erlang, the dependencies between the RPC client and the RPC server is entirely encoded in some "unstructured" data that flows on the message channels, and there is no easy way for the scheduler to exploit this information.
Another potential value of such information would be that you can (At least partially) prevent priority inversion, in the case where a high-priority rpc client sends a request to a lower priority rpc-server.
Perhaps some of that information could be captured the same way the current "fix" is in Erlang/OTP, by fixing the case for the common framework code; and perhaps that would be enough to capture such dependencies for most cases. I don't know.
Conclusion?
For writing systems that handle multiple concurrent conversations with other agents/processes/services, Erlang's model is IMHO the only thing out there that works. If you disagree, you should see Ulf Wiger's QCon presentation Death by Accidental COmplexity one more time.
I believe systems that have these patterns are quite common.
But I still think we could do even better.
References
If you google "selective receive" you can find numerous references to these issues. Several different fixes has been suggested, and the problem seems to come up every now and then.
See for instance this post from the RabbitMQ community, or this one. Seems that the usual way to handle this is to put a proxy process in front, which buffers the messages so that not too many messages end up in the "controller".
Recent Comments