Merge the connection and session tabs into channel tab

This commit is contained in:
Feng Lee 2019-07-25 09:25:45 +08:00
parent a0a2375810
commit 5b3a61b799
1 changed files with 43 additions and 136 deletions

View File

@ -32,13 +32,13 @@
, unregister_channel/2
]).
-export([ get_conn_attrs/1
, get_conn_attrs/2
-export([ get_chan_attrs/1
, get_chan_attrs/2
, set_chan_attrs/2
]).
-export([ get_conn_stats/1
, get_conn_stats/2
-export([ get_chan_stats/1
, get_chan_stats/2
, set_chan_stats/2
]).
@ -47,16 +47,6 @@
, resume_session/1
]).
-export([ get_session_attrs/1
, get_session_attrs/2
, set_session_attrs/2
]).
-export([ get_session_stats/1
, get_session_stats/2
, set_session_stats/2
]).
-export([ lookup_channels/1
, lookup_channels/2
]).
@ -73,29 +63,19 @@
%% Internal export
-export([stats_fun/0]).
-export_type([attrs/0, stats/0]).
-type(chan_pid() :: pid()).
-opaque(attrs() :: #{atom() => term()}).
-opaque(stats() :: #{atom() => integer()}).
%% Tables for channel management.
-define(CHAN_TAB, emqx_channel).
-define(CHAN_P_TAB, emqx_channel_p).
-define(CHAN_ATTRS_TAB, emqx_channel_attrs).
-define(CHAN_STATS_TAB, emqx_channel_stats).
-define(CONN_TAB, emqx_connection).
-define(SESSION_TAB, emqx_session).
-define(SESSION_P_TAB, emqx_session_p).
%% Chan stats
-define(CHAN_STATS,
[{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CONN_TAB, 'connections.count', 'connections.max'},
{?SESSION_TAB, 'sessions.count', 'sessions.max'},
{?SESSION_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max'}
{?CHAN_TAB, 'connections.count', 'connections.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
{?CHAN_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max'}
]).
%% Batch drain
@ -118,6 +98,7 @@ start_link() ->
register_channel(ClientId) when is_binary(ClientId) ->
register_channel(ClientId, self()).
%% @doc Register a channel with pid.
-spec(register_channel(emqx_types:client_id(), chan_pid()) -> ok).
register_channel(ClientId, ChanPid) ->
Chan = {ClientId, ChanPid},
@ -139,66 +120,51 @@ unregister_channel(ClientId, ChanPid) ->
%% @private
do_unregister_channel(Chan) ->
ok = emqx_cm_registry:unregister_channel(Chan),
true = ets:delete_object(?SESSION_P_TAB, Chan),
true = ets:delete(?SESSION_TAB, Chan),
true = ets:delete(?CONN_TAB, Chan),
true = ets:delete_object(?CHAN_P_TAB, Chan),
true = ets:delete(?CHAN_ATTRS_TAB, Chan),
true = ets:delete(?CHAN_STATS_TAB, Chan),
ets:delete_object(?CHAN_TAB, Chan).
%% @doc Get conn attrs.
-spec(get_conn_attrs(emqx_types:client_id()) -> maybe(attrs())).
get_conn_attrs(ClientId) ->
with_channel(ClientId, fun(ChanPid) ->
get_conn_attrs(ClientId, ChanPid)
end).
%% @doc Get attrs of a channel.
-spec(get_chan_attrs(emqx_types:client_id()) -> maybe(emqx_types:attrs())).
get_chan_attrs(ClientId) ->
with_channel(ClientId, fun(ChanPid) -> get_chan_attrs(ClientId, ChanPid) end).
-spec(get_conn_attrs(emqx_types:client_id(), chan_pid()) -> maybe(attrs())).
get_conn_attrs(ClientId, ChanPid) when node(ChanPid) == node() ->
-spec(get_chan_attrs(emqx_types:client_id(), chan_pid()) -> maybe(emqx_types:attrs())).
get_chan_attrs(ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try ets:lookup_element(?CONN_TAB, Chan, 2) of
Attrs -> Attrs
catch
error:badarg -> undefined
end;
get_conn_attrs(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_conn_attrs, [ClientId, ChanPid]).
emqx_tables:lookup_value(?CHAN_ATTRS_TAB, Chan);
get_chan_attrs(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_attrs, [ClientId, ChanPid]).
%% @doc Set conn attrs.
-spec(set_chan_attrs(emqx_types:client_id(), attrs()) -> ok).
%% @doc Set attrs of a channel.
-spec(set_chan_attrs(emqx_types:client_id(), emqx_types:attrs()) -> ok).
set_chan_attrs(ClientId, Attrs) when is_binary(ClientId) ->
Chan = {ClientId, self()},
case ets:update_element(?CONN_TAB, Chan, {2, Attrs}) of
true -> ok;
false -> true = ets:insert(?CONN_TAB, {Chan, Attrs, #{}}),
ok
end.
true = ets:insert(?CHAN_ATTRS_TAB, {Chan, Attrs}),
ok.
%% @doc Get conn stats.
-spec(get_conn_stats(emqx_types:client_id()) -> maybe(stats())).
get_conn_stats(ClientId) ->
with_channel(ClientId, fun(ChanPid) ->
get_conn_stats(ClientId, ChanPid)
end).
%% @doc Get channel's stats.
-spec(get_chan_stats(emqx_types:client_id()) -> maybe(emqx_types:stats())).
get_chan_stats(ClientId) ->
with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end).
-spec(get_conn_stats(emqx_types:client_id(), chan_pid()) -> maybe(stats())).
get_conn_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
-spec(get_chan_stats(emqx_types:client_id(), chan_pid()) -> maybe(emqx_types:stats())).
get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try ets:lookup_element(?CONN_TAB, Chan, 3) of
Stats -> Stats
catch
error:badarg -> undefined
end;
get_conn_stats(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_conn_stats, [ClientId, ChanPid]).
emqx_tables:lookup_value(?CHAN_STATS_TAB, Chan);
get_chan_stats(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
%% @doc Set conn stats.
-spec(set_chan_stats(emqx_types:client_id(), stats()) -> ok).
%% @doc Set channel's stats.
-spec(set_chan_stats(emqx_types:client_id(), emqx_types:stats()) -> ok).
set_chan_stats(ClientId, Stats) when is_binary(ClientId) ->
set_chan_stats(ClientId, self(), Stats).
-spec(set_chan_stats(emqx_types:client_id(), chan_pid(), stats()) -> ok).
-spec(set_chan_stats(emqx_types:client_id(), chan_pid(), emqx_types:stats()) -> ok).
set_chan_stats(ClientId, ChanPid, Stats) ->
Chan = {ClientId, ChanPid},
_ = ets:update_element(?CONN_TAB, Chan, {3, Stats}),
true = ets:insert(?CHAN_STATS_TAB, {Chan, Stats}),
ok.
%% @doc Open a session.
@ -257,69 +223,10 @@ discard_session(ClientId) when is_binary(ClientId) ->
end, ChanPids)
end.
%% @doc Get session attrs.
-spec(get_session_attrs(emqx_types:client_id()) -> attrs()).
get_session_attrs(ClientId) ->
with_channel(ClientId, fun(ChanPid) ->
get_session_attrs(ClientId, ChanPid)
end).
-spec(get_session_attrs(emqx_types:client_id(), chan_pid()) -> maybe(attrs())).
get_session_attrs(ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try ets:lookup_element(?SESSION_TAB, Chan, 2) of
Attrs -> Attrs
catch
error:badarg -> undefined
end;
get_session_attrs(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_session_attrs, [ClientId, ChanPid]).
%% @doc Set session attrs.
-spec(set_session_attrs(emqx_types:client_id(), attrs()) -> ok).
set_session_attrs(ClientId, Attrs) when is_binary(ClientId) ->
Chan = {ClientId, self()},
case ets:update_element(?SESSION_TAB, Chan, {2, Attrs}) of
true -> ok;
false ->
true = ets:insert(?SESSION_TAB, {Chan, Attrs, #{}}),
is_clean_start(Attrs) orelse ets:insert(?SESSION_P_TAB, Chan),
ok
end.
%% @doc Is clean start?
is_clean_start(#{clean_start := false}) -> false;
is_clean_start(_Attrs) -> true.
%% @doc Get session stats.
-spec(get_session_stats(emqx_types:client_id()) -> stats()).
get_session_stats(ClientId) ->
with_channel(ClientId, fun(ChanPid) ->
get_session_stats(ClientId, ChanPid)
end).
-spec(get_session_stats(emqx_types:client_id(), chan_pid()) -> maybe(stats())).
get_session_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try ets:lookup_element(?SESSION_TAB, Chan, 3) of
Stats -> Stats
catch
error:badarg -> undefined
end;
get_session_stats(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_session_stats, [ClientId, ChanPid]).
%% @doc Set session stats.
-spec(set_session_stats(emqx_types:client_id(), stats()) -> ok).
set_session_stats(ClientId, Stats) when is_binary(ClientId) ->
set_session_stats(ClientId, self(), Stats).
-spec(set_session_stats(emqx_types:client_id(), chan_pid(), stats()) -> ok).
set_session_stats(ClientId, ChanPid, Stats) ->
Chan = {ClientId, ChanPid},
_ = ets:update_element(?SESSION_TAB, Chan, {3, Stats}),
ok.
with_channel(ClientId, Fun) ->
case lookup_channels(ClientId) of
[] -> undefined;
@ -362,9 +269,9 @@ cast(Msg) -> gen_server:cast(?CM, Msg).
init([]) ->
TabOpts = [public, {write_concurrency, true}],
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]),
ok = emqx_tables:new(?CONN_TAB, [set, compressed | TabOpts]),
ok = emqx_tables:new(?SESSION_TAB, [set, compressed | TabOpts]),
ok = emqx_tables:new(?SESSION_P_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_P_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_ATTRS_TAB, [set, compressed | TabOpts]),
ok = emqx_tables:new(?CHAN_STATS_TAB, [set | TabOpts]),
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
{ok, #{chan_pmon => emqx_pmon:new()}}.