This commit is contained in:
Feng 2016-08-27 15:50:49 +08:00
commit d61d2605ad
4 changed files with 48 additions and 9 deletions

View File

@ -87,7 +87,13 @@ init([OriginConn, MqttEnv]) ->
end, end,
ConnName = esockd_net:format(PeerName), ConnName = esockd_net:format(PeerName),
Self = self(), Self = self(),
SendFun = fun(Data) ->
%%TODO: Send packet...
SendFun = fun(Packet) ->
Data = emqttd_serializer:serialize(Packet),
%%TODO: How to Log???
?LOG(debug, "SEND ~p", [Data], #client_state{connname = ConnName}),
emqttd_metrics:inc('bytes/sent', size(Data)),
try Connection:async_send(Data) of try Connection:async_send(Data) of
true -> ok true -> ok
catch catch

View File

@ -236,8 +236,8 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
State = #proto_state{client_id = ClientId, State = #proto_state{client_id = ClientId,
username = Username, username = Username,
session = Session}) -> session = Session}) ->
Msg = emqttd_message:from_packet(Username, ClientId, Packet), Msg = emqttd_message:from_packet(Username, ClientId, Packet),
case emqttd_session:publish(Session, Msg) of case emqttd_session:publish(Session, Msg) of
ok -> ok ->
@ -256,10 +256,7 @@ send(Packet, State = #proto_state{sendfun = SendFun})
when is_record(Packet, mqtt_packet) -> when is_record(Packet, mqtt_packet) ->
trace(send, Packet, State), trace(send, Packet, State),
emqttd_metrics:sent(Packet), emqttd_metrics:sent(Packet),
Data = emqttd_serializer:serialize(Packet), SendFun(Packet),
?LOG(debug, "SEND ~p", [Data], State),
emqttd_metrics:inc('bytes/sent', size(Data)),
SendFun(Data),
{ok, State}. {ok, State}.
trace(recv, Packet, ProtoState) -> trace(recv, Packet, ProtoState) ->

View File

@ -66,7 +66,12 @@ init([MqttEnv, WsPid, Req, ReplyChannel]) ->
{ok, Peername} = Req:get(peername), {ok, Peername} = Req:get(peername),
Headers = mochiweb_headers:to_list( Headers = mochiweb_headers:to_list(
mochiweb_request:get(headers, Req)), mochiweb_request:get(headers, Req)),
SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, %% SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
SendFun = fun(Packet) ->
Data = emqttd_serializer:serialize(Packet),
emqttd_metrics:inc('bytes/sent', size(Data)),
ReplyChannel({binary, Data})
end,
ProtoState = emqttd_protocol:init(Peername, SendFun, ProtoState = emqttd_protocol:init(Peername, SendFun,
[{ws_initial_headers, Headers} | MqttEnv]), [{ws_initial_headers, Headers} | MqttEnv]),
{ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer), {ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer),

View File

@ -22,6 +22,8 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
all() -> all() ->
[{group, protocol}, [{group, protocol},
{group, pubsub}, {group, pubsub},
@ -62,7 +64,10 @@ groups() ->
[dispatch_retained_messages]}, [dispatch_retained_messages]},
{backend, [sequence], {backend, [sequence],
[]}, []},
{http, [sequence], [request_status]}, {http, [sequence],
[request_status,
request_publish
]},
{cli, [sequence], {cli, [sequence],
[ctl_register_cmd, [ctl_register_cmd,
cli_status, cli_status,
@ -330,6 +335,32 @@ request_status(_) ->
httpc:request(get, {Url, []}, [], []), httpc:request(get, {Url, []}, [], []),
?assertEqual(binary_to_list(Status), Return). ?assertEqual(binary_to_list(Status), Return).
request_publish(_) ->
ok = emqttd:subscribe(<<"a/b/c">>, self(), [{qos, 1}]),
Params = "qos=1&retain=0&topic=a/b/c&message=hello",
?assert(connect_emqttd_publish_(post, "mqtt/publish", Params, auth_header_("", ""))),
?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
emqttd:unsubscribe(<<"a/b/c">>).
connect_emqttd_publish_(Method, Api, Params, Auth) ->
Url = "http://127.0.0.1:8083/" ++ Api,
case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of
{error, socket_closed_remotely} ->
false;
{ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } ->
true;
{ok, {{"HTTP/1.1", 400, _}, _, []}} ->
false;
{ok, {{"HTTP/1.1", 404, _}, _, []}} ->
false
end.
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
{"Authorization","Basic " ++ Encoded}.
%% cli group %% cli group
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------