chore(mgmt): callback query function with table name param

This commit is contained in:
JianBo He 2021-08-31 12:13:24 +08:00
parent 3f0ef7efa8
commit 0a7a14f4cd
4 changed files with 162 additions and 153 deletions

View File

@ -22,13 +22,13 @@
%% first_next query APIs
-export([ params2qs/2
, node_query/4
, cluster_query/3
, node_query/5
, cluster_query/4
, traverse_table/5
, select_table/5
]).
-export([do_query/5]).
-export([do_query/6]).
paginate(Tables, Params, RowFun) ->
Qh = query_handle(Tables),
@ -78,14 +78,14 @@ limit(Params) ->
%% Node Query
%%--------------------------------------------------------------------
node_query(Node, Params, {Tab, QsSchema}, QueryFun) ->
node_query(Node, Params, Tab, QsSchema, QueryFun) ->
{CodCnt, Qs} = params2qs(Params, QsSchema),
Limit = b2i(limit(Params)),
Page = b2i(page(Params)),
Start = if Page > 1 -> (Page-1) * Limit;
true -> 0
end,
{_, Rows} = do_query(Node, Qs, QueryFun, Start, Limit+1),
{_, Rows} = do_query(Node, Tab, Qs, QueryFun, Start, Limit+1),
Meta = #{page => Page, limit => Limit},
NMeta = case CodCnt =:= 0 of
true -> Meta#{count => count(Tab)};
@ -94,10 +94,11 @@ node_query(Node, Params, {Tab, QsSchema}, QueryFun) ->
#{meta => NMeta, data => lists:sublist(Rows, Limit)}.
%% @private
do_query(Node, Qs, {M,F}, Start, Limit) when Node =:= node() ->
M:F(Qs, Start, Limit);
do_query(Node, Qs, QueryFun, Start, Limit) ->
rpc_call(Node, ?MODULE, do_query, [Node, Qs, QueryFun, Start, Limit], 50000).
do_query(Node, Tab, Qs, {M,F}, Start, Limit) when Node =:= node() ->
M:F(Tab, Qs, Start, Limit);
do_query(Node, Tab, Qs, QueryFun, Start, Limit) ->
rpc_call(Node, ?MODULE, do_query,
[Node, Tab, Qs, QueryFun, Start, Limit], 50000).
%% @private
rpc_call(Node, M, F, A, T) ->
@ -110,7 +111,7 @@ rpc_call(Node, M, F, A, T) ->
%% Cluster Query
%%--------------------------------------------------------------------
cluster_query(Params, {Tab, QsSchema}, QueryFun) ->
cluster_query(Params, Tab, QsSchema, QueryFun) ->
{CodCnt, Qs} = params2qs(Params, QsSchema),
Limit = b2i(limit(Params)),
Page = b2i(page(Params)),
@ -118,7 +119,7 @@ cluster_query(Params, {Tab, QsSchema}, QueryFun) ->
true -> 0
end,
Nodes = ekka_mnesia:running_nodes(),
Rows = do_cluster_query(Nodes, Qs, QueryFun, Start, Limit+1, []),
Rows = do_cluster_query(Nodes, Tab, Qs, QueryFun, Start, Limit+1, []),
Meta = #{page => Page, limit => Limit},
NMeta = case CodCnt =:= 0 of
true -> Meta#{count => count(Tab, Nodes)};
@ -127,13 +128,13 @@ cluster_query(Params, {Tab, QsSchema}, QueryFun) ->
#{meta => NMeta, data => lists:sublist(Rows, Limit)}.
%% @private
do_cluster_query([], _, _, _, _, Acc) ->
do_cluster_query([], _, _, _, _, _, Acc) ->
lists:append(lists:reverse(Acc));
do_cluster_query([Node|Nodes], Qs, QueryFun, Start, Limit, Acc) ->
{NStart, Rows} = do_query(Node, Qs, QueryFun, Start, Limit),
do_cluster_query([Node|Nodes], Tab, Qs, QueryFun, Start, Limit, Acc) ->
{NStart, Rows} = do_query(Node, Tab, Qs, QueryFun, Start, Limit),
case Limit - length(Rows) of
Rest when Rest > 0 ->
do_cluster_query(Nodes, Qs, QueryFun, NStart, Limit, [Rows|Acc]);
do_cluster_query(Nodes, Tab, Qs, QueryFun, NStart, Limit, [Rows|Acc]);
0 ->
lists:append(lists:reverse([Rows|Acc]))
end.

