From e61d8b559532109e359ddd467784244914aa984e Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Wed, 11 Mar 2015 13:59:50 +0800 Subject: [PATCH] bridge options... --- apps/emqttd/src/emqttd_bridge.erl | 74 ++++++++++++++++++++++----- apps/emqttd/src/emqttd_bridge_sup.erl | 29 +++++++---- 2 files changed, 79 insertions(+), 24 deletions(-) diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index ba2736090..d80979516 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -33,38 +33,73 @@ -include("emqttd_packet.hrl"). %% API Function Exports --export([start_link/2]). +-export([start_link/3]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(PING_INTERVAL, 1000). +-define(PING_DOWN_INTERVAL, 1000). --record(state, {node, local_topic, status = up}). +-record(state, {node, subtopic, + qos, + topic_suffix = <<>>, + topic_prefix = <<>>, + max_queue_len = 0, + ping_down_interval = ?PING_DOWN_INTERVAL, + status = up}). + +-type option() :: {max_queue_len, pos_integer()} | + {qos, mqtt_qos()} | + {topic_suffix, binary()} | + {topic_prefix, binary()} | + {ping_down_interval, pos_integer()}. + +-export_type([option/0]). %%%============================================================================= %%% API %%%============================================================================= -start_link(Node, LocalTopic) -> - gen_server:start_link(?MODULE, [Node, LocalTopic], []). +%%------------------------------------------------------------------------------ +%% @doc +%% Start a bridge. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec start_link(atom(), binary(), [option()]) -> {ok, pid()} | ignore | {error, term()}. +start_link(Node, SubTopic, Options) -> + gen_server:start_link(?MODULE, [Node, SubTopic, Options], []). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Node, LocalTopic]) -> +init([Node, SubTopic, Options]) -> process_flag(trap_exit, true), case net_kernel:connect_node(Node) of true -> true = erlang:monitor_node(Node, true), - emqttd_pubsub:subscribe({LocalTopic, ?QOS_0}, self()), - {ok, #state{node = Node, local_topic = LocalTopic}}; + State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}), + emqttd_pubsub:subscribe({SubTopic, ?QOS_0}, self()), + {ok, State}; false -> {stop, {cannot_connect, Node}} end. +parse_opts([], State) -> + State; +parse_opts([{qos, Qos} | Opts], State) -> + parse_opts(Opts, State#state{qos = Qos}); +parse_opts([{topic_suffix, Suffix} | Opts], State) -> + parse_opts(Opts, State#state{topic_suffix= Suffix}); +parse_opts([{topic_prefix, Prefix} | Opts], State) -> + parse_opts(Opts, State#state{topic_prefix = Prefix}); +parse_opts([{max_queue_len, Len} | Opts], State) -> + parse_opts(Opts, State#state{max_queue_len = Len}); +parse_opts([{ping_down_interval, Interval} | Opts], State) -> + parse_opts(Opts, State#state{ping_down_interval = Interval*1000}). + handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -76,12 +111,12 @@ handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = down} {noreply, State}; handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) -> - rpc:cast(Node, emqttd_router, route, [Msg]), + rpc:cast(Node, emqttd_router, route, [transform(Msg, State)]), {noreply, State}; -handle_info({nodedown, Node}, State = #state{node = Node}) -> +handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> lager:warning("Bridge Node Down: ~p", [Node]), - erlang:send_after(?PING_INTERVAL, self(), ping_down_node), + erlang:send_after(Interval, self(), ping_down_node), {noreply, State#state{status = down}}; handle_info({nodeup, Node}, State = #state{node = Node}) -> @@ -95,14 +130,14 @@ handle_info({nodeup, Node}, State = #state{node = Node}) -> {noreply, State#state{status = down}} end; -handle_info(ping_down_node, State = #state{node = Node}) -> +handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) -> Self = self(), spawn_link(fun() -> case net_kernel:connect_node(Node) of true -> %%TODO: this is not right... fixme later Self ! {nodeup, Node}; false -> - erlang:send_after(?PING_INTERVAL, Self, ping_down_node) + erlang:send_after(Interval, Self, ping_down_node) end end), {noreply, State}; @@ -124,3 +159,16 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +%TODO: qos is not right... +transform(Msg = #mqtt_message{topic = Topic}, #state{qos = Qos, + topic_prefix = Prefix, + topic_suffix = Suffix}) -> + Msg1 = + if + Qos =:= undefined -> Msg; + true -> Msg#mqtt_message{qos = Qos} + end, + Msg1#mqtt_message{topic = <>}. + + + diff --git a/apps/emqttd/src/emqttd_bridge_sup.erl b/apps/emqttd/src/emqttd_bridge_sup.erl index ecdddc5e4..542cabd42 100644 --- a/apps/emqttd/src/emqttd_bridge_sup.erl +++ b/apps/emqttd/src/emqttd_bridge_sup.erl @@ -30,7 +30,9 @@ -behavior(supervisor). --export([start_link/0, start_bridge/2, stop_bridge/2]). +-export([start_link/0, + start_bridge/2, start_bridge/3, + stop_bridge/2]). -export([init/1]). @@ -54,8 +56,14 @@ start_link() -> %% @end %%------------------------------------------------------------------------------ -spec start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}. -start_bridge(Node, LocalTopic) when is_atom(Node) and is_binary(LocalTopic) -> - supervisor:start_child(?MODULE, bridge_spec(Node, LocalTopic)). +start_bridge(Node, SubTopic) when is_atom(Node) and is_binary(SubTopic) -> + start_bridge(Node, SubTopic, []). + +-spec start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}. +start_bridge(Node, SubTopic, Options) when is_atom(Node) and is_binary(SubTopic) -> + {ok, Env} = application:get_env(emqttd, bridge), + Options1 = emqttd_opts:merge(Env, Options), + supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1)). %%------------------------------------------------------------------------------ %% @doc @@ -64,8 +72,8 @@ start_bridge(Node, LocalTopic) when is_atom(Node) and is_binary(LocalTopic) -> %% @end %%------------------------------------------------------------------------------ -spec stop_bridge(atom(), binary()) -> {ok, pid()} | ok. -stop_bridge(Node, LocalTopic) -> - ChildId = bridge_id(Node, LocalTopic), +stop_bridge(Node, SubTopic) -> + ChildId = bridge_id(Node, SubTopic), case supervisor:terminate_child(ChildId) of ok -> supervisor:delete_child(?MODULE, ChildId); @@ -80,12 +88,11 @@ stop_bridge(Node, LocalTopic) -> init([]) -> {ok, {{one_for_one, 10, 100}, []}}. -bridge_id(Node, LocalTopic) -> - {bridge, Node, LocalTopic}. +bridge_id(Node, SubTopic) -> + {bridge, Node, SubTopic}. -bridge_spec(Node, LocalTopic) -> - ChildId = bridge_id(Node, LocalTopic), - {ChildId, {emqttd_bridge, start_link, [Node, LocalTopic]}, +bridge_spec(Node, SubTopic, Options) -> + ChildId = bridge_id(Node, SubTopic), + {ChildId, {emqttd_bridge, start_link, [Node, SubTopic, Options]}, transient, 10000, worker, [emqttd_bridge]}. -