pub_interval = 0

This commit is contained in:
Feng Lee 2015-03-11 00:15:27 +08:00
parent d315b5e22a
commit b466b8778b
13 changed files with 82 additions and 31 deletions

View File

@ -28,7 +28,7 @@
-author('feng@emqtt.io'). -author('feng@emqtt.io').
-export([start/0, open/1]). -export([start/0, open/1, is_running/1]).
-define(MQTT_SOCKOPTS, [ -define(MQTT_SOCKOPTS, [
binary, binary,
@ -69,4 +69,11 @@ open({http, Port, Options}) ->
MFArgs = {emqttd_http, handle, []}, MFArgs = {emqttd_http, handle, []},
mochiweb:start_http(Port, Options, MFArgs). mochiweb:start_http(Port, Options, MFArgs).
is_running(Node) ->
case rpc:call(Node, erlang, whereis, [emqttd]) of
{badrpc, _} -> false;
undefined -> false;
Pid when is_pid(Pid) -> true
end.

View File

@ -64,7 +64,7 @@ start(_StartType, _StartArgs) ->
ok = emqttd_mnesia:wait(), ok = emqttd_mnesia:wait(),
{ok, Listeners} = application:get_env(listen), {ok, Listeners} = application:get_env(listen),
emqttd:open(Listeners), emqttd:open(Listeners),
register(emqtt, self()), register(emqttd, self()),
print_vsn(), print_vsn(),
{ok, Sup}. {ok, Sup}.
@ -103,8 +103,9 @@ start_servers(Sup) ->
{"emqttd auth", emqttd_auth}, {"emqttd auth", emqttd_auth},
{"emqttd pubsub", emqttd_pubsub}, {"emqttd pubsub", emqttd_pubsub},
{"emqttd router", emqttd_router}, {"emqttd router", emqttd_router},
{"emqttd broker", emqttd_broker, BrokerOpts}, {"emqttd broker", emqttd_broker, BrokerOpts},
{"emqttd metrics", emqttd_metrics, MetricOpts}, {"emqttd metrics", emqttd_metrics, MetricOpts},
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
{"emqttd monitor", emqttd_monitor} {"emqttd monitor", emqttd_monitor}
]). ]).

View File

