fix(delayed): return correct node name

This commit is contained in:
JianBo He 2022-11-18 13:28:59 +08:00 committed by Zaiming (Stone) Shi
parent 0c1412315c
commit b9c5a5f822
4 changed files with 79 additions and 18 deletions

View File

@ -0,0 +1,42 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------
%% setup
%%--------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(),
Config.
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite().
%%--------------------------------------------------------------------
%% cases
%%--------------------------------------------------------------------
t_cluster_query(_Config) ->
ok.

View File

@ -31,6 +31,7 @@ end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite(). emqx_mgmt_api_test_util:end_suite().
t_nodes_api(_) -> t_nodes_api(_) ->
Node = atom_to_binary(node(), utf8),
Topic = <<"test_topic">>, Topic = <<"test_topic">>,
{ok, Client} = emqtt:start_link(#{ {ok, Client} = emqtt:start_link(#{
username => <<"routes_username">>, clientid => <<"routes_cid">> username => <<"routes_username">>, clientid => <<"routes_cid">>
@ -49,11 +50,30 @@ t_nodes_api(_) ->
Data = maps:get(<<"data">>, RoutesData), Data = maps:get(<<"data">>, RoutesData),
Route = erlang:hd(Data), Route = erlang:hd(Data),
?assertEqual(Topic, maps:get(<<"topic">>, Route)), ?assertEqual(Topic, maps:get(<<"topic">>, Route)),
?assertEqual(atom_to_binary(node(), utf8), maps:get(<<"node">>, Route)), ?assertEqual(Node, maps:get(<<"node">>, Route)),
%% exact match
Topic2 = <<"test_topic_2">>,
{ok, _, _} = emqtt:subscribe(Client, Topic2),
QS = uri_string:compose_query([
{"topic", Topic2},
{"node", atom_to_list(node())}
]),
Headers = emqx_mgmt_api_test_util:auth_header_(),
{ok, MatchResponse} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
MatchData = emqx_json:decode(MatchResponse, [return_maps]),
?assertMatch(
#{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100},
maps:get(<<"meta">>, MatchData)
),
?assertMatch(
[#{<<"topic">> := Topic2, <<"node">> := Node}],
maps:get(<<"data">>, MatchData)
),
%% get topics/:topic %% get topics/:topic
RoutePath = emqx_mgmt_api_test_util:api_path(["topics", Topic]), RoutePath = emqx_mgmt_api_test_util:api_path(["topics", Topic]),
{ok, RouteResponse} = emqx_mgmt_api_test_util:request_api(get, RoutePath), {ok, RouteResponse} = emqx_mgmt_api_test_util:request_api(get, RoutePath),
RouteData = emqx_json:decode(RouteResponse, [return_maps]), RouteData = emqx_json:decode(RouteResponse, [return_maps]),
?assertEqual(Topic, maps:get(<<"topic">>, RouteData)), ?assertEqual(Topic, maps:get(<<"topic">>, RouteData)),
?assertEqual(atom_to_binary(node(), utf8), maps:get(<<"node">>, RouteData)). ?assertEqual(Node, maps:get(<<"node">>, RouteData)).

View File

@ -59,15 +59,17 @@
cluster_list/1 cluster_list/1
]). ]).
%% internal exports %% exports for query
-export([qs2ms/2]). -export([
qs2ms/2,
format_delayed/1,
format_delayed/2
]).
-export([ -export([
post_config_update/5 post_config_update/5
]). ]).
-export([format_delayed/1]).
%% exported for `emqx_telemetry' %% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]). -export([get_basic_usage_info/0]).
@ -168,13 +170,12 @@ list(Params) ->
emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN). emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN).
cluster_list(Params) -> cluster_list(Params) ->
%% FIXME: why cluster_query???
emqx_mgmt_api:cluster_query( emqx_mgmt_api:cluster_query(
?TAB, ?TAB,
Params, Params,
[], [],
fun ?MODULE:qs2ms/2, fun ?MODULE:qs2ms/2,
fun ?MODULE:format_delayed/1 fun ?MODULE:format_delayed/2
). ).
-spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}. -spec qs2ms(atom(), {list(), list()}) -> {ets:match_spec(), fun() | undefined}.
@ -182,9 +183,13 @@ qs2ms(_Table, {_Qs, _Fuzzy}) ->
{[{'$1', [], ['$1']}], undefined}. {[{'$1', [], ['$1']}], undefined}.
format_delayed(Delayed) -> format_delayed(Delayed) ->
format_delayed(Delayed, false). format_delayed(node(), Delayed).
format_delayed(WhichNode, Delayed) ->
format_delayed(WhichNode, Delayed, false).
format_delayed( format_delayed(
WhichNode,
#delayed_message{ #delayed_message{
key = {ExpectTimeStamp, Id}, key = {ExpectTimeStamp, Id},
delayed = Delayed, delayed = Delayed,
@ -204,7 +209,7 @@ format_delayed(
RemainingTime = ExpectTimeStamp - ?NOW, RemainingTime = ExpectTimeStamp - ?NOW,
Result = #{ Result = #{
msgid => emqx_guid:to_hexstr(Id), msgid => emqx_guid:to_hexstr(Id),
node => node(), node => WhichNode,
publish_at => PublishTime, publish_at => PublishTime,
delayed_interval => Delayed, delayed_interval => Delayed,
delayed_remaining => RemainingTime div 1000, delayed_remaining => RemainingTime div 1000,
@ -231,7 +236,7 @@ get_delayed_message(Id) ->
{error, not_found}; {error, not_found};
Rows -> Rows ->
Message = hd(Rows), Message = hd(Rows),
{ok, format_delayed(Message, true)} {ok, format_delayed(node(), Message, true)}
end. end.
get_delayed_message(Node, Id) when Node =:= node() -> get_delayed_message(Node, Id) when Node =:= node() ->

View File

@ -574,13 +574,7 @@ generate_match_spec([Qs | Rest], N, {MtchHead, Conds}) ->
generate_match_spec(Rest, N + 1, {NMtchHead, NConds}). generate_match_spec(Rest, N + 1, {NMtchHead, NConds}).
put_conds({_, Op, V}, Holder, Conds) -> put_conds({_, Op, V}, Holder, Conds) ->
[{Op, Holder, V} | Conds]; [{Op, Holder, V} | Conds].
put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
[
{Op2, Holder, V2},
{Op1, Holder, V1}
| Conds
].
ms(enable, X) -> ms(enable, X) ->
#{enable => X}. #{enable => X}.