View File

@ -22,8 +22,10 @@
-export([alarms/2]).
-export([ query_activated/3
, query_deactivated/3]).
%% internal export (for query)
-export([ query/4
]).
%% notice: from emqx_alarms
-define(ACTIVATED_ALARM, emqx_activated_alarm).
-define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
@ -71,14 +73,12 @@ alarms_api() ->
%%%==============================================================================================
%% parameters trans
alarms(get, #{query_string := Qs}) ->
{Table, Function} =
Table =
case maps:get(<<"activated">>, Qs, <<"true">>) of
<<"true">> ->
{?ACTIVATED_ALARM, query_activated};
<<"false">> ->
{?DEACTIVATED_ALARM, query_deactivated}
<<"true">> -> ?ACTIVATED_ALARM;
<<"false">> -> ?DEACTIVATED_ALARM
end,
Response = emqx_mgmt_api:cluster_query(Qs, {Table, []}, {?MODULE, Function}),
Response = emqx_mgmt_api:cluster_query(Qs, Table, [], {?MODULE, query}),
{200, Response};
alarms(delete, _Params) ->
@ -87,13 +87,8 @@ alarms(delete, _Params) ->
%%%==============================================================================================
%% internal
query_activated(_, Start, Limit) ->
query(?ACTIVATED_ALARM, Start, Limit).
query_deactivated(_, Start, Limit) ->
query(?DEACTIVATED_ALARM, Start, Limit).
query(Table, Start, Limit) ->
query(Table, _QsSpec, Start, Limit) ->
Ms = [{'$1',[],['$1']}],
emqx_mgmt_api:select_table(Table, Ms, Start, Limit, fun format_alarm/1).

View File

@ -35,7 +35,7 @@
, unsubscribe/2
, subscribe_batch/2]).
-export([ query/3
-export([ query/4
, format_channel_info/1]).
%% for batch operation
@ -420,14 +420,17 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
%% api apply
list(Params) ->
{Tab, QuerySchema} = ?CLIENT_QS_SCHEMA,
case maps:get(<<"node">>, Params, undefined) of
undefined ->
Response = emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun),
Response = emqx_mgmt_api:cluster_query(Params, Tab,
QuerySchema, ?query_fun),
{200, Response};
Node1 ->
Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Params),
Response = emqx_mgmt_api:node_query(Node, ParamsWithoutNode, ?CLIENT_QS_SCHEMA, ?query_fun),
Response = emqx_mgmt_api:node_query(Node, ParamsWithoutNode,
Tab, QuerySchema, ?query_fun),
{200, Response}
end.
@ -492,8 +495,123 @@ subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
ArgList = [[ClientID, Topic, Qos]|| #{topic := Topic, qos := Qos} <- Topics],
emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList).
%%%==============================================================================================
%%--------------------------------------------------------------------
%% internal function
do_subscribe(ClientID, Topic0, Qos) ->
{Topic, Opts} = emqx_topic:parse(Topic0),
TopicTable = [{Topic, Opts#{qos => Qos}}],
case emqx_mgmt:subscribe(ClientID, TopicTable) of
{error, Reason} ->
{error, Reason};
{subscribe, Subscriptions} ->
case proplists:is_defined(Topic, Subscriptions) of
true ->
ok;
false ->
{error, unknow_error}
end
end.
do_unsubscribe(ClientID, Topic) ->
case emqx_mgmt:unsubscribe(ClientID, Topic) of
{error, Reason} ->
{error, Reason};
Res ->
Res
end.
%%--------------------------------------------------------------------
%% Query Functions
query(Tab, {Qs, []}, Start, Limit) ->
Ms = qs2ms(Qs),
emqx_mgmt_api:select_table(Tab, Ms, Start, Limit,
fun format_channel_info/1);
query(Tab, {Qs, Fuzzy}, Start, Limit) ->
Ms = qs2ms(Qs),
MatchFun = match_fun(Ms, Fuzzy),
emqx_mgmt_api:traverse_table(Tab, MatchFun, Start, Limit,
fun format_channel_info/1).
%%--------------------------------------------------------------------
%% QueryString to Match Spec
-spec qs2ms(list()) -> ets:match_spec().
qs2ms(Qs) ->
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
[{{'$1', MtchHead, '_'}, Conds, ['$_']}].
qs2ms([], _, {MtchHead, Conds}) ->
{MtchHead, lists:reverse(Conds)};
qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
qs2ms(Rest, N, {NMtchHead, Conds});
qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
NConds = put_conds(Qs, Holder, Conds),
qs2ms(Rest, N+1, {NMtchHead, NConds}).
put_conds({_, Op, V}, Holder, Conds) ->
[{Op, Holder, V} | Conds];
put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
[{Op2, Holder, V2},
{Op1, Holder, V1} | Conds].
ms(clientid, X) ->
#{clientinfo => #{clientid => X}};
ms(username, X) ->
#{clientinfo => #{username => X}};
ms(zone, X) ->
#{clientinfo => #{zone => X}};
ms(ip_address, X) ->
#{clientinfo => #{peerhost => X}};
ms(conn_state, X) ->
#{conn_state => X};
ms(clean_start, X) ->
#{conninfo => #{clean_start => X}};
ms(proto_name, X) ->
#{conninfo => #{proto_name => X}};
ms(proto_ver, X) ->
#{conninfo => #{proto_ver => X}};
ms(connected_at, X) ->
#{conninfo => #{connected_at => X}};
ms(created_at, X) ->
#{session => #{created_at => X}}.
%%--------------------------------------------------------------------
%% Match funcs
match_fun(Ms, Fuzzy) ->
MsC = ets:match_spec_compile(Ms),
REFuzzy = lists:map(fun({K, like, S}) ->
{ok, RE} = re:compile(S),
{K, like, RE}
end, Fuzzy),
fun(Rows) ->
case ets:match_spec_run(Rows, MsC) of
[] -> [];
Ls ->
lists:filter(fun(E) ->
run_fuzzy_match(E, REFuzzy)
end, Ls)
end
end.
run_fuzzy_match(_, []) ->
true;
run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) ->
Val = case maps:get(Key, ClientInfo, "") of
undefined -> "";
V -> V
end,
re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_match(E, Fuzzy).
%%--------------------------------------------------------------------
%% format funcs
format_channel_info({_, ClientInfo, ClientStats}) ->
Fun =
fun
@ -546,116 +664,8 @@ peer_to_binary(Addr) ->
list_to_binary(inet:ntoa(Addr)).
format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) ->
#{
access => PubSub,
topic => Topic,
result => AuthzResult,
updated_time => Timestamp
}.
do_subscribe(ClientID, Topic0, Qos) ->
{Topic, Opts} = emqx_topic:parse(Topic0),
TopicTable = [{Topic, Opts#{qos => Qos}}],
case emqx_mgmt:subscribe(ClientID, TopicTable) of
{error, Reason} ->
{error, Reason};
{subscribe, Subscriptions} ->
case proplists:is_defined(Topic, Subscriptions) of
true ->
ok;
false ->
{error, unknow_error}
end
end.
do_unsubscribe(ClientID, Topic) ->
case emqx_mgmt:unsubscribe(ClientID, Topic) of
{error, Reason} ->
{error, Reason};
Res ->
Res
end.
%%%==============================================================================================
%% Query Functions
query({Qs, []}, Start, Limit) ->
Ms = qs2ms(Qs),
emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format_channel_info/1);
query({Qs, Fuzzy}, Start, Limit) ->
Ms = qs2ms(Qs),
MatchFun = match_fun(Ms, Fuzzy),
emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1).
%%%==============================================================================================
%% QueryString to Match Spec
-spec qs2ms(list()) -> ets:match_spec().
qs2ms(Qs) ->
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
[{{'$1', MtchHead, '_'}, Conds, ['$_']}].
qs2ms([], _, {MtchHead, Conds}) ->
{MtchHead, lists:reverse(Conds)};
qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
qs2ms(Rest, N, {NMtchHead, Conds});
qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
NConds = put_conds(Qs, Holder, Conds),
qs2ms(Rest, N+1, {NMtchHead, NConds}).
put_conds({_, Op, V}, Holder, Conds) ->
[{Op, Holder, V} | Conds];
put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
[{Op2, Holder, V2},
{Op1, Holder, V1} | Conds].
ms(clientid, X) ->
#{clientinfo => #{clientid => X}};
ms(username, X) ->
#{clientinfo => #{username => X}};
ms(zone, X) ->
#{clientinfo => #{zone => X}};
ms(ip_address, X) ->
#{clientinfo => #{peerhost => X}};
ms(conn_state, X) ->
#{conn_state => X};
ms(clean_start, X) ->
#{conninfo => #{clean_start => X}};
ms(proto_name, X) ->
#{conninfo => #{proto_name => X}};
ms(proto_ver, X) ->
#{conninfo => #{proto_ver => X}};
ms(connected_at, X) ->
#{conninfo => #{connected_at => X}};
ms(created_at, X) ->
#{session => #{created_at => X}}.
%%%==============================================================================================
%% Match funcs
match_fun(Ms, Fuzzy) ->
MsC = ets:match_spec_compile(Ms),
REFuzzy = lists:map(fun({K, like, S}) ->
{ok, RE} = re:compile(S),
{K, like, RE}
end, Fuzzy),
fun(Rows) ->
case ets:match_spec_run(Rows, MsC) of
[] -> [];
Ls ->
lists:filter(fun(E) ->
run_fuzzy_match(E, REFuzzy)
end, Ls)
end
end.
run_fuzzy_match(_, []) ->
true;
run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) ->
Val = case maps:get(Key, ClientInfo, "") of
undefined -> "";
V -> V
end,
re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_match(E, Fuzzy).
#{ access => PubSub,
topic => Topic,
result => AuthzResult,
updated_time => Timestamp
}.

