Merge pull request #9385 from terry-xiaoyu/sync_v43_to_v44
Sync v43 to v44
This commit is contained in:
commit
0da764e67e
|
@ -95,6 +95,9 @@
|
|||
end
|
||||
end()).
|
||||
|
||||
-define(GET_RES_ALIVE_TIMEOUT, 60000).
|
||||
-define(PROBE_RES_PREFIX, "__probe__:").
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Load resource/action providers from all available applications
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -363,7 +366,7 @@ test_resource(#{type := Type} = Params) ->
|
|||
{ok, #resource_type{}} ->
|
||||
%% Resource will be deleted after test.
|
||||
%% Use random resource id, ensure test func will not delete the resource in used.
|
||||
ResId = resource_id(),
|
||||
ResId = probe_resource_id(),
|
||||
try
|
||||
case create_resource(maps:put(id, ResId, Params), no_retry) of
|
||||
{ok, _} ->
|
||||
|
@ -405,7 +408,7 @@ is_resource_alive(Nodes, ResId, _Opts = #{fetch := true}) ->
|
|||
{ok, #resource_type{on_status = {Mod, OnStatus}}}
|
||||
= emqx_rule_registry:find_resource_type(ResType),
|
||||
case rpc:multicall(Nodes,
|
||||
?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], 5000) of
|
||||
?MODULE, fetch_resource_status, [Mod, OnStatus, ResId], ?GET_RES_ALIVE_TIMEOUT) of
|
||||
{ResL, []} ->
|
||||
is_resource_alive_(ResL);
|
||||
{_, _Error} ->
|
||||
|
@ -420,7 +423,7 @@ is_resource_alive(Nodes, ResId, _Opts = #{fetch := true}) ->
|
|||
end;
|
||||
is_resource_alive(Nodes, ResId, _Opts = #{fetch := false}) ->
|
||||
try
|
||||
case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], 5000) of
|
||||
case rpc:multicall(Nodes, ?MODULE, get_resource_status, [ResId], ?GET_RES_ALIVE_TIMEOUT) of
|
||||
{ResL, []} ->
|
||||
is_resource_alive_(ResL);
|
||||
{_, _Errors} ->
|
||||
|
@ -532,10 +535,15 @@ refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) ->
|
|||
refresh_resource_status() ->
|
||||
lists:foreach(
|
||||
fun(#resource{id = ResId, type = ResType}) ->
|
||||
case emqx_rule_registry:find_resource_type(ResType) of
|
||||
{ok, #resource_type{on_status = {Mod, OnStatus}}} ->
|
||||
fetch_resource_status(Mod, OnStatus, ResId);
|
||||
_ -> ok
|
||||
case is_prober(ResId) of
|
||||
false ->
|
||||
case emqx_rule_registry:find_resource_type(ResType) of
|
||||
{ok, #resource_type{on_status = {Mod, OnStatus}}} ->
|
||||
fetch_resource_status(Mod, OnStatus, ResId);
|
||||
_ -> ok
|
||||
end;
|
||||
true ->
|
||||
ok
|
||||
end
|
||||
end, emqx_rule_registry:get_resources()).
|
||||
|
||||
|
@ -662,6 +670,9 @@ ignore_lib_apps(Apps) ->
|
|||
resource_id() ->
|
||||
gen_id("resource:", fun emqx_rule_registry:find_resource/1).
|
||||
|
||||
probe_resource_id() ->
|
||||
gen_id(?PROBE_RES_PREFIX, fun emqx_rule_registry:find_resource/1).
|
||||
|
||||
rule_id() ->
|
||||
gen_id("rule:", fun emqx_rule_registry:get_rule/1).
|
||||
|
||||
|
@ -811,4 +822,9 @@ find_type(ResId) ->
|
|||
{ok, Type}.
|
||||
|
||||
alarm_name_of_resource_down(Type, ResId) ->
|
||||
list_to_binary(io_lib:format("resource/~s/~s/down", [Type, ResId])).
|
||||
unicode:characters_to_binary(io_lib:format("resource/~ts/~ts/down", [Type, ResId])).
|
||||
|
||||
is_prober(<<?PROBE_RES_PREFIX, _/binary>>) ->
|
||||
true;
|
||||
is_prober(_ResId) ->
|
||||
false.
|
||||
|
|
|
@ -62,7 +62,8 @@ groups() ->
|
|||
t_create_rule,
|
||||
t_reset_metrics,
|
||||
t_reset_metrics_fallbacks,
|
||||
t_create_resource
|
||||
t_create_resource,
|
||||
t_clean_resource_alarms
|
||||
]},
|
||||
{actions, [],
|
||||
[t_inspect_action
|
||||
|
@ -309,21 +310,29 @@ t_create_resource(_Config) ->
|
|||
ok.
|
||||
|
||||
t_clean_resource_alarms(_Config) ->
|
||||
lists:foreach(fun(ResId) ->
|
||||
clean_resource_alarms(ResId)
|
||||
end, [<<"abc">>, <<"哈喽"/utf8>>]).
|
||||
|
||||
clean_resource_alarms(ResId) ->
|
||||
emqx_rule_registry:register_resource_types(
|
||||
[make_simple_debug_resource_type()]),
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
|
||||
#{type => built_in,
|
||||
#{id => ResId,
|
||||
type => built_in,
|
||||
config => #{},
|
||||
description => <<"debug resource">>}),
|
||||
?assert(true, is_binary(ResId)),
|
||||
Name = emqx_rule_engine:alarm_name_of_resource_down(ResId, built_in),
|
||||
_ = emqx_alarm:activate(Name, #{id => ResId, type => built_in}),
|
||||
AlarmExist = fun(#{name := AName}) -> AName == Name end,
|
||||
Len = length(lists:filter(AlarmExist, emqx_alarm:get_alarms())),
|
||||
?assert(Len == 1),
|
||||
Len = length(lists:filter(AlarmExist, emqx_alarm:get_alarms(activated))),
|
||||
?assertEqual(1, Len),
|
||||
emqx_rule_engine:ensure_resource_deleted(ResId),
|
||||
emqx_alarm:deactivate(Name),
|
||||
LenAfterRemove = length(lists:filter(AlarmExist, emqx_alarm:get_alarms(activated))),
|
||||
?assertEqual(0, LenAfterRemove),
|
||||
ok = emqx_rule_engine:unload_providers(),
|
||||
emqx_rule_registry:remove_resource(ResId),
|
||||
LenAfterRemove = length(lists:filter(AlarmExist, emqx_alarm:get_alarms())),
|
||||
?assert(LenAfterRemove == 0),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -61,7 +61,7 @@
|
|||
, {getopt, "1.0.1"}
|
||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}}
|
||||
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}
|
||||
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.14"}}}
|
||||
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.15"}}}
|
||||
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}
|
||||
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
|
||||
]}.
|
||||
|
|
Loading…
Reference in New Issue