Merge pull request #241 from emqtt/issue#231

Issue#231
This commit is contained in:
Feng Lee 2015-08-13 17:25:52 +08:00
commit 91ff959770
3 changed files with 46 additions and 32 deletions

View File

@ -82,6 +82,10 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% MQTT Client %% MQTT Client
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-type header_key() :: atom() | binary() | string().
-type header_val() :: atom() | binary() | string() | integer().
-record(mqtt_client, { -record(mqtt_client, {
client_id :: binary() | undefined, client_id :: binary() | undefined,
client_pid :: pid(), client_pid :: pid(),
@ -91,6 +95,7 @@
proto_ver :: 3 | 4, proto_ver :: 3 | 4,
keepalive = 0, keepalive = 0,
will_topic :: undefined | binary(), will_topic :: undefined | binary(),
ws_initial_headers :: list({header_key(), header_val()}),
connected_at :: erlang:timestamp() connected_at :: erlang:timestamp()
}). }).

View File

@ -54,6 +54,7 @@
keepalive, keepalive,
max_clientid_len = ?MAX_CLIENTID_LEN, max_clientid_len = ?MAX_CLIENTID_LEN,
client_pid, client_pid,
ws_initial_headers, %% Headers from first HTTP request for websocket client
connected_at}). connected_at}).
-type proto_state() :: #proto_state{}. -type proto_state() :: #proto_state{}.
@ -65,20 +66,23 @@
init(Peername, SendFun, Opts) -> init(Peername, SendFun, Opts) ->
MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
#proto_state{peername = Peername, WsInitialHeaders = proplists:get_value(ws_initial_headers, Opts),
sendfun = SendFun, #proto_state{peername = Peername,
max_clientid_len = MaxLen, sendfun = SendFun,
client_pid = self()}. max_clientid_len = MaxLen,
client_pid = self(),
ws_initial_headers = WsInitialHeaders}.
info(#proto_state{client_id = ClientId, info(#proto_state{client_id = ClientId,
username = Username, username = Username,
peername = Peername, peername = Peername,
proto_ver = ProtoVer, proto_ver = ProtoVer,
proto_name = ProtoName, proto_name = ProtoName,
keepalive = KeepAlive, keepalive = KeepAlive,
clean_sess = CleanSess, clean_sess = CleanSess,
will_msg = WillMsg, ws_initial_headers = WsInitialHeaders,
connected_at = ConnectedAt}) -> will_msg = WillMsg,
connected_at = ConnectedAt}) ->
[{client_id, ClientId}, [{client_id, ClientId},
{username, Username}, {username, Username},
{peername, Peername}, {peername, Peername},
@ -86,34 +90,37 @@ info(#proto_state{client_id = ClientId,
{proto_name, ProtoName}, {proto_name, ProtoName},
{keepalive, KeepAlive}, {keepalive, KeepAlive},
{clean_sess, CleanSess}, {clean_sess, CleanSess},
{ws_initial_headers, WsInitialHeaders},
{will_msg, WillMsg}, {will_msg, WillMsg},
{connected_at, ConnectedAt}]. {connected_at, ConnectedAt}].
clientid(#proto_state{client_id = ClientId}) -> clientid(#proto_state{client_id = ClientId}) ->
ClientId. ClientId.
client(#proto_state{client_id = ClientId, client(#proto_state{client_id = ClientId,
peername = Peername, peername = Peername,
username = Username, username = Username,
clean_sess = CleanSess, clean_sess = CleanSess,
proto_ver = ProtoVer, proto_ver = ProtoVer,
keepalive = Keepalive, keepalive = Keepalive,
will_msg = WillMsg, will_msg = WillMsg,
client_pid = Pid, client_pid = Pid,
connected_at = Time}) -> ws_initial_headers = WsInitialHeaders,
connected_at = Time}) ->
WillTopic = if WillTopic = if
WillMsg =:= undefined -> undefined; WillMsg =:= undefined -> undefined;
true -> WillMsg#mqtt_message.topic true -> WillMsg#mqtt_message.topic
end, end,
#mqtt_client{client_id = ClientId, #mqtt_client{client_id = ClientId,
client_pid = Pid, client_pid = Pid,
username = Username, username = Username,
peername = Peername, peername = Peername,
clean_sess = CleanSess, clean_sess = CleanSess,
proto_ver = ProtoVer, proto_ver = ProtoVer,
keepalive = Keepalive, keepalive = Keepalive,
will_topic = WillTopic, will_topic = WillTopic,
connected_at = Time}. ws_initial_headers = WsInitialHeaders,
connected_at = Time}.
%% CONNECT Client requests a connection to a Server %% CONNECT Client requests a connection to a Server

View File

@ -104,7 +104,9 @@ init([WsPid, Req, ReplyChannel, PktOpts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, Peername} = Req:get(peername), {ok, Peername} = Req:get(peername),
SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts), Headers = mochiweb_request:get(headers, Req),
HeadersList = mochiweb_headers:to_list(Headers),
ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, HeadersList}|PktOpts]),
{ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
handle_call(_Req, _From, State) -> handle_call(_Req, _From, State) ->