@ -39,7 +39,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {node, local_topic, status = running}). -define(PING_INTERVAL, 1000).
-record(state, {node, local_topic, status = up}).
%%%============================================================================= %%%=============================================================================
%%% API %%% API
@ -53,9 +55,15 @@ start_link(Node, LocalTopic) ->
%%%============================================================================= %%%=============================================================================
init([Node, LocalTopic]) -> init([Node, LocalTopic]) ->
emqttd_pubsub:subscribe({LocalTopic, ?QOS_0}, self()), process_flag(trap_exit, true),
%%TODO: monitor nodes... case net_kernel:connect_node(Node) of
{ok, #state{node = Node, local_topic = LocalTopic}}. true ->
true = erlang:monitor_node(Node, true),
emqttd_pubsub:subscribe({LocalTopic, ?QOS_0}, self()),
{ok, #state{node = Node, local_topic = LocalTopic}};
false ->
{stop, {cannot_connect, Node}}
end.
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
@ -63,13 +71,43 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = down}) ->
lager:warning("Bridge Dropped Msg for ~p Down:~n~p", [Node, Msg]),
{noreply, State};
handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) ->
rpc:cast(Node, emqttd_router, route, [Msg]),
{noreply, State};
handle_info({nodedown, Node}, State = #state{node = Node}) -> handle_info({nodedown, Node}, State = #state{node = Node}) ->
%%.... lager:warning("Bridge Node Down: ~p", [Node]),
erlang:send_after(?PING_INTERVAL, self(), ping_down_node),
{noreply, State#state{status = down}}; {noreply, State#state{status = down}};
handle_info({dispatch, {_From, Msg}}, State = #state{node = Node}) -> handle_info({nodeup, Node}, State = #state{node = Node}) ->
%%TODO: CAST %% TODO: Really fast??
rpc:call(Node, emqttd_router, route, [Msg]), case emqttd:is_running(Node) of
true ->
lager:warning("Bridge Node Up: ~p", [Node]),
{noreply, State#state{status = up}};
false ->
self() ! {nodedown, Node},
{noreply, State#state{status = down}}
end;
handle_info(ping_down_node, State = #state{node = Node}) ->
Self = self(),
spawn_link(fun() ->
case net_kernel:connect_node(Node) of
true -> %%TODO: this is not right... fixme later
Self ! {nodeup, Node};
false ->
erlang:send_after(?PING_INTERVAL, Self, ping_down_node)
end
end),
{noreply, State};
handle_info({'EXIT', _Pid, normal}, State) ->
{noreply, State}; {noreply, State};
handle_info(Info, State) -> handle_info(Info, State) ->
@ -86,5 +124,3 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================

View File

@ -55,13 +55,7 @@ start_link() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}. -spec start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}.
start_bridge(Node, LocalTopic) when is_atom(Node) and is_binary(LocalTopic) -> start_bridge(Node, LocalTopic) when is_atom(Node) and is_binary(LocalTopic) ->
%%TODO: mv this code to emqttd_bridge??? supervisor:start_child(?MODULE, bridge_spec(Node, LocalTopic)).
case net_kernel:connect_node(Node) of
true ->
supervisor:start_child(?MODULE, bridge_spec(Node, LocalTopic));
false ->
{error, {cannot_connect, Node}}
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc

View File

@ -154,7 +154,11 @@ init([Options]) ->
[{atomic, _} = create(systop(Topic)) || Topic <- Topics], [{atomic, _} = create(systop(Topic)) || Topic <- Topics],
SysInterval = proplists:get_value(sys_interval, Options, 60), SysInterval = proplists:get_value(sys_interval, Options, 60),
State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, State = #state{started_at = os:timestamp(), sys_interval = SysInterval},
{ok, tick(random:uniform(SysInterval), State), hibernate}. Delay = if
SysInterval == 0 -> 0;
true -> random:uniform(SysInterval)
end,
{ok, tick(Delay, State), hibernate}.
handle_call(uptime, _From, State) -> handle_call(uptime, _From, State) ->
{reply, uptime(State), State}; {reply, uptime(State), State};
@ -224,9 +228,12 @@ uptime(days, D) ->
tick(State = #state{sys_interval = SysInterval}) -> tick(State = #state{sys_interval = SysInterval}) ->
tick(SysInterval, State). tick(SysInterval, State).
tick(0, State) ->
State;
tick(Delay, State) -> tick(Delay, State) ->
State#state{tick_timer = erlang:send_after(Delay * 1000, self(), tick)}. State#state{tick_timer = erlang:send_after(Delay * 1000, self(), tick)}.
i2b(I) when is_integer(I) -> i2b(I) when is_integer(I) ->
list_to_binary(integer_to_list(I)). list_to_binary(integer_to_list(I)).

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqtt client manager. %%% emqttd client manager.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------

View File

@ -56,7 +56,7 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
%% Start emqtt metrics. %% Start emqttd metrics.
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -174,7 +174,7 @@ key(counter, Metric) ->
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================= %%%=============================================================================
init(Options) -> init([Options]) ->
random:seed(now()), random:seed(now()),
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
% Create metrics table % Create metrics table
@ -184,7 +184,11 @@ init(Options) ->
% $SYS Topics for metrics % $SYS Topics for metrics
[{atomic, _} = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], [{atomic, _} = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics],
PubInterval = proplists:get_value(pub_interval, Options, 60), PubInterval = proplists:get_value(pub_interval, Options, 60),
{ok, tick(random:uniform(PubInterval), #state{pub_interval = PubInterval}), hibernate}. Delay = if
PubInterval == 0 -> 0;
true -> random:uniform(PubInterval)
end,
{ok, tick(Delay, #state{pub_interval = PubInterval}), hibernate}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}. {stop, {badreq, Req}, State}.
@ -226,6 +230,8 @@ new_metric({counter, Name}) ->
tick(State = #state{pub_interval = PubInterval}) -> tick(State = #state{pub_interval = PubInterval}) ->
tick(PubInterval, State). tick(PubInterval, State).
tick(0, State) ->
State;
tick(Delay, State) -> tick(Delay, State) ->
State#state{tick_timer = erlang:send_after(Delay * 1000, self(), tick)}. State#state{tick_timer = erlang:send_after(Delay * 1000, self(), tick)}.

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqtt plugin framework. %%% emqttd plugin framework.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqtt protocol. %%% emqttd protocol.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqtt core pubsub. %%% emqttd core pubsub.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqtt packet serialiser. %%% emqttd packet serialiser.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqtt server. retain messages??? %%% emqttd server. retain messages???
%%% TODO: redesign... %%% TODO: redesign...
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqtt session. %%% emqttd session.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------