From 5b3a61b7999fcd6da75c02a1d4068e1f7da6520b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 25 Jul 2019 09:25:45 +0800 Subject: [PATCH] Merge the connection and session tabs into channel tab --- src/emqx_cm.erl | 179 ++++++++++++------------------------------------ 1 file changed, 43 insertions(+), 136 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index e756a37e5..b1e3982fe 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -32,13 +32,13 @@ , unregister_channel/2 ]). --export([ get_conn_attrs/1 - , get_conn_attrs/2 +-export([ get_chan_attrs/1 + , get_chan_attrs/2 , set_chan_attrs/2 ]). --export([ get_conn_stats/1 - , get_conn_stats/2 +-export([ get_chan_stats/1 + , get_chan_stats/2 , set_chan_stats/2 ]). @@ -47,16 +47,6 @@ , resume_session/1 ]). --export([ get_session_attrs/1 - , get_session_attrs/2 - , set_session_attrs/2 - ]). - --export([ get_session_stats/1 - , get_session_stats/2 - , set_session_stats/2 - ]). - -export([ lookup_channels/1 , lookup_channels/2 ]). @@ -73,29 +63,19 @@ %% Internal export -export([stats_fun/0]). --export_type([attrs/0, stats/0]). - -type(chan_pid() :: pid()). --opaque(attrs() :: #{atom() => term()}). - --opaque(stats() :: #{atom() => integer()}). - %% Tables for channel management. -define(CHAN_TAB, emqx_channel). +-define(CHAN_P_TAB, emqx_channel_p). +-define(CHAN_ATTRS_TAB, emqx_channel_attrs). +-define(CHAN_STATS_TAB, emqx_channel_stats). --define(CONN_TAB, emqx_connection). - --define(SESSION_TAB, emqx_session). - --define(SESSION_P_TAB, emqx_session_p). - -%% Chan stats -define(CHAN_STATS, [{?CHAN_TAB, 'channels.count', 'channels.max'}, - {?CONN_TAB, 'connections.count', 'connections.max'}, - {?SESSION_TAB, 'sessions.count', 'sessions.max'}, - {?SESSION_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max'} + {?CHAN_TAB, 'connections.count', 'connections.max'}, + {?CHAN_TAB, 'sessions.count', 'sessions.max'}, + {?CHAN_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max'} ]). %% Batch drain @@ -118,6 +98,7 @@ start_link() -> register_channel(ClientId) when is_binary(ClientId) -> register_channel(ClientId, self()). +%% @doc Register a channel with pid. -spec(register_channel(emqx_types:client_id(), chan_pid()) -> ok). register_channel(ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, @@ -139,66 +120,51 @@ unregister_channel(ClientId, ChanPid) -> %% @private do_unregister_channel(Chan) -> ok = emqx_cm_registry:unregister_channel(Chan), - true = ets:delete_object(?SESSION_P_TAB, Chan), - true = ets:delete(?SESSION_TAB, Chan), - true = ets:delete(?CONN_TAB, Chan), + true = ets:delete_object(?CHAN_P_TAB, Chan), + true = ets:delete(?CHAN_ATTRS_TAB, Chan), + true = ets:delete(?CHAN_STATS_TAB, Chan), ets:delete_object(?CHAN_TAB, Chan). -%% @doc Get conn attrs. --spec(get_conn_attrs(emqx_types:client_id()) -> maybe(attrs())). -get_conn_attrs(ClientId) -> - with_channel(ClientId, fun(ChanPid) -> - get_conn_attrs(ClientId, ChanPid) - end). +%% @doc Get attrs of a channel. +-spec(get_chan_attrs(emqx_types:client_id()) -> maybe(emqx_types:attrs())). +get_chan_attrs(ClientId) -> + with_channel(ClientId, fun(ChanPid) -> get_chan_attrs(ClientId, ChanPid) end). --spec(get_conn_attrs(emqx_types:client_id(), chan_pid()) -> maybe(attrs())). -get_conn_attrs(ClientId, ChanPid) when node(ChanPid) == node() -> +-spec(get_chan_attrs(emqx_types:client_id(), chan_pid()) -> maybe(emqx_types:attrs())). +get_chan_attrs(ClientId, ChanPid) when node(ChanPid) == node() -> Chan = {ClientId, ChanPid}, - try ets:lookup_element(?CONN_TAB, Chan, 2) of - Attrs -> Attrs - catch - error:badarg -> undefined - end; -get_conn_attrs(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_conn_attrs, [ClientId, ChanPid]). + emqx_tables:lookup_value(?CHAN_ATTRS_TAB, Chan); +get_chan_attrs(ClientId, ChanPid) -> + rpc_call(node(ChanPid), get_chan_attrs, [ClientId, ChanPid]). -%% @doc Set conn attrs. --spec(set_chan_attrs(emqx_types:client_id(), attrs()) -> ok). +%% @doc Set attrs of a channel. +-spec(set_chan_attrs(emqx_types:client_id(), emqx_types:attrs()) -> ok). set_chan_attrs(ClientId, Attrs) when is_binary(ClientId) -> Chan = {ClientId, self()}, - case ets:update_element(?CONN_TAB, Chan, {2, Attrs}) of - true -> ok; - false -> true = ets:insert(?CONN_TAB, {Chan, Attrs, #{}}), - ok - end. + true = ets:insert(?CHAN_ATTRS_TAB, {Chan, Attrs}), + ok. -%% @doc Get conn stats. --spec(get_conn_stats(emqx_types:client_id()) -> maybe(stats())). -get_conn_stats(ClientId) -> - with_channel(ClientId, fun(ChanPid) -> - get_conn_stats(ClientId, ChanPid) - end). +%% @doc Get channel's stats. +-spec(get_chan_stats(emqx_types:client_id()) -> maybe(emqx_types:stats())). +get_chan_stats(ClientId) -> + with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end). --spec(get_conn_stats(emqx_types:client_id(), chan_pid()) -> maybe(stats())). -get_conn_stats(ClientId, ChanPid) when node(ChanPid) == node() -> +-spec(get_chan_stats(emqx_types:client_id(), chan_pid()) -> maybe(emqx_types:stats())). +get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() -> Chan = {ClientId, ChanPid}, - try ets:lookup_element(?CONN_TAB, Chan, 3) of - Stats -> Stats - catch - error:badarg -> undefined - end; -get_conn_stats(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_conn_stats, [ClientId, ChanPid]). + emqx_tables:lookup_value(?CHAN_STATS_TAB, Chan); +get_chan_stats(ClientId, ChanPid) -> + rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]). -%% @doc Set conn stats. --spec(set_chan_stats(emqx_types:client_id(), stats()) -> ok). +%% @doc Set channel's stats. +-spec(set_chan_stats(emqx_types:client_id(), emqx_types:stats()) -> ok). set_chan_stats(ClientId, Stats) when is_binary(ClientId) -> set_chan_stats(ClientId, self(), Stats). --spec(set_chan_stats(emqx_types:client_id(), chan_pid(), stats()) -> ok). +-spec(set_chan_stats(emqx_types:client_id(), chan_pid(), emqx_types:stats()) -> ok). set_chan_stats(ClientId, ChanPid, Stats) -> Chan = {ClientId, ChanPid}, - _ = ets:update_element(?CONN_TAB, Chan, {3, Stats}), + true = ets:insert(?CHAN_STATS_TAB, {Chan, Stats}), ok. %% @doc Open a session. @@ -257,69 +223,10 @@ discard_session(ClientId) when is_binary(ClientId) -> end, ChanPids) end. -%% @doc Get session attrs. --spec(get_session_attrs(emqx_types:client_id()) -> attrs()). -get_session_attrs(ClientId) -> - with_channel(ClientId, fun(ChanPid) -> - get_session_attrs(ClientId, ChanPid) - end). - --spec(get_session_attrs(emqx_types:client_id(), chan_pid()) -> maybe(attrs())). -get_session_attrs(ClientId, ChanPid) when node(ChanPid) == node() -> - Chan = {ClientId, ChanPid}, - try ets:lookup_element(?SESSION_TAB, Chan, 2) of - Attrs -> Attrs - catch - error:badarg -> undefined - end; -get_session_attrs(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_session_attrs, [ClientId, ChanPid]). - -%% @doc Set session attrs. --spec(set_session_attrs(emqx_types:client_id(), attrs()) -> ok). -set_session_attrs(ClientId, Attrs) when is_binary(ClientId) -> - Chan = {ClientId, self()}, - case ets:update_element(?SESSION_TAB, Chan, {2, Attrs}) of - true -> ok; - false -> - true = ets:insert(?SESSION_TAB, {Chan, Attrs, #{}}), - is_clean_start(Attrs) orelse ets:insert(?SESSION_P_TAB, Chan), - ok - end. - %% @doc Is clean start? is_clean_start(#{clean_start := false}) -> false; is_clean_start(_Attrs) -> true. -%% @doc Get session stats. --spec(get_session_stats(emqx_types:client_id()) -> stats()). -get_session_stats(ClientId) -> - with_channel(ClientId, fun(ChanPid) -> - get_session_stats(ClientId, ChanPid) - end). - --spec(get_session_stats(emqx_types:client_id(), chan_pid()) -> maybe(stats())). -get_session_stats(ClientId, ChanPid) when node(ChanPid) == node() -> - Chan = {ClientId, ChanPid}, - try ets:lookup_element(?SESSION_TAB, Chan, 3) of - Stats -> Stats - catch - error:badarg -> undefined - end; -get_session_stats(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_session_stats, [ClientId, ChanPid]). - -%% @doc Set session stats. --spec(set_session_stats(emqx_types:client_id(), stats()) -> ok). -set_session_stats(ClientId, Stats) when is_binary(ClientId) -> - set_session_stats(ClientId, self(), Stats). - --spec(set_session_stats(emqx_types:client_id(), chan_pid(), stats()) -> ok). -set_session_stats(ClientId, ChanPid, Stats) -> - Chan = {ClientId, ChanPid}, - _ = ets:update_element(?SESSION_TAB, Chan, {3, Stats}), - ok. - with_channel(ClientId, Fun) -> case lookup_channels(ClientId) of [] -> undefined; @@ -362,9 +269,9 @@ cast(Msg) -> gen_server:cast(?CM, Msg). init([]) -> TabOpts = [public, {write_concurrency, true}], ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]), - ok = emqx_tables:new(?CONN_TAB, [set, compressed | TabOpts]), - ok = emqx_tables:new(?SESSION_TAB, [set, compressed | TabOpts]), - ok = emqx_tables:new(?SESSION_P_TAB, [bag | TabOpts]), + ok = emqx_tables:new(?CHAN_P_TAB, [bag | TabOpts]), + ok = emqx_tables:new(?CHAN_ATTRS_TAB, [set, compressed | TabOpts]), + ok = emqx_tables:new(?CHAN_STATS_TAB, [set | TabOpts]), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), {ok, #{chan_pmon => emqx_pmon:new()}}.