diff --git a/apps/emqx_auth_http/src/emqx_acl_http.erl b/apps/emqx_auth_http/src/emqx_acl_http.erl index 51bf9c303..4f9282a42 100644 --- a/apps/emqx_auth_http/src/emqx_acl_http.erl +++ b/apps/emqx_auth_http/src/emqx_acl_http.erl @@ -41,12 +41,18 @@ check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _Params) ok; check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl := ACLParams = #{path := Path}}) -> ClientInfo1 = ClientInfo#{access => access(PubSub), topic => Topic}, + Username = maps:get(username, ClientInfo1, undefined), case check_acl_request(ACLParams, ClientInfo1) of {ok, 200, <<"ignore">>} -> ok; - {ok, 200, _Body} -> {stop, allow}; - {ok, _Code, _Body} -> {stop, deny}; - {error, Error} -> - ?LOG(error, "Request ACL path ~s, error: ~p", [Path, Error]), + {ok, 200, _Body} -> {stop, allow}; + {ok, Code, _Body} -> + ?LOG(error, "Deny ~s to topic ~ts, username: ~ts, http response code: ~p", + [PubSub, Topic, Username, Code]), + {stop, deny}; + {error, Error} -> + ?LOG(error, "Deny ~s to topic ~ts, username: ~ts, due to request " + "http server failure, path: ~p, error: ~0p", + [PubSub, Topic, Username, Path, Error]), ok end. diff --git a/apps/emqx_auth_http/src/emqx_auth_http.app.src b/apps/emqx_auth_http/src/emqx_auth_http.app.src index 87d087bae..d5b9edbc8 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.app.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_http, [{description, "EMQ X Authentication/ACL with HTTP API"}, - {vsn, "4.3.8"}, % strict semver, bump manually! + {vsn, "4.3.9"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_http_sup]}, {applications, [kernel,stdlib,ehttpc]}, diff --git a/apps/emqx_auth_http/src/emqx_auth_http.appup.src b/apps/emqx_auth_http/src/emqx_auth_http.appup.src index 01d756b9e..7fe5279c1 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.appup.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.appup.src @@ -1,7 +1,10 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.7", + [{"4.3.8", + [{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, + {"4.3.7", [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, @@ -32,7 +35,10 @@ {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]}, {<<".*">>,[]}], - [{"4.3.7", + [{"4.3.8", + [{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, + {"4.3.7", [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, diff --git a/apps/emqx_auth_http/src/emqx_auth_http.erl b/apps/emqx_auth_http/src/emqx_auth_http.erl index 620750bd0..3e63ea597 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http.erl @@ -36,6 +36,7 @@ check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path}, super := SuperParams}) -> + Username = maps:get(username, ClientInfo, undefined), case authenticate(AuthParms, ClientInfo) of {ok, 200, <<"ignore">>} -> ok; @@ -46,12 +47,15 @@ check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path}, anonymous => false, mountpoint => mountpoint(Body, ClientInfo)}}; {ok, Code, _Body} -> - ?LOG(error, "Deny connection from path: ~s, response http code: ~p", - [Path, Code]), + ?LOG(error, "Deny connection from path: ~s, username: ~ts, http " + "response code: ~p", + [Path, Username, Code]), {stop, AuthResult#{auth_result => http_to_connack_error(Code), anonymous => false}}; {error, Error} -> - ?LOG(error, "Request auth path: ~s, error: ~p", [Path, Error]), + ?LOG(error, "Deny connection from path: ~s, username: ~ts, due to " + "request http-server failed: ~0p", + [Path, Username, Error]), %%FIXME later: server_unavailable is not right. {stop, AuthResult#{auth_result => server_unavailable, anonymous => false}} diff --git a/apps/emqx_auth_jwt/src/emqx_auth_jwt.erl b/apps/emqx_auth_jwt/src/emqx_auth_jwt.erl index 1b57c3a83..eca90ab72 100644 --- a/apps/emqx_auth_jwt/src/emqx_auth_jwt.erl +++ b/apps/emqx_auth_jwt/src/emqx_auth_jwt.erl @@ -113,11 +113,20 @@ string_to_number(_) -> %% Verify Claims %%-------------------------------------------------------------------- -verify_acl(ClientInfo, #{<<"sub">> := SubTopics}, subscribe, Topic) when is_list(SubTopics) -> - verify_acl(ClientInfo, SubTopics, Topic); -verify_acl(ClientInfo, #{<<"pub">> := PubTopics}, publish, Topic) when is_list(PubTopics) -> - verify_acl(ClientInfo, PubTopics, Topic); -verify_acl(_ClientInfo, _Acl, _PubSub, _Topic) -> {stop, deny}. +verify_acl(ClientInfo, Acl, PubSub, Topic) -> + Key = case PubSub of + subscribe -> <<"sub">>; + publish -> <<"pub">> + end, + Rules0 = lists:map( + fun(K) -> + case maps:get(K, Acl, undefined) of + R when is_list(R) -> R; + _ -> [] + end + end, [<<"all">>, Key]), + Rules = lists:append(Rules0), + verify_acl(ClientInfo, Rules, Topic). verify_acl(_ClientInfo, [], _Topic) -> {stop, deny}; verify_acl(ClientInfo, [AclTopic | AclTopics], Topic) -> diff --git a/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl b/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl index 62f753904..7452091bd 100644 --- a/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl +++ b/apps/emqx_auth_jwt/test/emqx_auth_jwt_SUITE.erl @@ -297,7 +297,8 @@ t_check_jwt_acl(_Config) -> {username, <<"plain">>}, {sub, value}, {acl, [{sub, [<<"a/b">>]}, - {pub, [<<"c/d">>]}]}, + {pub, [<<"c/d">>]}, + {all, [<<"all">>]}]}, {exp, erlang:system_time(seconds) + 10}], <<"HS256">>, <<"emqxsecret">>), @@ -329,6 +330,19 @@ t_check_jwt_acl(_Config) -> after 100 -> ok end, + %% can pub/sub to all rules + ?assertMatch( + {ok, #{}, [0]}, + emqtt:subscribe(C, <<"all">>, 0)), + + ?assertMatch( + ok, + emqtt:publish(C, <<"all">>, <<"hi">>, 0)), + receive + {publish, #{topic := <<"all">>}} -> ok + after 2000 -> + ?assert(false, "Publish to `all` should be allowed") + end, ok = emqtt:disconnect(C). t_check_jwt_acl_no_recs(init, _Config) -> diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_connect.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_connect.erl index 3482f33ee..f21718f60 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_connect.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_connect.erl @@ -56,19 +56,7 @@ start(Module, Config) -> {ok, Conn} -> {ok, Conn}; {error, Reason} -> - Config1 = obfuscate(Config), - ?LOG(error, "Failed to connect with module=~p\n" - "config=~p\nreason:~p", [Module, Config1, Reason]), + ?LOG_SENSITIVE(error, "Failed to connect with module=~p\n" + "config=~p\nreason:~p", [Module, Config, Reason]), {error, Reason} end. - -obfuscate(Map) -> - maps:fold(fun(K, V, Acc) -> - case is_sensitive(K) of - true -> [{K, '***'} | Acc]; - false -> [{K, V} | Acc] - end - end, [], Map). - -is_sensitive(password) -> true; -is_sensitive(_) -> false. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 578671f4e..a3bf1e914 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mqtt, [{description, "EMQ X Bridge to MQTT Broker"}, - {vsn, "4.3.6"}, % strict semver, bump manually! + {vsn, "4.3.7"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,replayq,emqtt]}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src index a72a18658..a6ca580f1 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -1,29 +1,43 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{<<"4\\.3\\.[4-5]">>, - [{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, + [{"4.3.6", + [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]}, + {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[4-5]">>, + [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]}, + {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]}, + {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[1-2]">>, - [{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, + [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]}, + {load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, + [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]}, + {load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[4-5]">>, - [{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, + [{"4.3.6", + [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]}, + {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[4-5]">>, + [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]}, + {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]}, + {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[1-2]">>, - [{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, + [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]}, + {load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, + [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]}, + {load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index a2b88352e..1739bd47a 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -421,7 +421,7 @@ start_resource(ResId, PoolName, Options) -> on_resource_destroy(ResId, #{<<"pool">> => PoolName}), start_resource(ResId, PoolName, Options); {error, Reason} -> - ?LOG(error, "Initiate Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), + ?LOG_SENSITIVE(error, "Initiate Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), on_resource_destroy(ResId, #{<<"pool">> => PoolName}), error({{?RESOURCE_TYPE_MQTT, ResId}, create_failed}) end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_data.erl b/apps/emqx_management/src/emqx_mgmt_api_data.erl index 2c44fda53..808483a56 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_data.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_data.erl @@ -73,7 +73,7 @@ export(_Bindings, _Params) -> case emqx_mgmt_data_backup:export() of {ok, File = #{filename := Filename}} -> - minirest:return({ok, File#{filename => list_to_binary(filename:basename(Filename))}}); + minirest:return({ok, File#{filename => unicode:characters_to_binary(filename:basename(Filename))}}); Return -> minirest:return(Return) end. @@ -162,7 +162,7 @@ import_content(Content) -> tmp_filename() -> Seconds = erlang:system_time(second), {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), - list_to_binary(io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S])). + iolist_to_binary(io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S])). filename_decode(Filename) -> uri_string:percent_decode(Filename). diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index bf573a49a..1d1063fb0 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -174,26 +174,26 @@ export_confs() -> {lists:map(fun({_, Key, Confs}) -> case Key of {_Zone, Name} -> - [{zone, list_to_binary(Name)}, + [{zone, iolist_to_binary(Name)}, {confs, confs_to_binary(Confs)}]; {_Listener, Type, Name} -> - [{type, list_to_binary(Type)}, - {name, list_to_binary(Name)}, + [{type, iolist_to_binary(Type)}, + {name, iolist_to_binary(Name)}, {confs, confs_to_binary(Confs)}]; Name -> - [{name, list_to_binary(Name)}, + [{name, iolist_to_binary(Name)}, {confs, confs_to_binary(Confs)}] end end, ets:tab2list(emqx_conf_b)), lists:map(fun({_, {_Listener, Type, Name}, Status}) -> - [{type, list_to_binary(Type)}, - {name, list_to_binary(Name)}, + [{type, iolist_to_binary(Type)}, + {name, iolist_to_binary(Name)}, {status, Status}] end, ets:tab2list(emqx_listeners_state))} end. confs_to_binary(Confs) -> - [{list_to_binary(Key), list_to_binary(Val)} || {Key, Val} <-Confs]. + [{iolist_to_binary(Key), iolist_to_binary(Val)} || {Key, Val} <-Confs]. -endif. @@ -290,7 +290,7 @@ compatible_version(#{<<"id">> := ID, <<"request_timeout">> := RequestTimeout, <<"url">> := URL}} = Resource, Acc) -> CovertFun = fun(Int) -> - list_to_binary(integer_to_list(Int) ++ "s") + iolist_to_binary(integer_to_list(Int) ++ "s") end, Cfg = make_new_config(#{<<"pool_size">> => PoolSize, <<"connect_timeout">> => CovertFun(ConnectTimeout), @@ -626,11 +626,17 @@ to_version(Version) when is_binary(Version) -> to_version(Version) when is_list(Version) -> Version. +%% TODO: do not allow abs file path here. +%% i.e. Filename0 should be a relative path only +%% or the path prefix is in an white-list upload_backup_file(Filename0, Bin) -> - case ensure_file_name(Filename0) of + %% ensure it's a binary, so filenmae:join will always return binary + Filename1 = to_unicode_bin(Filename0), + case ensure_file_name(Filename1) of {ok, Filename} -> case check_json(Bin) of {ok, _} -> + ok = filelib:ensure_dir(Filename), logger:info("write backup file ~p", [Filename]), file:write_file(Filename, Bin); {error, Reason} -> @@ -646,8 +652,8 @@ list_backup_file() -> case file:read_file_info(File) of {ok, #file_info{size = Size, ctime = CTime = {{Y, M, D}, {H, MM, S}}}} -> Seconds = calendar:datetime_to_gregorian_seconds(CTime), - BaseFilename = to_binary(filename:basename(File)), - CreatedAt = to_binary(io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S])), + BaseFilename = to_unicode_bin(filename:basename(File)), + CreatedAt = to_unicode_bin(io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S])), Info = { Seconds, [{filename, BaseFilename}, @@ -664,25 +670,22 @@ list_backup_file() -> lists:filtermap(Filter, backup_files()). backup_files() -> - backup_files(backup_dir()) ++ backup_files(backup_dir_old_version()). + backup_files(backup_dir()) ++ + backup_files(backup_dir_old_version()). backup_files(Dir) -> {ok, FilesAll} = file:list_dir_all(Dir), Files = lists:filtermap(fun legal_filename/1, FilesAll), - [filename:join([Dir, File]) || File <- Files]. + [filename:join([Dir, to_unicode_bin(File)]) || File <- Files]. -look_up_file(Filename) when is_binary(Filename) -> - look_up_file(binary_to_list(Filename)); look_up_file(Filename) -> DefOnNotFound = fun(_Filename) -> {error, not_found} end, do_look_up_file(Filename, DefOnNotFound). -do_look_up_file(Filename, OnNotFound) when is_binary(Filename) -> - do_look_up_file(binary_to_list(Filename), OnNotFound); do_look_up_file(Filename, OnNotFound) -> Filter = fun(MaybeFile) -> - filename:basename(MaybeFile) == Filename + filename:basename(MaybeFile) =:= Filename end, case lists:filter(Filter, backup_files()) of [] -> @@ -696,7 +699,7 @@ read_backup_file(Filename0) -> {ok, Filename} -> case file:read_file(Filename) of {ok, Bin} -> - {ok, #{filename => to_binary(Filename0), + {ok, #{filename => to_unicode_bin(Filename0), file => Bin}}; {error, Reason} -> logger:error("read file ~p failed ~p", [Filename, Reason]), @@ -739,9 +742,9 @@ delete_all_backup_file() -> export() -> Seconds = erlang:system_time(second), - Data = do_export_data() ++ [{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}], + Data = do_export_data() ++ [{date, iolist_to_binary(emqx_mgmt_util:strftime(Seconds))}], {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), - BaseFilename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), + BaseFilename = to_unicode_bin(io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S])), {ok, Filename} = ensure_file_name(BaseFilename), case file:write_file(Filename, emqx_json:encode(Data)) of ok -> @@ -750,7 +753,7 @@ export() -> CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y1, M1, D1, H1, MM1, S1]), {ok, #{filename => Filename, size => Size, - created_at => list_to_binary(CreatedAt), + created_at => iolist_to_binary(CreatedAt), node => node() }}; Error -> Error @@ -760,7 +763,7 @@ export() -> do_export_data() -> Version = string:sub_string(emqx_sys:version(), 1, 3), - [{version, erlang:list_to_binary(Version)}, + [{version, iolist_to_binary(Version)}, {rules, export_rules()}, {resources, export_resources()}, {blacklist, export_blacklist()}, @@ -829,6 +832,8 @@ import(Filename, OverridesJson) -> -endif. -spec(check_import_json(binary() | string()) -> {ok, map()} | {error, term()}). +check_import_json(Filename) when is_list(Filename) -> + check_import_json(to_unicode_bin(Filename)); check_import_json(Filename) -> OnNotFound = fun(F) -> @@ -1005,5 +1010,5 @@ get_old_type() -> set_old_type(Type) -> application:set_env(emqx_auth_mnesia, as, Type). -to_binary(Bin) when is_binary(Bin) -> Bin; -to_binary(Str) when is_list(Str) -> list_to_binary(Str). +to_unicode_bin(Bin) when is_binary(Bin) -> Bin; +to_unicode_bin(Str) when is_list(Str) -> unicode:characters_to_binary(Str). diff --git a/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl b/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl index 681998012..c3921140a 100644 --- a/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl +++ b/apps/emqx_management/test/emqx_bridge_mqtt_data_export_import_SUITE.erl @@ -28,13 +28,17 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Cfg) -> + ekka_mnesia:start(), + ok = emqx_dashboard_admin:mnesia(boot), application:load(emqx_modules), application:load(emqx_bridge_mqtt), emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]), + application:ensure_all_started(emqx_dashboard), Cfg. end_per_suite(Cfg) -> emqx_mgmt_data_backup:delete_all_backup_file(), + application:stop(emqx_dashboard), emqx_ct_helpers:stop_apps([emqx_management, emqx_rule_engine]), Cfg. diff --git a/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE.erl b/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE.erl index e289eec18..5caa795c7 100644 --- a/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE.erl +++ b/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE.erl @@ -28,14 +28,18 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Cfg) -> + ekka_mnesia:start(), + ok = emqx_dashboard_admin:mnesia(boot), application:load(emqx_modules), application:load(emqx_web_hook), emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]), + application:ensure_all_started(emqx_dashboard), ok = emqx_rule_registry:mnesia(boot), ok = emqx_rule_engine:load_providers(), Cfg. end_per_suite(Cfg) -> + application:stop(emqx_dashboard), emqx_ct_helpers:stop_apps([emqx_management, emqx_rule_engine]), Cfg. @@ -46,11 +50,8 @@ remove_resource(Id) -> emqx_rule_registry:remove_resource(Id), emqx_rule_registry:remove_resource_params(Id). -import(FilePath0, Version) -> - Filename = filename:basename(FilePath0), - FilePath = filename:join([get_data_path(), FilePath0]), - {ok, Bin} = file:read_file(FilePath), - ok = emqx_mgmt_data_backup:upload_backup_file(Filename, Bin), +import_and_check(Filename, Version) -> + {ok, #{code := 0}} = emqx_mgmt_api_data:import(#{}, [{<<"filename">>, Filename}]), lists:foreach(fun(#resource{id = Id, config = Config} = _Resource) -> case Id of <<"webhook">> -> @@ -61,34 +62,51 @@ import(FilePath0, Version) -> end end, emqx_rule_registry:get_resources()). +upload_import_export_list_download(NameVsnTable) -> + lists:foreach(fun({Filename0, Vsn}) -> + Filename = unicode:characters_to_binary(Filename0), + FullPath = filename:join([get_data_path(), Filename]), + ct:pal("testing upload_import_export_list_download for file: ~ts, version: ~p", [FullPath, Vsn]), + %% upload + {ok, FileCnt} = file:read_file(FullPath), + {ok, #{code := 0}} = emqx_mgmt_api_data:upload(#{}, + [{<<"filename">>, Filename}, {<<"file">>, FileCnt}]), + %% import + ok = import_and_check(Filename, Vsn), + %% export + {ok, #{data := #{created_at := CAt, filename := FName, size := Size}}} + = emqx_mgmt_api_data:export(#{}, []), + ?assert(true, is_binary(CAt)), + ?assert(true, is_binary(FName)), + ?assert(true, is_integer(Size)), + %% list exported files + lists:foreach(fun({Seconds, Content}) -> + ?assert(true, is_integer(Seconds)), + ?assert(true, is_binary(proplists:get_value(filename, Content))), + ?assert(true, is_binary(proplists:get_value(created_at, Content))), + ?assert(true, is_integer(proplists:get_value(size, Content))) + end, emqx_mgmt_api_data:get_list_exported()), + %% download + ?assertMatch({ok, #{filename := FName}}, + emqx_mgmt_api_data:download(#{filename => FName}, [])) + end, NameVsnTable). + %%-------------------------------------------------------------------- %% Cases %%-------------------------------------------------------------------- -ifdef(EMQX_ENTERPRISE). -t_importee4010(_) -> - import("ee4010.json", ee4010), - {ok, _} = emqx_mgmt_data_backup:export(). - -t_importee410(_) -> - import("ee410.json", ee410), - {ok, _} = emqx_mgmt_data_backup:export(). - -t_importee411(_) -> - import("ee411.json", ee411), - {ok, _} = emqx_mgmt_data_backup:export(). - -t_importee420(_) -> - import("ee420.json", ee420), - {ok, _} = emqx_mgmt_data_backup:export(). - -t_importee425(_) -> - import("ee425.json", ee425), - {ok, _} = emqx_mgmt_data_backup:export(). - -t_importee430(_) -> - import("ee430.json", ee430), - {ok, _} = emqx_mgmt_data_backup:export(). +t_upload_import_export_list_download(_) -> + NameVsnTable = [ + {"ee4010.json", ee4010}, + {"ee410.json", ee410}, + {"ee411.json", ee411}, + {"ee420.json", ee420}, + {"ee425.json", ee425}, + {"ee430.json", ee430}, + {"ee430-中文.json", ee430} + ], + upload_import_export_list_download(NameVsnTable). %%-------------------------------------------------------------------- %% handle_config @@ -134,29 +152,17 @@ handle_config(_, _) -> ok. -ifndef(EMQX_ENTERPRISE). -t_import422(_) -> - import("422.json", 422), - {ok, _} = emqx_mgmt_data_backup:export(). - -t_import423(_) -> - import("423.json", 423), - {ok, _} = emqx_mgmt_data_backup:export(). - -t_import425(_) -> - import("425.json", 425), - {ok, _} = emqx_mgmt_data_backup:export(). - -t_import430(_) -> - import("430.json", 430), - {ok, _} = emqx_mgmt_data_backup:export(). - -t_import409(_) -> - import("409.json", 409), - {ok, _} = emqx_mgmt_data_backup:export(). - -t_import415(_) -> - import("415.json", 415), - {ok, _} = emqx_mgmt_data_backup:export(). +t_upload_import_export_list_download(_) -> + NameVsnTable = [ + {"422.json", 422}, + {"423.json", 423}, + {"425.json", 425}, + {"430.json", 430}, + {"430-中文.json", 430}, + {"409.json", 409}, + {"415.json", 415} + ], + upload_import_export_list_download(NameVsnTable). %%-------------------------------------------------------------------- %% handle_config diff --git a/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE_data/430-中文.json b/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE_data/430-中文.json new file mode 100644 index 000000000..9472152f9 --- /dev/null +++ b/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE_data/430-中文.json @@ -0,0 +1,52 @@ +{ + "version": "4.3", + "rules": [], + "resources": [ + { + "id": "webhook", + "type": "web_hook", + "config": { + "cacertfile": { + "filename": "", + "file": "" + }, + "certfile": { + "filename": "", + "file": "" + }, + "keyfile": { + "filename": "", + "file": "" + }, + "connect_timeout": "5s", + "pool_size": 8, + "request_timeout": "5s", + "url": "http://www.emqx.io", + "verify": false + }, + "created_at": 1616581851001, + "description": "webhook" + } + ], + "blacklist": [], + "apps": [ + { + "id": "admin", + "secret": "public", + "name": "Default", + "desc": "Application user", + "status": true, + "expired": "undefined" + } + ], + "users": [ + { + "username": "admin", + "password": "q8v7hISIMz+iKn/ZuAaogvAxKbA=", + "tags": "administrator" + } + ], + "auth_mnesia": [], + "acl_mnesia": [], + "date": "2021-03-24 18:31:21" +} \ No newline at end of file diff --git a/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE_data/ee430-中文.json b/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE_data/ee430-中文.json new file mode 100644 index 000000000..36986f24b --- /dev/null +++ b/apps/emqx_management/test/emqx_webhook_data_export_import_SUITE_data/ee430-中文.json @@ -0,0 +1,98 @@ +{ + "version": "4.3", + "rules": [], + "resources": [ + { + "id": "webhook", + "type": "web_hook", + "config": { + "cacertfile": { + "filename": "", + "file": "" + }, + "certfile": { + "filename": "", + "file": "" + }, + "connect_timeout": "5s", + "keyfile": { + "filename": "", + "file": "" + }, + "pool_size": 8, + "request_timeout": "5s", + "url": "http://www.emqx.io", + "verify": false + }, + "created_at": 1618304340172, + "description": "webhook" + } + ], + "blacklist": [], + "apps": [ + { + "id": "admin", + "secret": "public", + "name": "Default", + "desc": "Application user", + "status": true, + "expired": "undefined" + } + ], + "users": [ + { + "username": "admin", + "password": "qq8hg9pOkmYiHqzi3+bcUaK2CGA=", + "tags": "administrator" + } + ], + "auth_mnesia": [], + "acl_mnesia": [], + "modules": [ + { + "id": "module:aabeddbf", + "type": "recon", + "config": {}, + "enabled": true, + "created_at": 1618304311061, + "description": "" + }, + { + "id": "module:cbe6d976", + "type": "internal_acl", + "config": { + "acl_rule_file": "etc/acl.conf" + }, + "enabled": true, + "created_at": 1618304311061, + "description": "" + }, + { + "id": "module:46375e06", + "type": "retainer", + "config": { + "storage_type": "ram", + "max_retained_messages": 0, + "max_payload_size": "1MB", + "expiry_interval": 0 + }, + "enabled": true, + "created_at": 1618304311061, + "description": "" + }, + { + "id": "module:091eb7c3", + "type": "presence", + "config": { + "qos": 0 + }, + "enabled": true, + "created_at": 1618304311061, + "description": "" + } + ], + "schemas": [], + "configs": [], + "listeners_state": [], + "date": "2021-04-13 17:59:52" +} \ No newline at end of file diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index c6d8fb2b4..234e11c3f 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -176,10 +176,12 @@ end end()). +-define(RPC_TIMEOUT, 30000). + -define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). -define(CLUSTER_CALL(Func, Args, ResParttern), - fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of + fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, ?RPC_TIMEOUT) of {ResL, []} -> case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of [] -> ResL; @@ -192,6 +194,37 @@ throw({Func, {failed_on_nodes, BadNodes}}) end end()). +%% like CLUSTER_CALL/3, but recall the remote node using FallbackFunc if Func is undefined +-define(CLUSTER_CALL(Func, Args, ResParttern, FallbackFunc, FallbackArgs), + fun() -> + RNodes = ekka_mnesia:running_nodes(), + ResL = erpc:multicall(RNodes, ?MODULE, Func, Args, ?RPC_TIMEOUT), + Res = lists:zip(RNodes, ResL), + BadRes = lists:filtermap(fun + ({_Node, {ok, ResParttern}}) -> + false; + ({Node, {error, {exception, undef, _}}}) -> + try erpc:call(Node, ?MODULE, FallbackFunc, FallbackArgs, ?RPC_TIMEOUT) of + ResParttern -> + false; + OtherRes -> + {true, #{rpc_type => call, func => FallbackFunc, + result => OtherRes, node => Node}} + catch + Err:Reason -> + {true, #{rpc_type => call, func => FallbackFunc, + exception => {Err, Reason}, node => Node}} + end; + ({Node, OtherRes}) -> + {true, #{rpc_type => multicall, func => FallbackFunc, + result => OtherRes, node => Node}} + end, Res), + case BadRes of + [] -> Res; + _ -> throw(BadRes) + end + end()). + %% Tables -define(RULE_TAB, emqx_rule). -define(ACTION_TAB, emqx_rule_action). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 41a184e94..9f1f4d7bd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,19 +2,29 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.10", - [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.9", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, @@ -22,7 +32,10 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.[6-7]">>, - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -32,7 +45,10 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.5", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -43,7 +59,10 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -54,7 +73,9 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {"4.4.3", - [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -68,7 +89,9 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.2", - [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -83,7 +106,9 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.1", - [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -98,7 +123,9 @@ {add_module,emqx_rule_date}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.0", - [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -114,19 +141,29 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.10", - [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.9", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, @@ -134,7 +171,10 @@ {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.[6-7]">>, - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -144,7 +184,10 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.5", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -155,7 +198,10 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -166,7 +212,9 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {"4.4.3", - [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -180,7 +228,9 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.2", - [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -195,7 +245,9 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.1", - [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -210,7 +262,9 @@ {delete_module,emqx_rule_date}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.0", - [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index ac0ccaf4f..24dc58a97 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -24,7 +24,7 @@ , refresh_resources/0 , refresh_resource/1 , refresh_rule/1 - , refresh_rules/0 + , refresh_rules_when_boot/0 , refresh_actions/1 , refresh_actions/2 , refresh_resource_status/0 @@ -47,6 +47,7 @@ ]). -export([ init_resource/4 + , init_resource_with_retrier/4 , init_action/4 , clear_resource/4 , clear_rule/1 @@ -80,6 +81,19 @@ -define(T_RETRY, 60000). +%% redefine this macro to confine the appup scope +-undef(RAISE). +-define(RAISE(_EXP_, _ERROR_CONTEXT_), + fun() -> + try (_EXP_) + catch + throw : Reason -> + throw({_ERROR_CONTEXT_, Reason}); + _EXCLASS_:_EXCPTION_:_ST_ -> + throw({_ERROR_CONTEXT_, {_EXCPTION_, _EXCPTION_, _ST_}}) + end + end()). + %%------------------------------------------------------------------------------ %% Load resource/action providers from all available applications %%------------------------------------------------------------------------------ @@ -255,15 +269,20 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) -> created_at = erlang:system_time(millisecond) }, ok = emqx_rule_registry:add_resource(Resource), + InitArgs = [M, F, ResId, Config], case Retry of with_retry -> %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - _ = (catch (?CLUSTER_CALL(init_resource, [M, F, ResId, Config]))), + _ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok, + init_resource, InitArgs) + catch throw : Reason -> + ?LOG(error, "create_resource failed: ~0p", [Reason]) + end, {ok, Resource}; no_retry -> try - _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]), + _ = ?CLUSTER_CALL(init_resource, InitArgs), {ok, Resource} catch throw : Reason -> {error, Reason} @@ -290,7 +309,7 @@ check_and_update_resource(Id, NewParams) -> do_check_and_update_resource(#{id => Id, config => Conifg, type => Type, description => Descr}) catch Error:Reason:ST -> - ?LOG(error, "check_and_update_resource failed: ~0p", [{Error, Reason, ST}]), + ?LOG_SENSITIVE(error, "check_and_update_resource failed: ~0p", [{Error, Reason, ST}]), {error, Reason} end; _Other -> @@ -327,7 +346,7 @@ start_resource(ResId) -> {ok, #resource_type{on_create = {Mod, Create}}} = emqx_rule_registry:find_resource_type(ResType), try - init_resource(Mod, Create, ResId, Config), + init_resource_with_retrier(Mod, Create, ResId, Config), refresh_actions_of_a_resource(ResId) catch throw:Reason -> {error, Reason} @@ -358,7 +377,7 @@ test_resource(#{type := Type} = Params) -> {error, Reason} end catch E:R:S -> - ?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]), + ?LOG_SENSITIVE(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]), {error, R} after _ = ?CLUSTER_CALL(ensure_resource_deleted, [ResId]), @@ -476,20 +495,22 @@ refresh_resource(Type) when is_atom(Type) -> emqx_rule_registry:get_resources_by_type(Type)); refresh_resource(#resource{id = ResId, type = Type, config = Config}) -> - try - {ok, #resource_type{on_create = {M, F}}} = - emqx_rule_registry:find_resource_type(Type), - ok = emqx_rule_engine:init_resource(M, F, ResId, Config) - catch _:_ -> - emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY) - end. + {ok, #resource_type{on_create = {M, F}}} = + emqx_rule_registry:find_resource_type(Type), + ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config). --spec(refresh_rules() -> ok). -refresh_rules() -> +-spec(refresh_rules_when_boot() -> ok). +refresh_rules_when_boot() -> lists:foreach(fun (#rule{enabled = true} = Rule) -> try refresh_rule(Rule) catch _:_ -> + %% We set the enable = false when rule init failed to avoid bad rules running + %% without actions created properly. + %% The init failure might be caused by a disconnected resource, in this case the + %% actions can not be created, so the rules won't work. + %% After the user fixed the problem he can enable it manually, + %% doing so will also recreate the actions. emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup}) end; (_) -> ok @@ -648,18 +669,29 @@ action_instance_id(ActionName) -> iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]). init_resource(Module, OnCreate, ResId, Config) -> - Params = ?RAISE(Module:OnCreate(ResId, Config), - {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}), + Params = ?RAISE(Module:OnCreate(ResId, Config), {Module, OnCreate}), ResParams = #resource_params{id = ResId, params = Params, status = #{is_alive => true}}, emqx_rule_registry:add_resource_params(ResParams). +init_resource_with_retrier(Module, OnCreate, ResId, Config) -> + try + Params = Module:OnCreate(ResId, Config), + ResParams = #resource_params{id = ResId, + params = Params, + status = #{is_alive => true}}, + emqx_rule_registry:add_resource_params(ResParams) + catch Class:Reason:ST -> + Interval = persistent_term:get({emqx_rule_engine, resource_restart_interval}, ?T_RETRY), + emqx_rule_monitor:ensure_resource_retrier(ResId, Interval), + erlang:raise(Class, {init_resource, Reason}, ST) + end. + init_action(Module, OnCreate, ActionInstId, Params) -> ok = emqx_rule_metrics:create_metrics(ActionInstId), case ?RAISE(Module:OnCreate(ActionInstId, Params), - {{init_action_failure, node()}, - {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}}) of + {init_action_failure, node(), Module, OnCreate}) of {Apply, NewParams} when is_function(Apply) -> %% BACKW: =< e4.2.2 ok = emqx_rule_registry:add_action_instance_params( #action_instance_params{id = ActionInstId, params = NewParams, apply = Apply}); @@ -683,7 +715,7 @@ clear_resource(Module, Destroy, ResId, Type) -> case emqx_rule_registry:find_resource_params(ResId) of {ok, #resource_params{params = Params}} -> ?RAISE(Module:Destroy(ResId, Params), - {{destroy_resource_failure, node()}, {{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}), + {destroy_resource_failure, node(), Module, Destroy}), ok = emqx_rule_registry:remove_resource_params(ResId); not_found -> ok @@ -711,8 +743,8 @@ clear_action(Module, Destroy, ActionInstId) -> emqx_rule_metrics:clear_metrics(ActionInstId), case emqx_rule_registry:get_action_instance_params(ActionInstId) of {ok, #action_instance_params{params = Params}} -> - ?RAISE(Module:Destroy(ActionInstId, Params),{{destroy_action_failure, node()}, - {{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}), + ?RAISE(Module:Destroy(ActionInstId, Params), + {destroy_action_failure, node(), Module, Destroy}), ok = emqx_rule_registry:remove_action_instance_params(ActionInstId); not_found -> ok diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 202c21d1a..217d2bf2b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -28,8 +28,7 @@ start(_Type, _Args) -> {ok, Sup} = emqx_rule_engine_sup:start_link(), _ = emqx_rule_engine_sup:start_locker(), ok = emqx_rule_engine:load_providers(), - ok = emqx_rule_engine:refresh_resources(), - ok = emqx_rule_engine:refresh_rules(), + ok = emqx_rule_monitor:async_refresh_resources_rules(), ok = emqx_rule_engine_cli:load(), {ok, Sup}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index d829a3b14..bbdd36d7a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -853,5 +853,9 @@ printable_maps(Headers) -> value => Value } || {Key, Value} <- V0] }; - (K, V0, AccIn) -> AccIn#{K => V0} + (_K, V, AccIn) when is_tuple(V) -> + %% internal header + AccIn; + (K, V, AccIn) -> + AccIn#{K => V} end, #{}, Headers). diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl index afff14c40..8af8aa7ff 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl @@ -31,6 +31,7 @@ -export([ start_link/0 , stop/0 + , async_refresh_resources_rules/0 , ensure_resource_retrier/2 , retry_loop/3 ]). @@ -45,12 +46,22 @@ init([]) -> _ = erlang:process_flag(trap_exit, true), {ok, #{retryers => #{}}}. +async_refresh_resources_rules() -> + gen_server:cast(?MODULE, async_refresh). + ensure_resource_retrier(ResId, Interval) -> gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). handle_call(_Msg, _From, State) -> {reply, ok, State}. +handle_cast(async_refresh, #{boot_refresh_pid := Pid} = State) when is_pid(Pid) -> + %% the refresh task is already in progress, we discard the duplication + {noreply, State}; +handle_cast(async_refresh, State) -> + Pid = spawn_link(fun do_async_refresh/0), + {noreply, State#{boot_refresh_pid => Pid}}; + handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> Objects = maps:get(Tag, State, #{}), NewState = case maps:find(Obj, Objects) of @@ -65,7 +76,13 @@ handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> handle_cast(_Msg, State) -> {noreply, State}. + +handle_info({'EXIT', Pid, _Reason}, State = #{boot_refresh_pid := Pid}) -> + {noreply, State#{boot_refresh_pid => undefined}}; handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) -> + %% We won't try to restart the 'retryers' event if the 'EXIT' Reason is not 'normal'. + %% Instead we rely on the user to trigger a manual retry for the resources, and then enable + %% the rules after resources are connected. case maps:take(Pid, Retryers) of {{Tag, Obj}, Retryers2} -> Objects = maps:get(Tag, State, #{}), @@ -117,6 +134,12 @@ retry_loop(resource, ResId, Interval) -> ok end. +do_async_refresh() -> + %% NOTE: the order matters. + %% We should always refresh the resources first and then the rules. + ok = emqx_rule_engine:refresh_resources(), + ok = emqx_rule_engine:refresh_rules_when_boot(). + refresh_and_enable_rules_of_resource(ResId) -> lists:foreach( fun (#rule{id = Id, enabled = false, state = refresh_failed_at_bootup} = Rule) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl index f586d6256..6f2cccfea 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl @@ -22,10 +22,16 @@ t_mod_hook_fun(_) -> t_printable_maps(_) -> Headers = #{peerhost => {127,0,0,1}, peername => {{127,0,0,1}, 9980}, - sockname => {{127,0,0,1}, 1883} + sockname => {{127,0,0,1}, 1883}, + redispatch_to => {<<"group">>, <<"sub/topic/+">>}, + shared_dispatch_ack => {self(), ref} }, + Converted = emqx_rule_events:printable_maps(Headers), ?assertMatch( #{peerhost := <<"127.0.0.1">>, peername := <<"127.0.0.1:9980">>, sockname := <<"127.0.0.1:1883">> - }, emqx_rule_events:printable_maps(Headers)). + }, Converted), + ?assertNot(maps:is_key(redispatch_to, Converted)), + ?assertNot(maps:is_key(shared_dispatch_ack, Converted)), + ok. diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index 121970342..6389c1a2e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -35,6 +35,7 @@ suite() -> groups() -> [{resource, [sequence], [ t_restart_resource + , t_refresh_resources_rules ]} ]. @@ -47,24 +48,53 @@ end_per_suite(_Config) -> ok. init_per_testcase(t_restart_resource, Config) -> + persistent_term:put({emqx_rule_engine, resource_restart_interval}, 100), Opts = [public, named_table, set, {read_concurrency, true}], _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), ets:new(t_restart_resource, [named_table, public]), ets:insert(t_restart_resource, {failed_count, 0}), ets:insert(t_restart_resource, {succ_count, 0}), + common_init_per_testcase(), + Config; +init_per_testcase(t_refresh_resources_rules, Config) -> + meck:unload(), + ets:new(t_refresh_resources_rules, [named_table, public]), + ok = meck:new(emqx_rule_engine, [no_link, passthrough]), + meck:expect(emqx_rule_engine, refresh_resources, fun() -> + timer:sleep(500), + ets:update_counter(t_refresh_resources_rules, refresh_resources, 1, {refresh_resources, 0}), + ok + end), + meck:expect(emqx_rule_engine, refresh_rules_when_boot, fun() -> + timer:sleep(500), + ets:update_counter(t_refresh_resources_rules, refresh_rules, 1, {refresh_rules, 0}), + ok + end), + common_init_per_testcase(), Config; - init_per_testcase(_, Config) -> + common_init_per_testcase(), Config. end_per_testcase(t_restart_resource, Config) -> + persistent_term:put({emqx_rule_engine, resource_restart_interval}, 60000), ets:delete(t_restart_resource), + common_end_per_testcases(), + Config; +end_per_testcase(t_refresh_resources_rules, Config) -> + meck:unload(), + common_end_per_testcases(), Config; end_per_testcase(_, Config) -> + common_end_per_testcases(), Config. +common_init_per_testcase() -> + {ok, _} = emqx_rule_monitor:start_link(). +common_end_per_testcases() -> + emqx_rule_monitor:stop(). + t_restart_resource(_) -> - {ok, _} = emqx_rule_monitor:start_link(), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = test_res_1, @@ -79,11 +109,12 @@ t_restart_resource(_) -> {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( #{type => test_res_1, config => #{}, + restart_interval => 100, description => <<"debug resource">>}), - [{_, 1}] = ets:lookup(t_restart_resource, failed_count), - [{_, 0}] = ets:lookup(t_restart_resource, succ_count), + ?assertMatch([{_, 0}], ets:lookup(t_restart_resource, succ_count)), + ?assertMatch([{_, N}] when N == 1 orelse N == 2 orelse N == 3, + ets:lookup(t_restart_resource, failed_count)), ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]), - emqx_rule_monitor:ensure_resource_retrier(ResId, 100), timer:sleep(1000), [{_, 5}] = ets:lookup(t_restart_resource, failed_count), [{_, 1}] = ets:lookup(t_restart_resource, succ_count), @@ -91,9 +122,21 @@ t_restart_resource(_) -> ?assertEqual(0, map_size(Pids)), ok = emqx_rule_engine:unload_providers(), emqx_rule_registry:remove_resource(ResId), - emqx_rule_monitor:stop(), ok. +t_refresh_resources_rules(_) -> + ok = emqx_rule_monitor:async_refresh_resources_rules(), + ok = emqx_rule_monitor:async_refresh_resources_rules(), + %% there should be only one refresh handler at the same time + ?assertMatch(#{boot_refresh_pid := Pid} when is_pid(Pid), sys:get_state(whereis(emqx_rule_monitor))), + timer:sleep(1200), + ?assertEqual([{refresh_resources, 1}], ets:lookup(t_refresh_resources_rules, refresh_resources)), + ?assertEqual([{refresh_rules, 1}], ets:lookup(t_refresh_resources_rules, refresh_rules)), + ok = emqx_rule_monitor:async_refresh_resources_rules(), + timer:sleep(1200), + ?assertEqual([{refresh_resources, 2}], ets:lookup(t_refresh_resources_rules, refresh_resources)), + ?assertEqual([{refresh_rules, 2}], ets:lookup(t_refresh_resources_rules, refresh_rules)). + on_resource_create(Id, _) -> case ets:lookup(t_restart_resource, failed_count) of [{_, 5}] -> diff --git a/apps/emqx_web_hook/src/emqx_web_hook.app.src b/apps/emqx_web_hook/src/emqx_web_hook.app.src index efe41b3bb..5726f5d89 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.app.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.app.src @@ -1,6 +1,6 @@ {application, emqx_web_hook, [{description, "EMQ X WebHook Plugin"}, - {vsn, "4.3.14"}, % strict semver, bump manually! + {vsn, "4.3.15"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_web_hook_sup]}, {applications, [kernel,stdlib,ehttpc]}, diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src index a2f72f0e2..8b8058dba 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.appup.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -1,7 +1,8 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{<<"4\\.3\\.[0-7]">>, + [{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[0-7]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, @@ -24,7 +25,8 @@ {<<"4\\.3\\.1[2-3]">>, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[0-7]">>, + [{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[0-7]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 29550e1e9..254a476aa 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -384,7 +384,7 @@ test_http_connect(Conf) -> false catch Err:Reason:ST -> - ?LOG(error, "check http_connectivity failed: ~p, ~0p", [Conf, {Err, Reason, ST}]), + ?LOG_SENSITIVE(error, "check http_connectivity failed: ~p, ~0p", [Conf, {Err, Reason, ST}]), false end. l2b(L) when is_list(L) -> iolist_to_binary(L); diff --git a/changes/v4.3.22-en.md b/changes/v4.3.22-en.md index db76d3423..4fc08477d 100644 --- a/changes/v4.3.22-en.md +++ b/changes/v4.3.22-en.md @@ -1,14 +1,26 @@ -### Enhancements +# v4.3.22 + +## Enhancements + +- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199). + This is to avoid slowing down the boot if some resources spend long time establishing the connection. - Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124). This is to make the ACL deny logging for subscription behave the same as for publish. -### Bug fixes +- JWT ACL claim supports `all` action to imply the rules applie to both `pub` and `sub` [#9044](https://github.com/emqx/emqx/pull/9044). -- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places. [#9185](https://github.com/emqx/emqx/pull/9185) +- Added a log censor to avoid logging sensitive data [#9189](https://github.com/emqx/emqx/pull/9189). + If the data to be logged is a map or key-value list which contains sensitive key words such as `password`, the value is obfuscated as `******`. + +## Bug fixes + +- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224). + +- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places [#9185](https://github.com/emqx/emqx/pull/9185). This is to avoid displaying floats like `0.30000000000000004` on the dashboard. -- Fix the issue that emqx prints too many error logs when connecting to mongodb but auth failed. [#9184](https://github.com/emqx/emqx/pull/9184) +- Fix the issue that emqx prints too many error logs when connecting to mongodb but auth failed [#9184](https://github.com/emqx/emqx/pull/9184). - Fix that after receiving publish in `idle mode` the emqx-sn gateway may panic [#9024](https://github.com/emqx/emqx/pull/9024). @@ -16,5 +28,10 @@ - Restore old `emqx_auth_jwt` module API, so the hook callback functions registered in older version will not be invalidated after hot-upgrade [#9144](https://github.com/emqx/emqx/pull/9144). -- Fixed the response status code for the `/status` endpoint [#9210](https://github.com/emqx/emqx/pull/9210). +- Fixed the response status code for the `/status` endpoint [#9210](https://github.com/emqx/emqx/pull/9210). Before the fix, it always returned `200` even if the EMQX application was not running. Now it returns `503` in that case. + +- Fix message delivery related event encoding [#9226](https://github.com/emqx/emqx/pull/9226) + For rule-engine's input events like `$events/message_delivered`, and `$events/message_dropped`, + if the message was delivered to a shared-subscription, the encoding (to JSON) of the event will fail. + Affected versions: `v4.3.21`, `v4.4.10`, `e4.3.16` and `e4.4.10`. diff --git a/changes/v4.3.22-zh.md b/changes/v4.3.22-zh.md index e7d343db0..696cce5ca 100644 --- a/changes/v4.3.22-zh.md +++ b/changes/v4.3.22-zh.md @@ -1,14 +1,26 @@ -### 增强 +# v4.3.22 + +## 增强 + +- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。 + 这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。 - 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。 该行为的改变主要是为了跟发布失败时的行为保持一致。 -### 修复 +- 基于 JWT 的 ACL 支持 `all` 动作,指定同时适用于 `pub` 和 `sub` 两个动作的规则列表 [#9044](https://github.com/emqx/emqx/pull/9044)。 -- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185) +- 增强包含敏感数据的日志的安全性 [#9189](https://github.com/emqx/emqx/pull/9189)。 + 如果日志中包含敏感关键词,例如 `password`,那么关联的数据回被模糊化处理,替换成 `******`。 + +## 修复 + +- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。 + +- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)。 避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。 -- 修复在尝试连接 MongoDB 数据库过程中,如果认证失败会不停打印错误日志的问题。[#9184](https://github.com/emqx/emqx/pull/9184) +- 修复在尝试连接 MongoDB 数据库过程中,如果认证失败会不停打印错误日志的问题 [#9184](https://github.com/emqx/emqx/pull/9184)。 - 修复 emqx-sn 插件在“空闲”状态下收到消息发布请求时可能崩溃的情况 [#9024](https://github.com/emqx/emqx/pull/9024)。 @@ -18,3 +30,8 @@ - 修正了 `/status` API 的响应状态代码 [#9210](https://github.com/emqx/emqx/pull/9210)。 在修复之前,它总是返回 `200`,即使 EMQX 应用程序没有运行。 现在它在这种情况下返回 `503`。 + +- 修复规则引擎的消息事件编码失败 [#9226](https://github.com/emqx/emqx/pull/9226)。 + 带消息的规则引擎事件,例如 `$events/message_delivered` 和 `$events/message_dropped`, + 如果消息事件是共享订阅产生的,在编码(到 JSON 格式)过程中会失败。 + 影响到的版本:`v4.3.21`, `v4.4.10`, `e4.3.16` 和 `e4.4.10`。 diff --git a/include/logger.hrl b/include/logger.hrl index fb64a37e5..eaf26c34a 100644 --- a/include/logger.hrl +++ b/include/logger.hrl @@ -48,3 +48,13 @@ line => ?LINE})) end). +%% Copy-paste to avoid changing the old macro which may cause beam md5 changes in a lot of modules +%% i.e. hot-upgrade hell +-define(LOG_SENSITIVE(Level, Format, Args), + begin + (logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), emqx_misc:redact(Args)} end, + mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}, + line => ?LINE, + is_sensitive => true + })) + end). diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 8db8b0eaf..fff6ba87e 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.10", - [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -278,7 +279,8 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.10", - [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_logger_jsonfmt.erl b/src/emqx_logger_jsonfmt.erl index 640df8de4..0541f721b 100644 --- a/src/emqx_logger_jsonfmt.erl +++ b/src/emqx_logger_jsonfmt.erl @@ -217,6 +217,7 @@ json_key(Term) -> throw({badkey, Term}) end. + -ifdef(TEST). -include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index d84e9189d..f276c3ec2 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -66,6 +66,8 @@ nolink_apply/2 ]). +-export([redact/1]). + -define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$"). -define(DEFAULT_PMAP_TIMEOUT, 5000). @@ -468,6 +470,67 @@ maybe_mute_rpc_log(Node) -> ok end. +is_sensitive_key(token) -> true; +is_sensitive_key("token") -> true; +is_sensitive_key(<<"token">>) -> true; +is_sensitive_key(password) -> true; +is_sensitive_key("password") -> true; +is_sensitive_key(<<"password">>) -> true; +is_sensitive_key(secret) -> true; +is_sensitive_key("secret") -> true; +is_sensitive_key(<<"secret">>) -> true; +is_sensitive_key(passcode) -> true; +is_sensitive_key("passcode") -> true; +is_sensitive_key(<<"passcode">>) -> true; +is_sensitive_key(passphrase) -> true; +is_sensitive_key("passphrase") -> true; +is_sensitive_key(<<"passphrase">>) -> true; +is_sensitive_key(key) -> true; +is_sensitive_key("key") -> true; +is_sensitive_key(<<"key">>) -> true; +is_sensitive_key(aws_secret_access_key) -> true; +is_sensitive_key("aws_secret_access_key") -> true; +is_sensitive_key(<<"aws_secret_access_key">>) -> true; +is_sensitive_key(secret_key) -> true; +is_sensitive_key("secret_key") -> true; +is_sensitive_key(<<"secret_key">>) -> true; +is_sensitive_key(bind_password) -> true; +is_sensitive_key("bind_password") -> true; +is_sensitive_key(<<"bind_password">>) -> true; +is_sensitive_key(_) -> false. + +redact(L) when is_list(L) -> + lists:map(fun redact/1, L); +redact(M) when is_map(M) -> + maps:map(fun(K, V) -> + redact(K, V) + end, M); +redact({Key, Value}) -> + case is_sensitive_key(Key) of + true -> + {Key, redact_v(Value)}; + false -> + {redact(Key), redact(Value)} + end; +redact(T) when is_tuple(T) -> + Elements = erlang:tuple_to_list(T), + Redact = redact(Elements), + erlang:list_to_tuple(Redact); +redact(Any) -> + Any. + +redact(K, V) -> + case is_sensitive_key(K) of + true -> + redact_v(V); + false -> + redact(V) + end. + +-define(REDACT_VAL, "******"). +redact_v(V) when is_binary(V) -> <>; +redact_v(_V) -> ?REDACT_VAL. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -500,4 +563,53 @@ is_sane_id_test() -> ?assertMatch({error, _}, is_sane_id(list_to_binary(Bad))), ok. +redact_test_() -> + Case = fun(Type, KeyT) -> + Key = + case Type of + atom -> KeyT; + string -> erlang:atom_to_list(KeyT); + binary -> erlang:atom_to_binary(KeyT) + end, + + ?assert(is_sensitive_key(Key)), + + %% direct + ?assertEqual({Key, ?REDACT_VAL}, redact({Key, foo})), + ?assertEqual(#{Key => ?REDACT_VAL}, redact(#{Key => foo})), + ?assertEqual({Key, Key, Key}, redact({Key, Key, Key})), + ?assertEqual({[{Key, ?REDACT_VAL}], bar}, redact({[{Key, foo}], bar})), + + %% 1 level nested + ?assertEqual([{Key, ?REDACT_VAL}], redact([{Key, foo}])), + ?assertEqual([#{Key => ?REDACT_VAL}], redact([#{Key => foo}])), + + %% 2 level nested + ?assertEqual(#{opts => [{Key, ?REDACT_VAL}]}, redact(#{opts => [{Key, foo}]})), + ?assertEqual(#{opts => #{Key => ?REDACT_VAL}}, redact(#{opts => #{Key => foo}})), + ?assertEqual({opts, [{Key, ?REDACT_VAL}]}, redact({opts, [{Key, foo}]})), + + %% 3 level nested + ?assertEqual([#{opts => [{Key, ?REDACT_VAL}]}], redact([#{opts => [{Key, foo}]}])), + ?assertEqual([{opts, [{Key, ?REDACT_VAL}]}], redact([{opts, [{Key, foo}]}])), + ?assertEqual([{opts, [#{Key => ?REDACT_VAL}]}], redact([{opts, [#{Key => foo}]}])) + end, + + Types = [atom, string, binary], + Keys = [ + token, + password, + secret, + passcode, + passphrase, + key, + aws_secret_access_key, + secret_key, + bind_password + ], + [{case_name(Type, Key), fun() -> Case(Type, Key) end} || Key <- Keys, Type <- Types]. + +case_name(Type, Key) -> + lists:concat([Type, "-", Key]). + -endif.