From 62e4c749d5307e0443c520a44ac46b26cc159646 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 11 Dec 2015 00:43:32 +0800 Subject: [PATCH 01/11] 0.14.0 --- plugins/emqttd_dashboard | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/emqttd_dashboard b/plugins/emqttd_dashboard index 5276a19f1..86bfd8907 160000 --- a/plugins/emqttd_dashboard +++ b/plugins/emqttd_dashboard @@ -1 +1 @@ -Subproject commit 5276a19f1fb8b0868b02484e1e6974f12236c8fb +Subproject commit 86bfd890776f67b29d9df84bdf1b95ffe4702a5f From b172a78fcd50ab6093089ecbf55f10eca48451dc Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 16 Dec 2015 10:18:28 +0800 Subject: [PATCH 02/11] lookup/2, tune_qos --- src/emqttd_pubsub.erl | 34 ++++++++++++++++++++++++++++++---- src/emqttd_session.erl | 4 ++-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index bb8032c79..308be6184 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -39,11 +39,13 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). -%% API Exports +%% API Exports -export([start_link/4]). --export([create/2, subscribe/1, subscribe/2, - unsubscribe/1, unsubscribe/2, publish/1]). +-export([create/2, lookup/2, subscribe/1, subscribe/2, + unsubscribe/1, unsubscribe/2, publish/1, delete/2]). + +%% Subscriptions API %% Local node -export([match/1]). @@ -154,6 +156,30 @@ create(subscription, {SubId, Topic, Qos}) -> {aborted, Error} -> {error, Error} end. +%%------------------------------------------------------------------------------ +%% @doc Lookup Topic or Subscription. +%% @end +%%------------------------------------------------------------------------------ +-spec lookup(topic | subscription, binary()) -> list(). +lookup(topic, Topic) -> + mnesia:dirty_read(topic, Topic); + +lookup(subscription, ClientId) -> + mnesia:dirty_read(subscription, ClientId). + +%%------------------------------------------------------------------------------ +%% @doc Delete Topic or Subscription. +%% @end +%%------------------------------------------------------------------------------ +delete(topic, _Topic) -> + {error, unsupported}; + +delete(subscription, ClientId) when is_binary(ClientId) -> + mnesia:dirty_deleate({subscription, ClientId}); + +delete(subscription, {ClientId, Topic}) when is_binary(ClientId) -> + mnesia:async_dirty(fun remove_subscriptions/2, [ClientId, [Topic]]). + %%------------------------------------------------------------------------------ %% @doc Subscribe Topics %% @end @@ -363,7 +389,7 @@ remove_subscriptions(SubId, Topics) -> lists:foreach(fun(Topic) -> Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}, Records = mnesia:match_object(subscription, Pattern, write), - [delete_subscription(Record) || Record <- Records] + lists:foreach(fun delete_subscription/1, Records) end, Topics). delete_subscription(Record) -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index e2606ae5d..dde925ab0 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -483,7 +483,7 @@ handle_cast(Msg, State) -> %% Dispatch Message handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions}) when is_record(Msg, mqtt_message) -> - dispatch(fixqos(Topic, Msg, Subscriptions), Session); + dispatch(tune_qos(Topic, Msg, Subscriptions), Session); handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, awaiting_ack = AwaitingAck}) -> @@ -603,7 +603,7 @@ dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) end. -fixqos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) -> +tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) -> case dict:find(Topic, Subscriptions) of {ok, SubQos} when PubQos > SubQos -> Msg#mqtt_message{qos = SubQos}; From d3ee464789e3c3baa532388a09e470c6f0a47c1b Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 16 Dec 2015 10:22:37 +0800 Subject: [PATCH 03/11] topics, subscriptions --- src/emqttd_cli.erl | 138 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 108 insertions(+), 30 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index ff8e9806a..a7656f9a3 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -29,6 +29,8 @@ -include("emqttd_cli.hrl"). +-include("emqttd_protocol.hrl"). + -import(lists, [foreach/2]). -import(proplists, [get_value/2]). @@ -36,10 +38,8 @@ -export([load/0]). -export([status/1, broker/1, cluster/1, bridges/1, - clients/1, sessions/1, plugins/1, listeners/1, - vm/1, mnesia/1, trace/1]). - -%% TODO: topics, subscriptions... + clients/1, sessions/1, topics/1, subscriptions/1, + plugins/1, listeners/1, vm/1, mnesia/1, trace/1]). -define(PROC_INFOKEYS, [status, memory, @@ -49,6 +49,8 @@ stack_size, reductions]). +-define(MAX_LINES, 20000). + -define(APP, emqttd). load() -> @@ -70,10 +72,10 @@ status([]) -> {InternalStatus, _ProvidedStatus} = init:get_status(), ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), case lists:keysearch(?APP, 1, application:which_applications()) of - false -> - ?PRINT_MSG("emqttd is not running~n"); - {value, {?APP, _Desc, Vsn}} -> - ?PRINT("emqttd ~s is running~n", [Vsn]) + false -> + ?PRINT_MSG("emqttd is not running~n"); + {value, {?APP, _Desc, Vsn}} -> + ?PRINT("emqttd ~s is running~n", [Vsn]) end; status(_) -> ?PRINT_CMD("status", "query broker status"). @@ -101,12 +103,12 @@ broker(["metrics"]) -> broker(["pubsub"]) -> Pubsubs = supervisor:which_children(emqttd_pubsub_sup), foreach(fun({{_, Id}, Pid, _, _}) -> - ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), - ?PRINT("pubsub: ~w~n", [Id]), - foreach(fun({Key, Val}) -> - ?PRINT(" ~-18s: ~w~n", [Key, Val]) - end, ProcInfo) - end, lists:reverse(Pubsubs)); + ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), + ?PRINT("pubsub: ~w~n", [Id]), + foreach(fun({Key, Val}) -> + ?PRINT(" ~-18s: ~w~n", [Key, Val]) + end, ProcInfo) + end, lists:reverse(Pubsubs)); broker(_) -> ?USAGE([{"broker", "query broker version, uptime and description"}, @@ -123,7 +125,7 @@ cluster([]) -> ?PRINT("cluster nodes: ~p~n", [Nodes]); cluster(usage) -> - ?PRINT_CMD("cluster []", "cluster with node, query cluster info "); + ?PRINT_CMD("cluster []", "cluster with node, query cluster info"); cluster([SNode]) -> Node = emqttd_dist:parse_node(SNode), @@ -171,24 +173,22 @@ clients(["list"]) -> emqttd_mnesia:dump(ets, mqtt_client, fun print/1); clients(["show", ClientId]) -> - case emqttd_cm:lookup(list_to_binary(ClientId)) of - undefined -> ?PRINT_MSG("Not Found.~n"); - Client -> print(Client) - end; + if_client(ClientId, fun print/1); clients(["kick", ClientId]) -> - case emqttd_cm:lookup(list_to_binary(ClientId)) of - undefined -> - ?PRINT_MSG("Not Found.~n"); - #mqtt_client{client_pid = Pid} -> - emqttd_client:kick(Pid) - end; + if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end); clients(_) -> ?USAGE([{"clients list", "list all clients"}, {"clients show ", "show a client"}, {"clients kick ", "kick a client"}]). +if_client(ClientId, Fun) -> + case emqttd_cm:lookup(bin(ClientId)) of + undefined -> ?PRINT_MSG("Not Found.~n"); + Client -> Fun(Client) + end. + %%------------------------------------------------------------------------------ %% @doc Sessions Command %% @end @@ -203,7 +203,7 @@ sessions(["list", "transient"]) -> emqttd_mnesia:dump(ets, mqtt_transient_session, fun print/1); sessions(["show", ClientId]) -> - MP = {{list_to_binary(ClientId), '_'}, '_'}, + MP = {{bin(ClientId), '_'}, '_'}, case {ets:match_object(mqtt_transient_session, MP), ets:match_object(mqtt_persistent_session, MP)} of {[], []} -> @@ -220,6 +220,70 @@ sessions(_) -> {"sessions list transient", "list all transient sessions"}, {"sessions show ", "show a session"}]). +%%------------------------------------------------------------------------------ +%% @doc Topics Command +%% @end +%%------------------------------------------------------------------------------ +topics(["list"]) -> + Print = fun(Topic, Records) -> print(topic, Topic, Records) end, + if_could_print(topic, Print); + +topics(["show", Topic]) -> + print(topic, Topic, ets:lookup(topic, bin(Topic))); + +topics(_) -> + ?USAGE([{"topics list", "list all topics"}, + {"topics show ", "show a topic"}]). + +subscriptions(["list"]) -> + Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end, + if_subscription(fun() -> if_could_print(subscription, Print) end); + +subscriptions(["show", ClientId]) -> + if_subscription(fun() -> + case emqttd_pubsub:lookup(subscription, ClientId) of + [] -> ?PRINT_MSG("Not Found.~n"); + Records -> print(subscription, ClientId, Records) + end + end); + +subscriptions(["add", ClientId, Topic, QoS]) -> + Create = fun(IntQos) -> emqttd_pubsub:create(subscription, {ClientId, Topic, IntQos}) end, + if_subscription(fun() -> if_valid_qos(QoS, Create) end); + +subscriptions(["del", ClientId, Topic]) -> + if_subscription(fun() -> + emqttd_pubsub:delete(subscription, {ClientId, Topic}) + end); + +subscriptions(_) -> + ?USAGE([{"subscriptions list", "list all subscriptions"}, + {"subscriptions show ", "show subscriptions of a client"}, + {"subscriptions add ", "add subscription"}, + {"subscriptions del ", "delete subscription"}]). + +if_subscription(Fun) -> + case ets:info(subscription, name) of + undefined -> ?PRINT_MSG("Error: subscription table not found!~n"); + _ -> Fun() + end. + +if_could_print(Tab, Fun) -> + case mnesia:table_info(Tab, size) of + Size when Size >= ?MAX_LINES -> + ?PRINT("Could not list, too many ~ss: ~p~n", [Tab, Size]); + _Size -> + Keys = mnesia:dirty_all_keys(Tab), + foreach(fun(Key) -> Fun(Key, ets:lookup(Tab, Key)) end, Keys) + end. + +if_valid_qos(QoS, Fun) -> + try list_to_integer(QoS) of + Int when ?IS_QOS(Int) -> Fun(Int) + catch _:_ -> + ?PRINT_MSG("QoS should be 0, 1, 2~n") + end. + plugins(["list"]) -> foreach(fun print/1, emqttd_plugins:list()); @@ -296,9 +360,9 @@ parse_opts(Cmd, OptStr) -> parse_opt(bridge, qos, Qos) -> {qos, list_to_integer(Qos)}; parse_opt(bridge, suffix, Suffix) -> - {topic_suffix, list_to_binary(Suffix)}; + {topic_suffix, bin(Suffix)}; parse_opt(bridge, prefix, Prefix) -> - {topic_prefix, list_to_binary(Prefix)}; + {topic_prefix, bin(Prefix)}; parse_opt(bridge, queue, Len) -> {max_queue_len, list_to_integer(Len)}; parse_opt(_Cmd, Opt, _Val) -> @@ -377,7 +441,7 @@ trace(_) -> {"trace topic off", "stop to trace Topic"}]). trace_on(Who, Name, LogFile) -> - case emqttd_trace:start_trace({Who, list_to_binary(Name)}, LogFile) of + case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of ok -> ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); {error, Error} -> @@ -385,7 +449,7 @@ trace_on(Who, Name, LogFile) -> end. trace_off(Who, Name) -> - case emqttd_trace:stop_trace({Who, list_to_binary(Name)}) of + case emqttd_trace:stop_trace({Who, bin(Name)}) of ok -> ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); {error, Error} -> @@ -423,6 +487,9 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, emqttd_net:format(Peername), emqttd_util:now_to_secs(ConnectedAt)]); +print(#mqtt_topic{topic = Topic, node = Node}) -> + ?PRINT("~s on ~s~n", [Topic, Node]); + print({{ClientId, _ClientPid}, SessInfo}) -> InfoKeys = [clean_sess, max_inflight, @@ -440,6 +507,14 @@ print({{ClientId, _ClientPid}, SessInfo}) -> "created_at=~w, subscriptions=~s)~n", [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). +print(topic, Topic, Records) -> + Nodes = [Node || #mqtt_topic{node = Node} <- Records], + ?PRINT("~s: on ~p~n", [Topic, Nodes]); + +print(subscription, ClientId, Subscriptions) -> + TopicTable = [{Topic, Qos} || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], + ?PRINT("~s: ~p~n", [ClientId, TopicTable]). + format(created_at, Val) -> emqttd_util:now_to_secs(Val); @@ -449,3 +524,6 @@ format(subscriptions, List) -> format(_, Val) -> Val. +bin(S) when is_list(S) -> list_to_binary(S); +bin(B) when is_binary(B) -> B. + From 7cd5367f3b125120c26b986f53a3cbd4d5dee4f8 Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 16 Dec 2015 11:04:20 +0800 Subject: [PATCH 04/11] binary --- src/emqttd_pubsub.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 308be6184..d9a1787d7 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -150,7 +150,7 @@ create(topic, Topic) when is_binary(Topic) -> {aborted, Error} -> {error, Error} end; -create(subscription, {SubId, Topic, Qos}) -> +create(subscription, {SubId, Topic, Qos}) when is_binary(SubId) andalso is_binary(Topic) -> case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, Qos}]) of {atomic, ok} -> ok; {aborted, Error} -> {error, Error} From e37b00a9e46919118f9f22f4b114735dbda61d01 Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 16 Dec 2015 11:04:51 +0800 Subject: [PATCH 05/11] fix subscriptions cli --- src/emqttd_cli.erl | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index a7656f9a3..8e6000b4d 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -241,19 +241,26 @@ subscriptions(["list"]) -> subscriptions(["show", ClientId]) -> if_subscription(fun() -> - case emqttd_pubsub:lookup(subscription, ClientId) of + case emqttd_pubsub:lookup(subscription, bin(ClientId)) of [] -> ?PRINT_MSG("Not Found.~n"); Records -> print(subscription, ClientId, Records) end end); subscriptions(["add", ClientId, Topic, QoS]) -> - Create = fun(IntQos) -> emqttd_pubsub:create(subscription, {ClientId, Topic, IntQos}) end, + Create = fun(IntQos) -> + Subscription = {bin(ClientId), bin(Topic), IntQos}, + case emqttd_pubsub:create(subscription, Subscription) of + ok -> ?PRINT_MSG("ok~n"); + {error, Error} -> ?PRINT("Error: ~p~n", [Error]) + end + end, if_subscription(fun() -> if_valid_qos(QoS, Create) end); subscriptions(["del", ClientId, Topic]) -> if_subscription(fun() -> - emqttd_pubsub:delete(subscription, {ClientId, Topic}) + Ok = emqttd_pubsub:delete(subscription, {bin(ClientId), bin(Topic)}), + ?PRINT("~p~n", [Ok]) end); subscriptions(_) -> @@ -279,7 +286,8 @@ if_could_print(Tab, Fun) -> if_valid_qos(QoS, Fun) -> try list_to_integer(QoS) of - Int when ?IS_QOS(Int) -> Fun(Int) + Int when ?IS_QOS(Int) -> Fun(Int); + _ -> ?PRINT_MSG("QoS should be 0, 1, 2~n") catch _:_ -> ?PRINT_MSG("QoS should be 0, 1, 2~n") end. From fe82fde717f75d2cbd4cabd2c62b2e81417ae3a0 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 17 Dec 2015 12:38:13 +0800 Subject: [PATCH 06/11] subscription --- ...utosub.erl => emqttd_mod_subscription.erl} | 52 ++++++++++++------- test/emqttd_mod_subscription_tests.erl | 18 +++++++ 2 files changed, 52 insertions(+), 18 deletions(-) rename src/{emqttd_mod_autosub.erl => emqttd_mod_subscription.erl} (56%) create mode 100644 test/emqttd_mod_subscription_tests.erl diff --git a/src/emqttd_mod_autosub.erl b/src/emqttd_mod_subscription.erl similarity index 56% rename from src/emqttd_mod_autosub.erl rename to src/emqttd_mod_subscription.erl index dff147969..a4d786f26 100644 --- a/src/emqttd_mod_autosub.erl +++ b/src/emqttd_mod_subscription.erl @@ -19,11 +19,11 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc emqttd auto subscribe module. +%%% @doc Subscription from Broker Side %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- --module(emqttd_mod_autosub). +-module(emqttd_mod_subscription). -behaviour(emqttd_gen_mod). @@ -33,29 +33,45 @@ -export([load/1, client_connected/3, unload/1]). --record(state, {topics}). +-record(state, {topics, stored = false}). + +-ifdef(TEST). +-compile(export_all). +-endif. load(Opts) -> - Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], + Topics = [{bin(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)], + State = #state{topics = Topics, stored = lists:member(stored, Opts)}, emqttd_broker:hook('client.connected', {?MODULE, client_connected}, - {?MODULE, client_connected, [Topics]}), - {ok, #state{topics = Topics}}. + {?MODULE, client_connected, [State]}), + {ok, State}. -client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, +client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid, - username = Username}, Topics) -> - F = fun(Topic) -> - Topic1 = emqttd_topic:feed_var(<<"$c">>, ClientId, Topic), - if - Username =:= undefined -> Topic1; - true -> emqttd_topic:feed_var(<<"$u">>, Username, Topic1) - end - end, - emqttd_client:subscribe(ClientPid, [{F(Topic), Qos} || {Topic, Qos} <- Topics]); + username = Username}, + #state{topics = Topics, stored = Stored}) -> + Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end, + TopicTable = with_stored(Stored, ClientId, [{Replace(Topic), Qos} || {Topic, Qos} <- Topics]), + emqttd_client:subscribe(ClientPid, TopicTable). -client_connected(_ConnAck, _Client, _Topics) -> - ignore. +with_stored(false, _ClientId, TopicTable) -> + TopicTable; +with_stored(true, ClientId, TopicTable) -> + Fun = fun(#mqtt_subscription{topic = Topic, qos = Qos}) -> {Topic, Qos} end, + emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_pubsub:lookup(subscription, ClientId)], TopicTable). unload(_Opts) -> emqttd_broker:unhook('client.connected', {?MODULE, client_connected}). +rep(<<"$c">>, ClientId, Topic) -> + emqttd_topic:feed_var(<<"$c">>, ClientId, Topic); +rep(<<"$u">>, undefined, Topic) -> + Topic; +rep(<<"$u">>, Username, Topic) -> + emqttd_topic:feed_var(<<"$u">>, Username, Topic). + +bin(B) when is_binary(B) -> + B; +bin(S) when is_list(S) -> + list_to_binary(S). + diff --git a/test/emqttd_mod_subscription_tests.erl b/test/emqttd_mod_subscription_tests.erl new file mode 100644 index 000000000..4f1cffbef --- /dev/null +++ b/test/emqttd_mod_subscription_tests.erl @@ -0,0 +1,18 @@ +-module(emqttd_mod_subscription_tests). + +-include("emqttd.hrl"). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +-define(M, emqttd_mod_subscription). + +rep_test() -> + ?assertEqual(<<"topic/clientId">>, ?M:rep(<<"$c">>, <<"clientId">>, <<"topic/$c">>)), + ?assertEqual(<<"topic/username">>, ?M:rep(<<"$u">>, <<"username">>, <<"topic/$u">>)), + ?assertEqual(<<"topic/username/clientId">>, + ?M:rep(<<"$c">>, <<"clientId">>, + ?M:rep(<<"$u">>, <<"username">>, <<"topic/$u/$c">>))). + +-endif. From 714aa4dc9d4aa97437fdd4036ab73657b1776381 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 17 Dec 2015 12:38:48 +0800 Subject: [PATCH 07/11] subscription --- rel/files/emqttd.config.development | 16 ++++++++++++---- rel/files/emqttd.config.production | 14 +++++++++++--- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index 50339361c..76375d227 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -1,6 +1,6 @@ % -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- %% ex: ft=erlang ts=4 sw=4 et -[{kernel, +[{kernel, [{start_timer, true}, {start_pg2, true} ]}, @@ -175,14 +175,22 @@ {modules, [ %% Client presence management module. %% Publish messages when client connected or disconnected - {presence, [{qos, 0}]} + {presence, [{qos, 0}]}, %% Subscribe topics automatically when client connected - %% {autosub, [{"$Q/client/$c", 0}]} + {subscription, [ + %% Subscription from stored table + stored, + + %% $u will be replaced with username + {"$Q/username/$u", 1}, + + %% $c will be replaced with clientid + {"$Q/client/$c", 1} + ]} %% Rewrite rules %% {rewrite, [{file, "etc/rewrite.config"}]} - ]}, %% Plugins {plugins, [ diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index e602e60ba..ef18232b8 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -167,14 +167,22 @@ {modules, [ %% Client presence management module. %% Publish messages when client connected or disconnected - {presence, [{qos, 0}]} + %% {presence, [{qos, 0}]}, %% Subscribe topics automatically when client connected - %% {autosub, [{"$Q/client/$c", 0}]} + %% {subscription, [ + %% %% Subscription from stored table + %% stored, + %% + %% %% $u will be replaced with username + %% {"$Q/username/$u", 1}, + %% + %% %% $c will be replaced with clientid + %% {"$Q/client/$c", 1} + %% ]} %% Rewrite rules %% {rewrite, [{file, "etc/rewrite.config"}]} - ]}, %% Plugins {plugins, [ From 7ffa25c65572c7040e88ed41c64f920657896073 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 17 Dec 2015 14:12:35 +0800 Subject: [PATCH 08/11] destroy --- src/emqttd_router.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 3eed025b4..9f10f3b58 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -68,6 +68,12 @@ ensure_tab(Tab, Opts) -> ok end. +-ifdef(TEST). +destory() -> + ets:delete(route), + ets:delete(reverse_route). +-endif. + %%------------------------------------------------------------------------------ %% @doc Add Routes. %% @end From be390d42acdb1100b5a5e38d78d61ef452e76e37 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 17 Dec 2015 14:12:45 +0800 Subject: [PATCH 09/11] stop --- src/emqttd_stats.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 7135a2315..97d9b8027 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -31,7 +31,7 @@ -define(SERVER, ?MODULE). --export([start_link/0]). +-export([start_link/0, stop/0]). %% statistics API. -export([statsfun/1, statsfun/2, @@ -88,6 +88,9 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +stop() -> + gen_server:call(?SERVER, stop). + %%------------------------------------------------------------------------------ %% @doc Generate stats fun %% @end @@ -149,6 +152,9 @@ init([]) -> % Tick to publish stats {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; + handle_call(_Request, _From, State) -> {reply, error, State}. From f8c638d1130c40758ead7b0ea5f9de27d9c2f0c8 Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 17 Dec 2015 14:12:53 +0800 Subject: [PATCH 10/11] route_tests --- test/emqttd_router_tests.erl | 39 ++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 test/emqttd_router_tests.erl diff --git a/test/emqttd_router_tests.erl b/test/emqttd_router_tests.erl new file mode 100644 index 000000000..14cad08e3 --- /dev/null +++ b/test/emqttd_router_tests.erl @@ -0,0 +1,39 @@ + +-module(emqttd_router_tests). + +-include("emqttd.hrl"). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +-define(ROUTER, emqttd_router). + +route_test_() -> + {setup, + fun() -> ?ROUTER:init([]) end, + fun(_) -> ?ROUTER:destory() end, + [?_test(t_add_routes()), + ?_test(t_delete_routes()), + ?_test(t_has_route()), + ?_test(t_route()) + ]}. + +t_add_routes() -> + Pid = self(), + ok. + %?ROUTER:add_routes([<<"a">>, <<"b">>], Pid), + %?assertEqual([{<<"a">>, Pid}, {<<"b">>, Pid}], lists:sort(ets:tab2list(route))), + %?assertEqual([{Pid, <<"a">>}, {Pid, <<"b">>}], lists:sort(ets:tab2list(reverse_route))). + +t_delete_routes() -> + ok. + +t_has_route() -> + ok. + +t_route() -> + ok. + +-endif. + From d0d5090bde2aa73706713b57775e7167a6dcadab Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 17 Dec 2015 15:18:03 +0800 Subject: [PATCH 11/11] uncomment presence module --- rel/files/emqttd.config.production | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index ef18232b8..6176e7a47 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -167,7 +167,7 @@ {modules, [ %% Client presence management module. %% Publish messages when client connected or disconnected - %% {presence, [{qos, 0}]}, + {presence, [{qos, 0}]} %% Subscribe topics automatically when client connected %% {subscription, [