diff --git a/src/emqttd.erl b/src/emqttd.erl index 521251efc..252d77d78 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -35,6 +35,9 @@ %% Hooks API -export([hook/4, hook/3, unhook/2, run_hooks/3]). +%% Adapter +-export([adapter/1]). + %% Debug API -export([dump/0]). @@ -157,6 +160,14 @@ unhook(Hook, Function) -> run_hooks(Hook, Args, Acc) -> emqttd_hook:run(Hook, Args, Acc). +%%-------------------------------------------------------------------- +%% Adapter +%%-------------------------------------------------------------------- + +adapter(server) -> env(pubsub_server, emqttd_server); +adapter(pubsub) -> env(pubsub_adapter, emqttd_pubsub); +adapter(bridge) -> env(bridge_adapter, emqttd_bridge). + %%-------------------------------------------------------------------- %% Debug %%-------------------------------------------------------------------- diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 81640c884..011bd8fe5 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -91,7 +91,7 @@ start_servers(Sup) -> {"emqttd broker", emqttd_broker}, {"emqttd alarm", emqttd_alarm}, {"emqttd mod supervisor", emqttd_mod_sup}, - {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, + {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup_sup}}, {"emqttd access control", emqttd_access_control}, {"emqttd system monitor", {supervisor, emqttd_sysmon_sup}}], [start_server(Sup, Server) || Server <- Servers]. diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index dca66a8b6..e808ee72a 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -18,56 +18,25 @@ -behavior(supervisor). --export([start_link/0, bridges/0, start_bridge/2, start_bridge/3, stop_bridge/2]). +-export([start_link/3]). -export([init/1]). --define(BRIDGE_ID(Node, Topic), {bridge, Node, Topic}). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- %% @doc Start bridge supervisor -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% @doc List all bridges --spec(bridges() -> [{tuple(), pid()}]). -bridges() -> - [{{Node, Topic}, Pid} || {?BRIDGE_ID(Node, Topic), Pid, worker, _} - <- supervisor:which_children(?MODULE)]. - -%% @doc Start a bridge --spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}). -start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> - start_bridge(Node, Topic, []). - --spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}). -start_bridge(Node, _Topic, _Options) when Node =:= node() -> - {error, bridge_to_self}; -start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -> - Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options), - supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). - -%% @doc Stop a bridge --spec(stop_bridge(atom(), binary()) -> {ok, pid()} | ok). -stop_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> - ChildId = ?BRIDGE_ID(Node, Topic), - case supervisor:terminate_child(?MODULE, ChildId) of - ok -> supervisor:delete_child(?MODULE, ChildId); - Error -> Error - end. +-spec(start_link(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}). +start_link(Node, Topic, Options) -> + supervisor:start_link(?MODULE, [Node, Topic, Options]). %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- -init([]) -> - {ok, {{one_for_one, 10, 100}, []}}. - -bridge_spec(Node, Topic, Options) -> - ChildId = ?BRIDGE_ID(Node, Topic), - {ChildId, {emqttd_bridge, start_link, [Node, Topic, Options]}, - transient, 10000, worker, [emqttd_bridge]}. +init([Node, Topic, Options]) -> + {ok, {{one_for_all, 10, 100}, + [{bridge, {emqttd_bridge, start_link, [Node, Topic, Options]}, + transient, 10000, worker, [emqttd_bridge]}]}}. diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 58143cc83..f87f87d48 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -57,17 +57,9 @@ pool_size(Env) -> pool_sup(Name, Env) -> Pool = list_to_atom(atom_to_list(Name) ++ "_pool"), - MFA = {adapter(Name), start_link, [Env]}, + MFA = {emqttd:adapter(Name), start_link, [Env]}, emqttd_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]). -%%-------------------------------------------------------------------- -%% Adapter -%%-------------------------------------------------------------------- - -adapter(server) -> - emqttd:env(pubsub_server, emqttd_server); -adapter(pubsub) -> - emqttd:env(pubsub_adapter, emqttd_pubsub). %%-------------------------------------------------------------------- %% Create PubSub Tables