From 61a64ea0b0f629642080494b0652e18a527c1025 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 28 Jun 2016 16:44:52 +0800 Subject: [PATCH 1/3] send(Packet) --- src/emqttd_client.erl | 9 ++++++++- src/emqttd_protocol.erl | 9 +++------ src/emqttd_ws_client.erl | 6 +++++- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index f3169339a..89cc6a4a7 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -80,13 +80,20 @@ init([OriginConn, MqttEnv]) -> end, ConnName = esockd_net:format(PeerName), Self = self(), - SendFun = fun(Data) -> + + %%TODO: Send packet... + SendFun = fun(Packet) -> + Data = emqttd_serializer:serialize(Packet), + %%TODO: How to Log??? + ?LOG(debug, "SEND ~p", [Data], #client_state{connname = ConnName}), + emqttd_metrics:inc('bytes/sent', size(Data)), try Connection:async_send(Data) of true -> ok catch error:Error -> Self ! {shutdown, Error} end end, + PktOpts = proplists:get_value(packet, MqttEnv), ParserFun = emqttd_parser:new(PktOpts), ProtoState = emqttd_protocol:init(PeerName, SendFun, PktOpts), diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 7c8a87146..64594866e 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -236,8 +236,8 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), State = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> + username = Username, + session = Session}) -> Msg = emqttd_message:from_packet(Username, ClientId, Packet), case emqttd_session:publish(Session, Msg) of ok -> @@ -256,10 +256,7 @@ send(Packet, State = #proto_state{sendfun = SendFun}) when is_record(Packet, mqtt_packet) -> trace(send, Packet, State), emqttd_metrics:sent(Packet), - Data = emqttd_serializer:serialize(Packet), - ?LOG(debug, "SEND ~p", [Data], State), - emqttd_metrics:inc('bytes/sent', size(Data)), - SendFun(Data), + SendFun(Packet), {ok, State}. trace(recv, Packet, ProtoState) -> diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 776dc4ce5..7466148eb 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -108,7 +108,11 @@ init([WsPid, Req, ReplyChannel, PktOpts]) -> %%issue#413: trap_exit is unnecessary %%process_flag(trap_exit, true), {ok, Peername} = Req:get(peername), - SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, + SendFun = fun(Packet) -> + Data = emqttd_serializer:serialize(Packet), + emqttd_metrics:inc('bytes/sent', size(Data)), + ReplyChannel({binary, Data}) + end, Headers = mochiweb_request:get(headers, Req), HeadersList = mochiweb_headers:to_list(Headers), ProtoState = emqttd_protocol:init(Peername, SendFun, From a318244be03f33b15a5bc94e8a0d3e4b005cbd75 Mon Sep 17 00:00:00 2001 From: huangdan Date: Mon, 22 Aug 2016 15:22:57 +0800 Subject: [PATCH 2/3] http publish --- test/emqttd_SUITE.erl | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index e8ecd2aea..69047c11c 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -22,6 +22,8 @@ -include_lib("eunit/include/eunit.hrl"). +-define(CONTENT_TYPE, "application/x-www-form-urlencoded"). + all() -> [{group, protocol}, {group, pubsub}, @@ -62,7 +64,10 @@ groups() -> [dispatch_retained_messages]}, {backend, [sequence], []}, - {http, [sequence], [request_status]}, + {http, [sequence], + [request_status, + request_publish + ]}, {cli, [sequence], [ctl_register_cmd, cli_status, @@ -330,6 +335,32 @@ request_status(_) -> httpc:request(get, {Url, []}, [], []), ?assertEqual(binary_to_list(Status), Return). +request_publish(_) -> + ok = emqttd:subscribe(<<"a/b/c">>, self(), [{qos, 1}]), + Params = "qos=1&retain=0&topic=a/b/c&message=hello", + ?assert(connect_emqttd_publish_(post, "mqtt/publish", Params, auth_header_("", ""))), + ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), + emqttd:unsubscribe(<<"a/b/c">>). + +connect_emqttd_publish_(Method, Api, Params, Auth) -> + Url = "http://127.0.0.1:8083/" ++ Api, + case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of + {error, socket_closed_remotely} -> + false; + {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> + true; + {ok, {{"HTTP/1.1", 400, _}, _, []}} -> + false; + {ok, {{"HTTP/1.1", 404, _}, _, []}} -> + false + end. + +auth_header_(User, Pass) -> + Encoded = base64:encode_to_string(lists:append([User,":",Pass])), + {"Authorization","Basic " ++ Encoded}. + + + %% cli group %%-------------------------------------------------------------------- From b0f082ebe073dfffcafa512be970c8c4984461b7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 23 Aug 2016 10:13:13 +0800 Subject: [PATCH 3/3] to_hexstr/1, from_hexstr/1 --- src/emqttd_guid.erl | 8 +++++++- test/emqttd_lib_SUITE.erl | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/emqttd_guid.erl b/src/emqttd_guid.erl index d9593d3a0..6dab23afe 100644 --- a/src/emqttd_guid.erl +++ b/src/emqttd_guid.erl @@ -29,7 +29,7 @@ %% @end -module(emqttd_guid). --export([gen/0, new/0, timestamp/1]). +-export([gen/0, new/0, timestamp/1, to_hexstr/1, from_hexstr/1]). -define(MAX_SEQ, 16#FFFF). @@ -120,3 +120,9 @@ npid() -> PidByte3:8, PidByte4:8>>, NPid. +to_hexstr(<>) -> + list_to_binary(integer_to_list(I, 16)). + +from_hexstr(S) -> + I = list_to_integer(binary_to_list(S), 16), <>. + diff --git a/test/emqttd_lib_SUITE.erl b/test/emqttd_lib_SUITE.erl index 1a3b1aef6..ec68294c6 100644 --- a/test/emqttd_lib_SUITE.erl +++ b/test/emqttd_lib_SUITE.erl @@ -16,6 +16,8 @@ -module(emqttd_lib_SUITE). +-include_lib("eunit/include/eunit.hrl"). + -compile(export_all). -define(SOCKOPTS, [ @@ -35,7 +37,7 @@ all() -> [{group, guid}, {group, opts}, {group, node}, {group, base62}]. groups() -> - [{guid, [], [guid_gen]}, + [{guid, [], [guid_gen, guid_hexstr]}, {opts, [], [opts_merge]}, {?PQ, [], [priority_queue_plen, priority_queue_out2]}, @@ -56,6 +58,10 @@ guid_gen(_) -> Ts2 = emqttd_guid:timestamp(emqttd_guid:gen()), true = Ts2 > Ts1. +guid_hexstr(_) -> + Guid = emqttd_guid:gen(), + ?assertEqual(Guid, emqttd_guid:from_hexstr(emqttd_guid:to_hexstr(Guid))). + %%-------------------------------------------------------------------- %% emqttd_opts %%--------------------------------------------------------------------