Back to Erlang-fr French area.
(Second part)
par Thierry « Shaman » Mallard
In the previous article, we saw some ErlyStage main agents : Thor and Mercury. Thor gives life to agents such as Mercury, which is responsible for TCP connection management. We are now going to see how the clients and Mercury are going to exchange information datas.
We must be able to process incoming data from the TCP connection handled by Mercury. The idea here is to get some data, format it into text lines, and apply functions on them, with the cuning idea of letting every function returning the next function that should be called for the next line.
This makes a finite state machine were states are modified by the incoming data, and functions are used to move from one state to another. Hmm.. at least, that is how I see it, but I don't have much theoric knowledge on coding, so I may be very wrong ;-)
The getdata function that handles incoming data is something like this :
%%
%% getdata : retrieve data from a socket
%%
getdata( CSocket, B ) ->
{ ok, Rest } = ?MODULE:dispatch( CSocket, B ),
case gen_tcp:recv( CSocket, 0) of
{ ok, Bs } = ?MODULE:getdata( CSocket, Rest ++ Bs ) ;
{ error, closed } -> ?MODULE:dispatch( Rest )
end.
Well, the actual code is somewhat messier than that, but we'll keep at that code for the moment. This function does receive (gen_tcp:recv) data and uses the dispatch function to process it. The dispatcher will return some unprocessed data (Rest), which will be added to new incoming data (Bs). The function then loops back.
Now, let's have a look at the dispatcher. We want it to separated the data into lines, and process them one by one with a dynamic function. That means that every time a function is called to process one line, this function should return a pointer to another (or the same) function to process the next line.
Here is how it is implemented :
%%
%% The dispatcher : We parse the current data into lines and then we
%% process each line thought the current Module:Fun(Id, Arg), which
%% should return a possibly new processor
%%
dispatch( PegasusAgent, CSocket, ProcModule, ProcFun, ProcId, ProcArg, B)->
Pos = string:str(B, ?DELIMITER),
DelSize = string:len(?DELIMITER),
case Pos of
0 -> { ok, ProcModule, ProcFun, ProcId, ProcArg, B } ;
_ ->
Line = string:substr( B, 1, Pos-1 ),
Rest = string:substr( B, Pos+DelSize, string:len(B)
- (Pos-DelSize+1) ),
%% Apply the processor on the Line
%% notice the processor can return a different one (NewProc...)
%% this way, we make something like a Finite State Machine
{ok, NewProcModule, NewProcFun, NewProcId, NewProcArg } =
apply( ProcModule, ProcFun, [ ProcId, CSocket, Line, ProcArg ]),
%% And we loop back.
?MODULE:dispatch(
PegasusAgent, CSocket, NewProcModule, NewProcFun, NewProcId, NewProcArg, Rest )
end.
Don't look to close at PegasusAgent argument. It will be seen in a following article. Here, we parse the data contained in B (for Binary) by looking for a delimiter. If there is a delimiter (case _), we divide the data into a Line (the actual data we are going to process) and a Rest, that shall be processed on the next cycle.
For the new Erlang coders, remember (or learn) that "_" is an anonymous variable, which says that we don't care about its contents. The case Pos of return either 0 or anything different than 0. This could have been written with an if statement, but in Erlang I often prefer to use case ones.
If there was no delimiter in the B data, we just bring it back to the caller to let him concatenates with incoming data.
So we have a Line to process. To do that, we apply the current «processor», which is a tuple of { module, function, arguments }, on it. The function should return another (or the same) tuple, that will be used for the next line...
This way, we may implement quite simply a protocol. And guess what, we are just about to do that... ;-)
This protocol was designed by Stephanus Du Toit a few years ago. The main objectives were :
This transparency allows Atlas to be transport datas into different codecs (coder/decoder).
Those codecs should be available for any Atlas implementation. The default and mandatory ones are XML and Packed codec.
The negociation occurs like this :
| Client | link | Server |
| (TCP Connection) | ---> | |
| <--- | ATLAS server erly-stage | |
| ATLAS client XXXXXX | ---> | |
| Now, the client will say what codecs it can understand
For example, we can use XML and Packed. | ||
| ICAN XML | ---> | |
| ICAN Packed | ---> | |
| We finish the list with an empty line | ||
| (empty line) | ---> | |
| The server now responds what codecs they shall
use For example, the server will choose Packed | ||
| <--- | IWILL Packed | |
| <--- | (empty line) | |
| The clients confirms with an empty line | ||
| (empty line) | ---> | |
| The server does the same | ||
| <--- | (empty line) | |
Ok, so the first state, when a TCP connection arrives, which is handled by Mercury, is to send back the greeting message. This is done by setting the first processor function to atlas:negociate(init, CSocket).
negociate( init, CSocket, _, _ ) ->
Message = ["ATLAS " , "server ", ?ATLAS_SERVER_NAME, ?DELIMITER],
ok = gen_tcp:send(CSocket, Message),
%% Next step is «getname» part ..
{ ok, ?MODULE, negociate, getname, [] } ;
Here we send the welcome message, and return a pointer (a tuple more precisely) to the function called negociate in the same module (the ?MODULE macro), with getname as the first argument.
The Mercury dispatcher we saw earlier will use this function on the next received line data. We also saw that the next step will be to receive a «ATLAS client xxxx» message :
%% The syntax is: "ATLAS client " + Name of the server and eventualy version
%% number and + "\n"
negociate( getname, _CSocket, Line, _ ) ->
%% We don't care here of the client name...
%% Next step is get client possibilities
{ ok, ?MODULE, negociate, getcodecs, [] } ;
The same scheme is used here. Next function is ?MODULE:negociate( getcodecs...) which will listen to the list of possible codecs. The end of list is delimited with an empty line. We'll use pattern matching to check that :
%% When the client send an empty line it means there's no more codec to check
negociate( getcodecs, CSocket, "", CurrentCodecs ) ->
io:format("[Atlas] NEGOCIATION / CLIENT CODECS DONE ~n"),
Codec = answer( codec, CSocket, CurrentCodecs ),
%% We now switch to the filter negociation, such as it is
{ ok, ?MODULE, negociate, getfilter, Codec } ;
%% When the client sends "ICAN Xxx" it means it can use codec Xxx
%% We just store that in the var
%% .... if only I had a var ;-) (FIXME) => should be ok now ?
negociate( getcodecs, CSocket, "ICAN "++Codec, CurrentCodecs ) ->
%% We have a client possibility
io:format("[Atlas] NEGOCIATION / CLIENT CODEC : ~s ~n", [Codec]),
{ ok, ?MODULE, negociate, getcodecs, CurrentCodecs ++ [ Codec ] } ;
If some «ICAN ...» lines came from the clients, we loop back by returning the very same function pointer. On the other hand, when the client is done, and thus sends an empty line, we'll jump to ?MODULE:negociate(getfilter, Codec), the Codec beeing determined by the answer function below :
answer( codec, CSocket, CurrentCodecs ) ->
io:format("[Atlas] Client Available codecs : ~p ~n", [CurrentCodecs]),
ok = gen_tcp:send( CSocket, "IWILL Packed"++?DELIMITER++?DELIMITER),
?CODEC;
We cheat for the moment, forcing the Packed codec to be used, as it is currently the only codec implemented in Erlang.
In the same way, filters aren't implemented yet, so the getfilter is somewhat.. err... straightforward ;-)
negociate( getfilter, CSocket, Line, Codec) ->
answer( filter, CSocket, [] ),
{ ok, ?MODULE, dialog, msg, Codec }.
answer( filter, CSocket, _CurrentFilters ) ->
ok = gen_tcp:send( CSocket, ?DELIMITER ).
Ok. We now have an Atlas implementation that is able to negociate the Packed codec without filters. But hey, that's a start. The next function is merely the ?MODULE:dialog(msg, ...) :
dialog( msg, CSocket, Line, Codec ) ->
{ [], [X] } =Codec:decode( Line ),
%% Now we pass the message to pegasus
global:send( pegasus_agent, X ),
{ ok, ?MODULE, dialog, msg, Codec }.
Remember will be called by the Mercury dispatcher. It will receive a line data (Line) and now which Codec to use, which has been negociate earlier. The function just has to call Codec:decode(Line) to get the real data in Erlang form (X). Currently, it just sends it to the Pegasus agent that Thor created earlier.
The code line that I found really amazing is the simple Codec:decode(Line). This allow dynamic linking to an Erlang module, without knowledge or its name at coding time. The same principle will be used later with Pegasus, allowing dynamic passing of messages to the right processing module.
Atlas is now ready to exchange information using a codec. We'll see in the next article how to implement the Packed codec.
In the meantime, happy coding !
Thanks to Mickaël Rémond (a.k.a Stormy) for this second reading :-)