refactor(emqx_gateway): Reuse emqx_gateway_cm APIs
This commit is contained in:
parent
706d9a899a
commit
925654978e
|
@ -304,10 +304,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _},
|
||||||
%% format funcs
|
%% format funcs
|
||||||
|
|
||||||
format_channel_info({_, Infos, Stats} = R) ->
|
format_channel_info({_, Infos, Stats} = R) ->
|
||||||
|
Node = maps:get(node, Infos, node()),
|
||||||
ClientInfo = maps:get(clientinfo, Infos, #{}),
|
ClientInfo = maps:get(clientinfo, Infos, #{}),
|
||||||
ConnInfo = maps:get(conninfo, Infos, #{}),
|
ConnInfo = maps:get(conninfo, Infos, #{}),
|
||||||
SessInfo = maps:get(session, Infos, #{}),
|
SessInfo = maps:get(session, Infos, #{}),
|
||||||
FetchX = [ {node, ClientInfo, node()}
|
FetchX = [ {node, ClientInfo, Node}
|
||||||
, {clientid, ClientInfo}
|
, {clientid, ClientInfo}
|
||||||
, {username, ClientInfo}
|
, {username, ClientInfo}
|
||||||
, {proto_name, ConnInfo}
|
, {proto_name, ConnInfo}
|
||||||
|
|
|
@ -36,6 +36,7 @@
|
||||||
, register_channel/4
|
, register_channel/4
|
||||||
, unregister_channel/2
|
, unregister_channel/2
|
||||||
, insert_channel_info/4
|
, insert_channel_info/4
|
||||||
|
, lookup_by_clientid/2
|
||||||
, set_chan_info/3
|
, set_chan_info/3
|
||||||
, set_chan_info/4
|
, set_chan_info/4
|
||||||
, get_chan_info/2
|
, get_chan_info/2
|
||||||
|
@ -64,7 +65,8 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% RPC targets
|
%% RPC targets
|
||||||
-export([ do_get_chan_info/3
|
-export([ do_lookup_by_clientid/2
|
||||||
|
, do_get_chan_info/3
|
||||||
, do_set_chan_info/4
|
, do_set_chan_info/4
|
||||||
, do_get_chan_stats/3
|
, do_get_chan_stats/3
|
||||||
, do_set_chan_stats/4
|
, do_set_chan_stats/4
|
||||||
|
@ -159,11 +161,19 @@ get_chan_info(GwName, ClientId) ->
|
||||||
get_chan_info(GwName, ClientId, ChanPid)
|
get_chan_info(GwName, ClientId, ChanPid)
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) ->
|
||||||
|
[pid()].
|
||||||
|
do_lookup_by_clientid(GwName, ClientId) ->
|
||||||
|
ChanTab = emqx_gateway_cm:tabname(chan, GwName),
|
||||||
|
[Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)].
|
||||||
|
|
||||||
-spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid())
|
-spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid())
|
||||||
-> emqx_types:infos() | undefined.
|
-> emqx_types:infos() | undefined.
|
||||||
do_get_chan_info(GwName, ClientId, ChanPid) ->
|
do_get_chan_info(GwName, ClientId, ChanPid) ->
|
||||||
Chan = {ClientId, ChanPid},
|
Chan = {ClientId, ChanPid},
|
||||||
try ets:lookup_element(tabname(info, GwName), Chan, 2)
|
try
|
||||||
|
Info = ets:lookup_element(tabname(info, GwName), Chan, 2),
|
||||||
|
Info#{node => node()}
|
||||||
catch
|
catch
|
||||||
error:badarg -> undefined
|
error:badarg -> undefined
|
||||||
end.
|
end.
|
||||||
|
@ -173,6 +183,13 @@ do_get_chan_info(GwName, ClientId, ChanPid) ->
|
||||||
get_chan_info(GwName, ClientId, ChanPid) ->
|
get_chan_info(GwName, ClientId, ChanPid) ->
|
||||||
wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)).
|
wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)).
|
||||||
|
|
||||||
|
-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) ->
|
||||||
|
[pid()].
|
||||||
|
lookup_by_clientid(GwName, ClientId) ->
|
||||||
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
Pids = emqx_gateway_cm_proto_v1:lookup_by_clientid(Nodes, GwName, ClientId),
|
||||||
|
lists:append([Pid || {ok, Pid} <- Pids]).
|
||||||
|
|
||||||
%% @doc Update infos of the channel.
|
%% @doc Update infos of the channel.
|
||||||
-spec set_chan_info(gateway_name(),
|
-spec set_chan_info(gateway_name(),
|
||||||
emqx_types:clientid(),
|
emqx_types:clientid(),
|
||||||
|
|
|
@ -47,9 +47,7 @@
|
||||||
|
|
||||||
%% Mgmt APIs - clients
|
%% Mgmt APIs - clients
|
||||||
-export([ lookup_client/3
|
-export([ lookup_client/3
|
||||||
, lookup_client/4
|
|
||||||
, kickout_client/2
|
, kickout_client/2
|
||||||
, kickout_client/3
|
|
||||||
, list_client_subscriptions/2
|
, list_client_subscriptions/2
|
||||||
, client_subscribe/4
|
, client_subscribe/4
|
||||||
, client_unsubscribe/3
|
, client_unsubscribe/3
|
||||||
|
@ -231,41 +229,26 @@ confexp({error, Reason}) -> error(Reason).
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec lookup_client(gateway_name(),
|
-spec lookup_client(gateway_name(),
|
||||||
emqx_types:clientid(), {atom(), atom()}) -> list().
|
emqx_types:clientid(), {module(), atom()}) -> list().
|
||||||
lookup_client(GwName, ClientId, FormatFun) ->
|
lookup_client(GwName, ClientId, {M, F}) ->
|
||||||
lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun)
|
[begin
|
||||||
|| Node <- mria_mnesia:running_nodes()]).
|
Info = emqx_gateway_cm:get_chan_info(GwName, ClientId, Pid),
|
||||||
|
Stats = emqx_gateway_cm:get_chan_stats(GwName, ClientId, Pid),
|
||||||
lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() ->
|
M:F({{ClientId, Pid}, Info, Stats})
|
||||||
ChanTab = emqx_gateway_cm:tabname(chan, GwName),
|
end
|
||||||
InfoTab = emqx_gateway_cm:tabname(info, GwName),
|
|| Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)].
|
||||||
|
|
||||||
lists:append(lists:map(
|
|
||||||
fun(Key) ->
|
|
||||||
lists:map(fun M:F/1, ets:lookup(InfoTab, Key))
|
|
||||||
end, ets:lookup(ChanTab, ClientId)));
|
|
||||||
|
|
||||||
lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) ->
|
|
||||||
rpc_call(Node, lookup_client,
|
|
||||||
[Node, GwName, {clientid, ClientId}, FormatFun]).
|
|
||||||
|
|
||||||
-spec kickout_client(gateway_name(), emqx_types:clientid())
|
-spec kickout_client(gateway_name(), emqx_types:clientid())
|
||||||
-> {error, any()}
|
-> {error, any()}
|
||||||
| ok.
|
| ok.
|
||||||
kickout_client(GwName, ClientId) ->
|
kickout_client(GwName, ClientId) ->
|
||||||
Results = [kickout_client(Node, GwName, ClientId)
|
Results = [emqx_gateway_cm:kick_session(GwName, ClientId, Pid)
|
||||||
|| Node <- mria_mnesia:running_nodes()],
|
|| Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)],
|
||||||
case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
case Results =:= [] orelse lists:any(fun(Item) -> Item =:= ok end, Results) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> lists:last(Results)
|
false -> lists:last(Results)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
kickout_client(Node, GwName, ClientId) when Node =:= node() ->
|
|
||||||
emqx_gateway_cm:kick_session(GwName, ClientId);
|
|
||||||
|
|
||||||
kickout_client(Node, GwName, ClientId) ->
|
|
||||||
rpc_call(Node, kickout_client, [Node, GwName, ClientId]).
|
|
||||||
|
|
||||||
-spec list_client_subscriptions(gateway_name(), emqx_types:clientid())
|
-spec list_client_subscriptions(gateway_name(), emqx_types:clientid())
|
||||||
-> {error, any()}
|
-> {error, any()}
|
||||||
| {ok, list()}.
|
| {ok, list()}.
|
||||||
|
@ -459,9 +442,3 @@ to_list(B) when is_binary(B) ->
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal funcs
|
%% Internal funcs
|
||||||
|
|
||||||
rpc_call(Node, Fun, Args) ->
|
|
||||||
case rpc:call(Node, ?MODULE, Fun, Args) of
|
|
||||||
{badrpc, Reason} -> {error, Reason};
|
|
||||||
Res -> Res
|
|
||||||
end.
|
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
, discard_session/3
|
, discard_session/3
|
||||||
, kick_session/3
|
, kick_session/3
|
||||||
, get_chann_conn_mod/3
|
, get_chann_conn_mod/3
|
||||||
|
, lookup_by_clientid/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/bpapi.hrl").
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
@ -34,6 +35,11 @@
|
||||||
introduced_in() ->
|
introduced_in() ->
|
||||||
"5.0.0".
|
"5.0.0".
|
||||||
|
|
||||||
|
-spec lookup_by_clientid([node()], emqx_gateway_cm:gateway_name(), emqx_types:clientid()) ->
|
||||||
|
emqx_rpc:multicall_return([pid()]).
|
||||||
|
lookup_by_clientid(Nodes, GwName, ClientId) ->
|
||||||
|
erpc:multicall(Nodes, emqx_gateway_cm, do_lookup_by_clientid, [GwName, ClientId]).
|
||||||
|
|
||||||
-spec get_chan_info(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid())
|
-spec get_chan_info(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid())
|
||||||
-> emqx_types:infos() | undefined | {badrpc, _}.
|
-> emqx_types:infos() | undefined | {badrpc, _}.
|
||||||
get_chan_info(GwName, ClientId, ChanPid) ->
|
get_chan_info(GwName, ClientId, ChanPid) ->
|
||||||
|
|
|
@ -125,7 +125,7 @@ t_get_set_chan_info_stats(_) ->
|
||||||
#{clientinfo => clientinfo(), conninfo => conninfo()}, []),
|
#{clientinfo => clientinfo(), conninfo => conninfo()}, []),
|
||||||
|
|
||||||
%% Info: get/set
|
%% Info: get/set
|
||||||
NInfo = #{newinfo => true},
|
NInfo = #{newinfo => true, node => node()},
|
||||||
emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo),
|
emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
NInfo,
|
NInfo,
|
||||||
|
|
Loading…
Reference in New Issue