bridge options...
This commit is contained in:
parent
2f9e6f980b
commit
e61d8b5595
|
@ -33,38 +33,73 @@
|
|||
-include("emqttd_packet.hrl").
|
||||
|
||||
%% API Function Exports
|
||||
-export([start_link/2]).
|
||||
-export([start_link/3]).
|
||||
|
||||
%% gen_server Function Exports
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-define(PING_INTERVAL, 1000).
|
||||
-define(PING_DOWN_INTERVAL, 1000).
|
||||
|
||||
-record(state, {node, local_topic, status = up}).
|
||||
-record(state, {node, subtopic,
|
||||
qos,
|
||||
topic_suffix = <<>>,
|
||||
topic_prefix = <<>>,
|
||||
max_queue_len = 0,
|
||||
ping_down_interval = ?PING_DOWN_INTERVAL,
|
||||
status = up}).
|
||||
|
||||
-type option() :: {max_queue_len, pos_integer()} |
|
||||
{qos, mqtt_qos()} |
|
||||
{topic_suffix, binary()} |
|
||||
{topic_prefix, binary()} |
|
||||
{ping_down_interval, pos_integer()}.
|
||||
|
||||
-export_type([option/0]).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% API
|
||||
%%%=============================================================================
|
||||
|
||||
start_link(Node, LocalTopic) ->
|
||||
gen_server:start_link(?MODULE, [Node, LocalTopic], []).
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Start a bridge.
|
||||
%%
|
||||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
-spec start_link(atom(), binary(), [option()]) -> {ok, pid()} | ignore | {error, term()}.
|
||||
start_link(Node, SubTopic, Options) ->
|
||||
gen_server:start_link(?MODULE, [Node, SubTopic, Options], []).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%=============================================================================
|
||||
|
||||
init([Node, LocalTopic]) ->
|
||||
init([Node, SubTopic, Options]) ->
|
||||
process_flag(trap_exit, true),
|
||||
case net_kernel:connect_node(Node) of
|
||||
true ->
|
||||
true = erlang:monitor_node(Node, true),
|
||||
emqttd_pubsub:subscribe({LocalTopic, ?QOS_0}, self()),
|
||||
{ok, #state{node = Node, local_topic = LocalTopic}};
|
||||
State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}),
|
||||
emqttd_pubsub:subscribe({SubTopic, ?QOS_0}, self()),
|
||||
{ok, State};
|
||||
false ->
|
||||
{stop, {cannot_connect, Node}}
|
||||
end.
|
||||
|
||||
parse_opts([], State) ->
|
||||
State;
|
||||
parse_opts([{qos, Qos} | Opts], State) ->
|
||||
parse_opts(Opts, State#state{qos = Qos});
|
||||
parse_opts([{topic_suffix, Suffix} | Opts], State) ->
|
||||
parse_opts(Opts, State#state{topic_suffix= Suffix});
|
||||
parse_opts([{topic_prefix, Prefix} | Opts], State) ->
|
||||
parse_opts(Opts, State#state{topic_prefix = Prefix});
|
||||
parse_opts([{max_queue_len, Len} | Opts], State) ->
|
||||
parse_opts(Opts, State#state{max_queue_len = Len});
|
||||
parse_opts([{ping_down_interval, Interval} | Opts], State) ->
|
||||
parse_opts(Opts, State#state{ping_down_interval = Interval*1000}).
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
|
@ -76,12 +111,12 @@ handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = down}
|
|||
{noreply, State};
|
||||
|
||||
handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) ->
|
||||
rpc:cast(Node, emqttd_router, route, [Msg]),
|
||||
rpc:cast(Node, emqttd_router, route, [transform(Msg, State)]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({nodedown, Node}, State = #state{node = Node}) ->
|
||||
handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
|
||||
lager:warning("Bridge Node Down: ~p", [Node]),
|
||||
erlang:send_after(?PING_INTERVAL, self(), ping_down_node),
|
||||
erlang:send_after(Interval, self(), ping_down_node),
|
||||
{noreply, State#state{status = down}};
|
||||
|
||||
handle_info({nodeup, Node}, State = #state{node = Node}) ->
|
||||
|
@ -95,14 +130,14 @@ handle_info({nodeup, Node}, State = #state{node = Node}) ->
|
|||
{noreply, State#state{status = down}}
|
||||
end;
|
||||
|
||||
handle_info(ping_down_node, State = #state{node = Node}) ->
|
||||
handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) ->
|
||||
Self = self(),
|
||||
spawn_link(fun() ->
|
||||
case net_kernel:connect_node(Node) of
|
||||
true -> %%TODO: this is not right... fixme later
|
||||
Self ! {nodeup, Node};
|
||||
false ->
|
||||
erlang:send_after(?PING_INTERVAL, Self, ping_down_node)
|
||||
erlang:send_after(Interval, Self, ping_down_node)
|
||||
end
|
||||
end),
|
||||
{noreply, State};
|
||||
|
@ -124,3 +159,16 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%% Internal functions
|
||||
%%%=============================================================================
|
||||
|
||||
%TODO: qos is not right...
|
||||
transform(Msg = #mqtt_message{topic = Topic}, #state{qos = Qos,
|
||||
topic_prefix = Prefix,
|
||||
topic_suffix = Suffix}) ->
|
||||
Msg1 =
|
||||
if
|
||||
Qos =:= undefined -> Msg;
|
||||
true -> Msg#mqtt_message{qos = Qos}
|
||||
end,
|
||||
Msg1#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -30,7 +30,9 @@
|
|||
|
||||
-behavior(supervisor).
|
||||
|
||||
-export([start_link/0, start_bridge/2, stop_bridge/2]).
|
||||
-export([start_link/0,
|
||||
start_bridge/2, start_bridge/3,
|
||||
stop_bridge/2]).
|
||||
|
||||
-export([init/1]).
|
||||
|
||||
|
@ -54,8 +56,14 @@ start_link() ->
|
|||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
-spec start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}.
|
||||
start_bridge(Node, LocalTopic) when is_atom(Node) and is_binary(LocalTopic) ->
|
||||
supervisor:start_child(?MODULE, bridge_spec(Node, LocalTopic)).
|
||||
start_bridge(Node, SubTopic) when is_atom(Node) and is_binary(SubTopic) ->
|
||||
start_bridge(Node, SubTopic, []).
|
||||
|
||||
-spec start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}.
|
||||
start_bridge(Node, SubTopic, Options) when is_atom(Node) and is_binary(SubTopic) ->
|
||||
{ok, Env} = application:get_env(emqttd, bridge),
|
||||
Options1 = emqttd_opts:merge(Env, Options),
|
||||
supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1)).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -64,8 +72,8 @@ start_bridge(Node, LocalTopic) when is_atom(Node) and is_binary(LocalTopic) ->
|
|||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
-spec stop_bridge(atom(), binary()) -> {ok, pid()} | ok.
|
||||
stop_bridge(Node, LocalTopic) ->
|
||||
ChildId = bridge_id(Node, LocalTopic),
|
||||
stop_bridge(Node, SubTopic) ->
|
||||
ChildId = bridge_id(Node, SubTopic),
|
||||
case supervisor:terminate_child(ChildId) of
|
||||
ok ->
|
||||
supervisor:delete_child(?MODULE, ChildId);
|
||||
|
@ -80,12 +88,11 @@ stop_bridge(Node, LocalTopic) ->
|
|||
init([]) ->
|
||||
{ok, {{one_for_one, 10, 100}, []}}.
|
||||
|
||||
bridge_id(Node, LocalTopic) ->
|
||||
{bridge, Node, LocalTopic}.
|
||||
bridge_id(Node, SubTopic) ->
|
||||
{bridge, Node, SubTopic}.
|
||||
|
||||
bridge_spec(Node, LocalTopic) ->
|
||||
ChildId = bridge_id(Node, LocalTopic),
|
||||
{ChildId, {emqttd_bridge, start_link, [Node, LocalTopic]},
|
||||
bridge_spec(Node, SubTopic, Options) ->
|
||||
ChildId = bridge_id(Node, SubTopic),
|
||||
{ChildId, {emqttd_bridge, start_link, [Node, SubTopic, Options]},
|
||||
transient, 10000, worker, [emqttd_bridge]}.
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue