From e97c3a5c8c7dcd74d1b13749862c758f7c348852 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 Aug 2018 13:44:09 +0800 Subject: [PATCH 1/6] update frame suite for latest emqx_frame --- test/emqx_frame_SUITE.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 60ed52e46..19bbb1e8c 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -330,15 +330,16 @@ serialize_parse_subscribe(_) -> %% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}]) Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>, TopicOpts = #{ nl => 0 , rap => 0, rc => 0, - rh => 0, subid => 0 , qos => 2 }, + rh => 0, qos => 2 }, TopicFilters = [{<<"TopicA">>, TopicOpts}], Packet = ?SUBSCRIBE_PACKET(2, TopicFilters), ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), + ct:log("Bin: ~p, Packet: ~p ~n", [Packet, parse(Bin)]), ?assertEqual({ok, Packet, <<>>}, parse(Bin)). serialize_parse_subscribe_v5(_) -> - TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0, subid => 0}}, - {<<"TopicQos1">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0, subid => 0}}], + TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}}, + {<<"TopicQos1">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}}], Packet = ?SUBSCRIBE_PACKET(3, #{'Subscription-Identifier' => 16#FFFFFFF}, TopicFilters), ?assertEqual({ok, Packet, <<>>}, From db76177228fb07083b74881c7b00c420abf0bb05 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 Aug 2018 13:48:54 +0800 Subject: [PATCH 2/6] add router clean function before each case --- test/emqx_router_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/emqx_router_SUITE.erl b/test/emqx_router_SUITE.erl index b9d40810a..196b1678e 100644 --- a/test/emqx_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -39,6 +39,7 @@ end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). init_per_testcase(_TestCase, Config) -> + clear_tables(), Config. end_per_testcase(_TestCase, _Config) -> From b0ed953708817fc09c286aea0d1c5eab992fffd8 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 Aug 2018 13:56:55 +0800 Subject: [PATCH 3/6] fix emqx_session:unsubscribe bug --- src/emqx_session.erl | 175 ++++++++++++++++++++++--------------------- 1 file changed, 90 insertions(+), 85 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index ab9096f23..0ac1cf59d 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -62,87 +62,87 @@ -import(emqx_zone, [get_env/2, get_env/3]). -record(state, { - %% Clean Start Flag - clean_start = false :: boolean(), + %% Clean Start Flag + clean_start = false :: boolean(), - %% Client Binding: local | remote - binding = local :: local | remote, + %% Client Binding: local | remote + binding = local :: local | remote, - %% ClientId: Identifier of Session - client_id :: binary(), + %% ClientId: Identifier of Session + client_id :: binary(), - %% Username - username :: binary() | undefined, + %% Username + username :: binary() | undefined, - %% Connection pid binding with session - conn_pid :: pid(), + %% Connection pid binding with session + conn_pid :: pid(), - %% Old Connection Pid that has been kickout - old_conn_pid :: pid(), + %% Old Connection Pid that has been kickout + old_conn_pid :: pid(), - %% Next packet id of the session - next_pkt_id = 1 :: emqx_mqtt_types:packet_id(), + %% Next packet id of the session + next_pkt_id = 1 :: emqx_mqtt_types:packet_id(), - %% Max subscriptions - max_subscriptions :: non_neg_integer(), + %% Max subscriptions + max_subscriptions :: non_neg_integer(), - %% Client’s Subscriptions. - subscriptions :: map(), + %% Client’s Subscriptions. + subscriptions :: map(), - %% Upgrade QoS? - upgrade_qos = false :: boolean(), + %% Upgrade QoS? + upgrade_qos = false :: boolean(), - %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. - inflight :: emqx_inflight:inflight(), + %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. + inflight :: emqx_inflight:inflight(), - %% Max Inflight Size. DEPRECATED: Get from inflight - %% max_inflight = 32 :: non_neg_integer(), + %% Max Inflight Size. DEPRECATED: Get from inflight + %% max_inflight = 32 :: non_neg_integer(), - %% Retry interval for redelivering QoS1/2 messages - retry_interval = 20000 :: timeout(), + %% Retry interval for redelivering QoS1/2 messages + retry_interval = 20000 :: timeout(), - %% Retry Timer - retry_timer :: reference() | undefined, + %% Retry Timer + retry_timer :: reference() | undefined, - %% All QoS1, QoS2 messages published to when client is disconnected. - %% QoS 1 and QoS 2 messages pending transmission to the Client. - %% - %% Optionally, QoS 0 messages pending transmission to the Client. - mqueue :: emqx_mqueue:mqueue(), + %% All QoS1, QoS2 messages published to when client is disconnected. + %% QoS 1 and QoS 2 messages pending transmission to the Client. + %% + %% Optionally, QoS 0 messages pending transmission to the Client. + mqueue :: emqx_mqueue:mqueue(), - %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. - awaiting_rel :: map(), + %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. + awaiting_rel :: map(), - %% Max Packets Awaiting PUBREL - max_awaiting_rel = 100 :: non_neg_integer(), + %% Max Packets Awaiting PUBREL + max_awaiting_rel = 100 :: non_neg_integer(), - %% Awaiting PUBREL Timeout - await_rel_timeout = 20000 :: timeout(), + %% Awaiting PUBREL Timeout + await_rel_timeout = 20000 :: timeout(), - %% Awaiting PUBREL Timer - await_rel_timer :: reference() | undefined, + %% Awaiting PUBREL Timer + await_rel_timer :: reference() | undefined, - %% Session Expiry Interval - expiry_interval = 7200000 :: timeout(), + %% Session Expiry Interval + expiry_interval = 7200000 :: timeout(), - %% Expired Timer - expiry_timer :: reference() | undefined, + %% Expired Timer + expiry_timer :: reference() | undefined, - %% Enable Stats - enable_stats :: boolean(), + %% Enable Stats + enable_stats :: boolean(), - %% Stats timer - stats_timer :: reference() | undefined, + %% Stats timer + stats_timer :: reference() | undefined, - %% TODO: - deliver_stats = 0, + %% TODO: + deliver_stats = 0, - %% TODO: - enqueue_stats = 0, + %% TODO: + enqueue_stats = 0, - %% Created at - created_at :: erlang:timestamp() - }). + %% Created at + created_at :: erlang:timestamp() + }). -define(TIMEOUT, 60000). @@ -284,7 +284,12 @@ pubcomp(SPid, PacketId, ReasonCode) -> -spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok). unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> - unsubscribe(SPid, undefined, #{}, lists:map(fun emqx_topic:parse/1, RawTopicFilters)). + TopicFilters = lists:map(fun({RawTopic, Opts}) -> + emqx_topic:parse(RawTopic, Opts); + (RawTopic) -> + emqx_topic:parse(RawTopic) + end, RawTopicFilters), + unsubscribe(SPid, undefined, #{}, TopicFilters). -spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). @@ -424,20 +429,20 @@ handle_call(Req, _From, State) -> handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = - lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> - {[QoS|RcAcc], case maps:find(Topic, SubMap) of - {ok, SubOpts} -> - SubMap; - {ok, _SubOpts} -> - emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), - maps:put(Topic, SubOpts, SubMap); - error -> - emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), - maps:put(Topic, SubOpts, SubMap) - end} - end, {[], Subscriptions}, TopicFilters), + lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> + {[QoS|RcAcc], case maps:find(Topic, SubMap) of + {ok, SubOpts} -> + SubMap; + {ok, _SubOpts} -> + emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + maps:put(Topic, SubOpts, SubMap); + error -> + emqx_broker:subscribe(Topic, ClientId, SubOpts), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + maps:put(Topic, SubOpts, SubMap) + end} + end, {[], Subscriptions}, TopicFilters), suback(FromPid, PacketId, ReasonCodes), {noreply, State#state{subscriptions = Subscriptions1}}; @@ -445,16 +450,16 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = - lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> - case maps:find(Topic, SubMap) of - {ok, SubOpts} -> - ok = emqx_broker:unsubscribe(Topic, ClientId), - emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]), - {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; - error -> - {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} - end - end, {[], Subscriptions}, TopicFilters), + lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> + case maps:find(Topic, SubMap) of + {ok, SubOpts} -> + ok = emqx_broker:unsubscribe(Topic, ClientId), + emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]), + {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; + error -> + {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} + end + end, {[], Subscriptions}, TopicFilters), unsuback(From, PacketId, ReasonCodes), {noreply, State#state{subscriptions = Subscriptions1}}; @@ -524,7 +529,7 @@ handle_cast(Msg, State) -> %% Batch dispatch handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> {noreply, lists:foldl(fun(Msg, NewState) -> - element(2, handle_info({dispatch, Topic, Msg}, NewState)) + element(2, handle_info({dispatch, Topic, Msg}, NewState)) end, State, Msgs)}; %% Dispatch message @@ -684,7 +689,7 @@ sortfun(inflight) -> sortfun(awaiting_rel) -> fun({_, #message{timestamp = Ts1}}, {_, #message{timestamp = Ts2}}) -> - Ts1 < Ts2 + Ts1 < Ts2 end. %%------------------------------------------------------------------------------ @@ -726,7 +731,7 @@ dispatch(Msg = #message{qos = ?QOS0}, State) -> inc_stats(deliver, State); dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight}) - when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> + when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> case emqx_inflight:is_full(Inflight) of true -> enqueue_msg(Msg, State); @@ -824,7 +829,7 @@ dequeue2(State = #state{mqueue = Q}) -> %% Ensure timers ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) -> - State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; + State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; ensure_await_rel_timer(State) -> State. From 8a5519cafad355f740bc3c4698e0c4335ab7d303 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Thu, 30 Aug 2018 14:29:07 +0800 Subject: [PATCH 4/6] attrs for ws_connection --- src/emqx_ws_connection.erl | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 026d160b3..ed1532565 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -17,7 +17,7 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([info/1]). +-export([info/1, attrs/1]). -export([stats/1]). -export([kick/1]). -export([session/1]). @@ -53,9 +53,14 @@ %% API %%------------------------------------------------------------------------------ +%% for debug info(WSPid) -> call(WSPid, info). +%% for dashboard +attrs(CPid) when is_pid(CPid) -> + call(CPid, attrs). + stats(WSPid) -> call(WSPid, stats). @@ -170,6 +175,15 @@ websocket_info({call, From, info}, State = #state{peername = Peername, gen_server:reply(From, lists:append([ConnInfo, ProtoInfo])), {ok, State}; +websocket_info({call, From, attrs}, State = #state{peername = Peername, + sockname = Sockname, + proto_state = ProtoState}) -> + SockAttrs = [{peername, Peername}, + {sockname, Sockname}], + ProtoAttrs = emqx_protocol:attrs(ProtoState), + gen_server:reply(From, lists:usort(lists:append(SockAttrs, ProtoAttrs))), + {ok, State}; + websocket_info({call, From, stats}, State = #state{proto_state = ProtoState}) -> Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState)]), gen_server:reply(From, Stats), @@ -262,4 +276,3 @@ shutdown(Reason, State) -> wsock_stats() -> [{Key, get(Key)} || Key <- ?SOCK_STATS]. - From c89f53f14dbd621178a010c91b10ab1fd736e8b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Thu, 30 Aug 2018 14:43:14 +0800 Subject: [PATCH 5/6] Add session, mountpoint test suites --- Makefile | 4 +-- test/emqx_mock_client.erl | 25 ++++++++++++--- test/emqx_mountpoint_SUITE.erl | 36 +++++++++++++++++++++ test/emqx_session_SUITE.erl | 57 ++++++++++++++++++++++++++++++++++ test/emqx_sm_SUITE.erl | 2 +- 5 files changed, 117 insertions(+), 7 deletions(-) create mode 100644 test/emqx_mountpoint_SUITE.erl create mode 100644 test/emqx_session_SUITE.erl diff --git a/Makefile b/Makefile index 80a3dafbd..ce0a282fa 100644 --- a/Makefile +++ b/Makefile @@ -35,10 +35,10 @@ EUNIT_OPTS = verbose # CT_SUITES = emqx_stats ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat -CT_SUITES = emqx emqx_access emqx_base62 emqx_broker emqx_client emqx_cm emqx_frame emqx_guid emqx_inflight \ +CT_SUITES = emqx emqx_session emqx_access emqx_base62 emqx_broker emqx_client emqx_cm emqx_frame emqx_guid emqx_inflight \ emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ - emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone + emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone emqx_mountpoint CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1 diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 164b6d4ea..0536b8bac 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -16,13 +16,15 @@ -behaviour(gen_server). --export([start_link/1, open_session/3, close_session/2, stop/1]). +-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, {clean_start, client_id, client_pid}). +-define(TAB, messages). + start_link(ClientId) -> gen_server:start_link(?MODULE, [ClientId], []). @@ -35,13 +37,25 @@ close_session(ClientPid, SessPid) -> stop(CPid) -> gen_server:call(CPid, stop). +get_last_message() -> + [{last_message, Msg}] = ets:lookup(?TAB, last_message), + Msg. + init([ClientId]) -> - {ok, #state{clean_start = true, client_id = ClientId}}. + Result = lists:member(?TAB, ets:all()), + if Result == false -> + ets:new(?TAB, [set, named_table]); + true -> ok + end, + {ok, + #state{clean_start = true, + client_id = ClientId} + }. handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> Attrs = #{ zone => Zone, client_id => ClientId, - conn_pid => ClientPid, + conn_pid => ClientPid, clean_start => true, username => undefined, conn_props => undefined @@ -49,7 +63,7 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> {ok, SessPid} = emqx_sm:open_session(Attrs), {reply, {ok, SessPid}, State#state{ clean_start = true, - client_id = ClientId, + client_id = ClientId, client_pid = ClientPid }}; @@ -66,6 +80,9 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info({_, Msg}, State) -> + ets:insert(?TAB, {last_message, Msg}), + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. diff --git a/test/emqx_mountpoint_SUITE.erl b/test/emqx_mountpoint_SUITE.erl new file mode 100644 index 000000000..61d8d3652 --- /dev/null +++ b/test/emqx_mountpoint_SUITE.erl @@ -0,0 +1,36 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_mountpoint_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> [t_mount_unmount, t_replvar]. + +t_mount_unmount(_) -> + Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), + Msg2 = emqx_mountpoint:mount(<<"mount">>, Msg), + ?assertEqual(<<"mounttopic">>, Msg2#message.topic), + TopicFilter = [{<<"mounttopic">>, #{qos => ?QOS2}}], + TopicFilter = emqx_mountpoint:mount(<<"mount">>, [{<<"topic">>, #{qos => ?QOS2}}]), + Msg = emqx_mountpoint:unmount(<<"mount">>, Msg2). + +t_replvar(_) -> + <<"mount/test/clientid">> = emqx_mountpoint:replvar(<<"mount/%u/%c">>, #{client_id => <<"clientid">>, username => <<"test">>}). diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl new file mode 100644 index 000000000..f2da10b74 --- /dev/null +++ b/test/emqx_session_SUITE.erl @@ -0,0 +1,57 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_session_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). + +all() -> [t_session_all]. + +t_session_all(_) -> + emqx_ct_broker_helpers:run_setup_steps(), + ClientId = <<"ClientId">>, + {ok, ConnPid} = emqx_mock_client:start_link(ClientId), + {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), + Message1 = emqx_message:make(<<"ClientId">>, 2, <<"topic">>, <<"hello">>), + emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 2}}]), + emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 1}}]), + timer:sleep(200), + [{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}), + emqx_session:publish(SPid, 1, Message1), + timer:sleep(200), + {publish, 1, _} = emqx_mock_client:get_last_message(), + emqx_session:puback(SPid, 2), + emqx_session:puback(SPid, 3, reasoncode), + emqx_session:pubrec(SPid, 4), + emqx_session:pubrec(SPid, 5, reasoncode), + emqx_session:pubrel(SPid, 6, reasoncode), + emqx_session:pubcomp(SPid, 7, reasoncode), + timer:sleep(200), + 2 = emqx_metrics:val('packets/puback/missed'), + 2 = emqx_metrics:val('packets/pubrec/missed'), + 1 = emqx_metrics:val('packets/pubrel/missed'), + 1 = emqx_metrics:val('packets/pubcomp/missed'), + Attrs = emqx_session:attrs(SPid), + Info = emqx_session:info(SPid), + Stats = emqx_session:stats(SPid), + ClientId = proplists:get_value(client_id, Attrs), + ClientId = proplists:get_value(client_id, Info), + 1 = proplists:get_value(subscriptions_count, Stats), + emqx_session:unsubscribe(SPid, [<<"topic">>]), + timer:sleep(200), + [] = emqx:subscriptions({SPid, <<"clientId">>}), + emqx_mock_client:close_session(ConnPid, SPid). diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 5bc096cd8..24999f2a0 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -26,7 +26,7 @@ t_open_close_session(_) -> {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid, zone => internal, username => <<"zhou">>, conn_props => #{}}, - {ok, _SPid} = emqx_sm:open_session(Attrs), + {ok, SPid} = emqx_sm:open_session(Attrs), [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>), SPid = emqx_sm:lookup_session_pid(<<"client">>), {ok, NewConnPid} = emqx_mock_client:start_link(<<"client">>), From 518ba9ace9c3f5c267ddab8a00a0c232cf4652d4 Mon Sep 17 00:00:00 2001 From: RockyJin Date: Thu, 30 Aug 2018 15:29:14 +0800 Subject: [PATCH 6/6] readme file for v3.0 --- README.md | 130 +++++++++++++++++++----------------------------------- 1 file changed, 45 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index eceeee5de..1e80dae6b 100644 --- a/README.md +++ b/README.md @@ -1,65 +1,29 @@ +# *EMQ X* - MQTT Broker -# *EMQ X* - EMQ X Broker -[![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd) +*EMQ X* broker is fully a open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. -*EMQ* (Erlang MQTT Broker) is a distributed, massively scalable, highly extensible MQTT message broker written in Erlang/OTP. +Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket, STOMP and SockJS. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster. -*EMQ* is fully open source and licensed under the Apache Version 2.0. *EMQ* implements both MQTT V3.1 and V3.1.1 protocol specifications, and supports MQTT-SN, CoAP, WebSocket, STOMP and SockJS at the same time. -*EMQ* provides a scalable, reliable, enterprise-grade MQTT message Hub for IoT, M2M, Smart Hardware and Mobile Messaging Applications. +- For full list of new features, please read *EMQ X* broker 3.0 [release notes](https://github.com/emqtt/emqttd/releases/). +- For more information, please visit [EMQ X homepage](http://emqtt.io). -The 1.0 release of the EMQ broker has scaled to 1.3 million concurrent MQTT connections on a 12 Core, 32G CentOS server. -Please visit [emqtt.io](http://emqtt.io) for more service. Follow us on Twitter: [@emqtt](https://twitter.com/emqtt) - -## Features - -* Full MQTT V3.1/V3.1.1 support -* QoS0, QoS1, QoS2 Publish/Subscribe -* Session Management and Offline Messages -* Retained Message -* Last Will Message -* TCP/SSL Connection -* MQTT Over WebSocket(SSL) -* HTTP Publish API -* MQTT-SN Protocol -* STOMP protocol -* STOMP over SockJS -* $SYS/# Topics -* ClientID Authentication -* IpAddress Authentication -* Username and Password Authentication -* Access control based on IpAddress, ClientID, Username -* JWT Authentication -* LDAP Authentication/ACL -* HTTP Authentication/ACL -* MySQL Authentication/ACL -* Redis Authentication/ACL -* PostgreSQL Authentication/ACL -* MongoDB Authentication/ACL -* Cluster brokers on several nodes -* Bridge brokers locally or remotely -* mosquitto, RSMB bridge -* Extensible architecture with Hooks and Plugins -* Passed eclipse paho interoperability tests -* Local Subscription -* Shared Subscription -* Proxy Protocol V1/2 -* Lua Hook and Web Hook -* LWM2M Prototol Support ## Installation The *EMQ* broker is cross-platform, which can be deployed on Linux, Unix, Mac, Windows and even Raspberry Pi. -Download the binary package for your platform from http://emqtt.io/downloads. +Download the binary package for your platform from [here](http://emqtt.io/downloads). + +-[Single Node Install](http://emqtt.io/docs/v2/install.html) +-[Multi Node Install](http://emqtt.io/docs/v2/cluster.html) -Documentation on [emqtt.io/docs/v2/](http://emqtt.io/docs/v2/install.html), [docs.emqtt.com](http://docs.emqtt.com/en/latest/install.html) for installation and configuration guide. ## Build From Source -The *EMQ* broker requires Erlang/OTP R19+ to build since 2.1 release. +The *EMQ* broker requires Erlang/OTP R21+ to build since 3.0 release. ``` git clone https://github.com/emqtt/emq-relx.git @@ -67,55 +31,51 @@ git clone https://github.com/emqtt/emq-relx.git cd emq-relx && make cd _rel/emqttd && ./bin/emqttd console + ``` -## Plugins +## Quick Start -The *EMQ* broker is highly extensible, with many hooks and plugins for customizing the authentication/ACL and integrating with other systems: + # Start emqttd + ./bin/emqttd start + + # Check Status + ./bin/emqttd_ctl status + + # Stop emqttd + ./bin/emqttd stop -Plugin | Description ------------------------------------------------------------------------|-------------------------------------- -[emq_plugin_template](https://github.com/emqtt/emq_plugin_template) | Plugin template and demo -[emq_dashboard](https://github.com/emqtt/emq_dashboard) | Web Dashboard -[emq_retainer](https://github.com/emqtt/emq-retainer) | Store MQTT Retained Messages -[emq_modules](https://github.com/emqtt/emq-modules) | Presence, Subscription and Rewrite Modules -[emq_auth_username](https://github.com/emqtt/emq_auth_username) | Username/Password Authentication Plugin -[emq_auth_clientid](https://github.com/emqtt/emq_auth_clientid) | ClientId Authentication Plugin -[emq_auth_mysql](https://github.com/emqtt/emq_auth_mysql) | MySQL Authentication/ACL Plugin -[emq_auth_pgsql](https://github.com/emqtt/emq_auth_pgsql) | PostgreSQL Authentication/ACL Plugin -[emq_auth_redis](https://github.com/emqtt/emq_auth_redis) | Redis Authentication/ACL Plugin -[emq_auth_mongo](https://github.com/emqtt/emq_auth_mongo) | MongoDB Authentication/ACL Plugin -[emq_auth_http](https://github.com/emqtt/emq_auth_http) | Authentication/ACL by HTTP API -[emq_auth_ldap](https://github.com/emqtt/emq_auth_ldap) | LDAP Authentication Plugin -[emq_auth_jwt](https://github.com/emqtt/emq-auth-jwt) | JWT Authentication Plugin -[emq_web_hook](https://github.com/emqtt/emq-web-hook) | Web Hook Plugin -[emq_lua_hook](https://github.com/emqtt/emq-lua-hook) | Lua Hook Plugin -[emq_sn](https://github.com/emqtt/emq_sn) | MQTT-SN Protocol Plugin -[emq_coap](https://github.com/emqtt/emq_coap) | CoAP Protocol Plugin -[emq_stomp](https://github.com/emqtt/emq_stomp) | Stomp Protocol Plugin -[emq_lwm2m](https://github.com/emqx/emqx-lwm2m) | LWM2M Prototol Plugin -[emq_recon](https://github.com/emqtt/emq_recon) | Recon Plugin -[emq_reloader](https://github.com/emqtt/emq_reloader) | Reloader Plugin -[emq_sockjs](https://github.com/emqtt/emq_sockjs) | SockJS(Stomp) Plugin + To view the dashboard after running, use your browser to open: http://localhost:18083 -## Supports -* Twitter: [@emqtt](https://twitter.com/emqtt) -* Homepage: http://emqtt.io -* Downloads: http://emqtt.io/downloads -* Documentation: http://emqtt.io/docs/v2/ -* Forum: https://groups.google.com/d/forum/emqtt -* Mailing List: -* Issues: https://github.com/emqtt/emqttd/issues -* QQ Group: 12222225 +## Roadmap -## Test Servers +The [EMQX roadmap uses Github milestones](https://github.com/emqtt/emqttd/milestones) to track the progress of the project. -The **q.emqtt.com** hosts a public Four-Node *EMQ* cluster on [QingCloud](https://qingcloud.com): +## Community, discussion, contribution, and support + +You can reach the EMQ community and developers via the following channels: +- [EMQX Slack](http://emqx.slack.com) + -[#emqx-users](https://emqx.slack.com/messages/CBUF2TTB8/) + -[#emqx-devs](https://emqx.slack.com/messages/CBSL57DUH/) +- [Mailing Lists]() +- [Twitter](https://twitter.com/emqtt) +- [Forum](https://groups.google.com/d/forum/emqtt) +- [Blog](https://medium.com/@emqtt) + +Please submit any bugs, issues, and feature requests to [emqtt/emqttd](//github.com/emqtt/emqttd/issues). -![qing_cluster](http://emqtt.io/static/img/public_cluster.png) ## License +Copyright (c) 2014-2018 [EMQ X Tech, LLC](http://emqtt.io) + +Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License at + +[http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0) + +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. + + -Apache License Version 2.0