Merge pull request #9253 from zmstone/1027-sync-v43-to-v44
1027 sync v43 to v44
This commit is contained in:
commit
02122469bc
|
@ -41,12 +41,18 @@ check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _Params)
|
|||
ok;
|
||||
check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl := ACLParams = #{path := Path}}) ->
|
||||
ClientInfo1 = ClientInfo#{access => access(PubSub), topic => Topic},
|
||||
Username = maps:get(username, ClientInfo1, undefined),
|
||||
case check_acl_request(ACLParams, ClientInfo1) of
|
||||
{ok, 200, <<"ignore">>} -> ok;
|
||||
{ok, 200, _Body} -> {stop, allow};
|
||||
{ok, _Code, _Body} -> {stop, deny};
|
||||
{error, Error} ->
|
||||
?LOG(error, "Request ACL path ~s, error: ~p", [Path, Error]),
|
||||
{ok, 200, _Body} -> {stop, allow};
|
||||
{ok, Code, _Body} ->
|
||||
?LOG(error, "Deny ~s to topic ~ts, username: ~ts, http response code: ~p",
|
||||
[PubSub, Topic, Username, Code]),
|
||||
{stop, deny};
|
||||
{error, Error} ->
|
||||
?LOG(error, "Deny ~s to topic ~ts, username: ~ts, due to request "
|
||||
"http server failure, path: ~p, error: ~0p",
|
||||
[PubSub, Topic, Username, Path, Error]),
|
||||
ok
|
||||
end.
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_auth_http,
|
||||
[{description, "EMQ X Authentication/ACL with HTTP API"},
|
||||
{vsn, "4.3.8"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.9"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_http_sup]},
|
||||
{applications, [kernel,stdlib,ehttpc]},
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
%% -*- mode: erlang -*-
|
||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.3.7",
|
||||
[{"4.3.8",
|
||||
[{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.7",
|
||||
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||
|
@ -32,7 +35,10 @@
|
|||
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.7",
|
||||
[{"4.3.8",
|
||||
[{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.7",
|
||||
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
|
||||
check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path},
|
||||
super := SuperParams}) ->
|
||||
Username = maps:get(username, ClientInfo, undefined),
|
||||
case authenticate(AuthParms, ClientInfo) of
|
||||
{ok, 200, <<"ignore">>} ->
|
||||
ok;
|
||||
|
@ -46,12 +47,15 @@ check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path},
|
|||
anonymous => false,
|
||||
mountpoint => mountpoint(Body, ClientInfo)}};
|
||||
{ok, Code, _Body} ->
|
||||
?LOG(error, "Deny connection from path: ~s, response http code: ~p",
|
||||
[Path, Code]),
|
||||
?LOG(error, "Deny connection from path: ~s, username: ~ts, http "
|
||||
"response code: ~p",
|
||||
[Path, Username, Code]),
|
||||
{stop, AuthResult#{auth_result => http_to_connack_error(Code),
|
||||
anonymous => false}};
|
||||
{error, Error} ->
|
||||
?LOG(error, "Request auth path: ~s, error: ~p", [Path, Error]),
|
||||
?LOG(error, "Deny connection from path: ~s, username: ~ts, due to "
|
||||
"request http-server failed: ~0p",
|
||||
[Path, Username, Error]),
|
||||
%%FIXME later: server_unavailable is not right.
|
||||
{stop, AuthResult#{auth_result => server_unavailable,
|
||||
anonymous => false}}
|
||||
|
|
|
@ -113,11 +113,20 @@ string_to_number(_) ->
|
|||
%% Verify Claims
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
verify_acl(ClientInfo, #{<<"sub">> := SubTopics}, subscribe, Topic) when is_list(SubTopics) ->
|
||||
verify_acl(ClientInfo, SubTopics, Topic);
|
||||
verify_acl(ClientInfo, #{<<"pub">> := PubTopics}, publish, Topic) when is_list(PubTopics) ->
|
||||
verify_acl(ClientInfo, PubTopics, Topic);
|
||||
verify_acl(_ClientInfo, _Acl, _PubSub, _Topic) -> {stop, deny}.
|
||||
verify_acl(ClientInfo, Acl, PubSub, Topic) ->
|
||||
Key = case PubSub of
|
||||
subscribe -> <<"sub">>;
|
||||
publish -> <<"pub">>
|
||||
end,
|
||||
Rules0 = lists:map(
|
||||
fun(K) ->
|
||||
case maps:get(K, Acl, undefined) of
|
||||
R when is_list(R) -> R;
|
||||
_ -> []
|
||||
end
|
||||
end, [<<"all">>, Key]),
|
||||
Rules = lists:append(Rules0),
|
||||
verify_acl(ClientInfo, Rules, Topic).
|
||||
|
||||
verify_acl(_ClientInfo, [], _Topic) -> {stop, deny};
|
||||
verify_acl(ClientInfo, [AclTopic | AclTopics], Topic) ->
|
||||
|
|
|
@ -297,7 +297,8 @@ t_check_jwt_acl(_Config) ->
|
|||
{username, <<"plain">>},
|
||||
{sub, value},
|
||||
{acl, [{sub, [<<"a/b">>]},
|
||||
{pub, [<<"c/d">>]}]},
|
||||
{pub, [<<"c/d">>]},
|
||||
{all, [<<"all">>]}]},
|
||||
{exp, erlang:system_time(seconds) + 10}],
|
||||
<<"HS256">>,
|
||||
<<"emqxsecret">>),
|
||||
|
@ -329,6 +330,19 @@ t_check_jwt_acl(_Config) ->
|
|||
after 100 -> ok
|
||||
end,
|
||||
|
||||
%% can pub/sub to all rules
|
||||
?assertMatch(
|
||||
{ok, #{}, [0]},
|
||||
emqtt:subscribe(C, <<"all">>, 0)),
|
||||
|
||||
?assertMatch(
|
||||
ok,
|
||||
emqtt:publish(C, <<"all">>, <<"hi">>, 0)),
|
||||
receive
|
||||
{publish, #{topic := <<"all">>}} -> ok
|
||||
after 2000 ->
|
||||
?assert(false, "Publish to `all` should be allowed")
|
||||
end,
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
t_check_jwt_acl_no_recs(init, _Config) ->
|
||||
|
|
|
@ -56,19 +56,7 @@ start(Module, Config) ->
|
|||
{ok, Conn} ->
|
||||
{ok, Conn};
|
||||
{error, Reason} ->
|
||||
Config1 = obfuscate(Config),
|
||||
?LOG(error, "Failed to connect with module=~p\n"
|
||||
"config=~p\nreason:~p", [Module, Config1, Reason]),
|
||||
?LOG_SENSITIVE(error, "Failed to connect with module=~p\n"
|
||||
"config=~p\nreason:~p", [Module, Config, Reason]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
obfuscate(Map) ->
|
||||
maps:fold(fun(K, V, Acc) ->
|
||||
case is_sensitive(K) of
|
||||
true -> [{K, '***'} | Acc];
|
||||
false -> [{K, V} | Acc]
|
||||
end
|
||||
end, [], Map).
|
||||
|
||||
is_sensitive(password) -> true;
|
||||
is_sensitive(_) -> false.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_bridge_mqtt,
|
||||
[{description, "EMQ X Bridge to MQTT Broker"},
|
||||
{vsn, "4.3.6"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.7"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel,stdlib,replayq,emqtt]},
|
||||
|
|
|
@ -1,29 +1,43 @@
|
|||
%% -*- mode: erlang -*-
|
||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{<<"4\\.3\\.[4-5]">>,
|
||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||
[{"4.3.6",
|
||||
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[4-5]">>,
|
||||
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.3",
|
||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[1-2]">>,
|
||||
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.0",
|
||||
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{<<"4\\.3\\.[4-5]">>,
|
||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||
[{"4.3.6",
|
||||
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[4-5]">>,
|
||||
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.3",
|
||||
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[1-2]">>,
|
||||
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.0",
|
||||
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}]}.
|
||||
|
|
|
@ -421,7 +421,7 @@ start_resource(ResId, PoolName, Options) ->
|
|||
on_resource_destroy(ResId, #{<<"pool">> => PoolName}),
|
||||
start_resource(ResId, PoolName, Options);
|
||||
{error, Reason} ->
|
||||
?LOG(error, "Initiate Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]),
|
||||
?LOG_SENSITIVE(error, "Initiate Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]),
|
||||
on_resource_destroy(ResId, #{<<"pool">> => PoolName}),
|
||||
error({{?RESOURCE_TYPE_MQTT, ResId}, create_failed})
|
||||
end.
|
||||
|
|
|
@ -73,7 +73,7 @@
|
|||
export(_Bindings, _Params) ->
|
||||
case emqx_mgmt_data_backup:export() of
|
||||
{ok, File = #{filename := Filename}} ->
|
||||
minirest:return({ok, File#{filename => list_to_binary(filename:basename(Filename))}});
|
||||
minirest:return({ok, File#{filename => unicode:characters_to_binary(filename:basename(Filename))}});
|
||||
Return -> minirest:return(Return)
|
||||
end.
|
||||
|
||||
|
@ -162,7 +162,7 @@ import_content(Content) ->
|
|||
tmp_filename() ->
|
||||
Seconds = erlang:system_time(second),
|
||||
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
|
||||
list_to_binary(io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S])).
|
||||
iolist_to_binary(io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S])).
|
||||
|
||||
filename_decode(Filename) ->
|
||||
uri_string:percent_decode(Filename).
|
||||
|
|
|
@ -174,26 +174,26 @@ export_confs() ->
|
|||
{lists:map(fun({_, Key, Confs}) ->
|
||||
case Key of
|
||||
{_Zone, Name} ->
|
||||
[{zone, list_to_binary(Name)},
|
||||
[{zone, iolist_to_binary(Name)},
|
||||
{confs, confs_to_binary(Confs)}];
|
||||
{_Listener, Type, Name} ->
|
||||
[{type, list_to_binary(Type)},
|
||||
{name, list_to_binary(Name)},
|
||||
[{type, iolist_to_binary(Type)},
|
||||
{name, iolist_to_binary(Name)},
|
||||
{confs, confs_to_binary(Confs)}];
|
||||
Name ->
|
||||
[{name, list_to_binary(Name)},
|
||||
[{name, iolist_to_binary(Name)},
|
||||
{confs, confs_to_binary(Confs)}]
|
||||
end
|
||||
end, ets:tab2list(emqx_conf_b)),
|
||||
lists:map(fun({_, {_Listener, Type, Name}, Status}) ->
|
||||
[{type, list_to_binary(Type)},
|
||||
{name, list_to_binary(Name)},
|
||||
[{type, iolist_to_binary(Type)},
|
||||
{name, iolist_to_binary(Name)},
|
||||
{status, Status}]
|
||||
end, ets:tab2list(emqx_listeners_state))}
|
||||
end.
|
||||
|
||||
confs_to_binary(Confs) ->
|
||||
[{list_to_binary(Key), list_to_binary(Val)} || {Key, Val} <-Confs].
|
||||
[{iolist_to_binary(Key), iolist_to_binary(Val)} || {Key, Val} <-Confs].
|
||||
|
||||
-endif.
|
||||
|
||||
|
@ -290,7 +290,7 @@ compatible_version(#{<<"id">> := ID,
|
|||
<<"request_timeout">> := RequestTimeout,
|
||||
<<"url">> := URL}} = Resource, Acc) ->
|
||||
CovertFun = fun(Int) ->
|
||||
list_to_binary(integer_to_list(Int) ++ "s")
|
||||
iolist_to_binary(integer_to_list(Int) ++ "s")
|
||||
end,
|
||||
Cfg = make_new_config(#{<<"pool_size">> => PoolSize,
|
||||
<<"connect_timeout">> => CovertFun(ConnectTimeout),
|
||||
|
@ -626,11 +626,17 @@ to_version(Version) when is_binary(Version) ->
|
|||
to_version(Version) when is_list(Version) ->
|
||||
Version.
|
||||
|
||||
%% TODO: do not allow abs file path here.
|
||||
%% i.e. Filename0 should be a relative path only
|
||||
%% or the path prefix is in an white-list
|
||||
upload_backup_file(Filename0, Bin) ->
|
||||
case ensure_file_name(Filename0) of
|
||||
%% ensure it's a binary, so filenmae:join will always return binary
|
||||
Filename1 = to_unicode_bin(Filename0),
|
||||
case ensure_file_name(Filename1) of
|
||||
{ok, Filename} ->
|
||||
case check_json(Bin) of
|
||||
{ok, _} ->
|
||||
ok = filelib:ensure_dir(Filename),
|
||||
logger:info("write backup file ~p", [Filename]),
|
||||
file:write_file(Filename, Bin);
|
||||
{error, Reason} ->
|
||||
|
@ -646,8 +652,8 @@ list_backup_file() ->
|
|||
case file:read_file_info(File) of
|
||||
{ok, #file_info{size = Size, ctime = CTime = {{Y, M, D}, {H, MM, S}}}} ->
|
||||
Seconds = calendar:datetime_to_gregorian_seconds(CTime),
|
||||
BaseFilename = to_binary(filename:basename(File)),
|
||||
CreatedAt = to_binary(io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S])),
|
||||
BaseFilename = to_unicode_bin(filename:basename(File)),
|
||||
CreatedAt = to_unicode_bin(io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S])),
|
||||
Info = {
|
||||
Seconds,
|
||||
[{filename, BaseFilename},
|
||||
|
@ -664,25 +670,22 @@ list_backup_file() ->
|
|||
lists:filtermap(Filter, backup_files()).
|
||||
|
||||
backup_files() ->
|
||||
backup_files(backup_dir()) ++ backup_files(backup_dir_old_version()).
|
||||
backup_files(backup_dir()) ++
|
||||
backup_files(backup_dir_old_version()).
|
||||
|
||||
backup_files(Dir) ->
|
||||
{ok, FilesAll} = file:list_dir_all(Dir),
|
||||
Files = lists:filtermap(fun legal_filename/1, FilesAll),
|
||||
[filename:join([Dir, File]) || File <- Files].
|
||||
[filename:join([Dir, to_unicode_bin(File)]) || File <- Files].
|
||||
|
||||
look_up_file(Filename) when is_binary(Filename) ->
|
||||
look_up_file(binary_to_list(Filename));
|
||||
look_up_file(Filename) ->
|
||||
DefOnNotFound = fun(_Filename) -> {error, not_found} end,
|
||||
do_look_up_file(Filename, DefOnNotFound).
|
||||
|
||||
do_look_up_file(Filename, OnNotFound) when is_binary(Filename) ->
|
||||
do_look_up_file(binary_to_list(Filename), OnNotFound);
|
||||
do_look_up_file(Filename, OnNotFound) ->
|
||||
Filter =
|
||||
fun(MaybeFile) ->
|
||||
filename:basename(MaybeFile) == Filename
|
||||
filename:basename(MaybeFile) =:= Filename
|
||||
end,
|
||||
case lists:filter(Filter, backup_files()) of
|
||||
[] ->
|
||||
|
@ -696,7 +699,7 @@ read_backup_file(Filename0) ->
|
|||
{ok, Filename} ->
|
||||
case file:read_file(Filename) of
|
||||
{ok, Bin} ->
|
||||
{ok, #{filename => to_binary(Filename0),
|
||||
{ok, #{filename => to_unicode_bin(Filename0),
|
||||
file => Bin}};
|
||||
{error, Reason} ->
|
||||
logger:error("read file ~p failed ~p", [Filename, Reason]),
|
||||
|
@ -739,9 +742,9 @@ delete_all_backup_file() ->
|
|||
|
||||
export() ->
|
||||
Seconds = erlang:system_time(second),
|
||||
Data = do_export_data() ++ [{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}],
|
||||
Data = do_export_data() ++ [{date, iolist_to_binary(emqx_mgmt_util:strftime(Seconds))}],
|
||||
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
|
||||
BaseFilename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
|
||||
BaseFilename = to_unicode_bin(io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S])),
|
||||
{ok, Filename} = ensure_file_name(BaseFilename),
|
||||
case file:write_file(Filename, emqx_json:encode(Data)) of
|
||||
ok ->
|
||||
|
@ -750,7 +753,7 @@ export() ->
|
|||
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y1, M1, D1, H1, MM1, S1]),
|
||||
{ok, #{filename => Filename,
|
||||
size => Size,
|
||||
created_at => list_to_binary(CreatedAt),
|
||||
created_at => iolist_to_binary(CreatedAt),
|
||||
node => node()
|
||||
}};
|
||||
Error -> Error
|
||||
|
@ -760,7 +763,7 @@ export() ->
|
|||
|
||||
do_export_data() ->
|
||||
Version = string:sub_string(emqx_sys:version(), 1, 3),
|
||||
[{version, erlang:list_to_binary(Version)},
|
||||
[{version, iolist_to_binary(Version)},
|
||||
{rules, export_rules()},
|
||||
{resources, export_resources()},
|
||||
{blacklist, export_blacklist()},
|
||||
|
@ -829,6 +832,8 @@ import(Filename, OverridesJson) ->
|
|||
-endif.
|
||||
|
||||
-spec(check_import_json(binary() | string()) -> {ok, map()} | {error, term()}).
|
||||
check_import_json(Filename) when is_list(Filename) ->
|
||||
check_import_json(to_unicode_bin(Filename));
|
||||
check_import_json(Filename) ->
|
||||
OnNotFound =
|
||||
fun(F) ->
|
||||
|
@ -1005,5 +1010,5 @@ get_old_type() ->
|
|||
set_old_type(Type) ->
|
||||
application:set_env(emqx_auth_mnesia, as, Type).
|
||||
|
||||
to_binary(Bin) when is_binary(Bin) -> Bin;
|
||||
to_binary(Str) when is_list(Str) -> list_to_binary(Str).
|
||||
to_unicode_bin(Bin) when is_binary(Bin) -> Bin;
|
||||
to_unicode_bin(Str) when is_list(Str) -> unicode:characters_to_binary(Str).
|
||||
|
|
|
@ -28,13 +28,17 @@ all() ->
|
|||
emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Cfg) ->
|
||||
ekka_mnesia:start(),
|
||||
ok = emqx_dashboard_admin:mnesia(boot),
|
||||
application:load(emqx_modules),
|
||||
application:load(emqx_bridge_mqtt),
|
||||
emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]),
|
||||
application:ensure_all_started(emqx_dashboard),
|
||||
Cfg.
|
||||
|
||||
end_per_suite(Cfg) ->
|
||||
emqx_mgmt_data_backup:delete_all_backup_file(),
|
||||
application:stop(emqx_dashboard),
|
||||
emqx_ct_helpers:stop_apps([emqx_management, emqx_rule_engine]),
|
||||
Cfg.
|
||||
|
||||
|
|
|
@ -28,14 +28,18 @@ all() ->
|
|||
emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Cfg) ->
|
||||
ekka_mnesia:start(),
|
||||
ok = emqx_dashboard_admin:mnesia(boot),
|
||||
application:load(emqx_modules),
|
||||
application:load(emqx_web_hook),
|
||||
emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]),
|
||||
application:ensure_all_started(emqx_dashboard),
|
||||
ok = emqx_rule_registry:mnesia(boot),
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
Cfg.
|
||||
|
||||
end_per_suite(Cfg) ->
|
||||
application:stop(emqx_dashboard),
|
||||
emqx_ct_helpers:stop_apps([emqx_management, emqx_rule_engine]),
|
||||
Cfg.
|
||||
|
||||
|
@ -46,11 +50,8 @@ remove_resource(Id) ->
|
|||
emqx_rule_registry:remove_resource(Id),
|
||||
emqx_rule_registry:remove_resource_params(Id).
|
||||
|
||||
import(FilePath0, Version) ->
|
||||
Filename = filename:basename(FilePath0),
|
||||
FilePath = filename:join([get_data_path(), FilePath0]),
|
||||
{ok, Bin} = file:read_file(FilePath),
|
||||
ok = emqx_mgmt_data_backup:upload_backup_file(Filename, Bin),
|
||||
import_and_check(Filename, Version) ->
|
||||
{ok, #{code := 0}} = emqx_mgmt_api_data:import(#{}, [{<<"filename">>, Filename}]),
|
||||
lists:foreach(fun(#resource{id = Id, config = Config} = _Resource) ->
|
||||
case Id of
|
||||
<<"webhook">> ->
|
||||
|
@ -61,34 +62,51 @@ import(FilePath0, Version) ->
|
|||
end
|
||||
end, emqx_rule_registry:get_resources()).
|
||||
|
||||
upload_import_export_list_download(NameVsnTable) ->
|
||||
lists:foreach(fun({Filename0, Vsn}) ->
|
||||
Filename = unicode:characters_to_binary(Filename0),
|
||||
FullPath = filename:join([get_data_path(), Filename]),
|
||||
ct:pal("testing upload_import_export_list_download for file: ~ts, version: ~p", [FullPath, Vsn]),
|
||||
%% upload
|
||||
{ok, FileCnt} = file:read_file(FullPath),
|
||||
{ok, #{code := 0}} = emqx_mgmt_api_data:upload(#{},
|
||||
[{<<"filename">>, Filename}, {<<"file">>, FileCnt}]),
|
||||
%% import
|
||||
ok = import_and_check(Filename, Vsn),
|
||||
%% export
|
||||
{ok, #{data := #{created_at := CAt, filename := FName, size := Size}}}
|
||||
= emqx_mgmt_api_data:export(#{}, []),
|
||||
?assert(true, is_binary(CAt)),
|
||||
?assert(true, is_binary(FName)),
|
||||
?assert(true, is_integer(Size)),
|
||||
%% list exported files
|
||||
lists:foreach(fun({Seconds, Content}) ->
|
||||
?assert(true, is_integer(Seconds)),
|
||||
?assert(true, is_binary(proplists:get_value(filename, Content))),
|
||||
?assert(true, is_binary(proplists:get_value(created_at, Content))),
|
||||
?assert(true, is_integer(proplists:get_value(size, Content)))
|
||||
end, emqx_mgmt_api_data:get_list_exported()),
|
||||
%% download
|
||||
?assertMatch({ok, #{filename := FName}},
|
||||
emqx_mgmt_api_data:download(#{filename => FName}, []))
|
||||
end, NameVsnTable).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Cases
|
||||
%%--------------------------------------------------------------------
|
||||
-ifdef(EMQX_ENTERPRISE).
|
||||
|
||||
t_importee4010(_) ->
|
||||
import("ee4010.json", ee4010),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
|
||||
t_importee410(_) ->
|
||||
import("ee410.json", ee410),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
|
||||
t_importee411(_) ->
|
||||
import("ee411.json", ee411),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
|
||||
t_importee420(_) ->
|
||||
import("ee420.json", ee420),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
|
||||
t_importee425(_) ->
|
||||
import("ee425.json", ee425),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
|
||||
t_importee430(_) ->
|
||||
import("ee430.json", ee430),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
t_upload_import_export_list_download(_) ->
|
||||
NameVsnTable = [
|
||||
{"ee4010.json", ee4010},
|
||||
{"ee410.json", ee410},
|
||||
{"ee411.json", ee411},
|
||||
{"ee420.json", ee420},
|
||||
{"ee425.json", ee425},
|
||||
{"ee430.json", ee430},
|
||||
{"ee430-中文.json", ee430}
|
||||
],
|
||||
upload_import_export_list_download(NameVsnTable).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% handle_config
|
||||
|
@ -134,29 +152,17 @@ handle_config(_, _) -> ok.
|
|||
|
||||
-ifndef(EMQX_ENTERPRISE).
|
||||
|
||||
t_import422(_) ->
|
||||
import("422.json", 422),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
|
||||
t_import423(_) ->
|
||||
import("423.json", 423),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
|
||||
t_import425(_) ->
|
||||
import("425.json", 425),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
|
||||
t_import430(_) ->
|
||||
import("430.json", 430),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
|
||||
t_import409(_) ->
|
||||
import("409.json", 409),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
|
||||
t_import415(_) ->
|
||||
import("415.json", 415),
|
||||
{ok, _} = emqx_mgmt_data_backup:export().
|
||||
t_upload_import_export_list_download(_) ->
|
||||
NameVsnTable = [
|
||||
{"422.json", 422},
|
||||
{"423.json", 423},
|
||||
{"425.json", 425},
|
||||
{"430.json", 430},
|
||||
{"430-中文.json", 430},
|
||||
{"409.json", 409},
|
||||
{"415.json", 415}
|
||||
],
|
||||
upload_import_export_list_download(NameVsnTable).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% handle_config
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
{
|
||||
"version": "4.3",
|
||||
"rules": [],
|
||||
"resources": [
|
||||
{
|
||||
"id": "webhook",
|
||||
"type": "web_hook",
|
||||
"config": {
|
||||
"cacertfile": {
|
||||
"filename": "",
|
||||
"file": ""
|
||||
},
|
||||
"certfile": {
|
||||
"filename": "",
|
||||
"file": ""
|
||||
},
|
||||
"keyfile": {
|
||||
"filename": "",
|
||||
"file": ""
|
||||
},
|
||||
"connect_timeout": "5s",
|
||||
"pool_size": 8,
|
||||
"request_timeout": "5s",
|
||||
"url": "http://www.emqx.io",
|
||||
"verify": false
|
||||
},
|
||||
"created_at": 1616581851001,
|
||||
"description": "webhook"
|
||||
}
|
||||
],
|
||||
"blacklist": [],
|
||||
"apps": [
|
||||
{
|
||||
"id": "admin",
|
||||
"secret": "public",
|
||||
"name": "Default",
|
||||
"desc": "Application user",
|
||||
"status": true,
|
||||
"expired": "undefined"
|
||||
}
|
||||
],
|
||||
"users": [
|
||||
{
|
||||
"username": "admin",
|
||||
"password": "q8v7hISIMz+iKn/ZuAaogvAxKbA=",
|
||||
"tags": "administrator"
|
||||
}
|
||||
],
|
||||
"auth_mnesia": [],
|
||||
"acl_mnesia": [],
|
||||
"date": "2021-03-24 18:31:21"
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
{
|
||||
"version": "4.3",
|
||||
"rules": [],
|
||||
"resources": [
|
||||
{
|
||||
"id": "webhook",
|
||||
"type": "web_hook",
|
||||
"config": {
|
||||
"cacertfile": {
|
||||
"filename": "",
|
||||
"file": ""
|
||||
},
|
||||
"certfile": {
|
||||
"filename": "",
|
||||
"file": ""
|
||||
},
|
||||
"connect_timeout": "5s",
|
||||
"keyfile": {
|
||||
"filename": "",
|
||||
"file": ""
|
||||
},
|
||||
"pool_size": 8,
|
||||
"request_timeout": "5s",
|
||||
"url": "http://www.emqx.io",
|
||||
"verify": false
|
||||
},
|
||||
"created_at": 1618304340172,
|
||||
"description": "webhook"
|
||||
}
|
||||
],
|
||||
"blacklist": [],
|
||||
"apps": [
|
||||
{
|
||||
"id": "admin",
|
||||
"secret": "public",
|
||||
"name": "Default",
|
||||
"desc": "Application user",
|
||||
"status": true,
|
||||
"expired": "undefined"
|
||||
}
|
||||
],
|
||||
"users": [
|
||||
{
|
||||
"username": "admin",
|
||||
"password": "qq8hg9pOkmYiHqzi3+bcUaK2CGA=",
|
||||
"tags": "administrator"
|
||||
}
|
||||
],
|
||||
"auth_mnesia": [],
|
||||
"acl_mnesia": [],
|
||||
"modules": [
|
||||
{
|
||||
"id": "module:aabeddbf",
|
||||
"type": "recon",
|
||||
"config": {},
|
||||
"enabled": true,
|
||||
"created_at": 1618304311061,
|
||||
"description": ""
|
||||
},
|
||||
{
|
||||
"id": "module:cbe6d976",
|
||||
"type": "internal_acl",
|
||||
"config": {
|
||||
"acl_rule_file": "etc/acl.conf"
|
||||
},
|
||||
"enabled": true,
|
||||
"created_at": 1618304311061,
|
||||
"description": ""
|
||||
},
|
||||
{
|
||||
"id": "module:46375e06",
|
||||
"type": "retainer",
|
||||
"config": {
|
||||
"storage_type": "ram",
|
||||
"max_retained_messages": 0,
|
||||
"max_payload_size": "1MB",
|
||||
"expiry_interval": 0
|
||||
},
|
||||
"enabled": true,
|
||||
"created_at": 1618304311061,
|
||||
"description": ""
|
||||
},
|
||||
{
|
||||
"id": "module:091eb7c3",
|
||||
"type": "presence",
|
||||
"config": {
|
||||
"qos": 0
|
||||
},
|
||||
"enabled": true,
|
||||
"created_at": 1618304311061,
|
||||
"description": ""
|
||||
}
|
||||
],
|
||||
"schemas": [],
|
||||
"configs": [],
|
||||
"listeners_state": [],
|
||||
"date": "2021-04-13 17:59:52"
|
||||
}
|
|
@ -176,10 +176,12 @@
|
|||
end
|
||||
end()).
|
||||
|
||||
-define(RPC_TIMEOUT, 30000).
|
||||
|
||||
-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
|
||||
|
||||
-define(CLUSTER_CALL(Func, Args, ResParttern),
|
||||
fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of
|
||||
fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, ?RPC_TIMEOUT) of
|
||||
{ResL, []} ->
|
||||
case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
|
||||
[] -> ResL;
|
||||
|
@ -192,6 +194,37 @@
|
|||
throw({Func, {failed_on_nodes, BadNodes}})
|
||||
end end()).
|
||||
|
||||
%% like CLUSTER_CALL/3, but recall the remote node using FallbackFunc if Func is undefined
|
||||
-define(CLUSTER_CALL(Func, Args, ResParttern, FallbackFunc, FallbackArgs),
|
||||
fun() ->
|
||||
RNodes = ekka_mnesia:running_nodes(),
|
||||
ResL = erpc:multicall(RNodes, ?MODULE, Func, Args, ?RPC_TIMEOUT),
|
||||
Res = lists:zip(RNodes, ResL),
|
||||
BadRes = lists:filtermap(fun
|
||||
({_Node, {ok, ResParttern}}) ->
|
||||
false;
|
||||
({Node, {error, {exception, undef, _}}}) ->
|
||||
try erpc:call(Node, ?MODULE, FallbackFunc, FallbackArgs, ?RPC_TIMEOUT) of
|
||||
ResParttern ->
|
||||
false;
|
||||
OtherRes ->
|
||||
{true, #{rpc_type => call, func => FallbackFunc,
|
||||
result => OtherRes, node => Node}}
|
||||
catch
|
||||
Err:Reason ->
|
||||
{true, #{rpc_type => call, func => FallbackFunc,
|
||||
exception => {Err, Reason}, node => Node}}
|
||||
end;
|
||||
({Node, OtherRes}) ->
|
||||
{true, #{rpc_type => multicall, func => FallbackFunc,
|
||||
result => OtherRes, node => Node}}
|
||||
end, Res),
|
||||
case BadRes of
|
||||
[] -> Res;
|
||||
_ -> throw(BadRes)
|
||||
end
|
||||
end()).
|
||||
|
||||
%% Tables
|
||||
-define(RULE_TAB, emqx_rule).
|
||||
-define(ACTION_TAB, emqx_rule_action).
|
||||
|
|
|
@ -2,19 +2,29 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.4.10",
|
||||
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.9",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.8",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
|
@ -22,7 +32,10 @@
|
|||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.4\\.[6-7]">>,
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
|
@ -32,7 +45,10 @@
|
|||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.5",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
|
@ -43,7 +59,10 @@
|
|||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.4",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
|
@ -54,7 +73,9 @@
|
|||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.3",
|
||||
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
|
@ -68,7 +89,9 @@
|
|||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.2",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
|
@ -83,7 +106,9 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.1",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -98,7 +123,9 @@
|
|||
{add_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.0",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -114,19 +141,29 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.10",
|
||||
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.9",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.8",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
|
@ -134,7 +171,10 @@
|
|||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.4\\.[6-7]">>,
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
|
@ -144,7 +184,10 @@
|
|||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.5",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
|
@ -155,7 +198,10 @@
|
|||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.4",
|
||||
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
|
@ -166,7 +212,9 @@
|
|||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.3",
|
||||
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
|
@ -180,7 +228,9 @@
|
|||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.2",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
|
@ -195,7 +245,9 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.1",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
@ -210,7 +262,9 @@
|
|||
{delete_module,emqx_rule_date},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.0",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
, refresh_resources/0
|
||||
, refresh_resource/1
|
||||
, refresh_rule/1
|
||||
, refresh_rules/0
|
||||
, refresh_rules_when_boot/0
|
||||
, refresh_actions/1
|
||||
, refresh_actions/2
|
||||
, refresh_resource_status/0
|
||||
|
@ -47,6 +47,7 @@
|
|||
]).
|
||||
|
||||
-export([ init_resource/4
|
||||
, init_resource_with_retrier/4
|
||||
, init_action/4
|
||||
, clear_resource/4
|
||||
, clear_rule/1
|
||||
|
@ -80,6 +81,19 @@
|
|||
|
||||
-define(T_RETRY, 60000).
|
||||
|
||||
%% redefine this macro to confine the appup scope
|
||||
-undef(RAISE).
|
||||
-define(RAISE(_EXP_, _ERROR_CONTEXT_),
|
||||
fun() ->
|
||||
try (_EXP_)
|
||||
catch
|
||||
throw : Reason ->
|
||||
throw({_ERROR_CONTEXT_, Reason});
|
||||
_EXCLASS_:_EXCPTION_:_ST_ ->
|
||||
throw({_ERROR_CONTEXT_, {_EXCPTION_, _EXCPTION_, _ST_}})
|
||||
end
|
||||
end()).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Load resource/action providers from all available applications
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -255,15 +269,20 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) ->
|
|||
created_at = erlang:system_time(millisecond)
|
||||
},
|
||||
ok = emqx_rule_registry:add_resource(Resource),
|
||||
InitArgs = [M, F, ResId, Config],
|
||||
case Retry of
|
||||
with_retry ->
|
||||
%% Note that we will return OK in case of resource creation failure,
|
||||
%% A timer is started to re-start the resource later.
|
||||
_ = (catch (?CLUSTER_CALL(init_resource, [M, F, ResId, Config]))),
|
||||
_ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok,
|
||||
init_resource, InitArgs)
|
||||
catch throw : Reason ->
|
||||
?LOG(error, "create_resource failed: ~0p", [Reason])
|
||||
end,
|
||||
{ok, Resource};
|
||||
no_retry ->
|
||||
try
|
||||
_ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]),
|
||||
_ = ?CLUSTER_CALL(init_resource, InitArgs),
|
||||
{ok, Resource}
|
||||
catch throw : Reason ->
|
||||
{error, Reason}
|
||||
|
@ -290,7 +309,7 @@ check_and_update_resource(Id, NewParams) ->
|
|||
do_check_and_update_resource(#{id => Id, config => Conifg, type => Type,
|
||||
description => Descr})
|
||||
catch Error:Reason:ST ->
|
||||
?LOG(error, "check_and_update_resource failed: ~0p", [{Error, Reason, ST}]),
|
||||
?LOG_SENSITIVE(error, "check_and_update_resource failed: ~0p", [{Error, Reason, ST}]),
|
||||
{error, Reason}
|
||||
end;
|
||||
_Other ->
|
||||
|
@ -327,7 +346,7 @@ start_resource(ResId) ->
|
|||
{ok, #resource_type{on_create = {Mod, Create}}}
|
||||
= emqx_rule_registry:find_resource_type(ResType),
|
||||
try
|
||||
init_resource(Mod, Create, ResId, Config),
|
||||
init_resource_with_retrier(Mod, Create, ResId, Config),
|
||||
refresh_actions_of_a_resource(ResId)
|
||||
catch
|
||||
throw:Reason -> {error, Reason}
|
||||
|
@ -358,7 +377,7 @@ test_resource(#{type := Type} = Params) ->
|
|||
{error, Reason}
|
||||
end
|
||||
catch E:R:S ->
|
||||
?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]),
|
||||
?LOG_SENSITIVE(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]),
|
||||
{error, R}
|
||||
after
|
||||
_ = ?CLUSTER_CALL(ensure_resource_deleted, [ResId]),
|
||||
|
@ -476,20 +495,22 @@ refresh_resource(Type) when is_atom(Type) ->
|
|||
emqx_rule_registry:get_resources_by_type(Type));
|
||||
|
||||
refresh_resource(#resource{id = ResId, type = Type, config = Config}) ->
|
||||
try
|
||||
{ok, #resource_type{on_create = {M, F}}} =
|
||||
emqx_rule_registry:find_resource_type(Type),
|
||||
ok = emqx_rule_engine:init_resource(M, F, ResId, Config)
|
||||
catch _:_ ->
|
||||
emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY)
|
||||
end.
|
||||
{ok, #resource_type{on_create = {M, F}}} =
|
||||
emqx_rule_registry:find_resource_type(Type),
|
||||
ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config).
|
||||
|
||||
-spec(refresh_rules() -> ok).
|
||||
refresh_rules() ->
|
||||
-spec(refresh_rules_when_boot() -> ok).
|
||||
refresh_rules_when_boot() ->
|
||||
lists:foreach(fun
|
||||
(#rule{enabled = true} = Rule) ->
|
||||
try refresh_rule(Rule)
|
||||
catch _:_ ->
|
||||
%% We set the enable = false when rule init failed to avoid bad rules running
|
||||
%% without actions created properly.
|
||||
%% The init failure might be caused by a disconnected resource, in this case the
|
||||
%% actions can not be created, so the rules won't work.
|
||||
%% After the user fixed the problem he can enable it manually,
|
||||
%% doing so will also recreate the actions.
|
||||
emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup})
|
||||
end;
|
||||
(_) -> ok
|
||||
|
@ -648,18 +669,29 @@ action_instance_id(ActionName) ->
|
|||
iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]).
|
||||
|
||||
init_resource(Module, OnCreate, ResId, Config) ->
|
||||
Params = ?RAISE(Module:OnCreate(ResId, Config),
|
||||
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
|
||||
Params = ?RAISE(Module:OnCreate(ResId, Config), {Module, OnCreate}),
|
||||
ResParams = #resource_params{id = ResId,
|
||||
params = Params,
|
||||
status = #{is_alive => true}},
|
||||
emqx_rule_registry:add_resource_params(ResParams).
|
||||
|
||||
init_resource_with_retrier(Module, OnCreate, ResId, Config) ->
|
||||
try
|
||||
Params = Module:OnCreate(ResId, Config),
|
||||
ResParams = #resource_params{id = ResId,
|
||||
params = Params,
|
||||
status = #{is_alive => true}},
|
||||
emqx_rule_registry:add_resource_params(ResParams)
|
||||
catch Class:Reason:ST ->
|
||||
Interval = persistent_term:get({emqx_rule_engine, resource_restart_interval}, ?T_RETRY),
|
||||
emqx_rule_monitor:ensure_resource_retrier(ResId, Interval),
|
||||
erlang:raise(Class, {init_resource, Reason}, ST)
|
||||
end.
|
||||
|
||||
init_action(Module, OnCreate, ActionInstId, Params) ->
|
||||
ok = emqx_rule_metrics:create_metrics(ActionInstId),
|
||||
case ?RAISE(Module:OnCreate(ActionInstId, Params),
|
||||
{{init_action_failure, node()},
|
||||
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}}) of
|
||||
{init_action_failure, node(), Module, OnCreate}) of
|
||||
{Apply, NewParams} when is_function(Apply) -> %% BACKW: =< e4.2.2
|
||||
ok = emqx_rule_registry:add_action_instance_params(
|
||||
#action_instance_params{id = ActionInstId, params = NewParams, apply = Apply});
|
||||
|
@ -683,7 +715,7 @@ clear_resource(Module, Destroy, ResId, Type) ->
|
|||
case emqx_rule_registry:find_resource_params(ResId) of
|
||||
{ok, #resource_params{params = Params}} ->
|
||||
?RAISE(Module:Destroy(ResId, Params),
|
||||
{{destroy_resource_failure, node()}, {{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}),
|
||||
{destroy_resource_failure, node(), Module, Destroy}),
|
||||
ok = emqx_rule_registry:remove_resource_params(ResId);
|
||||
not_found ->
|
||||
ok
|
||||
|
@ -711,8 +743,8 @@ clear_action(Module, Destroy, ActionInstId) ->
|
|||
emqx_rule_metrics:clear_metrics(ActionInstId),
|
||||
case emqx_rule_registry:get_action_instance_params(ActionInstId) of
|
||||
{ok, #action_instance_params{params = Params}} ->
|
||||
?RAISE(Module:Destroy(ActionInstId, Params),{{destroy_action_failure, node()},
|
||||
{{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}),
|
||||
?RAISE(Module:Destroy(ActionInstId, Params),
|
||||
{destroy_action_failure, node(), Module, Destroy}),
|
||||
ok = emqx_rule_registry:remove_action_instance_params(ActionInstId);
|
||||
not_found ->
|
||||
ok
|
||||
|
|
|
@ -28,8 +28,7 @@ start(_Type, _Args) ->
|
|||
{ok, Sup} = emqx_rule_engine_sup:start_link(),
|
||||
_ = emqx_rule_engine_sup:start_locker(),
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
ok = emqx_rule_engine:refresh_resources(),
|
||||
ok = emqx_rule_engine:refresh_rules(),
|
||||
ok = emqx_rule_monitor:async_refresh_resources_rules(),
|
||||
ok = emqx_rule_engine_cli:load(),
|
||||
{ok, Sup}.
|
||||
|
||||
|
|
|
@ -853,5 +853,9 @@ printable_maps(Headers) ->
|
|||
value => Value
|
||||
} || {Key, Value} <- V0]
|
||||
};
|
||||
(K, V0, AccIn) -> AccIn#{K => V0}
|
||||
(_K, V, AccIn) when is_tuple(V) ->
|
||||
%% internal header
|
||||
AccIn;
|
||||
(K, V, AccIn) ->
|
||||
AccIn#{K => V}
|
||||
end, #{}, Headers).
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
-export([ start_link/0
|
||||
, stop/0
|
||||
, async_refresh_resources_rules/0
|
||||
, ensure_resource_retrier/2
|
||||
, retry_loop/3
|
||||
]).
|
||||
|
@ -45,12 +46,22 @@ init([]) ->
|
|||
_ = erlang:process_flag(trap_exit, true),
|
||||
{ok, #{retryers => #{}}}.
|
||||
|
||||
async_refresh_resources_rules() ->
|
||||
gen_server:cast(?MODULE, async_refresh).
|
||||
|
||||
ensure_resource_retrier(ResId, Interval) ->
|
||||
gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).
|
||||
|
||||
handle_call(_Msg, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast(async_refresh, #{boot_refresh_pid := Pid} = State) when is_pid(Pid) ->
|
||||
%% the refresh task is already in progress, we discard the duplication
|
||||
{noreply, State};
|
||||
handle_cast(async_refresh, State) ->
|
||||
Pid = spawn_link(fun do_async_refresh/0),
|
||||
{noreply, State#{boot_refresh_pid => Pid}};
|
||||
|
||||
handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
|
||||
Objects = maps:get(Tag, State, #{}),
|
||||
NewState = case maps:find(Obj, Objects) of
|
||||
|
@ -65,7 +76,13 @@ handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
|
|||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
handle_info({'EXIT', Pid, _Reason}, State = #{boot_refresh_pid := Pid}) ->
|
||||
{noreply, State#{boot_refresh_pid => undefined}};
|
||||
handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) ->
|
||||
%% We won't try to restart the 'retryers' event if the 'EXIT' Reason is not 'normal'.
|
||||
%% Instead we rely on the user to trigger a manual retry for the resources, and then enable
|
||||
%% the rules after resources are connected.
|
||||
case maps:take(Pid, Retryers) of
|
||||
{{Tag, Obj}, Retryers2} ->
|
||||
Objects = maps:get(Tag, State, #{}),
|
||||
|
@ -117,6 +134,12 @@ retry_loop(resource, ResId, Interval) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
do_async_refresh() ->
|
||||
%% NOTE: the order matters.
|
||||
%% We should always refresh the resources first and then the rules.
|
||||
ok = emqx_rule_engine:refresh_resources(),
|
||||
ok = emqx_rule_engine:refresh_rules_when_boot().
|
||||
|
||||
refresh_and_enable_rules_of_resource(ResId) ->
|
||||
lists:foreach(
|
||||
fun (#rule{id = Id, enabled = false, state = refresh_failed_at_bootup} = Rule) ->
|
||||
|
|
|
@ -22,10 +22,16 @@ t_mod_hook_fun(_) ->
|
|||
t_printable_maps(_) ->
|
||||
Headers = #{peerhost => {127,0,0,1},
|
||||
peername => {{127,0,0,1}, 9980},
|
||||
sockname => {{127,0,0,1}, 1883}
|
||||
sockname => {{127,0,0,1}, 1883},
|
||||
redispatch_to => {<<"group">>, <<"sub/topic/+">>},
|
||||
shared_dispatch_ack => {self(), ref}
|
||||
},
|
||||
Converted = emqx_rule_events:printable_maps(Headers),
|
||||
?assertMatch(
|
||||
#{peerhost := <<"127.0.0.1">>,
|
||||
peername := <<"127.0.0.1:9980">>,
|
||||
sockname := <<"127.0.0.1:1883">>
|
||||
}, emqx_rule_events:printable_maps(Headers)).
|
||||
}, Converted),
|
||||
?assertNot(maps:is_key(redispatch_to, Converted)),
|
||||
?assertNot(maps:is_key(shared_dispatch_ack, Converted)),
|
||||
ok.
|
||||
|
|
|
@ -35,6 +35,7 @@ suite() ->
|
|||
groups() ->
|
||||
[{resource, [sequence],
|
||||
[ t_restart_resource
|
||||
, t_refresh_resources_rules
|
||||
]}
|
||||
].
|
||||
|
||||
|
@ -47,24 +48,53 @@ end_per_suite(_Config) ->
|
|||
ok.
|
||||
|
||||
init_per_testcase(t_restart_resource, Config) ->
|
||||
persistent_term:put({emqx_rule_engine, resource_restart_interval}, 100),
|
||||
Opts = [public, named_table, set, {read_concurrency, true}],
|
||||
_ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
|
||||
ets:new(t_restart_resource, [named_table, public]),
|
||||
ets:insert(t_restart_resource, {failed_count, 0}),
|
||||
ets:insert(t_restart_resource, {succ_count, 0}),
|
||||
common_init_per_testcase(),
|
||||
Config;
|
||||
init_per_testcase(t_refresh_resources_rules, Config) ->
|
||||
meck:unload(),
|
||||
ets:new(t_refresh_resources_rules, [named_table, public]),
|
||||
ok = meck:new(emqx_rule_engine, [no_link, passthrough]),
|
||||
meck:expect(emqx_rule_engine, refresh_resources, fun() ->
|
||||
timer:sleep(500),
|
||||
ets:update_counter(t_refresh_resources_rules, refresh_resources, 1, {refresh_resources, 0}),
|
||||
ok
|
||||
end),
|
||||
meck:expect(emqx_rule_engine, refresh_rules_when_boot, fun() ->
|
||||
timer:sleep(500),
|
||||
ets:update_counter(t_refresh_resources_rules, refresh_rules, 1, {refresh_rules, 0}),
|
||||
ok
|
||||
end),
|
||||
common_init_per_testcase(),
|
||||
Config;
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
common_init_per_testcase(),
|
||||
Config.
|
||||
|
||||
end_per_testcase(t_restart_resource, Config) ->
|
||||
persistent_term:put({emqx_rule_engine, resource_restart_interval}, 60000),
|
||||
ets:delete(t_restart_resource),
|
||||
common_end_per_testcases(),
|
||||
Config;
|
||||
end_per_testcase(t_refresh_resources_rules, Config) ->
|
||||
meck:unload(),
|
||||
common_end_per_testcases(),
|
||||
Config;
|
||||
end_per_testcase(_, Config) ->
|
||||
common_end_per_testcases(),
|
||||
Config.
|
||||
|
||||
common_init_per_testcase() ->
|
||||
{ok, _} = emqx_rule_monitor:start_link().
|
||||
common_end_per_testcases() ->
|
||||
emqx_rule_monitor:stop().
|
||||
|
||||
t_restart_resource(_) ->
|
||||
{ok, _} = emqx_rule_monitor:start_link(),
|
||||
ok = emqx_rule_registry:register_resource_types(
|
||||
[#resource_type{
|
||||
name = test_res_1,
|
||||
|
@ -79,11 +109,12 @@ t_restart_resource(_) ->
|
|||
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
|
||||
#{type => test_res_1,
|
||||
config => #{},
|
||||
restart_interval => 100,
|
||||
description => <<"debug resource">>}),
|
||||
[{_, 1}] = ets:lookup(t_restart_resource, failed_count),
|
||||
[{_, 0}] = ets:lookup(t_restart_resource, succ_count),
|
||||
?assertMatch([{_, 0}], ets:lookup(t_restart_resource, succ_count)),
|
||||
?assertMatch([{_, N}] when N == 1 orelse N == 2 orelse N == 3,
|
||||
ets:lookup(t_restart_resource, failed_count)),
|
||||
ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]),
|
||||
emqx_rule_monitor:ensure_resource_retrier(ResId, 100),
|
||||
timer:sleep(1000),
|
||||
[{_, 5}] = ets:lookup(t_restart_resource, failed_count),
|
||||
[{_, 1}] = ets:lookup(t_restart_resource, succ_count),
|
||||
|
@ -91,9 +122,21 @@ t_restart_resource(_) ->
|
|||
?assertEqual(0, map_size(Pids)),
|
||||
ok = emqx_rule_engine:unload_providers(),
|
||||
emqx_rule_registry:remove_resource(ResId),
|
||||
emqx_rule_monitor:stop(),
|
||||
ok.
|
||||
|
||||
t_refresh_resources_rules(_) ->
|
||||
ok = emqx_rule_monitor:async_refresh_resources_rules(),
|
||||
ok = emqx_rule_monitor:async_refresh_resources_rules(),
|
||||
%% there should be only one refresh handler at the same time
|
||||
?assertMatch(#{boot_refresh_pid := Pid} when is_pid(Pid), sys:get_state(whereis(emqx_rule_monitor))),
|
||||
timer:sleep(1200),
|
||||
?assertEqual([{refresh_resources, 1}], ets:lookup(t_refresh_resources_rules, refresh_resources)),
|
||||
?assertEqual([{refresh_rules, 1}], ets:lookup(t_refresh_resources_rules, refresh_rules)),
|
||||
ok = emqx_rule_monitor:async_refresh_resources_rules(),
|
||||
timer:sleep(1200),
|
||||
?assertEqual([{refresh_resources, 2}], ets:lookup(t_refresh_resources_rules, refresh_resources)),
|
||||
?assertEqual([{refresh_rules, 2}], ets:lookup(t_refresh_resources_rules, refresh_rules)).
|
||||
|
||||
on_resource_create(Id, _) ->
|
||||
case ets:lookup(t_restart_resource, failed_count) of
|
||||
[{_, 5}] ->
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_web_hook,
|
||||
[{description, "EMQ X WebHook Plugin"},
|
||||
{vsn, "4.3.14"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.15"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_web_hook_sup]},
|
||||
{applications, [kernel,stdlib,ehttpc]},
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
%% -*- mode: erlang -*-
|
||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{<<"4\\.3\\.[0-7]">>,
|
||||
[{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[0-7]">>,
|
||||
[{apply,{application,stop,[emqx_web_hook]}},
|
||||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
||||
|
@ -24,7 +25,8 @@
|
|||
{<<"4\\.3\\.1[2-3]">>,
|
||||
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{<<"4\\.3\\.[0-7]">>,
|
||||
[{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[0-7]">>,
|
||||
[{apply,{application,stop,[emqx_web_hook]}},
|
||||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -384,7 +384,7 @@ test_http_connect(Conf) ->
|
|||
false
|
||||
catch
|
||||
Err:Reason:ST ->
|
||||
?LOG(error, "check http_connectivity failed: ~p, ~0p", [Conf, {Err, Reason, ST}]),
|
||||
?LOG_SENSITIVE(error, "check http_connectivity failed: ~p, ~0p", [Conf, {Err, Reason, ST}]),
|
||||
false
|
||||
end.
|
||||
l2b(L) when is_list(L) -> iolist_to_binary(L);
|
||||
|
|
|
@ -1,14 +1,26 @@
|
|||
### Enhancements
|
||||
# v4.3.22
|
||||
|
||||
## Enhancements
|
||||
|
||||
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
|
||||
This is to avoid slowing down the boot if some resources spend long time establishing the connection.
|
||||
|
||||
- Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124).
|
||||
This is to make the ACL deny logging for subscription behave the same as for publish.
|
||||
|
||||
### Bug fixes
|
||||
- JWT ACL claim supports `all` action to imply the rules applie to both `pub` and `sub` [#9044](https://github.com/emqx/emqx/pull/9044).
|
||||
|
||||
- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places. [#9185](https://github.com/emqx/emqx/pull/9185)
|
||||
- Added a log censor to avoid logging sensitive data [#9189](https://github.com/emqx/emqx/pull/9189).
|
||||
If the data to be logged is a map or key-value list which contains sensitive key words such as `password`, the value is obfuscated as `******`.
|
||||
|
||||
## Bug fixes
|
||||
|
||||
- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
|
||||
|
||||
- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places [#9185](https://github.com/emqx/emqx/pull/9185).
|
||||
This is to avoid displaying floats like `0.30000000000000004` on the dashboard.
|
||||
|
||||
- Fix the issue that emqx prints too many error logs when connecting to mongodb but auth failed. [#9184](https://github.com/emqx/emqx/pull/9184)
|
||||
- Fix the issue that emqx prints too many error logs when connecting to mongodb but auth failed [#9184](https://github.com/emqx/emqx/pull/9184).
|
||||
|
||||
- Fix that after receiving publish in `idle mode` the emqx-sn gateway may panic [#9024](https://github.com/emqx/emqx/pull/9024).
|
||||
|
||||
|
@ -18,3 +30,8 @@
|
|||
|
||||
- Fixed the response status code for the `/status` endpoint [#9210](https://github.com/emqx/emqx/pull/9210).
|
||||
Before the fix, it always returned `200` even if the EMQX application was not running. Now it returns `503` in that case.
|
||||
|
||||
- Fix message delivery related event encoding [#9226](https://github.com/emqx/emqx/pull/9226)
|
||||
For rule-engine's input events like `$events/message_delivered`, and `$events/message_dropped`,
|
||||
if the message was delivered to a shared-subscription, the encoding (to JSON) of the event will fail.
|
||||
Affected versions: `v4.3.21`, `v4.4.10`, `e4.3.16` and `e4.4.10`.
|
||||
|
|
|
@ -1,14 +1,26 @@
|
|||
### 增强
|
||||
# v4.3.22
|
||||
|
||||
## 增强
|
||||
|
||||
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
|
||||
这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。
|
||||
|
||||
- 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。
|
||||
该行为的改变主要是为了跟发布失败时的行为保持一致。
|
||||
|
||||
### 修复
|
||||
- 基于 JWT 的 ACL 支持 `all` 动作,指定同时适用于 `pub` 和 `sub` 两个动作的规则列表 [#9044](https://github.com/emqx/emqx/pull/9044)。
|
||||
|
||||
- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)
|
||||
- 增强包含敏感数据的日志的安全性 [#9189](https://github.com/emqx/emqx/pull/9189)。
|
||||
如果日志中包含敏感关键词,例如 `password`,那么关联的数据回被模糊化处理,替换成 `******`。
|
||||
|
||||
## 修复
|
||||
|
||||
- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
|
||||
|
||||
- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)。
|
||||
避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。
|
||||
|
||||
- 修复在尝试连接 MongoDB 数据库过程中,如果认证失败会不停打印错误日志的问题。[#9184](https://github.com/emqx/emqx/pull/9184)
|
||||
- 修复在尝试连接 MongoDB 数据库过程中,如果认证失败会不停打印错误日志的问题 [#9184](https://github.com/emqx/emqx/pull/9184)。
|
||||
|
||||
- 修复 emqx-sn 插件在“空闲”状态下收到消息发布请求时可能崩溃的情况 [#9024](https://github.com/emqx/emqx/pull/9024)。
|
||||
|
||||
|
@ -18,3 +30,8 @@
|
|||
|
||||
- 修正了 `/status` API 的响应状态代码 [#9210](https://github.com/emqx/emqx/pull/9210)。
|
||||
在修复之前,它总是返回 `200`,即使 EMQX 应用程序没有运行。 现在它在这种情况下返回 `503`。
|
||||
|
||||
- 修复规则引擎的消息事件编码失败 [#9226](https://github.com/emqx/emqx/pull/9226)。
|
||||
带消息的规则引擎事件,例如 `$events/message_delivered` 和 `$events/message_dropped`,
|
||||
如果消息事件是共享订阅产生的,在编码(到 JSON 格式)过程中会失败。
|
||||
影响到的版本:`v4.3.21`, `v4.4.10`, `e4.3.16` 和 `e4.4.10`。
|
||||
|
|
|
@ -48,3 +48,13 @@
|
|||
line => ?LINE}))
|
||||
end).
|
||||
|
||||
%% Copy-paste to avoid changing the old macro which may cause beam md5 changes in a lot of modules
|
||||
%% i.e. hot-upgrade hell
|
||||
-define(LOG_SENSITIVE(Level, Format, Args),
|
||||
begin
|
||||
(logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), emqx_misc:redact(Args)} end,
|
||||
mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY},
|
||||
line => ?LINE,
|
||||
is_sensitive => true
|
||||
}))
|
||||
end).
|
||||
|
|
|
@ -2,7 +2,8 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.4.10",
|
||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
|
@ -278,7 +279,8 @@
|
|||
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.10",
|
||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -217,6 +217,7 @@ json_key(Term) ->
|
|||
throw({badkey, Term})
|
||||
end.
|
||||
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("proper/include/proper.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
|
|
@ -66,6 +66,8 @@
|
|||
nolink_apply/2
|
||||
]).
|
||||
|
||||
-export([redact/1]).
|
||||
|
||||
-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$").
|
||||
-define(DEFAULT_PMAP_TIMEOUT, 5000).
|
||||
|
||||
|
@ -468,6 +470,67 @@ maybe_mute_rpc_log(Node) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
is_sensitive_key(token) -> true;
|
||||
is_sensitive_key("token") -> true;
|
||||
is_sensitive_key(<<"token">>) -> true;
|
||||
is_sensitive_key(password) -> true;
|
||||
is_sensitive_key("password") -> true;
|
||||
is_sensitive_key(<<"password">>) -> true;
|
||||
is_sensitive_key(secret) -> true;
|
||||
is_sensitive_key("secret") -> true;
|
||||
is_sensitive_key(<<"secret">>) -> true;
|
||||
is_sensitive_key(passcode) -> true;
|
||||
is_sensitive_key("passcode") -> true;
|
||||
is_sensitive_key(<<"passcode">>) -> true;
|
||||
is_sensitive_key(passphrase) -> true;
|
||||
is_sensitive_key("passphrase") -> true;
|
||||
is_sensitive_key(<<"passphrase">>) -> true;
|
||||
is_sensitive_key(key) -> true;
|
||||
is_sensitive_key("key") -> true;
|
||||
is_sensitive_key(<<"key">>) -> true;
|
||||
is_sensitive_key(aws_secret_access_key) -> true;
|
||||
is_sensitive_key("aws_secret_access_key") -> true;
|
||||
is_sensitive_key(<<"aws_secret_access_key">>) -> true;
|
||||
is_sensitive_key(secret_key) -> true;
|
||||
is_sensitive_key("secret_key") -> true;
|
||||
is_sensitive_key(<<"secret_key">>) -> true;
|
||||
is_sensitive_key(bind_password) -> true;
|
||||
is_sensitive_key("bind_password") -> true;
|
||||
is_sensitive_key(<<"bind_password">>) -> true;
|
||||
is_sensitive_key(_) -> false.
|
||||
|
||||
redact(L) when is_list(L) ->
|
||||
lists:map(fun redact/1, L);
|
||||
redact(M) when is_map(M) ->
|
||||
maps:map(fun(K, V) ->
|
||||
redact(K, V)
|
||||
end, M);
|
||||
redact({Key, Value}) ->
|
||||
case is_sensitive_key(Key) of
|
||||
true ->
|
||||
{Key, redact_v(Value)};
|
||||
false ->
|
||||
{redact(Key), redact(Value)}
|
||||
end;
|
||||
redact(T) when is_tuple(T) ->
|
||||
Elements = erlang:tuple_to_list(T),
|
||||
Redact = redact(Elements),
|
||||
erlang:list_to_tuple(Redact);
|
||||
redact(Any) ->
|
||||
Any.
|
||||
|
||||
redact(K, V) ->
|
||||
case is_sensitive_key(K) of
|
||||
true ->
|
||||
redact_v(V);
|
||||
false ->
|
||||
redact(V)
|
||||
end.
|
||||
|
||||
-define(REDACT_VAL, "******").
|
||||
redact_v(V) when is_binary(V) -> <<?REDACT_VAL>>;
|
||||
redact_v(_V) -> ?REDACT_VAL.
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
|
@ -500,4 +563,53 @@ is_sane_id_test() ->
|
|||
?assertMatch({error, _}, is_sane_id(list_to_binary(Bad))),
|
||||
ok.
|
||||
|
||||
redact_test_() ->
|
||||
Case = fun(Type, KeyT) ->
|
||||
Key =
|
||||
case Type of
|
||||
atom -> KeyT;
|
||||
string -> erlang:atom_to_list(KeyT);
|
||||
binary -> erlang:atom_to_binary(KeyT)
|
||||
end,
|
||||
|
||||
?assert(is_sensitive_key(Key)),
|
||||
|
||||
%% direct
|
||||
?assertEqual({Key, ?REDACT_VAL}, redact({Key, foo})),
|
||||
?assertEqual(#{Key => ?REDACT_VAL}, redact(#{Key => foo})),
|
||||
?assertEqual({Key, Key, Key}, redact({Key, Key, Key})),
|
||||
?assertEqual({[{Key, ?REDACT_VAL}], bar}, redact({[{Key, foo}], bar})),
|
||||
|
||||
%% 1 level nested
|
||||
?assertEqual([{Key, ?REDACT_VAL}], redact([{Key, foo}])),
|
||||
?assertEqual([#{Key => ?REDACT_VAL}], redact([#{Key => foo}])),
|
||||
|
||||
%% 2 level nested
|
||||
?assertEqual(#{opts => [{Key, ?REDACT_VAL}]}, redact(#{opts => [{Key, foo}]})),
|
||||
?assertEqual(#{opts => #{Key => ?REDACT_VAL}}, redact(#{opts => #{Key => foo}})),
|
||||
?assertEqual({opts, [{Key, ?REDACT_VAL}]}, redact({opts, [{Key, foo}]})),
|
||||
|
||||
%% 3 level nested
|
||||
?assertEqual([#{opts => [{Key, ?REDACT_VAL}]}], redact([#{opts => [{Key, foo}]}])),
|
||||
?assertEqual([{opts, [{Key, ?REDACT_VAL}]}], redact([{opts, [{Key, foo}]}])),
|
||||
?assertEqual([{opts, [#{Key => ?REDACT_VAL}]}], redact([{opts, [#{Key => foo}]}]))
|
||||
end,
|
||||
|
||||
Types = [atom, string, binary],
|
||||
Keys = [
|
||||
token,
|
||||
password,
|
||||
secret,
|
||||
passcode,
|
||||
passphrase,
|
||||
key,
|
||||
aws_secret_access_key,
|
||||
secret_key,
|
||||
bind_password
|
||||
],
|
||||
[{case_name(Type, Key), fun() -> Case(Type, Key) end} || Key <- Keys, Type <- Types].
|
||||
|
||||
case_name(Type, Key) ->
|
||||
lists:concat([Type, "-", Key]).
|
||||
|
||||
-endif.
|
||||
|
|
Loading…
Reference in New Issue