TAB -> TABLE
This commit is contained in:
parent
1a81a4cc68
commit
cb6967cd46
|
@ -98,7 +98,7 @@ stop() ->
|
||||||
gen_server:call(?MODULE, stop).
|
gen_server:call(?MODULE, stop).
|
||||||
|
|
||||||
init([AuthMods]) ->
|
init([AuthMods]) ->
|
||||||
ets:new(?AUTH_TABLE, [set, named_table, protected]),
|
ets:new(?AUTH_TABLE, [set, named_table, protected, {read_concurrency, true}]),
|
||||||
Modules = lists:map(
|
Modules = lists:map(
|
||||||
fun({Mod, Opts}) ->
|
fun({Mod, Opts}) ->
|
||||||
AuthMod = authmod(Mod),
|
AuthMod = authmod(Mod),
|
||||||
|
|
|
@ -34,7 +34,7 @@
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-define(BROKER_TAB, mqtt_broker).
|
-define(BROKER_TABLE, mqtt_broker).
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/1]).
|
-export([start_link/1]).
|
||||||
|
@ -115,7 +115,7 @@ datetime() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec getstats() -> [{atom(), non_neg_integer()}].
|
-spec getstats() -> [{atom(), non_neg_integer()}].
|
||||||
getstats() ->
|
getstats() ->
|
||||||
ets:tab2list(?BROKER_TAB).
|
ets:tab2list(?BROKER_TABLE).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -125,7 +125,7 @@ getstats() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec getstat(atom()) -> non_neg_integer() | undefined.
|
-spec getstat(atom()) -> non_neg_integer() | undefined.
|
||||||
getstat(Name) ->
|
getstat(Name) ->
|
||||||
case ets:lookup(?BROKER_TAB, Name) of
|
case ets:lookup(?BROKER_TABLE, Name) of
|
||||||
[{Name, Val}] -> Val;
|
[{Name, Val}] -> Val;
|
||||||
[] -> undefined
|
[] -> undefined
|
||||||
end.
|
end.
|
||||||
|
@ -138,7 +138,7 @@ getstat(Name) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec setstat(atom(), pos_integer()) -> boolean().
|
-spec setstat(atom(), pos_integer()) -> boolean().
|
||||||
setstat(Name, Val) ->
|
setstat(Name, Val) ->
|
||||||
ets:insert(?BROKER_TAB, {Name, Val}).
|
ets:insert(?BROKER_TABLE, {Name, Val}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
|
@ -146,9 +146,9 @@ setstat(Name, Val) ->
|
||||||
|
|
||||||
init([Options]) ->
|
init([Options]) ->
|
||||||
random:seed(now()),
|
random:seed(now()),
|
||||||
ets:new(?BROKER_TAB, [set, public, named_table, {write_concurrency, true}]),
|
ets:new(?BROKER_TABLE, [set, public, named_table, {write_concurrency, true}]),
|
||||||
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
|
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
|
||||||
[ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics],
|
[ets:insert(?BROKER_TABLE, {Topic, 0}) || Topic <- Topics],
|
||||||
% Create $SYS Topics
|
% Create $SYS Topics
|
||||||
[ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS],
|
[ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS],
|
||||||
[ok = create(systop(Topic)) || Topic <- Topics],
|
[ok = create(systop(Topic)) || Topic <- Topics],
|
||||||
|
@ -175,7 +175,7 @@ handle_info(tick, State) ->
|
||||||
publish(systop(uptime), list_to_binary(uptime(State))),
|
publish(systop(uptime), list_to_binary(uptime(State))),
|
||||||
publish(systop(datetime), list_to_binary(datetime())),
|
publish(systop(datetime), list_to_binary(datetime())),
|
||||||
[publish(systop(Stat), i2b(Val))
|
[publish(systop(Stat), i2b(Val))
|
||||||
|| {Stat, Val} <- ets:tab2list(?BROKER_TAB)],
|
|| {Stat, Val} <- ets:tab2list(?BROKER_TABLE)],
|
||||||
{noreply, tick(State), hibernate};
|
{noreply, tick(State), hibernate};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-define(CLIENT_TAB, mqtt_client).
|
-define(CLIENT_TABLE, mqtt_client).
|
||||||
|
|
||||||
%% API Exports
|
%% API Exports
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
@ -73,7 +73,7 @@ start_link() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec lookup(ClientId :: binary()) -> pid() | undefined.
|
-spec lookup(ClientId :: binary()) -> pid() | undefined.
|
||||||
lookup(ClientId) when is_binary(ClientId) ->
|
lookup(ClientId) when is_binary(ClientId) ->
|
||||||
case ets:lookup(?CLIENT_TAB, ClientId) of
|
case ets:lookup(?CLIENT_TABLE, ClientId) of
|
||||||
[{_, Pid, _}] -> Pid;
|
[{_, Pid, _}] -> Pid;
|
||||||
[] -> undefined
|
[] -> undefined
|
||||||
end.
|
end.
|
||||||
|
@ -113,11 +113,11 @@ getstats() ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
ets:new(?CLIENT_TAB, [set, named_table, protected]),
|
ets:new(?CLIENT_TABLE, [set, named_table, protected]),
|
||||||
{ok, #state{}}.
|
{ok, #state{}}.
|
||||||
|
|
||||||
handle_call({register, ClientId, Pid}, _From, State) ->
|
handle_call({register, ClientId, Pid}, _From, State) ->
|
||||||
case ets:lookup(?CLIENT_TAB, ClientId) of
|
case ets:lookup(?CLIENT_TABLE, ClientId) of
|
||||||
[{_, Pid, _}] ->
|
[{_, Pid, _}] ->
|
||||||
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
|
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
|
||||||
ignore;
|
ignore;
|
||||||
|
@ -131,7 +131,7 @@ handle_call({register, ClientId, Pid}, _From, State) ->
|
||||||
{reply, ok, setstats(State)};
|
{reply, ok, setstats(State)};
|
||||||
|
|
||||||
handle_call(getstats, _From, State = #state{max = Max}) ->
|
handle_call(getstats, _From, State = #state{max = Max}) ->
|
||||||
Stats = [{'clients/count', ets:info(?CLIENT_TAB, size)},
|
Stats = [{'clients/count', ets:info(?CLIENT_TABLE, size)},
|
||||||
{'clients/max', Max}],
|
{'clients/max', Max}],
|
||||||
{reply, Stats, State};
|
{reply, Stats, State};
|
||||||
|
|
||||||
|
@ -139,10 +139,10 @@ handle_call(_Request, _From, State) ->
|
||||||
{reply, ok, State}.
|
{reply, ok, State}.
|
||||||
|
|
||||||
handle_cast({unregister, ClientId, Pid}, State) ->
|
handle_cast({unregister, ClientId, Pid}, State) ->
|
||||||
case ets:lookup(?CLIENT_TAB, ClientId) of
|
case ets:lookup(?CLIENT_TABLE, ClientId) of
|
||||||
[{_, Pid, MRef}] ->
|
[{_, Pid, MRef}] ->
|
||||||
erlang:demonitor(MRef, [flush]),
|
erlang:demonitor(MRef, [flush]),
|
||||||
ets:delete(?CLIENT_TAB, ClientId);
|
ets:delete(?CLIENT_TABLE, ClientId);
|
||||||
[_] ->
|
[_] ->
|
||||||
ignore;
|
ignore;
|
||||||
[] ->
|
[] ->
|
||||||
|
@ -154,7 +154,7 @@ handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
||||||
ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}),
|
ets:match_delete(?CLIENT_TABLE, {'_', DownPid, MRef}),
|
||||||
{noreply, setstats(State)};
|
{noreply, setstats(State)};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
|
@ -171,10 +171,10 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
insert(ClientId, Pid) ->
|
insert(ClientId, Pid) ->
|
||||||
ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}).
|
ets:insert(?CLIENT_TABLE, {ClientId, Pid, erlang:monitor(process, Pid)}).
|
||||||
|
|
||||||
setstats(State = #state{max = Max}) ->
|
setstats(State = #state{max = Max}) ->
|
||||||
Count = ets:info(?CLIENT_TAB, size),
|
Count = ets:info(?CLIENT_TABLE, size),
|
||||||
emqttd_broker:setstat('clients/count', Count),
|
emqttd_broker:setstat('clients/count', Count),
|
||||||
if
|
if
|
||||||
Count > Max ->
|
Count > Max ->
|
||||||
|
|
|
@ -36,7 +36,7 @@
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-define(METRIC_TAB, mqtt_broker_metric).
|
-define(METRIC_TABLE, mqtt_broker_metric).
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/1]).
|
-export([start_link/1]).
|
||||||
|
@ -81,7 +81,7 @@ all() ->
|
||||||
{ok, Count} -> maps:put(Metric, Count+Val, Map);
|
{ok, Count} -> maps:put(Metric, Count+Val, Map);
|
||||||
error -> maps:put(Metric, Val, Map)
|
error -> maps:put(Metric, Val, Map)
|
||||||
end
|
end
|
||||||
end, #{}, ?METRIC_TAB)).
|
end, #{}, ?METRIC_TABLE)).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -91,7 +91,7 @@ all() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec value(atom()) -> non_neg_integer().
|
-spec value(atom()) -> non_neg_integer().
|
||||||
value(Metric) ->
|
value(Metric) ->
|
||||||
lists:sum(ets:select(?METRIC_TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
|
lists:sum(ets:select(?METRIC_TABLE, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -125,9 +125,9 @@ inc(Metric, Val) when is_atom(Metric) and is_integer(Val) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer().
|
-spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer().
|
||||||
inc(gauge, Metric, Val) ->
|
inc(gauge, Metric, Val) ->
|
||||||
ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, Val});
|
ets:update_counter(?METRIC_TABLE, key(gauge, Metric), {2, Val});
|
||||||
inc(counter, Metric, Val) ->
|
inc(counter, Metric, Val) ->
|
||||||
ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}).
|
ets:update_counter(?METRIC_TABLE, key(counter, Metric), {2, Val}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -147,7 +147,7 @@ dec(gauge, Metric) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec dec(gauge, atom(), pos_integer()) -> integer().
|
-spec dec(gauge, atom(), pos_integer()) -> integer().
|
||||||
dec(gauge, Metric, Val) ->
|
dec(gauge, Metric, Val) ->
|
||||||
ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}).
|
ets:update_counter(?METRIC_TABLE, key(gauge, Metric), {2, -Val}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -158,7 +158,7 @@ dec(gauge, Metric, Val) ->
|
||||||
set(Metric, Val) when is_atom(Metric) ->
|
set(Metric, Val) when is_atom(Metric) ->
|
||||||
set(gauge, Metric, Val).
|
set(gauge, Metric, Val).
|
||||||
set(gauge, Metric, Val) ->
|
set(gauge, Metric, Val) ->
|
||||||
ets:insert(?METRIC_TAB, {key(gauge, Metric), Val}).
|
ets:insert(?METRIC_TABLE, {key(gauge, Metric), Val}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -180,7 +180,7 @@ init([Options]) ->
|
||||||
random:seed(now()),
|
random:seed(now()),
|
||||||
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
|
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
|
||||||
% Create metrics table
|
% Create metrics table
|
||||||
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
|
ets:new(?METRIC_TABLE, [set, public, named_table, {write_concurrency, true}]),
|
||||||
% Init metrics
|
% Init metrics
|
||||||
[new_metric(Metric) || Metric <- Metrics],
|
[new_metric(Metric) || Metric <- Metrics],
|
||||||
% $SYS Topics for metrics
|
% $SYS Topics for metrics
|
||||||
|
@ -223,11 +223,11 @@ publish(Topic, Payload) ->
|
||||||
emqttd_router:route(#mqtt_message{topic = Topic, payload = Payload}).
|
emqttd_router:route(#mqtt_message{topic = Topic, payload = Payload}).
|
||||||
|
|
||||||
new_metric({gauge, Name}) ->
|
new_metric({gauge, Name}) ->
|
||||||
ets:insert(?METRIC_TAB, {{Name, 0}, 0});
|
ets:insert(?METRIC_TABLE, {{Name, 0}, 0});
|
||||||
|
|
||||||
new_metric({counter, Name}) ->
|
new_metric({counter, Name}) ->
|
||||||
Schedulers = lists:seq(1, erlang:system_info(schedulers)),
|
Schedulers = lists:seq(1, erlang:system_info(schedulers)),
|
||||||
[ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
|
[ets:insert(?METRIC_TABLE, {{Name, I}, 0}) || I <- Schedulers].
|
||||||
|
|
||||||
tick(State = #state{pub_interval = PubInterval}) ->
|
tick(State = #state{pub_interval = PubInterval}) ->
|
||||||
tick(PubInterval, State).
|
tick(PubInterval, State).
|
||||||
|
|
|
@ -44,7 +44,7 @@
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-define(SESSION_TAB, mqtt_session).
|
-define(SESSION_TABLE, mqtt_session).
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
@ -72,7 +72,7 @@ start_link() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec lookup_session(binary()) -> pid() | undefined.
|
-spec lookup_session(binary()) -> pid() | undefined.
|
||||||
lookup_session(ClientId) ->
|
lookup_session(ClientId) ->
|
||||||
case ets:lookup(?SESSION_TAB, ClientId) of
|
case ets:lookup(?SESSION_TABLE, ClientId) of
|
||||||
[{_, SessPid, _}] -> SessPid;
|
[{_, SessPid, _}] -> SessPid;
|
||||||
[] -> undefined
|
[] -> undefined
|
||||||
end.
|
end.
|
||||||
|
@ -103,12 +103,12 @@ destroy_session(ClientId) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
ets:new(?SESSION_TAB, [set, protected, named_table]),
|
ets:new(?SESSION_TABLE, [set, protected, named_table]),
|
||||||
{ok, #state{}}.
|
{ok, #state{}}.
|
||||||
|
|
||||||
handle_call({start_session, ClientId, ClientPid}, _From, State) ->
|
handle_call({start_session, ClientId, ClientPid}, _From, State) ->
|
||||||
Reply =
|
Reply =
|
||||||
case ets:lookup(?SESSION_TAB, ClientId) of
|
case ets:lookup(?SESSION_TABLE, ClientId) of
|
||||||
[{_, SessPid, _MRef}] ->
|
[{_, SessPid, _MRef}] ->
|
||||||
emqttd_session:resume(SessPid, ClientId, ClientPid),
|
emqttd_session:resume(SessPid, ClientId, ClientPid),
|
||||||
{ok, SessPid};
|
{ok, SessPid};
|
||||||
|
@ -116,7 +116,7 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) ->
|
||||||
case emqttd_session_sup:start_session(ClientId, ClientPid) of
|
case emqttd_session_sup:start_session(ClientId, ClientPid) of
|
||||||
{ok, SessPid} ->
|
{ok, SessPid} ->
|
||||||
MRef = erlang:monitor(process, SessPid),
|
MRef = erlang:monitor(process, SessPid),
|
||||||
ets:insert(?SESSION_TAB, {ClientId, SessPid, MRef}),
|
ets:insert(?SESSION_TABLE, {ClientId, SessPid, MRef}),
|
||||||
{ok, SessPid};
|
{ok, SessPid};
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
{error, Error}
|
{error, Error}
|
||||||
|
@ -125,11 +125,11 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) ->
|
||||||
{reply, Reply, setstats(State)};
|
{reply, Reply, setstats(State)};
|
||||||
|
|
||||||
handle_call({destroy_session, ClientId}, _From, State) ->
|
handle_call({destroy_session, ClientId}, _From, State) ->
|
||||||
case ets:lookup(?SESSION_TAB, ClientId) of
|
case ets:lookup(?SESSION_TABLE, ClientId) of
|
||||||
[{_, SessPid, MRef}] ->
|
[{_, SessPid, MRef}] ->
|
||||||
erlang:demonitor(MRef),
|
erlang:demonitor(MRef),
|
||||||
emqttd_session:destroy(SessPid, ClientId),
|
emqttd_session:destroy(SessPid, ClientId),
|
||||||
ets:delete(?SESSION_TAB, ClientId);
|
ets:delete(?SESSION_TABLE, ClientId);
|
||||||
[] ->
|
[] ->
|
||||||
ignore
|
ignore
|
||||||
end,
|
end,
|
||||||
|
@ -142,7 +142,7 @@ handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
||||||
ets:match_delete(?SESSION_TAB, {'_', DownPid, MRef}),
|
ets:match_delete(?SESSION_TABLE, {'_', DownPid, MRef}),
|
||||||
{noreply, setstats(State)};
|
{noreply, setstats(State)};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
|
@ -159,7 +159,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
setstats(State = #state{max = Max}) ->
|
setstats(State = #state{max = Max}) ->
|
||||||
Count = ets:info(?SESSION_TAB, size),
|
Count = ets:info(?SESSION_TABLE, size),
|
||||||
emqttd_broker:setstat('sessions/count', Count),
|
emqttd_broker:setstat('sessions/count', Count),
|
||||||
if
|
if
|
||||||
Count > Max ->
|
Count > Max ->
|
||||||
|
|
Loading…
Reference in New Issue