From 6e64686f77f00fa8e8de2aa2e993c28a836471f0 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 12 Aug 2016 16:49:50 +0800 Subject: [PATCH 1/7] pass test cases first --- src/emqttd_cli.erl | 65 ++++++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 94c063c54..e338e09a6 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -217,41 +217,45 @@ topics(_) -> {"topics show ", "Show a topic"}]). subscriptions(["list"]) -> - lists:foreach(fun({Sub, Topic, Opts}) when is_pid(Sub) -> - ?PRINT("~p -> ~s: ~p~n", [Sub, Topic, Opts]); - ({Sub, Topic, Opts}) -> - ?PRINT("~s -> ~s: ~p~n", [Sub, Topic, Opts]) - end, emqttd:subscriptions()); + lists:foreach(fun(Subscription) -> + print(subscription, Subscription) + end, []); %%emqttd:subscriptions()); subscriptions(["show", ClientId]) -> - case mnesia:dirty_read(mqtt_subscription, bin(ClientId)) of + case ets:lookup(mqtt_subscription, bin(ClientId)) of [] -> ?PRINT_MSG("Not Found.~n"); - Records -> print(Records) + Records -> [print(subscription, Subscription) || Subscription <- Records] end; -subscriptions(["add", ClientId, Topic, QoS]) -> - Add = fun(IntQos) -> - Subscription = #mqtt_subscription{subid = bin(ClientId), - topic = bin(Topic), - qos = IntQos}, - case emqttd_backend:add_subscription(Subscription) of - ok -> - ?PRINT_MSG("ok~n"); - {error, already_existed} -> - ?PRINT_MSG("Error: already existed~n"); - {error, Reason} -> - ?PRINT("Error: ~p~n", [Reason]) - end - end, - if_valid_qos(QoS, Add); +%% +%% subscriptions(["add", ClientId, Topic, QoS]) -> +%% Add = fun(IntQos) -> +%% Subscription = #mqtt_subscription{subid = bin(ClientId), +%% topic = bin(Topic), +%% qos = IntQos}, +%% case emqttd_backend:add_subscription(Subscription) of +%% ok -> +%% ?PRINT_MSG("ok~n"); +%% {error, already_existed} -> +%% ?PRINT_MSG("Error: already existed~n"); +%% {error, Reason} -> +%% ?PRINT("Error: ~p~n", [Reason]) +%% end +%% end, +%% if_valid_qos(QoS, Add); +%% -subscriptions(["del", ClientId]) -> - Ok = emqttd_backend:del_subscriptions(bin(ClientId)), - ?PRINT("~p~n", [Ok]); +%% +%% subscriptions(["del", ClientId]) -> +%% Ok = emqttd_backend:del_subscriptions(bin(ClientId)), +%% ?PRINT("~p~n", [Ok]); +%% -subscriptions(["del", ClientId, Topic]) -> - Ok = emqttd_backend:del_subscription(bin(ClientId), bin(Topic)), - ?PRINT("~p~n", [Ok]); +%% +%% subscriptions(["del", ClientId, Topic]) -> +%% Ok = emqttd_backend:del_subscription(bin(ClientId), bin(Topic)), +%% ?PRINT("~p~n", [Ok]); +%% subscriptions(_) -> ?USAGE([{"subscriptions list", "List all subscriptions"}, @@ -526,6 +530,11 @@ print({ClientId, _ClientPid, CleanSess, SessInfo}) -> "created_at=~w)~n", [ClientId, CleanSess | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]). +print(subscription, {Sub, Topic, Opts}) when is_pid(Sub) -> + ?PRINT("~p -> ~s: ~p~n", [Sub, Topic, Opts]); +print(subscription, {Sub, Topic, Opts}) -> + ?PRINT("~s -> ~s: ~p~n", [Sub, Topic, Opts]). + format(created_at, Val) -> emqttd_time:now_to_secs(Val); From 0a967df15ae2672407deb6b79d0ac253bef29322 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 12 Aug 2016 22:23:29 +0800 Subject: [PATCH 2/7] do_subscribe_/4, do_unsubscribe_/3 --- src/emqttd_pubsub.erl | 12 ++++-------- src/emqttd_server.erl | 22 +++++++++++----------- src/emqttd_session.erl | 24 +++++++++++++++--------- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index eb4722bb8..036b345aa 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -162,18 +162,14 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- add_subscriber_(Topic, Subscriber) -> - case ets:member(mqtt_subscriber, Topic) of - false -> emqttd_router:add_route(Topic, node()); - true -> ok - end, + (not ets:member(mqtt_subscriber, Topic)) + andalso emqttd_router:add_route(Topic), ets:insert(mqtt_subscriber, {Topic, Subscriber}). del_subscriber_(Topic, Subscriber) -> ets:delete_object(mqtt_subscriber, {Topic, Subscriber}), - case ets:member(mqtt_subscriber, Topic) of - false -> emqttd_router:del_route(Topic, node()); - true -> ok - end. + (not ets:member(mqtt_subscriber, Topic)) + andalso emqttd_router:del_route(Topic). setstats(State) -> emqttd_stats:setstats('subscribers/count', 'subscribers/max', diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index 345b181f5..e9b2d0eb5 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -172,13 +172,13 @@ init([Pool, Id, Env]) -> {ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}. handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> - case subscribe_(Topic, Subscriber, Options, State) of + case do_subscribe_(Topic, Subscriber, Options, State) of {ok, NewState} -> {reply, ok, setstats(NewState)}; {error, Error} -> {reply, {error, Error}, State} end; handle_call({unsubscribe, Topic, Subscriber}, _From, State) -> - case unsubscribe_(Topic, Subscriber, State) of + case do_unsubscribe_(Topic, Subscriber, State) of {ok, NewState} -> {reply, ok, setstats(NewState), hibernate}; {error, Error} -> {reply, {error, Error}, State} end; @@ -198,13 +198,13 @@ handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). handle_cast({subscribe, Topic, Subscriber, Options}, State) -> - case subscribe_(Topic, Subscriber, Options, State) of + case do_subscribe_(Topic, Subscriber, Options, State) of {ok, NewState} -> {noreply, setstats(NewState)}; {error, _Error} -> {noreply, State} end; handle_cast({unsubscribe, Topic, Subscriber}, State) -> - case unsubscribe_(Topic, Subscriber, State) of + case do_unsubscribe_(Topic, Subscriber, State) of {ok, NewState} -> {noreply, setstats(NewState), hibernate}; {error, _Error} -> {noreply, State} end; @@ -233,7 +233,7 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Functions %%-------------------------------------------------------------------- -subscribe_(Topic, Subscriber, Options, State) -> +do_subscribe_(Topic, Subscriber, Options, State) -> case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of [] -> emqttd_pubsub:async_subscribe(Topic, Subscriber), @@ -244,7 +244,12 @@ subscribe_(Topic, Subscriber, Options, State) -> {error, {already_subscribed, Topic}} end. -unsubscribe_(Topic, Subscriber, State) -> +monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> + State#state{submon = PMon:monitor(SubPid)}; +monitor_subpid(_SubPid, State) -> + State. + +do_unsubscribe_(Topic, Subscriber, State) -> case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of [_] -> emqttd_pubsub:async_unsubscribe(Topic, Subscriber), @@ -258,11 +263,6 @@ unsubscribe_(Topic, Subscriber, State) -> {error, {subscription_not_found, Topic}} end. -monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> - State#state{submon = PMon:monitor(SubPid)}; -monitor_subpid(_SubPid, State) -> - State. - demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> State#state{submon = PMon:demonitor(SubPid)}; demonitor_subpid(_SubPid, State) -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 2035ce3f6..9345071dc 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -284,14 +284,18 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> - +handle_cast({subscribe, RawTopicTable, AckFun}, Session = #session{client_id = ClientId, + subscriptions = Subscriptions}) -> + %% TODO: Ugly... + TopicTable0 = lists:map(fun({T, Q}) -> + {T1, Opts} = emqttd_topic:strip(T), + {T1, [{qos, Q} | Opts]} + end, RawTopicTable), case emqttd:run_hooks('client.subscribe', [ClientId], TopicTable0) of {ok, TopicTable} -> ?LOG(info, "Subscribe ~p", [TopicTable], Session), Subscriptions1 = lists:foldl( - fun({Topic, Qos}, SubDict) -> + fun({Topic, Opts = [{qos, Qos}|_]}, SubDict) -> case dict:find(Topic, SubDict) of {ok, Qos} -> ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session), @@ -301,7 +305,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session), dict:store(Topic, Qos, SubDict); error -> - emqttd:subscribe(Topic, ClientId, [{qos, Qos}]), + emqttd:subscribe(Topic, ClientId, Opts), %%TODO: the design is ugly... %% : 3.8.4 %% Where the Topic Filter is not identical to any existing Subscription’s filter, @@ -319,9 +323,11 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = hibernate(Session) end; -handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> - +handle_cast({unsubscribe, RawTopics}, Session = #session{client_id = ClientId, + subscriptions = Subscriptions}) -> + Topics0 = lists:map(fun(Topic) -> + {T, _Opts} = emqttd_topic:strip(Topic), T + end, RawTopics), case emqttd:run_hooks('client.unsubscribe', [ClientId], Topics0) of {ok, Topics} -> ?LOG(info, "unsubscribe ~p", [Topics], Session), @@ -329,7 +335,7 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, fun(Topic, SubDict) -> case dict:find(Topic, SubDict) of {ok, _Qos} -> - emqttd:unsubscribe(ClientId, Topic), + emqttd:unsubscribe(Topic, ClientId), dict:erase(Topic, SubDict); error -> SubDict From 1266b4e86011ac42f78bb44d69dae15f5acdac03 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 12 Aug 2016 22:39:13 +0800 Subject: [PATCH 3/7] ack qos --- src/emqttd_session.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 9345071dc..55eb08b11 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -315,7 +315,7 @@ handle_cast({subscribe, RawTopicTable, AckFun}, Session = #session{client_id dict:store(Topic, Qos, SubDict) end end, Subscriptions, TopicTable), - AckFun([Qos || {_, Qos} <- TopicTable]), + AckFun([Qos || {_, Qos} <- RawTopicTable]), emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable), hibernate(Session#session{subscriptions = Subscriptions1}); {stop, TopicTable} -> From bc5cfb4b36479c11f77fa3af9b2068b48908cc4a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 13 Aug 2016 11:02:39 +0800 Subject: [PATCH 4/7] bridge sup --- src/emqttd.erl | 11 +++++++++ src/emqttd_app.erl | 2 +- src/emqttd_bridge_sup.erl | 47 +++++++-------------------------------- src/emqttd_pubsub_sup.erl | 10 +-------- 4 files changed, 21 insertions(+), 49 deletions(-) diff --git a/src/emqttd.erl b/src/emqttd.erl index 521251efc..252d77d78 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -35,6 +35,9 @@ %% Hooks API -export([hook/4, hook/3, unhook/2, run_hooks/3]). +%% Adapter +-export([adapter/1]). + %% Debug API -export([dump/0]). @@ -157,6 +160,14 @@ unhook(Hook, Function) -> run_hooks(Hook, Args, Acc) -> emqttd_hook:run(Hook, Args, Acc). +%%-------------------------------------------------------------------- +%% Adapter +%%-------------------------------------------------------------------- + +adapter(server) -> env(pubsub_server, emqttd_server); +adapter(pubsub) -> env(pubsub_adapter, emqttd_pubsub); +adapter(bridge) -> env(bridge_adapter, emqttd_bridge). + %%-------------------------------------------------------------------- %% Debug %%-------------------------------------------------------------------- diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 81640c884..011bd8fe5 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -91,7 +91,7 @@ start_servers(Sup) -> {"emqttd broker", emqttd_broker}, {"emqttd alarm", emqttd_alarm}, {"emqttd mod supervisor", emqttd_mod_sup}, - {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, + {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup_sup}}, {"emqttd access control", emqttd_access_control}, {"emqttd system monitor", {supervisor, emqttd_sysmon_sup}}], [start_server(Sup, Server) || Server <- Servers]. diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index dca66a8b6..e808ee72a 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -18,56 +18,25 @@ -behavior(supervisor). --export([start_link/0, bridges/0, start_bridge/2, start_bridge/3, stop_bridge/2]). +-export([start_link/3]). -export([init/1]). --define(BRIDGE_ID(Node, Topic), {bridge, Node, Topic}). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- %% @doc Start bridge supervisor -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% @doc List all bridges --spec(bridges() -> [{tuple(), pid()}]). -bridges() -> - [{{Node, Topic}, Pid} || {?BRIDGE_ID(Node, Topic), Pid, worker, _} - <- supervisor:which_children(?MODULE)]. - -%% @doc Start a bridge --spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}). -start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> - start_bridge(Node, Topic, []). - --spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}). -start_bridge(Node, _Topic, _Options) when Node =:= node() -> - {error, bridge_to_self}; -start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -> - Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options), - supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). - -%% @doc Stop a bridge --spec(stop_bridge(atom(), binary()) -> {ok, pid()} | ok). -stop_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> - ChildId = ?BRIDGE_ID(Node, Topic), - case supervisor:terminate_child(?MODULE, ChildId) of - ok -> supervisor:delete_child(?MODULE, ChildId); - Error -> Error - end. +-spec(start_link(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}). +start_link(Node, Topic, Options) -> + supervisor:start_link(?MODULE, [Node, Topic, Options]). %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- -init([]) -> - {ok, {{one_for_one, 10, 100}, []}}. - -bridge_spec(Node, Topic, Options) -> - ChildId = ?BRIDGE_ID(Node, Topic), - {ChildId, {emqttd_bridge, start_link, [Node, Topic, Options]}, - transient, 10000, worker, [emqttd_bridge]}. +init([Node, Topic, Options]) -> + {ok, {{one_for_all, 10, 100}, + [{bridge, {emqttd_bridge, start_link, [Node, Topic, Options]}, + transient, 10000, worker, [emqttd_bridge]}]}}. diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 58143cc83..f87f87d48 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -57,17 +57,9 @@ pool_size(Env) -> pool_sup(Name, Env) -> Pool = list_to_atom(atom_to_list(Name) ++ "_pool"), - MFA = {adapter(Name), start_link, [Env]}, + MFA = {emqttd:adapter(Name), start_link, [Env]}, emqttd_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]). -%%-------------------------------------------------------------------- -%% Adapter -%%-------------------------------------------------------------------- - -adapter(server) -> - emqttd:env(pubsub_server, emqttd_server); -adapter(pubsub) -> - emqttd:env(pubsub_adapter, emqttd_pubsub). %%-------------------------------------------------------------------- %% Create PubSub Tables From 8ca3430e390eff0aae6c939c057dac50fe189fff Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 13 Aug 2016 11:03:14 +0800 Subject: [PATCH 5/7] improve the design of bridge --- src/emqttd_bridge_sup_sup.erl | 76 +++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 src/emqttd_bridge_sup_sup.erl diff --git a/src/emqttd_bridge_sup_sup.erl b/src/emqttd_bridge_sup_sup.erl new file mode 100644 index 000000000..475149afe --- /dev/null +++ b/src/emqttd_bridge_sup_sup.erl @@ -0,0 +1,76 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_bridge_sup_sup). + +-behavior(supervisor). + +-export([start_link/0, bridges/0, start_bridge/2, start_bridge/3, stop_bridge/2]). + +-export([init/1]). + +-define(CHILD_ID(Node, Topic), {bridge_sup, Node, Topic}). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +%% @doc List all bridges +-spec(bridges() -> [{node(), binary(), pid()}]). +bridges() -> + [{Node, Topic, Pid} || {?CHILD_ID(Node, Topic), Pid, supervisor, _} + <- supervisor:which_children(?MODULE)]. + +%% @doc Start a bridge +-spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}). +start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> + start_bridge(Node, Topic, []). + +-spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}). +start_bridge(Node, _Topic, _Options) when Node =:= node() -> + {error, bridge_to_self}; +start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -> + Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options), + supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). + +%% @doc Stop a bridge +-spec(stop_bridge(atom(), binary()) -> {ok, pid()} | ok). +stop_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> + ChildId = ?CHILD_ID(Node, Topic), + case supervisor:terminate_child(?MODULE, ChildId) of + ok -> supervisor:delete_child(?MODULE, ChildId); + Error -> Error + end. + +%%-------------------------------------------------------------------- +%% Supervisor callbacks +%%-------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_one, 10, 100}, []}}. + +bridge_spec(Node, Topic, Options) -> + SupMod = sup_mod(emqttd:adapter(bridge)), + {?CHILD_ID(Node, Topic), + {SupMod, start_link, [Node, Topic, Options]}, + permanent, infinity, supervisor, [SupMod]}. + +sup_mod(Adaper) -> + list_to_atom(atom_to_list(Adaper) ++ "_sup"). + From d8aa7d9cfa798d18f80585e4f130be004c2cd975 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 13 Aug 2016 11:24:06 +0800 Subject: [PATCH 6/7] emqttd_bridge_sup_sup --- src/emqttd_cli.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index e338e09a6..6c77ddcd5 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -310,7 +310,7 @@ plugins(_) -> bridges(["list"]) -> foreach(fun({{Node, Topic}, _Pid}) -> ?PRINT("bridge: ~s--~s-->~s~n", [node(), Topic, Node]) - end, emqttd_bridge_sup:bridges()); + end, emqttd_bridge_sup_sup:bridges()); bridges(["options"]) -> ?PRINT_MSG("Options:~n"), @@ -322,20 +322,20 @@ bridges(["options"]) -> ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n"); bridges(["start", SNode, Topic]) -> - case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of + case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of {ok, _} -> ?PRINT_MSG("bridge is started.~n"); {error, Error} -> ?PRINT("error: ~p~n", [Error]) end; bridges(["start", SNode, Topic, OptStr]) -> Opts = parse_opts(bridge, OptStr), - case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of + case emqttd_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of {ok, _} -> ?PRINT_MSG("bridge is started.~n"); {error, Error} -> ?PRINT("error: ~p~n", [Error]) end; bridges(["stop", SNode, Topic]) -> - case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of + case emqttd_bridge_sup_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of ok -> ?PRINT_MSG("bridge is stopped.~n"); {error, Error} -> ?PRINT("error: ~p~n", [Error]) end; From f166143cd07e11ed1a7c597f313f0b8772e56e7d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 13 Aug 2016 12:21:18 +0800 Subject: [PATCH 7/7] Don't register name for pool_sup --- src/emqttd_pool_sup.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/emqttd_pool_sup.erl b/src/emqttd_pool_sup.erl index 0f16df67c..b47199cb0 100644 --- a/src/emqttd_pool_sup.erl +++ b/src/emqttd_pool_sup.erl @@ -41,10 +41,10 @@ start_link(Pool, Type, MFA) -> -spec(start_link(atom(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, any()}). start_link(Pool, Type, Size, MFA) -> - supervisor:start_link({local, sup_name(Pool)}, ?MODULE, [Pool, Type, Size, MFA]). + supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]). -sup_name(Pool) when is_atom(Pool) -> - list_to_atom(atom_to_list(Pool) ++ "_pool_sup"). +%% sup_name(Pool) when is_atom(Pool) -> +%% list_to_atom(atom_to_list(Pool) ++ "_pool_sup"). init([Pool, Type, Size, {M, F, Args}]) -> ensure_pool(Pool, Type, [{size, Size}]),