diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index a89063870..7368b442d 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -41,7 +41,8 @@ delete_all_deactivated_alarms/0, get_alarms/0, get_alarms/1, - format/1 + format/1, + format/2 ]). %% gen_server callbacks @@ -169,12 +170,15 @@ get_alarms(activated) -> get_alarms(deactivated) -> gen_server:call(?MODULE, {get_alarms, deactivated}). -format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) -> +format(Alarm) -> + format(node(), Alarm). + +format(Node, #activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) -> Now = erlang:system_time(microsecond), %% mnesia db stored microsecond for high frequency alarm %% format for dashboard using millisecond #{ - node => node(), + node => Node, name => Name, message => Message, %% to millisecond @@ -182,7 +186,7 @@ format(#activated_alarm{name = Name, message = Message, activate_at = At, detail activate_at => to_rfc3339(At), details => Details }; -format(#deactivated_alarm{ +format(Node, #deactivated_alarm{ name = Name, message = Message, activate_at = At, @@ -190,7 +194,7 @@ format(#deactivated_alarm{ deactivate_at = DAt }) -> #{ - node => node(), + node => Node, name => Name, message => Message, %% to millisecond diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index cb3fad7dc..1fcc00dc9 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -262,7 +262,14 @@ lookup_user(UserID, #{user_group := UserGroup}) -> list_users(QueryString, #{user_group := UserGroup}) -> NQueryString = QueryString#{<<"user_group">> => UserGroup}, - emqx_mgmt_api:node_query(node(), NQueryString, ?TAB, ?AUTHN_QSCHEMA, ?QUERY_FUN). + emqx_mgmt_api:node_query( + node(), + NQueryString, + ?TAB, + ?AUTHN_QSCHEMA, + ?QUERY_FUN, + fun ?MODULE:format_user_info/1 + ). %%-------------------------------------------------------------------- %% Query Functions @@ -273,8 +280,7 @@ query(Tab, {QString, []}, Continuation, Limit) -> Tab, Ms, Continuation, - Limit, - fun format_user_info/1 + Limit ); query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> Ms = ms_from_qstring(QString), @@ -283,8 +289,7 @@ query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> Tab, {Ms, FuzzyFilterFun}, Continuation, - Limit, - fun format_user_info/1 + Limit ). %%-------------------------------------------------------------------- diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index 7276ad428..f84bfc195 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -288,7 +288,14 @@ lookup_user(UserID, #{user_group := UserGroup}) -> list_users(QueryString, #{user_group := UserGroup}) -> NQueryString = QueryString#{<<"user_group">> => UserGroup}, - emqx_mgmt_api:node_query(node(), NQueryString, ?TAB, ?AUTHN_QSCHEMA, ?QUERY_FUN). + emqx_mgmt_api:node_query( + node(), + NQueryString, + ?TAB, + ?AUTHN_QSCHEMA, + ?QUERY_FUN, + fun ?MODULE:format_user_info/1 + ). %%-------------------------------------------------------------------- %% Query Functions @@ -299,8 +306,7 @@ query(Tab, {QString, []}, Continuation, Limit) -> Tab, Ms, Continuation, - Limit, - fun format_user_info/1 + Limit ); query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> Ms = ms_from_qstring(QString), @@ -309,8 +315,7 @@ query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> Tab, {Ms, FuzzyFilterFun}, Continuation, - Limit, - fun format_user_info/1 + Limit ). %%-------------------------------------------------------------------- diff --git a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl index 609974e98..8edd62e21 100644 --- a/apps/emqx_authz/src/emqx_authz_api_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_api_mnesia.erl @@ -408,7 +408,8 @@ users(get, #{query_string := QueryString}) -> QueryString, ?ACL_TABLE, ?ACL_USERNAME_QSCHEMA, - ?QUERY_USERNAME_FUN + ?QUERY_USERNAME_FUN, + fun ?MODULE:format_result/1 ) of {error, page_limit_invalid} -> @@ -443,7 +444,8 @@ clients(get, #{query_string := QueryString}) -> QueryString, ?ACL_TABLE, ?ACL_CLIENTID_QSCHEMA, - ?QUERY_CLIENTID_FUN + ?QUERY_CLIENTID_FUN, + fun ?MODULE:format_result/1 ) of {error, page_limit_invalid} -> @@ -582,8 +584,7 @@ query_username(Tab, {_QString, []}, Continuation, Limit) -> Tab, Ms, Continuation, - Limit, - fun format_result/1 + Limit ); query_username(Tab, {_QString, FuzzyQString}, Continuation, Limit) -> Ms = emqx_authz_mnesia:list_username_rules(), @@ -592,8 +593,7 @@ query_username(Tab, {_QString, FuzzyQString}, Continuation, Limit) -> Tab, {Ms, FuzzyFilterFun}, Continuation, - Limit, - fun format_result/1 + Limit ). query_clientid(Tab, {_QString, []}, Continuation, Limit) -> @@ -602,8 +602,7 @@ query_clientid(Tab, {_QString, []}, Continuation, Limit) -> Tab, Ms, Continuation, - Limit, - fun format_result/1 + Limit ); query_clientid(Tab, {_QString, FuzzyQString}, Continuation, Limit) -> Ms = emqx_authz_mnesia:list_clientid_rules(), @@ -612,8 +611,7 @@ query_clientid(Tab, {_QString, FuzzyQString}, Continuation, Limit) -> Tab, {Ms, FuzzyFilterFun}, Continuation, - Limit, - fun format_result/1 + Limit ). %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 5f6cc25b6..f0fbcf968 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -56,7 +56,8 @@ %% internal exports (for client query) -export([ query/4, - format_channel_info/1 + format_channel_info/1, + format_channel_info/2 ]). -define(TAGS, [<<"Gateway Clients">>]). @@ -112,7 +113,8 @@ clients(get, #{ QString, TabName, ?CLIENT_QSCHEMA, - ?QUERY_FUN + ?QUERY_FUN, + fun ?MODULE:format_channel_info/2 ); Node0 -> case emqx_misc:safe_to_existing_atom(Node0) of @@ -123,7 +125,8 @@ clients(get, #{ QStringWithoutNode, TabName, ?CLIENT_QSCHEMA, - ?QUERY_FUN + ?QUERY_FUN, + fun ?MODULE:format_channel_info/2 ); {error, _} -> {error, Node0, {badrpc, <<"invalid node">>}} @@ -272,8 +275,7 @@ query(Tab, {Qs, []}, Continuation, Limit) -> Tab, Ms, Continuation, - Limit, - fun format_channel_info/1 + Limit ); query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> Ms = qs2ms(Qs), @@ -282,8 +284,7 @@ query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> Tab, {Ms, FuzzyFilterFun}, Continuation, - Limit, - fun format_channel_info/1 + Limit ). qs2ms(Qs) -> @@ -363,8 +364,11 @@ run_fuzzy_filter( %%-------------------------------------------------------------------- %% format funcs -format_channel_info({_, Infos, Stats} = R) -> - Node = maps:get(node, Infos, node()), +format_channel_info(ChannInfo) -> + format_channel_info(node(), ChannInfo). + +format_channel_info(WhichNode, {_, Infos, Stats} = R) -> + Node = maps:get(node, Infos, WhichNode), ClientInfo = maps:get(clientinfo, Infos, #{}), ConnInfo = maps:get(conninfo, Infos, #{}), SessInfo = maps:get(session, Infos, #{}), diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 342228b19..cc0a81864 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -29,9 +29,9 @@ %% first_next query APIs -export([ - node_query/5, - cluster_query/4, - select_table_with_count/5, + node_query/6, + cluster_query/5, + select_table_with_count/4, b2i/1 ]). @@ -117,30 +117,24 @@ limit(Params) when is_map(Params) -> limit(Params) -> proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()). -init_meta(Params) -> - Limit = b2i(limit(Params)), - Page = b2i(page(Params)), - #{ - page => Page, - limit => Limit, - count => 0 - }. - %%-------------------------------------------------------------------- %% Node Query %%-------------------------------------------------------------------- -node_query(Node, QString, Tab, QSchema, QueryFun) -> - {_CodCnt, NQString} = parse_qstring(QString, QSchema), - page_limit_check_query( - init_meta(QString), - {fun do_node_query/5, [Node, Tab, NQString, QueryFun, init_meta(QString)]} - ). +node_query(Node, QString, Tab, QSchema, QueryFun, FmtFun) -> + case parse_pager_params(QString) of + false -> + {error, page_limit_invalid}; + Meta -> + {_CodCnt, NQString} = parse_qstring(QString, QSchema), + ResultAcc = #{cursor => 0, count => 0, rows => []}, + NResultAcc = do_node_query( + Node, Tab, NQString, QueryFun, ?FRESH_SELECT, Meta, ResultAcc + ), + format_query_result(FmtFun, Meta, NResultAcc) + end. %% @private -do_node_query(Node, Tab, QString, QueryFun, Meta) -> - do_node_query(Node, Tab, QString, QueryFun, _Continuation = ?FRESH_SELECT, Meta, _Results = []). - do_node_query( Node, Tab, @@ -148,45 +142,44 @@ do_node_query( QueryFun, Continuation, Meta = #{limit := Limit}, - Results + ResultAcc ) -> case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of {error, {badrpc, R}} -> {error, Node, {badrpc, R}}; - {Len, Rows, ?FRESH_SELECT} -> - {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - #{meta => NMeta, data => NResults}; - {Len, Rows, NContinuation} -> - {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - do_node_query(Node, Tab, QString, QueryFun, NContinuation, NMeta, NResults) + {Rows, ?FRESH_SELECT} -> + {_, NResultAcc} = accumulate_query_rows(Node, Rows, ResultAcc, Meta), + NResultAcc; + {Rows, NContinuation} -> + case accumulate_query_rows(Node, Rows, ResultAcc, Meta) of + {enough, NResultAcc} -> + NResultAcc; + {more, NResultAcc} -> + do_node_query(Node, Tab, QString, QueryFun, NContinuation, Meta, NResultAcc) + end end. %%-------------------------------------------------------------------- %% Cluster Query %%-------------------------------------------------------------------- -cluster_query(QString, Tab, QSchema, QueryFun) -> - {_CodCnt, NQString} = parse_qstring(QString, QSchema), - Nodes = mria_mnesia:running_nodes(), - page_limit_check_query( - init_meta(QString), - {fun do_cluster_query/5, [Nodes, Tab, NQString, QueryFun, init_meta(QString)]} - ). +cluster_query(QString, Tab, QSchema, QueryFun, FmtFun) -> + case parse_pager_params(QString) of + false -> + {error, page_limit_invalid}; + Meta -> + {_CodCnt, NQString} = parse_qstring(QString, QSchema), + Nodes = mria_mnesia:running_nodes(), + ResultAcc = #{cursor => 0, count => 0, rows => []}, + NResultAcc = do_cluster_query( + Nodes, Tab, NQString, QueryFun, ?FRESH_SELECT, Meta, ResultAcc + ), + format_query_result(FmtFun, Meta, NResultAcc) + end. %% @private -do_cluster_query(Nodes, Tab, QString, QueryFun, Meta) -> - do_cluster_query( - Nodes, - Tab, - QString, - QueryFun, - _Continuation = ?FRESH_SELECT, - Meta, - _Results = [] - ). - -do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, Meta, Results) -> - #{meta => Meta, data => Results}; +do_cluster_query([], _Tab, _QString, _QueryFun, _Continuation, _Meta, ResultAcc) -> + ResultAcc; do_cluster_query( [Node | Tail] = Nodes, Tab, @@ -194,17 +187,27 @@ do_cluster_query( QueryFun, Continuation, Meta = #{limit := Limit}, - Results + ResultAcc ) -> case do_query(Node, Tab, QString, QueryFun, Continuation, Limit) of {error, {badrpc, R}} -> - {error, Node, {bar_rpc, R}}; - {Len, Rows, ?FRESH_SELECT} -> - {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - do_cluster_query(Tail, Tab, QString, QueryFun, ?FRESH_SELECT, NMeta, NResults); - {Len, Rows, NContinuation} -> - {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), - do_cluster_query(Nodes, Tab, QString, QueryFun, NContinuation, NMeta, NResults) + {error, Node, {badrpc, R}}; + {Rows, NContinuation} -> + case accumulate_query_rows(Node, Rows, ResultAcc, Meta) of + {enough, NResultAcc} -> + NResultAcc; + {more, NResultAcc} -> + case NContinuation of + ?FRESH_SELECT -> + do_cluster_query( + Tail, Tab, QString, QueryFun, ?FRESH_SELECT, Meta, NResultAcc + ); + _ -> + do_cluster_query( + Nodes, Tab, QString, QueryFun, NContinuation, Meta, NResultAcc + ) + end + end end. %%-------------------------------------------------------------------- @@ -228,60 +231,76 @@ do_query(Node, Tab, QString, QueryFun, Continuation, Limit) -> Ret -> Ret end. -sub_query_result(Len, Rows, Limit, Results, Meta) -> - {Flag, NMeta} = judge_page_with_counting(Len, Meta), - NResults = - case Flag of - more -> - []; - cutrows -> - {SubStart, NeedNowNum} = rows_sub_params(Len, NMeta), - ThisRows = lists:sublist(Rows, SubStart, NeedNowNum), - lists:sublist(lists:append(Results, ThisRows), SubStart, Limit); - enough -> - lists:sublist(lists:append(Results, Rows), 1, Limit) - end, - {NMeta, NResults}. +%% ResultAcc :: #{count := integer(), +%% cursor := integer(), +%% rows := [{node(), Rows :: list()}] +%% } +accumulate_query_rows( + Node, + Rows, + ResultAcc = #{cursor := Cursor, count := Count, rows := RowsAcc}, + _Meta = #{page := Page, limit := Limit} +) -> + PageStart = (Page - 1) * Limit + 1, + PageEnd = Page * Limit, + Len = length(Rows), + case Cursor + Len of + NCursor when NCursor < PageStart -> + {more, ResultAcc#{cursor => NCursor}}; + NCursor when NCursor < PageEnd -> + {more, ResultAcc#{ + cursor => NCursor, + count => Count + length(Rows), + rows => [{Node, Rows} | RowsAcc] + }}; + NCursor when NCursor >= PageEnd -> + SubRows = lists:sublist(Rows, Limit - Count), + {enough, ResultAcc#{ + cursor => NCursor, + count => Count + length(SubRows), + rows => [{Node, SubRows} | RowsAcc] + }} + end. %%-------------------------------------------------------------------- %% Table Select %%-------------------------------------------------------------------- -select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit, FmtFun) when +select_table_with_count(Tab, {Ms, FuzzyFilterFun}, ?FRESH_SELECT, Limit) when is_function(FuzzyFilterFun) andalso Limit > 0 -> case ets:select(Tab, Ms, Limit) of '$end_of_table' -> - {0, [], ?FRESH_SELECT}; + {[], ?FRESH_SELECT}; {RawResult, NContinuation} -> Rows = FuzzyFilterFun(RawResult), - {length(Rows), lists:map(FmtFun, Rows), NContinuation} + {Rows, NContinuation} end; -select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit, FmtFun) when +select_table_with_count(_Tab, {Ms, FuzzyFilterFun}, Continuation, _Limit) when is_function(FuzzyFilterFun) -> case ets:select(ets:repair_continuation(Continuation, Ms)) of '$end_of_table' -> - {0, [], ?FRESH_SELECT}; + {[], ?FRESH_SELECT}; {RawResult, NContinuation} -> Rows = FuzzyFilterFun(RawResult), - {length(Rows), lists:map(FmtFun, Rows), NContinuation} + {Rows, NContinuation} end; -select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit, FmtFun) when +select_table_with_count(Tab, Ms, ?FRESH_SELECT, Limit) when Limit > 0 -> case ets:select(Tab, Ms, Limit) of '$end_of_table' -> - {0, [], ?FRESH_SELECT}; + {[], ?FRESH_SELECT}; {RawResult, NContinuation} -> - {length(RawResult), lists:map(FmtFun, RawResult), NContinuation} + {RawResult, NContinuation} end; -select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) -> +select_table_with_count(_Tab, Ms, Continuation, _Limit) -> case ets:select(ets:repair_continuation(Continuation, Ms)) of '$end_of_table' -> - {0, [], ?FRESH_SELECT}; + {[], ?FRESH_SELECT}; {RawResult, NContinuation} -> - {length(RawResult), lists:map(FmtFun, RawResult), NContinuation} + {RawResult, NContinuation} end. %%-------------------------------------------------------------------- @@ -379,40 +398,38 @@ is_fuzzy_key(<<"match_", _/binary>>) -> is_fuzzy_key(_) -> false. -page_start(1, _) -> 1; -page_start(Page, Limit) -> (Page - 1) * Limit + 1. +format_query_result(_FmtFun, _Meta, Error = {error, _Node, _Reason}) -> + Error; +format_query_result( + FmtFun, Meta, _ResultAcc = #{count := _Count, cursor := Cursor, rows := RowsAcc} +) -> + #{ + meta => Meta#{count => Cursor}, + data => lists:flatten( + lists:foldr( + fun({Node, Rows}, Acc) -> + [lists:map(fun(Row) -> exec_format_fun(FmtFun, Node, Row) end, Rows) | Acc] + end, + [], + RowsAcc + ) + ) + }. -judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) -> - PageStart = page_start(Page, Limit), - PageEnd = Page * Limit, - case Count + Len of - NCount when NCount < PageStart -> - {more, Meta#{count => NCount}}; - NCount when NCount < PageEnd -> - {cutrows, Meta#{count => NCount}}; - NCount when NCount >= PageEnd -> - {enough, Meta#{count => NCount}} +exec_format_fun(FmtFun, Node, Row) -> + case erlang:fun_info(FmtFun, arity) of + {arity, 1} -> FmtFun(Row); + {arity, 2} -> FmtFun(Node, Row) end. -rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) -> - PageStart = page_start(Page, Limit), - case (Count - Len) < PageStart of +parse_pager_params(Params) -> + Page = b2i(page(Params)), + Limit = b2i(limit(Params)), + case Page > 0 andalso Limit > 0 of true -> - NeedNowNum = Count - PageStart + 1, - SubStart = Len - NeedNowNum + 1, - {SubStart, NeedNowNum}; + #{page => Page, limit => Limit, count => 0}; false -> - {_SubStart = 1, _NeedNowNum = Len} - end. - -page_limit_check_query(Meta, {F, A}) -> - case Meta of - #{page := Page, limit := Limit} when - Page < 1; Limit < 1 - -> - {error, page_limit_invalid}; - _ -> - erlang:apply(F, A) + false end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl index 36845e4e7..d574ad4ee 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl @@ -24,7 +24,7 @@ -export([api_spec/0, paths/0, schema/1, fields/1]). --export([alarms/2]). +-export([alarms/2, format_alarm/2]). -define(TAGS, [<<"Alarms">>]). @@ -112,7 +112,15 @@ alarms(get, #{query_string := QString}) -> true -> ?ACTIVATED_ALARM; false -> ?DEACTIVATED_ALARM end, - case emqx_mgmt_api:cluster_query(QString, Table, [], {?MODULE, query}) of + case + emqx_mgmt_api:cluster_query( + QString, + Table, + [], + {?MODULE, query}, + fun ?MODULE:format_alarm/2 + ) + of {error, page_limit_invalid} -> {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; {error, Node, {badrpc, R}} -> @@ -130,9 +138,7 @@ alarms(delete, _Params) -> query(Table, _QsSpec, Continuation, Limit) -> Ms = [{'$1', [], ['$1']}], - emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit, fun format_alarm/1). + emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit). -format_alarm(Alarms) when is_list(Alarms) -> - [emqx_alarm:format(Alarm) || Alarm <- Alarms]; -format_alarm(Alarm) -> - emqx_alarm:format(Alarm). +format_alarm(WhichNode, Alarm) -> + emqx_alarm:format(WhichNode, Alarm). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index f4fe0387f..56730fe3e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -47,7 +47,8 @@ -export([ query/4, - format_channel_info/1 + format_channel_info/1, + format_channel_info/2 ]). %% for batch operation @@ -645,7 +646,8 @@ list_clients(QString) -> QString, ?CLIENT_QTAB, ?CLIENT_QSCHEMA, - ?QUERY_FUN + ?QUERY_FUN, + fun ?MODULE:format_channel_info/2 ); Node0 -> case emqx_misc:safe_to_existing_atom(Node0) of @@ -656,7 +658,8 @@ list_clients(QString) -> QStringWithoutNode, ?CLIENT_QTAB, ?CLIENT_QSCHEMA, - ?QUERY_FUN + ?QUERY_FUN, + fun ?MODULE:format_channel_info/2 ); {error, _} -> {error, Node0, {badrpc, <<"invalid node">>}} @@ -789,8 +792,7 @@ query(Tab, {QString, []}, Continuation, Limit) -> Tab, Ms, Continuation, - Limit, - fun format_channel_info/1 + Limit ); query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> Ms = qs2ms(QString), @@ -799,8 +801,7 @@ query(Tab, {QString, FuzzyQString}, Continuation, Limit) -> Tab, {Ms, FuzzyFilterFun}, Continuation, - Limit, - fun format_channel_info/1 + Limit ). %%-------------------------------------------------------------------- @@ -876,12 +877,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | %%-------------------------------------------------------------------- %% format funcs -format_channel_info({_, ClientInfo0, ClientStats}) -> - Node = - case ClientInfo0 of - #{node := N} -> N; - _ -> node() - end, +format_channel_info(ChannInfo = {_, _ClientInfo, _ClientStats}) -> + format_channel_info(node(), ChannInfo). + +format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) -> + Node = maps:get(node, ClientInfo0, WhichNode), ClientInfo1 = emqx_map_lib:deep_remove([conninfo, clientid], ClientInfo0), ClientInfo2 = emqx_map_lib:deep_remove([conninfo, username], ClientInfo1), StatsMap = maps:without( diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 03b833e84..eb0f83cc0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -33,7 +33,7 @@ -export([ query/4, - format/1 + format/2 ]). -define(SUBS_QTABLE, emqx_suboption). @@ -142,7 +142,8 @@ subscriptions(get, #{query_string := QString}) -> QString, ?SUBS_QTABLE, ?SUBS_QSCHEMA, - ?QUERY_FUN + ?QUERY_FUN, + fun ?MODULE:format/2 ); Node0 -> case emqx_misc:safe_to_existing_atom(Node0) of @@ -152,7 +153,8 @@ subscriptions(get, #{query_string := QString}) -> QString, ?SUBS_QTABLE, ?SUBS_QSCHEMA, - ?QUERY_FUN + ?QUERY_FUN, + fun ?MODULE:format/2 ); {error, _} -> {error, Node0, {badrpc, <<"invalid node">>}} @@ -168,16 +170,12 @@ subscriptions(get, #{query_string := QString}) -> {200, Result} end. -format(Items) when is_list(Items) -> - [format(Item) || Item <- Items]; -format({{Subscriber, Topic}, Options}) -> - format({Subscriber, Topic, Options}); -format({_Subscriber, Topic, Options}) -> +format(WhichNode, {{_Subscriber, Topic}, Options}) -> maps:merge( #{ topic => get_topic(Topic, Options), clientid => maps:get(subid, Options), - node => node() + node => WhichNode }, maps:with([qos, nl, rap, rh], Options) ). @@ -199,8 +197,7 @@ query(Tab, {Qs, []}, Continuation, Limit) -> Tab, Ms, Continuation, - Limit, - fun format/1 + Limit ); query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> Ms = qs2ms(Qs), @@ -209,8 +206,7 @@ query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> Tab, {Ms, FuzzyFilterFun}, Continuation, - Limit, - fun format/1 + Limit ). fuzzy_filter_fun(Fuzzy) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index c357855b2..3cc2168e7 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -109,7 +109,12 @@ topic(get, #{bindings := Bindings}) -> do_list(Params) -> case emqx_mgmt_api:node_query( - node(), Params, emqx_route, ?TOPICS_QUERY_SCHEMA, {?MODULE, query} + node(), + Params, + emqx_route, + ?TOPICS_QUERY_SCHEMA, + {?MODULE, query}, + fun ?MODULE:format/1 ) of {error, page_limit_invalid} -> diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index f511d74d9..44e7a85e3 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -166,11 +166,14 @@ list(Params) -> emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN). cluster_list(Params) -> - emqx_mgmt_api:cluster_query(Params, ?TAB, [], {?MODULE, cluster_query}). + %% FIXME: why cluster_query??? + emqx_mgmt_api:cluster_query( + Params, ?TAB, [], {?MODULE, cluster_query}, fun ?MODULE:format_delayed/1 + ). cluster_query(Table, _QsSpec, Continuation, Limit) -> Ms = [{'$1', [], ['$1']}], - emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit, fun format_delayed/1). + emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit). format_delayed(Delayed) -> format_delayed(Delayed, false). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 597ee838f..2157b108c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -277,7 +277,8 @@ param_path_id() -> QueryString, ?RULE_TAB, ?RULE_QS_SCHEMA, - {?MODULE, query} + {?MODULE, query}, + fun ?MODULE:format_rule_resp/1 ) of {error, page_limit_invalid} -> @@ -556,7 +557,7 @@ query(Tab, {Qs, Fuzzy}, Start, Limit) -> Ms = qs2ms(), FuzzyFun = fuzzy_match_fun(Qs, Ms, Fuzzy), emqx_mgmt_api:select_table_with_count( - Tab, {Ms, FuzzyFun}, Start, Limit, fun format_rule_resp/1 + Tab, {Ms, FuzzyFun}, Start, Limit ). %% rule is not a record, so everything is fuzzy filter.