View File

@ -29,7 +29,7 @@
-export([subscriptions/2]).
-export([ query/3
-export([ query/4
, format/1
]).
@ -111,11 +111,14 @@ subscriptions(get, #{query_string := Params}) ->
list(Params).
list(Params) ->
{Tab, QuerySchema} = ?SUBS_QS_SCHEMA,
case maps:get(<<"node">>, Params, undefined) of
undefined ->
{200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)};
{200, emqx_mgmt_api:cluster_query(Params, Tab,
QuerySchema, ?query_fun)};
Node ->
{200, emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), Params, ?SUBS_QS_SCHEMA, ?query_fun)}
{200, emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), Params,
Tab, QuerySchema, ?query_fun)}
end.
format(Items) when is_list(Items) ->
@ -145,14 +148,14 @@ format({_Subscriber, Topic, Options}) ->
%% Query Function
%%--------------------------------------------------------------------
query({Qs, []}, Start, Limit) ->
query(Tab, {Qs, []}, Start, Limit) ->
Ms = qs2ms(Qs),
emqx_mgmt_api:select_table(emqx_suboption, Ms, Start, Limit, fun format/1);
emqx_mgmt_api:select_table(Tab, Ms, Start, Limit, fun format/1);
query({Qs, Fuzzy}, Start, Limit) ->
query(Tab, {Qs, Fuzzy}, Start, Limit) ->
Ms = qs2ms(Qs),
MatchFun = match_fun(Ms, Fuzzy),
emqx_mgmt_api:traverse_table(emqx_suboption, MatchFun, Start, Limit, fun format/1).
emqx_mgmt_api:traverse_table(Tab, MatchFun, Start, Limit, fun format/1).
match_fun(Ms, Fuzzy) ->
MsC = ets:match_spec_compile(Ms),