fix options, stats
This commit is contained in:
parent
e88875d2af
commit
b4298f2b05
|
@ -35,7 +35,7 @@
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/1,
|
-export([start_link/0,
|
||||||
auth/2, % authentication
|
auth/2, % authentication
|
||||||
check_acl/3, % acl check
|
check_acl/3, % acl check
|
||||||
reload_acl/0, % reload acl
|
reload_acl/0, % reload acl
|
||||||
|
@ -58,9 +58,9 @@
|
||||||
%% @doc Start access control server
|
%% @doc Start access control server
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec start_link(AcOpts :: list()) -> {ok, pid()} | ignore | {error, any()}.
|
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||||
start_link(AcOpts) ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [AcOpts], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Authenticate MQTT Client
|
%% @doc Authenticate MQTT Client
|
||||||
|
@ -151,7 +151,8 @@ stop() ->
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([AcOpts]) ->
|
init([]) ->
|
||||||
|
{ok, AcOpts} = application:get_env(access_control),
|
||||||
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
|
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
|
||||||
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, AcOpts))}),
|
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, AcOpts))}),
|
||||||
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, AcOpts))}),
|
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, AcOpts))}),
|
||||||
|
|
|
@ -67,24 +67,19 @@ print_vsn() ->
|
||||||
?PRINT("~s ~s is running now~n", [Desc, Vsn]).
|
?PRINT("~s ~s is running now~n", [Desc, Vsn]).
|
||||||
|
|
||||||
start_servers(Sup) ->
|
start_servers(Sup) ->
|
||||||
{ok, PubSubOpts} = application:get_env(pubsub),
|
Servers = [{"emqttd event", emqttd_event},
|
||||||
{ok, BrokerOpts} = application:get_env(broker),
|
|
||||||
{ok, MetricOpts} = application:get_env(metrics),
|
|
||||||
{ok, AccessOpts} = application:get_env(access_control),
|
|
||||||
Servers = [
|
|
||||||
{"emqttd event", emqttd_event},
|
|
||||||
{"emqttd trace", emqttd_trace},
|
{"emqttd trace", emqttd_trace},
|
||||||
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
|
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
|
||||||
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
|
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
|
||||||
{"emqttd session manager", emqttd_sm},
|
{"emqttd session manager", emqttd_sm},
|
||||||
{"emqttd session supervisor", {supervisor, emqttd_session_sup}},
|
{"emqttd session supervisor", {supervisor, emqttd_session_sup}},
|
||||||
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts},
|
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
|
||||||
%{"emqttd router", emqttd_router},
|
%{"emqttd router", emqttd_router},
|
||||||
{"emqttd broker", emqttd_broker, BrokerOpts},
|
{"emqttd broker", emqttd_broker},
|
||||||
{"emqttd stats", emqttd_stats},
|
{"emqttd stats", emqttd_stats},
|
||||||
{"emqttd metrics", emqttd_metrics, MetricOpts},
|
{"emqttd metrics", emqttd_metrics},
|
||||||
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
|
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
|
||||||
{"emqttd access control", emqttd_access_control, AccessOpts},
|
{"emqttd access control", emqttd_access_control},
|
||||||
{"emqttd system monitor", emqttd_sysmon}],
|
{"emqttd system monitor", emqttd_sysmon}],
|
||||||
[start_server(Sup, Server) || Server <- Servers].
|
[start_server(Sup, Server) || Server <- Servers].
|
||||||
|
|
||||||
|
|
|
@ -37,9 +37,13 @@
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/1]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
-export([version/0, uptime/0, datetime/0, sysdescr/0]).
|
%% Event API
|
||||||
|
-export([subscribe/1, notify/2]).
|
||||||
|
|
||||||
|
%% Broker API
|
||||||
|
-export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
@ -47,7 +51,7 @@
|
||||||
|
|
||||||
-define(BROKER_TAB, mqtt_broker).
|
-define(BROKER_TAB, mqtt_broker).
|
||||||
|
|
||||||
-record(state, {started_at, sys_interval, tick_timer}).
|
-record(state, {started_at, sys_interval, tick_tref}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -57,9 +61,33 @@
|
||||||
%% @doc Start emqttd broker
|
%% @doc Start emqttd broker
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}.
|
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||||
start_link(Options) ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Subscribe broker event
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec subscribe(EventType :: any()) -> ok.
|
||||||
|
subscribe(EventType) ->
|
||||||
|
gproc:reg({p, l, {broker, EventType}}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Notify broker event
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec notify(EventType :: any(), Event :: any()) -> ok.
|
||||||
|
notify(EventType, Event) ->
|
||||||
|
Key = {broker, EventType},
|
||||||
|
gproc:send({p, l, Key}, {self(), Key, Event}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Get broker env
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
env(Name) ->
|
||||||
|
proplists:get_value(Name, application:get_env(emqttd, mqtt_broker, [])).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Get broker version
|
%% @doc Get broker version
|
||||||
|
@ -100,19 +128,16 @@ datetime() ->
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([Options]) ->
|
init([]) ->
|
||||||
random:seed(now()),
|
random:seed(now()),
|
||||||
ets:new(?BROKER_TAB, [set, public, named_table, {write_concurrency, true}]),
|
ets:new(?BROKER_TAB, [set, public, named_table]),
|
||||||
[ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics],
|
|
||||||
% Create $SYS Topics
|
% Create $SYS Topics
|
||||||
[ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS],
|
[ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS],
|
||||||
SysInterval = proplists:get_value(sys_interval, Options, 60),
|
% Tick
|
||||||
State = #state{started_at = os:timestamp(), sys_interval = SysInterval},
|
SysInterval = env(sys_interval),
|
||||||
Delay = if
|
{ok, TRef} = timer:send_interval(timer:seconds(SysInterval), tick),
|
||||||
SysInterval == 0 -> 0;
|
State = #state{started_at = os:timestamp(), sys_interval = SysInterval, tick_tref = TRef},
|
||||||
true -> random:uniform(SysInterval)
|
{ok, State, hibernate}.
|
||||||
end,
|
|
||||||
{ok, tick(Delay, State), hibernate}.
|
|
||||||
|
|
||||||
handle_call(uptime, _From, State) ->
|
handle_call(uptime, _From, State) ->
|
||||||
{reply, uptime(State), State};
|
{reply, uptime(State), State};
|
||||||
|
@ -124,13 +149,11 @@ handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(tick, State) ->
|
handle_info(tick, State) ->
|
||||||
retain(systop(version), list_to_binary(version())),
|
retain(version, list_to_binary(version())),
|
||||||
retain(systop(sysdescr), list_to_binary(sysdescr())),
|
retain(sysdescr, list_to_binary(sysdescr())),
|
||||||
publish(systop(uptime), list_to_binary(uptime(State))),
|
publish(uptime, list_to_binary(uptime(State))),
|
||||||
publish(systop(datetime), list_to_binary(datetime())),
|
publish(datetime, list_to_binary(datetime())),
|
||||||
[publish(systop(Stat), i2b(Val))
|
{noreply, State, hibernate};
|
||||||
|| {Stat, Val} <- ets:tab2list(?BROKER_TAB)],
|
|
||||||
{noreply, tick(State), hibernate};
|
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -145,21 +168,22 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
systop(Name) when is_atom(Name) ->
|
create_topic(Topic) ->
|
||||||
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
|
emqttd_pubsub:create(emqtt_topic:systop(Topic)).
|
||||||
|
|
||||||
create(Topic) ->
|
|
||||||
emqttd_pubsub:create(Topic).
|
|
||||||
|
|
||||||
retain(Topic, Payload) when is_binary(Payload) ->
|
retain(Topic, Payload) when is_binary(Payload) ->
|
||||||
emqttd_pubsub:publish(broker, #mqtt_message{retain = true,
|
publish(#mqtt_message{retain = true,
|
||||||
topic = Topic,
|
topic = emqtt_topic:systop(Topic),
|
||||||
payload = Payload}).
|
payload = Payload}).
|
||||||
|
|
||||||
publish(Topic, Payload) when is_binary(Payload) ->
|
publish(Topic, Payload) when is_binary(Payload) ->
|
||||||
emqttd_pubsub:publish(broker, #mqtt_message{topic = Topic,
|
publish( #mqtt_message{topic = emqtt_topic:systop(Topic),
|
||||||
payload = Payload}).
|
payload = Payload}).
|
||||||
|
|
||||||
|
publish(Msg) ->
|
||||||
|
emqttd_pubsub:publish(broker, Msg).
|
||||||
|
|
||||||
|
|
||||||
uptime(#state{started_at = Ts}) ->
|
uptime(#state{started_at = Ts}) ->
|
||||||
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,
|
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,
|
||||||
lists:flatten(uptime(seconds, Secs)).
|
lists:flatten(uptime(seconds, Secs)).
|
||||||
|
@ -179,14 +203,3 @@ uptime(hours, H) ->
|
||||||
uptime(days, D) ->
|
uptime(days, D) ->
|
||||||
[integer_to_list(D), " days,"].
|
[integer_to_list(D), " days,"].
|
||||||
|
|
||||||
tick(State = #state{sys_interval = SysInterval}) ->
|
|
||||||
tick(SysInterval, State).
|
|
||||||
|
|
||||||
tick(0, State) ->
|
|
||||||
State;
|
|
||||||
tick(Delay, State) ->
|
|
||||||
State#state{tick_timer = erlang:send_after(Delay * 1000, self(), tick)}.
|
|
||||||
|
|
||||||
i2b(I) when is_integer(I) ->
|
|
||||||
list_to_binary(integer_to_list(I)).
|
|
||||||
|
|
||||||
|
|
|
@ -94,7 +94,7 @@ unregister(ClientId) when is_binary(ClientId) ->
|
||||||
|
|
||||||
init([Id, TabId]) ->
|
init([Id, TabId]) ->
|
||||||
gproc_pool:connect_worker(?POOL, {?MODULE, Id}),
|
gproc_pool:connect_worker(?POOL, {?MODULE, Id}),
|
||||||
StatsFun = emqttd_broker:statsfun('clients/count', 'clients/max'),
|
StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
|
||||||
{ok, #state{tab = TabId, statsfun = StatsFun}}.
|
{ok, #state{tab = TabId, statsfun = StatsFun}}.
|
||||||
|
|
||||||
handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
|
handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
|
||||||
|
|
|
@ -124,7 +124,7 @@ broker([]) ->
|
||||||
[?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs].
|
[?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs].
|
||||||
|
|
||||||
stats([]) ->
|
stats([]) ->
|
||||||
[?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_broker:getstats()].
|
[?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_stats:getstats()].
|
||||||
|
|
||||||
metrics([]) ->
|
metrics([]) ->
|
||||||
[?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()].
|
[?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()].
|
||||||
|
|
|
@ -50,7 +50,7 @@
|
||||||
|
|
||||||
-define(METRIC_TAB, mqtt_metric).
|
-define(METRIC_TAB, mqtt_metric).
|
||||||
|
|
||||||
-record(state, {tick}).
|
-record(state, {tick_tref}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -60,7 +60,7 @@
|
||||||
%% @doc Start metrics server
|
%% @doc Start metrics server
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
|
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
@ -157,34 +157,33 @@ key(counter, Metric) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
random:seed(now()),
|
random:seed(now()),
|
||||||
{ok, BrokerOpts} = application:get_env(mqtt_broker),
|
|
||||||
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
|
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
|
||||||
% Create metrics table
|
% Create metrics table
|
||||||
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
|
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
|
||||||
% Init metrics
|
% Init metrics
|
||||||
[new_metric(Metric) || Metric <- Metrics],
|
[create_metric(Metric) || Metric <- Metrics],
|
||||||
% $SYS Topics for metrics
|
% $SYS Topics for metrics
|
||||||
[ok = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics],
|
[ok = create_topic(Topic) || {_, Topic} <- Metrics],
|
||||||
% Tick to publish stats
|
% Tick to publish metrics
|
||||||
Tick = emqttd_tick:new(proplists:get_value(sys_interval, BrokerOpts, 60)),
|
{ok, TRef} = timer:send_interval(timer:seconds(emqttd_broker:env(sys_interval)), tick),
|
||||||
{ok, #state{tick = Tick}, hibernate}.
|
{ok, #state{tick_tref = TRef}, hibernate}.
|
||||||
|
|
||||||
handle_call(_Req, _From, State) ->
|
handle_call(_Req, _From, State) ->
|
||||||
{reply, {error, badreq}, State}.
|
{reply, error, State}.
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(tick, State = #state{tick = Tick}) ->
|
handle_info(tick, State) ->
|
||||||
% publish metric message
|
% publish metric message
|
||||||
[publish(Metric, Val) || {Metric, Val} <- all()],
|
[publish(Metric, Val) || {Metric, Val} <- all()],
|
||||||
{noreply, State#state{tick = emqttd_tick:tick(Tick)}, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, #state{tick_tref = TRef}) ->
|
||||||
ok.
|
timer:cancel(TRef), ok.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -194,14 +193,17 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
publish(Metric, Val) ->
|
publish(Metric, Val) ->
|
||||||
emqttd_pubsub:publish(metrics, #mqtt_message{
|
emqttd_pubsub:publish(metrics, #mqtt_message{topic = emqtt_topic:systop(Metric),
|
||||||
topic = emqtt_topic:systop(Metric),
|
payload = emqttd_util:integer_to_binary(Val)}).
|
||||||
payload = emqttd_utils:integer_to_binary(Val)}).
|
|
||||||
|
|
||||||
new_metric({gauge, Name}) ->
|
create_metric({gauge, Name}) ->
|
||||||
ets:insert(?METRIC_TAB, {{Name, 0}, 0});
|
ets:insert(?METRIC_TAB, {{Name, 0}, 0});
|
||||||
|
|
||||||
new_metric({counter, Name}) ->
|
create_metric({counter, Name}) ->
|
||||||
Schedulers = lists:seq(1, erlang:system_info(schedulers)),
|
Schedulers = lists:seq(1, erlang:system_info(schedulers)),
|
||||||
[ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
|
[ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
|
||||||
|
|
||||||
|
create_topic(Topic) ->
|
||||||
|
emqttd_pubsub:create(emqtt_topic:systop(Topic)).
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -86,7 +86,7 @@ init_tables() ->
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
create_tables() ->
|
create_tables() ->
|
||||||
emqttd_utils:apply_module_attributes(boot_mnesia).
|
emqttd_util:apply_module_attributes(boot_mnesia).
|
||||||
|
|
||||||
create_table(Table, Attrs) ->
|
create_table(Table, Attrs) ->
|
||||||
case mnesia:create_table(Table, Attrs) of
|
case mnesia:create_table(Table, Attrs) of
|
||||||
|
@ -103,7 +103,7 @@ create_table(Table, Attrs) ->
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
copy_tables() ->
|
copy_tables() ->
|
||||||
emqttd_utils:apply_module_attributes(copy_mnesia).
|
emqttd_util:apply_module_attributes(copy_mnesia).
|
||||||
|
|
||||||
copy_table(Table) ->
|
copy_table(Table) ->
|
||||||
case mnesia:add_table_copy(Table, node(), ram_copies) of
|
case mnesia:add_table_copy(Table, node(), ram_copies) of
|
||||||
|
|
|
@ -355,10 +355,10 @@ setstats(all) ->
|
||||||
setstats(topics),
|
setstats(topics),
|
||||||
setstats(subscribers);
|
setstats(subscribers);
|
||||||
setstats(topics) ->
|
setstats(topics) ->
|
||||||
emqttd_broker:setstat('topics/count',
|
emqttd_stats:setstat('topics/count',
|
||||||
mnesia:table_info(topic, size));
|
mnesia:table_info(topic, size));
|
||||||
setstats(subscribers) ->
|
setstats(subscribers) ->
|
||||||
emqttd_broker:setstats('subscribers/count',
|
emqttd_stats:setstats('subscribers/count',
|
||||||
'subscribers/max',
|
'subscribers/max',
|
||||||
mnesia:table_info(subscriber, size)).
|
mnesia:table_info(subscriber, size)).
|
||||||
|
|
||||||
|
|
|
@ -33,15 +33,16 @@
|
||||||
-behaviour(supervisor).
|
-behaviour(supervisor).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/1]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
%% Supervisor callbacks
|
%% Supervisor callbacks
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
|
||||||
start_link(Opts) ->
|
start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, [Opts]).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
init([Opts]) ->
|
init([]) ->
|
||||||
|
{ok, Opts} = application:get_env(mqtt_pubsub),
|
||||||
Schedulers = erlang:system_info(schedulers),
|
Schedulers = erlang:system_info(schedulers),
|
||||||
PoolSize = proplists:get_value(pool_size, Opts, Schedulers),
|
PoolSize = proplists:get_value(pool_size, Opts, Schedulers),
|
||||||
gproc_pool:new(pubsub, hash, [{size, PoolSize}]),
|
gproc_pool:new(pubsub, hash, [{size, PoolSize}]),
|
||||||
|
|
|
@ -46,7 +46,7 @@
|
||||||
-export([store/2]).
|
-export([store/2]).
|
||||||
|
|
||||||
%% Start gen_server
|
%% Start gen_server
|
||||||
-export([start_link/3]).
|
-export([start_link/2]).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
@ -301,7 +301,7 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state{
|
||||||
end,
|
end,
|
||||||
|
|
||||||
%% cancel timeout timer
|
%% cancel timeout timer
|
||||||
emqttd_utils:cancel_timer(ETimer),
|
emqttd_util:cancel_timer(ETimer),
|
||||||
|
|
||||||
%% redelivery PUBREL
|
%% redelivery PUBREL
|
||||||
lists:foreach(fun(PacketId) ->
|
lists:foreach(fun(PacketId) ->
|
||||||
|
@ -410,7 +410,7 @@ next_msg_id(State = #session_state{message_id = MsgId}) ->
|
||||||
|
|
||||||
start_expire_timer(State = #session_state{expires = Expires,
|
start_expire_timer(State = #session_state{expires = Expires,
|
||||||
expire_timer = OldTimer}) ->
|
expire_timer = OldTimer}) ->
|
||||||
emqttd_utils:cancel_timer(OldTimer),
|
emqttd_util:cancel_timer(OldTimer),
|
||||||
Timer = erlang:send_after(Expires * 1000, self(), session_expired),
|
Timer = erlang:send_after(Expires * 1000, self(), session_expired),
|
||||||
State#session_state{expire_timer = Timer}.
|
State#session_state{expire_timer = Timer}.
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,7 @@ destroy_session(ClientId) ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
TabId = ets:new(?SESSION_TAB, [set, protected, named_table]),
|
TabId = ets:new(?SESSION_TAB, [set, protected, named_table]),
|
||||||
StatsFun = emqttd_broker:statsfun('sessions/count', 'sessions/max'),
|
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
|
||||||
{ok, #state{tabid = TabId, statsfun = StatsFun}}.
|
{ok, #state{tabid = TabId, statsfun = StatsFun}}.
|
||||||
|
|
||||||
handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) ->
|
handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) ->
|
||||||
|
|
|
@ -49,7 +49,7 @@
|
||||||
|
|
||||||
-define(STATS_TAB, mqtt_stats).
|
-define(STATS_TAB, mqtt_stats).
|
||||||
|
|
||||||
-record(state, {tick}).
|
-record(state, {tick_tref}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -122,15 +122,14 @@ setstats(Stat, MaxStat, Val) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
random:seed(now()),
|
random:seed(now()),
|
||||||
{ok, Options} = application:get_env(mqtt_broker),
|
|
||||||
ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
|
ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
|
||||||
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
|
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
|
||||||
[ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics],
|
[ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics],
|
||||||
% Create $SYS Topics
|
% Create $SYS Topics
|
||||||
[ok = emqttd_pubsub:create(emqtt_topic:systop(Topic)) || Topic <- Topics],
|
[ok = emqttd_pubsub:create(emqtt_topic:systop(Topic)) || Topic <- Topics],
|
||||||
% Tick to publish stats
|
% Tick to publish stats
|
||||||
Tick = emqttd_tick:new(proplists:get_value(sys_interval, Options, 60)),
|
{ok, TRef} = timer:send_interval(timer:seconds(emqttd_broker:env(sys_interval)), tick),
|
||||||
{ok, #state{tick = Tick}, hibernate}.
|
{ok, #state{tick_tref = TRef}, hibernate}.
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
{reply, error, State}.
|
{reply, error, State}.
|
||||||
|
@ -138,15 +137,15 @@ handle_call(_Request, _From, State) ->
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(tick, State = #state{tick = Tick}) ->
|
handle_info(tick, State) ->
|
||||||
[publish(Stat, Val) || {Stat, Val} <- ets:tab2list(?STATS_TAB)],
|
[publish(Stat, Val) || {Stat, Val} <- ets:tab2list(?STATS_TAB)],
|
||||||
{noreply, State#state{tick = emqttd_tick:tick(Tick)}, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, #state{tick_tref = TRef}) ->
|
||||||
ok.
|
timer:cancel(TRef), ok.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -155,8 +154,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
publish(Stat, Val) ->
|
publish(Stat, Val) ->
|
||||||
emqttd_pubsub:publish(stats, #mqtt_message{
|
emqttd_pubsub:publish(stats, #mqtt_message{topic = emqtt_topic:systop(Stat),
|
||||||
topic = emqtt_topic:systop(Stat),
|
payload = emqttd_util:integer_to_binary(Val)}).
|
||||||
payload = emqttd_utils:integer_to_binary(Val)}).
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_utils).
|
-module(emqttd_util).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
|
@ -77,13 +77,12 @@
|
||||||
{max_playload_size, 4096}
|
{max_playload_size, 4096}
|
||||||
]},
|
]},
|
||||||
%% PubSub
|
%% PubSub
|
||||||
{mqtt_pubsub, []},
|
{mqtt_pubsub, [
|
||||||
%% Metrics
|
%% default should be scheduler numbers
|
||||||
{metrics, [
|
%% {pool_size, 4}
|
||||||
{pub_interval, 60}
|
|
||||||
]},
|
]},
|
||||||
%% Bridge
|
%% Bridge
|
||||||
{bridge, [
|
{mqtt_bridge, [
|
||||||
{max_queue_len, 1000}, %NO effect now
|
{max_queue_len, 1000}, %NO effect now
|
||||||
{ping_down_interval, 1} %seconds
|
{ping_down_interval, 1} %seconds
|
||||||
]},
|
]},
|
||||||
|
|
Loading…
Reference in New Issue