Merge remote-tracking branch 'origin/main-v4.3' into main-v4.4

This commit is contained in:
Zaiming (Stone) Shi 2022-10-27 17:31:15 +02:00
commit 537f088616
30 changed files with 642 additions and 142 deletions

View File

@ -41,12 +41,18 @@ check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _Params)
ok; ok;
check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl := ACLParams = #{path := Path}}) -> check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl := ACLParams = #{path := Path}}) ->
ClientInfo1 = ClientInfo#{access => access(PubSub), topic => Topic}, ClientInfo1 = ClientInfo#{access => access(PubSub), topic => Topic},
Username = maps:get(username, ClientInfo1, undefined),
case check_acl_request(ACLParams, ClientInfo1) of case check_acl_request(ACLParams, ClientInfo1) of
{ok, 200, <<"ignore">>} -> ok; {ok, 200, <<"ignore">>} -> ok;
{ok, 200, _Body} -> {stop, allow}; {ok, 200, _Body} -> {stop, allow};
{ok, _Code, _Body} -> {stop, deny}; {ok, Code, _Body} ->
{error, Error} -> ?LOG(error, "Deny ~s to topic ~ts, username: ~ts, http response code: ~p",
?LOG(error, "Request ACL path ~s, error: ~p", [Path, Error]), [PubSub, Topic, Username, Code]),
{stop, deny};
{error, Error} ->
?LOG(error, "Deny ~s to topic ~ts, username: ~ts, due to request "
"http server failure, path: ~p, error: ~0p",
[PubSub, Topic, Username, Path, Error]),
ok ok
end. end.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_http, {application, emqx_auth_http,
[{description, "EMQ X Authentication/ACL with HTTP API"}, [{description, "EMQ X Authentication/ACL with HTTP API"},
{vsn, "4.3.8"}, % strict semver, bump manually! {vsn, "4.3.9"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_http_sup]}, {registered, [emqx_auth_http_sup]},
{applications, [kernel,stdlib,ehttpc]}, {applications, [kernel,stdlib,ehttpc]},

View File

@ -1,7 +1,10 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.3.7", [{"4.3.8",
[{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
@ -32,7 +35,10 @@
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]}, {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.7", [{"4.3.8",
[{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},

View File

@ -36,6 +36,7 @@
check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path}, check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path},
super := SuperParams}) -> super := SuperParams}) ->
Username = maps:get(username, ClientInfo, undefined),
case authenticate(AuthParms, ClientInfo) of case authenticate(AuthParms, ClientInfo) of
{ok, 200, <<"ignore">>} -> {ok, 200, <<"ignore">>} ->
ok; ok;
@ -46,12 +47,15 @@ check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path},
anonymous => false, anonymous => false,
mountpoint => mountpoint(Body, ClientInfo)}}; mountpoint => mountpoint(Body, ClientInfo)}};
{ok, Code, _Body} -> {ok, Code, _Body} ->
?LOG(error, "Deny connection from path: ~s, response http code: ~p", ?LOG(error, "Deny connection from path: ~s, username: ~ts, http "
[Path, Code]), "response code: ~p",
[Path, Username, Code]),
{stop, AuthResult#{auth_result => http_to_connack_error(Code), {stop, AuthResult#{auth_result => http_to_connack_error(Code),
anonymous => false}}; anonymous => false}};
{error, Error} -> {error, Error} ->
?LOG(error, "Request auth path: ~s, error: ~p", [Path, Error]), ?LOG(error, "Deny connection from path: ~s, username: ~ts, due to "
"request http-server failed: ~0p",
[Path, Username, Error]),
%%FIXME later: server_unavailable is not right. %%FIXME later: server_unavailable is not right.
{stop, AuthResult#{auth_result => server_unavailable, {stop, AuthResult#{auth_result => server_unavailable,
anonymous => false}} anonymous => false}}

View File

@ -113,11 +113,20 @@ string_to_number(_) ->
%% Verify Claims %% Verify Claims
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
verify_acl(ClientInfo, #{<<"sub">> := SubTopics}, subscribe, Topic) when is_list(SubTopics) -> verify_acl(ClientInfo, Acl, PubSub, Topic) ->
verify_acl(ClientInfo, SubTopics, Topic); Key = case PubSub of
verify_acl(ClientInfo, #{<<"pub">> := PubTopics}, publish, Topic) when is_list(PubTopics) -> subscribe -> <<"sub">>;
verify_acl(ClientInfo, PubTopics, Topic); publish -> <<"pub">>
verify_acl(_ClientInfo, _Acl, _PubSub, _Topic) -> {stop, deny}. end,
Rules0 = lists:map(
fun(K) ->
case maps:get(K, Acl, undefined) of
R when is_list(R) -> R;
_ -> []
end
end, [<<"all">>, Key]),
Rules = lists:append(Rules0),
verify_acl(ClientInfo, Rules, Topic).
verify_acl(_ClientInfo, [], _Topic) -> {stop, deny}; verify_acl(_ClientInfo, [], _Topic) -> {stop, deny};
verify_acl(ClientInfo, [AclTopic | AclTopics], Topic) -> verify_acl(ClientInfo, [AclTopic | AclTopics], Topic) ->

View File

@ -297,7 +297,8 @@ t_check_jwt_acl(_Config) ->
{username, <<"plain">>}, {username, <<"plain">>},
{sub, value}, {sub, value},
{acl, [{sub, [<<"a/b">>]}, {acl, [{sub, [<<"a/b">>]},
{pub, [<<"c/d">>]}]}, {pub, [<<"c/d">>]},
{all, [<<"all">>]}]},
{exp, erlang:system_time(seconds) + 10}], {exp, erlang:system_time(seconds) + 10}],
<<"HS256">>, <<"HS256">>,
<<"emqxsecret">>), <<"emqxsecret">>),
@ -329,6 +330,19 @@ t_check_jwt_acl(_Config) ->
after 100 -> ok after 100 -> ok
end, end,
%% can pub/sub to all rules
?assertMatch(
{ok, #{}, [0]},
emqtt:subscribe(C, <<"all">>, 0)),
?assertMatch(
ok,
emqtt:publish(C, <<"all">>, <<"hi">>, 0)),
receive
{publish, #{topic := <<"all">>}} -> ok
after 2000 ->
?assert(false, "Publish to `all` should be allowed")
end,
ok = emqtt:disconnect(C). ok = emqtt:disconnect(C).
t_check_jwt_acl_no_recs(init, _Config) -> t_check_jwt_acl_no_recs(init, _Config) ->

View File

@ -56,19 +56,7 @@ start(Module, Config) ->
{ok, Conn} -> {ok, Conn} ->
{ok, Conn}; {ok, Conn};
{error, Reason} -> {error, Reason} ->
Config1 = obfuscate(Config), ?LOG_SENSITIVE(error, "Failed to connect with module=~p\n"
?LOG(error, "Failed to connect with module=~p\n" "config=~p\nreason:~p", [Module, Config, Reason]),
"config=~p\nreason:~p", [Module, Config1, Reason]),
{error, Reason} {error, Reason}
end. end.
obfuscate(Map) ->
maps:fold(fun(K, V, Acc) ->
case is_sensitive(K) of
true -> [{K, '***'} | Acc];
false -> [{K, V} | Acc]
end
end, [], Map).
is_sensitive(password) -> true;
is_sensitive(_) -> false.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_mqtt, {application, emqx_bridge_mqtt,
[{description, "EMQ X Bridge to MQTT Broker"}, [{description, "EMQ X Bridge to MQTT Broker"},
{vsn, "4.3.6"}, % strict semver, bump manually! {vsn, "4.3.7"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel,stdlib,replayq,emqtt]}, {applications, [kernel,stdlib,replayq,emqtt]},

View File

@ -1,29 +1,43 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{<<"4\\.3\\.[4-5]">>, [{"4.3.6",
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[4-5]">>,
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}, [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]}, {load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>, {<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[4-5]">>, [{"4.3.6",
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[4-5]">>,
[{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}, [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]}, {load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>, {<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}, [{load_module,emqx_bridge_connect,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -421,7 +421,7 @@ start_resource(ResId, PoolName, Options) ->
on_resource_destroy(ResId, #{<<"pool">> => PoolName}), on_resource_destroy(ResId, #{<<"pool">> => PoolName}),
start_resource(ResId, PoolName, Options); start_resource(ResId, PoolName, Options);
{error, Reason} -> {error, Reason} ->
?LOG(error, "Initiate Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]), ?LOG_SENSITIVE(error, "Initiate Resource ~p failed, ResId: ~p, ~p", [?RESOURCE_TYPE_MQTT, ResId, Reason]),
on_resource_destroy(ResId, #{<<"pool">> => PoolName}), on_resource_destroy(ResId, #{<<"pool">> => PoolName}),
error({{?RESOURCE_TYPE_MQTT, ResId}, create_failed}) error({{?RESOURCE_TYPE_MQTT, ResId}, create_failed})
end. end.

View File

@ -73,7 +73,7 @@
export(_Bindings, _Params) -> export(_Bindings, _Params) ->
case emqx_mgmt_data_backup:export() of case emqx_mgmt_data_backup:export() of
{ok, File = #{filename := Filename}} -> {ok, File = #{filename := Filename}} ->
minirest:return({ok, File#{filename => list_to_binary(filename:basename(Filename))}}); minirest:return({ok, File#{filename => unicode:characters_to_binary(filename:basename(Filename))}});
Return -> minirest:return(Return) Return -> minirest:return(Return)
end. end.
@ -162,7 +162,7 @@ import_content(Content) ->
tmp_filename() -> tmp_filename() ->
Seconds = erlang:system_time(second), Seconds = erlang:system_time(second),
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
list_to_binary(io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S])). iolist_to_binary(io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S])).
filename_decode(Filename) -> filename_decode(Filename) ->
uri_string:percent_decode(Filename). uri_string:percent_decode(Filename).

View File

@ -28,13 +28,17 @@ all() ->
emqx_ct:all(?MODULE). emqx_ct:all(?MODULE).
init_per_suite(Cfg) -> init_per_suite(Cfg) ->
ekka_mnesia:start(),
ok = emqx_dashboard_admin:mnesia(boot),
application:load(emqx_modules), application:load(emqx_modules),
application:load(emqx_bridge_mqtt), application:load(emqx_bridge_mqtt),
emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]), emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]),
application:ensure_all_started(emqx_dashboard),
Cfg. Cfg.
end_per_suite(Cfg) -> end_per_suite(Cfg) ->
emqx_mgmt_data_backup:delete_all_backup_file(), emqx_mgmt_data_backup:delete_all_backup_file(),
application:stop(emqx_dashboard),
emqx_ct_helpers:stop_apps([emqx_management, emqx_rule_engine]), emqx_ct_helpers:stop_apps([emqx_management, emqx_rule_engine]),
Cfg. Cfg.

View File

@ -28,14 +28,18 @@ all() ->
emqx_ct:all(?MODULE). emqx_ct:all(?MODULE).
init_per_suite(Cfg) -> init_per_suite(Cfg) ->
ekka_mnesia:start(),
ok = emqx_dashboard_admin:mnesia(boot),
application:load(emqx_modules), application:load(emqx_modules),
application:load(emqx_web_hook), application:load(emqx_web_hook),
emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]), emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]),
application:ensure_all_started(emqx_dashboard),
ok = emqx_rule_registry:mnesia(boot), ok = emqx_rule_registry:mnesia(boot),
ok = emqx_rule_engine:load_providers(), ok = emqx_rule_engine:load_providers(),
Cfg. Cfg.
end_per_suite(Cfg) -> end_per_suite(Cfg) ->
application:stop(emqx_dashboard),
emqx_ct_helpers:stop_apps([emqx_management, emqx_rule_engine]), emqx_ct_helpers:stop_apps([emqx_management, emqx_rule_engine]),
Cfg. Cfg.
@ -46,11 +50,8 @@ remove_resource(Id) ->
emqx_rule_registry:remove_resource(Id), emqx_rule_registry:remove_resource(Id),
emqx_rule_registry:remove_resource_params(Id). emqx_rule_registry:remove_resource_params(Id).
import(FilePath0, Version) -> import_and_check(Filename, Version) ->
Filename = filename:basename(FilePath0), {ok, #{code := 0}} = emqx_mgmt_api_data:import(#{}, [{<<"filename">>, Filename}]),
FilePath = filename:join([get_data_path(), FilePath0]),
{ok, Bin} = file:read_file(FilePath),
ok = emqx_mgmt_data_backup:upload_backup_file(Filename, Bin),
lists:foreach(fun(#resource{id = Id, config = Config} = _Resource) -> lists:foreach(fun(#resource{id = Id, config = Config} = _Resource) ->
case Id of case Id of
<<"webhook">> -> <<"webhook">> ->
@ -61,34 +62,51 @@ import(FilePath0, Version) ->
end end
end, emqx_rule_registry:get_resources()). end, emqx_rule_registry:get_resources()).
upload_import_export_list_download(NameVsnTable) ->
lists:foreach(fun({Filename0, Vsn}) ->
Filename = unicode:characters_to_binary(Filename0),
FullPath = filename:join([get_data_path(), Filename]),
ct:pal("testing upload_import_export_list_download for file: ~ts, version: ~p", [FullPath, Vsn]),
%% upload
{ok, FileCnt} = file:read_file(FullPath),
{ok, #{code := 0}} = emqx_mgmt_api_data:upload(#{},
[{<<"filename">>, Filename}, {<<"file">>, FileCnt}]),
%% import
ok = import_and_check(Filename, Vsn),
%% export
{ok, #{data := #{created_at := CAt, filename := FName, size := Size}}}
= emqx_mgmt_api_data:export(#{}, []),
?assert(true, is_binary(CAt)),
?assert(true, is_binary(FName)),
?assert(true, is_integer(Size)),
%% list exported files
lists:foreach(fun({Seconds, Content}) ->
?assert(true, is_integer(Seconds)),
?assert(true, is_binary(proplists:get_value(filename, Content))),
?assert(true, is_binary(proplists:get_value(created_at, Content))),
?assert(true, is_integer(proplists:get_value(size, Content)))
end, emqx_mgmt_api_data:get_list_exported()),
%% download
?assertMatch({ok, #{filename := FName}},
emqx_mgmt_api_data:download(#{filename => FName}, []))
end, NameVsnTable).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Cases %% Cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-ifdef(EMQX_ENTERPRISE). -ifdef(EMQX_ENTERPRISE).
t_importee4010(_) -> t_upload_import_export_list_download(_) ->
import("ee4010.json", ee4010), NameVsnTable = [
{ok, _} = emqx_mgmt_data_backup:export(). {"ee4010.json", ee4010},
{"ee410.json", ee410},
t_importee410(_) -> {"ee411.json", ee411},
import("ee410.json", ee410), {"ee420.json", ee420},
{ok, _} = emqx_mgmt_data_backup:export(). {"ee425.json", ee425},
{"ee430.json", ee430},
t_importee411(_) -> {"ee430-中文.json", ee430}
import("ee411.json", ee411), ],
{ok, _} = emqx_mgmt_data_backup:export(). upload_import_export_list_download(NameVsnTable).
t_importee420(_) ->
import("ee420.json", ee420),
{ok, _} = emqx_mgmt_data_backup:export().
t_importee425(_) ->
import("ee425.json", ee425),
{ok, _} = emqx_mgmt_data_backup:export().
t_importee430(_) ->
import("ee430.json", ee430),
{ok, _} = emqx_mgmt_data_backup:export().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% handle_config %% handle_config
@ -134,29 +152,17 @@ handle_config(_, _) -> ok.
-ifndef(EMQX_ENTERPRISE). -ifndef(EMQX_ENTERPRISE).
t_import422(_) -> t_upload_import_export_list_download(_) ->
import("422.json", 422), NameVsnTable = [
{ok, _} = emqx_mgmt_data_backup:export(). {"422.json", 422},
{"423.json", 423},
t_import423(_) -> {"425.json", 425},
import("423.json", 423), {"430.json", 430},
{ok, _} = emqx_mgmt_data_backup:export(). {"430-中文.json", 430},
{"409.json", 409},
t_import425(_) -> {"415.json", 415}
import("425.json", 425), ],
{ok, _} = emqx_mgmt_data_backup:export(). upload_import_export_list_download(NameVsnTable).
t_import430(_) ->
import("430.json", 430),
{ok, _} = emqx_mgmt_data_backup:export().
t_import409(_) ->
import("409.json", 409),
{ok, _} = emqx_mgmt_data_backup:export().
t_import415(_) ->
import("415.json", 415),
{ok, _} = emqx_mgmt_data_backup:export().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% handle_config %% handle_config

View File

@ -0,0 +1,52 @@
{
"version": "4.3",
"rules": [],
"resources": [
{
"id": "webhook",
"type": "web_hook",
"config": {
"cacertfile": {
"filename": "",
"file": ""
},
"certfile": {
"filename": "",
"file": ""
},
"keyfile": {
"filename": "",
"file": ""
},
"connect_timeout": "5s",
"pool_size": 8,
"request_timeout": "5s",
"url": "http://www.emqx.io",
"verify": false
},
"created_at": 1616581851001,
"description": "webhook"
}
],
"blacklist": [],
"apps": [
{
"id": "admin",
"secret": "public",
"name": "Default",
"desc": "Application user",
"status": true,
"expired": "undefined"
}
],
"users": [
{
"username": "admin",
"password": "q8v7hISIMz+iKn/ZuAaogvAxKbA=",
"tags": "administrator"
}
],
"auth_mnesia": [],
"acl_mnesia": [],
"date": "2021-03-24 18:31:21"
}

View File

@ -0,0 +1,98 @@
{
"version": "4.3",
"rules": [],
"resources": [
{
"id": "webhook",
"type": "web_hook",
"config": {
"cacertfile": {
"filename": "",
"file": ""
},
"certfile": {
"filename": "",
"file": ""
},
"connect_timeout": "5s",
"keyfile": {
"filename": "",
"file": ""
},
"pool_size": 8,
"request_timeout": "5s",
"url": "http://www.emqx.io",
"verify": false
},
"created_at": 1618304340172,
"description": "webhook"
}
],
"blacklist": [],
"apps": [
{
"id": "admin",
"secret": "public",
"name": "Default",
"desc": "Application user",
"status": true,
"expired": "undefined"
}
],
"users": [
{
"username": "admin",
"password": "qq8hg9pOkmYiHqzi3+bcUaK2CGA=",
"tags": "administrator"
}
],
"auth_mnesia": [],
"acl_mnesia": [],
"modules": [
{
"id": "module:aabeddbf",
"type": "recon",
"config": {},
"enabled": true,
"created_at": 1618304311061,
"description": ""
},
{
"id": "module:cbe6d976",
"type": "internal_acl",
"config": {
"acl_rule_file": "etc/acl.conf"
},
"enabled": true,
"created_at": 1618304311061,
"description": ""
},
{
"id": "module:46375e06",
"type": "retainer",
"config": {
"storage_type": "ram",
"max_retained_messages": 0,
"max_payload_size": "1MB",
"expiry_interval": 0
},
"enabled": true,
"created_at": 1618304311061,
"description": ""
},
{
"id": "module:091eb7c3",
"type": "presence",
"config": {
"qos": 0
},
"enabled": true,
"created_at": 1618304311061,
"description": ""
}
],
"schemas": [],
"configs": [],
"listeners_state": [],
"date": "2021-04-13 17:59:52"
}

View File

@ -176,10 +176,12 @@
end end
end()). end()).
-define(RPC_TIMEOUT, 30000).
-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). -define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
-define(CLUSTER_CALL(Func, Args, ResParttern), -define(CLUSTER_CALL(Func, Args, ResParttern),
fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, ?RPC_TIMEOUT) of
{ResL, []} -> {ResL, []} ->
case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
[] -> ResL; [] -> ResL;
@ -192,6 +194,37 @@
throw({Func, {failed_on_nodes, BadNodes}}) throw({Func, {failed_on_nodes, BadNodes}})
end end()). end end()).
%% like CLUSTER_CALL/3, but recall the remote node using FallbackFunc if Func is undefined
-define(CLUSTER_CALL(Func, Args, ResParttern, FallbackFunc, FallbackArgs),
fun() ->
RNodes = ekka_mnesia:running_nodes(),
ResL = erpc:multicall(RNodes, ?MODULE, Func, Args, ?RPC_TIMEOUT),
Res = lists:zip(RNodes, ResL),
BadRes = lists:filtermap(fun
({_Node, {ok, ResParttern}}) ->
false;
({Node, {error, {exception, undef, _}}}) ->
try erpc:call(Node, ?MODULE, FallbackFunc, FallbackArgs, ?RPC_TIMEOUT) of
ResParttern ->
false;
OtherRes ->
{true, #{rpc_type => call, func => FallbackFunc,
result => OtherRes, node => Node}}
catch
Err:Reason ->
{true, #{rpc_type => call, func => FallbackFunc,
exception => {Err, Reason}, node => Node}}
end;
({Node, OtherRes}) ->
{true, #{rpc_type => multicall, func => FallbackFunc,
result => OtherRes, node => Node}}
end, Res),
case BadRes of
[] -> Res;
_ -> throw(BadRes)
end
end()).
%% Tables %% Tables
-define(RULE_TAB, emqx_rule). -define(RULE_TAB, emqx_rule).
-define(ACTION_TAB, emqx_rule_action). -define(ACTION_TAB, emqx_rule_action).

View File

@ -24,7 +24,7 @@
, refresh_resources/0 , refresh_resources/0
, refresh_resource/1 , refresh_resource/1
, refresh_rule/1 , refresh_rule/1
, refresh_rules/0 , refresh_rules_when_boot/0
, refresh_actions/1 , refresh_actions/1
, refresh_actions/2 , refresh_actions/2
, refresh_resource_status/0 , refresh_resource_status/0
@ -47,6 +47,7 @@
]). ]).
-export([ init_resource/4 -export([ init_resource/4
, init_resource_with_retrier/4
, init_action/4 , init_action/4
, clear_resource/4 , clear_resource/4
, clear_rule/1 , clear_rule/1
@ -80,6 +81,19 @@
-define(T_RETRY, 60000). -define(T_RETRY, 60000).
%% redefine this macro to confine the appup scope
-undef(RAISE).
-define(RAISE(_EXP_, _ERROR_CONTEXT_),
fun() ->
try (_EXP_)
catch
throw : Reason ->
throw({_ERROR_CONTEXT_, Reason});
_EXCLASS_:_EXCPTION_:_ST_ ->
throw({_ERROR_CONTEXT_, {_EXCPTION_, _EXCPTION_, _ST_}})
end
end()).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Load resource/action providers from all available applications %% Load resource/action providers from all available applications
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -255,15 +269,20 @@ create_resource(#{type := Type, config := Config0} = Params, Retry) ->
created_at = erlang:system_time(millisecond) created_at = erlang:system_time(millisecond)
}, },
ok = emqx_rule_registry:add_resource(Resource), ok = emqx_rule_registry:add_resource(Resource),
InitArgs = [M, F, ResId, Config],
case Retry of case Retry of
with_retry -> with_retry ->
%% Note that we will return OK in case of resource creation failure, %% Note that we will return OK in case of resource creation failure,
%% A timer is started to re-start the resource later. %% A timer is started to re-start the resource later.
_ = (catch (?CLUSTER_CALL(init_resource, [M, F, ResId, Config]))), _ = try ?CLUSTER_CALL(init_resource_with_retrier, InitArgs, ok,
init_resource, InitArgs)
catch throw : Reason ->
?LOG(error, "create_resource failed: ~0p", [Reason])
end,
{ok, Resource}; {ok, Resource};
no_retry -> no_retry ->
try try
_ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]), _ = ?CLUSTER_CALL(init_resource, InitArgs),
{ok, Resource} {ok, Resource}
catch throw : Reason -> catch throw : Reason ->
{error, Reason} {error, Reason}
@ -290,7 +309,7 @@ check_and_update_resource(Id, NewParams) ->
do_check_and_update_resource(#{id => Id, config => Conifg, type => Type, do_check_and_update_resource(#{id => Id, config => Conifg, type => Type,
description => Descr}) description => Descr})
catch Error:Reason:ST -> catch Error:Reason:ST ->
?LOG(error, "check_and_update_resource failed: ~0p", [{Error, Reason, ST}]), ?LOG_SENSITIVE(error, "check_and_update_resource failed: ~0p", [{Error, Reason, ST}]),
{error, Reason} {error, Reason}
end; end;
_Other -> _Other ->
@ -327,7 +346,7 @@ start_resource(ResId) ->
{ok, #resource_type{on_create = {Mod, Create}}} {ok, #resource_type{on_create = {Mod, Create}}}
= emqx_rule_registry:find_resource_type(ResType), = emqx_rule_registry:find_resource_type(ResType),
try try
init_resource(Mod, Create, ResId, Config), init_resource_with_retrier(Mod, Create, ResId, Config),
refresh_actions_of_a_resource(ResId) refresh_actions_of_a_resource(ResId)
catch catch
throw:Reason -> {error, Reason} throw:Reason -> {error, Reason}
@ -358,7 +377,7 @@ test_resource(#{type := Type} = Params) ->
{error, Reason} {error, Reason}
end end
catch E:R:S -> catch E:R:S ->
?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]), ?LOG_SENSITIVE(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]),
{error, R} {error, R}
after after
_ = ?CLUSTER_CALL(ensure_resource_deleted, [ResId]), _ = ?CLUSTER_CALL(ensure_resource_deleted, [ResId]),
@ -476,20 +495,22 @@ refresh_resource(Type) when is_atom(Type) ->
emqx_rule_registry:get_resources_by_type(Type)); emqx_rule_registry:get_resources_by_type(Type));
refresh_resource(#resource{id = ResId, type = Type, config = Config}) -> refresh_resource(#resource{id = ResId, type = Type, config = Config}) ->
try {ok, #resource_type{on_create = {M, F}}} =
{ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
emqx_rule_registry:find_resource_type(Type), ok = emqx_rule_engine:init_resource_with_retrier(M, F, ResId, Config).
ok = emqx_rule_engine:init_resource(M, F, ResId, Config)
catch _:_ ->
emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY)
end.
-spec(refresh_rules() -> ok). -spec(refresh_rules_when_boot() -> ok).
refresh_rules() -> refresh_rules_when_boot() ->
lists:foreach(fun lists:foreach(fun
(#rule{enabled = true} = Rule) -> (#rule{enabled = true} = Rule) ->
try refresh_rule(Rule) try refresh_rule(Rule)
catch _:_ -> catch _:_ ->
%% We set the enable = false when rule init failed to avoid bad rules running
%% without actions created properly.
%% The init failure might be caused by a disconnected resource, in this case the
%% actions can not be created, so the rules won't work.
%% After the user fixed the problem he can enable it manually,
%% doing so will also recreate the actions.
emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup}) emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup})
end; end;
(_) -> ok (_) -> ok
@ -648,18 +669,29 @@ action_instance_id(ActionName) ->
iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]). iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]).
init_resource(Module, OnCreate, ResId, Config) -> init_resource(Module, OnCreate, ResId, Config) ->
Params = ?RAISE(Module:OnCreate(ResId, Config), Params = ?RAISE(Module:OnCreate(ResId, Config), {Module, OnCreate}),
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
ResParams = #resource_params{id = ResId, ResParams = #resource_params{id = ResId,
params = Params, params = Params,
status = #{is_alive => true}}, status = #{is_alive => true}},
emqx_rule_registry:add_resource_params(ResParams). emqx_rule_registry:add_resource_params(ResParams).
init_resource_with_retrier(Module, OnCreate, ResId, Config) ->
try
Params = Module:OnCreate(ResId, Config),
ResParams = #resource_params{id = ResId,
params = Params,
status = #{is_alive => true}},
emqx_rule_registry:add_resource_params(ResParams)
catch Class:Reason:ST ->
Interval = persistent_term:get({emqx_rule_engine, resource_restart_interval}, ?T_RETRY),
emqx_rule_monitor:ensure_resource_retrier(ResId, Interval),
erlang:raise(Class, {init_resource, Reason}, ST)
end.
init_action(Module, OnCreate, ActionInstId, Params) -> init_action(Module, OnCreate, ActionInstId, Params) ->
ok = emqx_rule_metrics:create_metrics(ActionInstId), ok = emqx_rule_metrics:create_metrics(ActionInstId),
case ?RAISE(Module:OnCreate(ActionInstId, Params), case ?RAISE(Module:OnCreate(ActionInstId, Params),
{{init_action_failure, node()}, {init_action_failure, node(), Module, OnCreate}) of
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}}) of
{Apply, NewParams} when is_function(Apply) -> %% BACKW: =< e4.2.2 {Apply, NewParams} when is_function(Apply) -> %% BACKW: =< e4.2.2
ok = emqx_rule_registry:add_action_instance_params( ok = emqx_rule_registry:add_action_instance_params(
#action_instance_params{id = ActionInstId, params = NewParams, apply = Apply}); #action_instance_params{id = ActionInstId, params = NewParams, apply = Apply});
@ -683,7 +715,7 @@ clear_resource(Module, Destroy, ResId, Type) ->
case emqx_rule_registry:find_resource_params(ResId) of case emqx_rule_registry:find_resource_params(ResId) of
{ok, #resource_params{params = Params}} -> {ok, #resource_params{params = Params}} ->
?RAISE(Module:Destroy(ResId, Params), ?RAISE(Module:Destroy(ResId, Params),
{{destroy_resource_failure, node()}, {{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}), {destroy_resource_failure, node(), Module, Destroy}),
ok = emqx_rule_registry:remove_resource_params(ResId); ok = emqx_rule_registry:remove_resource_params(ResId);
not_found -> not_found ->
ok ok
@ -711,8 +743,8 @@ clear_action(Module, Destroy, ActionInstId) ->
emqx_rule_metrics:clear_metrics(ActionInstId), emqx_rule_metrics:clear_metrics(ActionInstId),
case emqx_rule_registry:get_action_instance_params(ActionInstId) of case emqx_rule_registry:get_action_instance_params(ActionInstId) of
{ok, #action_instance_params{params = Params}} -> {ok, #action_instance_params{params = Params}} ->
?RAISE(Module:Destroy(ActionInstId, Params),{{destroy_action_failure, node()}, ?RAISE(Module:Destroy(ActionInstId, Params),
{{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}), {destroy_action_failure, node(), Module, Destroy}),
ok = emqx_rule_registry:remove_action_instance_params(ActionInstId); ok = emqx_rule_registry:remove_action_instance_params(ActionInstId);
not_found -> not_found ->
ok ok

View File

@ -28,8 +28,7 @@ start(_Type, _Args) ->
{ok, Sup} = emqx_rule_engine_sup:start_link(), {ok, Sup} = emqx_rule_engine_sup:start_link(),
_ = emqx_rule_engine_sup:start_locker(), _ = emqx_rule_engine_sup:start_locker(),
ok = emqx_rule_engine:load_providers(), ok = emqx_rule_engine:load_providers(),
ok = emqx_rule_engine:refresh_resources(), ok = emqx_rule_monitor:async_refresh_resources_rules(),
ok = emqx_rule_engine:refresh_rules(),
ok = emqx_rule_engine_cli:load(), ok = emqx_rule_engine_cli:load(),
{ok, Sup}. {ok, Sup}.

View File

@ -853,5 +853,9 @@ printable_maps(Headers) ->
value => Value value => Value
} || {Key, Value} <- V0] } || {Key, Value} <- V0]
}; };
(K, V0, AccIn) -> AccIn#{K => V0} (_K, V, AccIn) when is_tuple(V) ->
%% internal header
AccIn;
(K, V, AccIn) ->
AccIn#{K => V}
end, #{}, Headers). end, #{}, Headers).

View File

@ -31,6 +31,7 @@
-export([ start_link/0 -export([ start_link/0
, stop/0 , stop/0
, async_refresh_resources_rules/0
, ensure_resource_retrier/2 , ensure_resource_retrier/2
, retry_loop/3 , retry_loop/3
]). ]).
@ -45,12 +46,22 @@ init([]) ->
_ = erlang:process_flag(trap_exit, true), _ = erlang:process_flag(trap_exit, true),
{ok, #{retryers => #{}}}. {ok, #{retryers => #{}}}.
async_refresh_resources_rules() ->
gen_server:cast(?MODULE, async_refresh).
ensure_resource_retrier(ResId, Interval) -> ensure_resource_retrier(ResId, Interval) ->
gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).
handle_call(_Msg, _From, State) -> handle_call(_Msg, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast(async_refresh, #{boot_refresh_pid := Pid} = State) when is_pid(Pid) ->
%% the refresh task is already in progress, we discard the duplication
{noreply, State};
handle_cast(async_refresh, State) ->
Pid = spawn_link(fun do_async_refresh/0),
{noreply, State#{boot_refresh_pid => Pid}};
handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
Objects = maps:get(Tag, State, #{}), Objects = maps:get(Tag, State, #{}),
NewState = case maps:find(Obj, Objects) of NewState = case maps:find(Obj, Objects) of
@ -65,7 +76,13 @@ handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'EXIT', Pid, _Reason}, State = #{boot_refresh_pid := Pid}) ->
{noreply, State#{boot_refresh_pid => undefined}};
handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) -> handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) ->
%% We won't try to restart the 'retryers' event if the 'EXIT' Reason is not 'normal'.
%% Instead we rely on the user to trigger a manual retry for the resources, and then enable
%% the rules after resources are connected.
case maps:take(Pid, Retryers) of case maps:take(Pid, Retryers) of
{{Tag, Obj}, Retryers2} -> {{Tag, Obj}, Retryers2} ->
Objects = maps:get(Tag, State, #{}), Objects = maps:get(Tag, State, #{}),
@ -117,6 +134,12 @@ retry_loop(resource, ResId, Interval) ->
ok ok
end. end.
do_async_refresh() ->
%% NOTE: the order matters.
%% We should always refresh the resources first and then the rules.
ok = emqx_rule_engine:refresh_resources(),
ok = emqx_rule_engine:refresh_rules_when_boot().
refresh_and_enable_rules_of_resource(ResId) -> refresh_and_enable_rules_of_resource(ResId) ->
lists:foreach( lists:foreach(
fun (#rule{id = Id, enabled = false, state = refresh_failed_at_bootup} = Rule) -> fun (#rule{id = Id, enabled = false, state = refresh_failed_at_bootup} = Rule) ->

View File

@ -22,10 +22,16 @@ t_mod_hook_fun(_) ->
t_printable_maps(_) -> t_printable_maps(_) ->
Headers = #{peerhost => {127,0,0,1}, Headers = #{peerhost => {127,0,0,1},
peername => {{127,0,0,1}, 9980}, peername => {{127,0,0,1}, 9980},
sockname => {{127,0,0,1}, 1883} sockname => {{127,0,0,1}, 1883},
redispatch_to => {<<"group">>, <<"sub/topic/+">>},
shared_dispatch_ack => {self(), ref}
}, },
Converted = emqx_rule_events:printable_maps(Headers),
?assertMatch( ?assertMatch(
#{peerhost := <<"127.0.0.1">>, #{peerhost := <<"127.0.0.1">>,
peername := <<"127.0.0.1:9980">>, peername := <<"127.0.0.1:9980">>,
sockname := <<"127.0.0.1:1883">> sockname := <<"127.0.0.1:1883">>
}, emqx_rule_events:printable_maps(Headers)). }, Converted),
?assertNot(maps:is_key(redispatch_to, Converted)),
?assertNot(maps:is_key(shared_dispatch_ack, Converted)),
ok.

View File

@ -35,6 +35,7 @@ suite() ->
groups() -> groups() ->
[{resource, [sequence], [{resource, [sequence],
[ t_restart_resource [ t_restart_resource
, t_refresh_resources_rules
]} ]}
]. ].
@ -47,24 +48,53 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(t_restart_resource, Config) -> init_per_testcase(t_restart_resource, Config) ->
persistent_term:put({emqx_rule_engine, resource_restart_interval}, 100),
Opts = [public, named_table, set, {read_concurrency, true}], Opts = [public, named_table, set, {read_concurrency, true}],
_ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
ets:new(t_restart_resource, [named_table, public]), ets:new(t_restart_resource, [named_table, public]),
ets:insert(t_restart_resource, {failed_count, 0}), ets:insert(t_restart_resource, {failed_count, 0}),
ets:insert(t_restart_resource, {succ_count, 0}), ets:insert(t_restart_resource, {succ_count, 0}),
common_init_per_testcase(),
Config;
init_per_testcase(t_refresh_resources_rules, Config) ->
meck:unload(),
ets:new(t_refresh_resources_rules, [named_table, public]),
ok = meck:new(emqx_rule_engine, [no_link, passthrough]),
meck:expect(emqx_rule_engine, refresh_resources, fun() ->
timer:sleep(500),
ets:update_counter(t_refresh_resources_rules, refresh_resources, 1, {refresh_resources, 0}),
ok
end),
meck:expect(emqx_rule_engine, refresh_rules_when_boot, fun() ->
timer:sleep(500),
ets:update_counter(t_refresh_resources_rules, refresh_rules, 1, {refresh_rules, 0}),
ok
end),
common_init_per_testcase(),
Config; Config;
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
common_init_per_testcase(),
Config. Config.
end_per_testcase(t_restart_resource, Config) -> end_per_testcase(t_restart_resource, Config) ->
persistent_term:put({emqx_rule_engine, resource_restart_interval}, 60000),
ets:delete(t_restart_resource), ets:delete(t_restart_resource),
common_end_per_testcases(),
Config;
end_per_testcase(t_refresh_resources_rules, Config) ->
meck:unload(),
common_end_per_testcases(),
Config; Config;
end_per_testcase(_, Config) -> end_per_testcase(_, Config) ->
common_end_per_testcases(),
Config. Config.
common_init_per_testcase() ->
{ok, _} = emqx_rule_monitor:start_link().
common_end_per_testcases() ->
emqx_rule_monitor:stop().
t_restart_resource(_) -> t_restart_resource(_) ->
{ok, _} = emqx_rule_monitor:start_link(),
ok = emqx_rule_registry:register_resource_types( ok = emqx_rule_registry:register_resource_types(
[#resource_type{ [#resource_type{
name = test_res_1, name = test_res_1,
@ -79,11 +109,12 @@ t_restart_resource(_) ->
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
#{type => test_res_1, #{type => test_res_1,
config => #{}, config => #{},
restart_interval => 100,
description => <<"debug resource">>}), description => <<"debug resource">>}),
[{_, 1}] = ets:lookup(t_restart_resource, failed_count), ?assertMatch([{_, 0}], ets:lookup(t_restart_resource, succ_count)),
[{_, 0}] = ets:lookup(t_restart_resource, succ_count), ?assertMatch([{_, N}] when N == 1 orelse N == 2 orelse N == 3,
ets:lookup(t_restart_resource, failed_count)),
ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]), ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]),
emqx_rule_monitor:ensure_resource_retrier(ResId, 100),
timer:sleep(1000), timer:sleep(1000),
[{_, 5}] = ets:lookup(t_restart_resource, failed_count), [{_, 5}] = ets:lookup(t_restart_resource, failed_count),
[{_, 1}] = ets:lookup(t_restart_resource, succ_count), [{_, 1}] = ets:lookup(t_restart_resource, succ_count),
@ -91,9 +122,21 @@ t_restart_resource(_) ->
?assertEqual(0, map_size(Pids)), ?assertEqual(0, map_size(Pids)),
ok = emqx_rule_engine:unload_providers(), ok = emqx_rule_engine:unload_providers(),
emqx_rule_registry:remove_resource(ResId), emqx_rule_registry:remove_resource(ResId),
emqx_rule_monitor:stop(),
ok. ok.
t_refresh_resources_rules(_) ->
ok = emqx_rule_monitor:async_refresh_resources_rules(),
ok = emqx_rule_monitor:async_refresh_resources_rules(),
%% there should be only one refresh handler at the same time
?assertMatch(#{boot_refresh_pid := Pid} when is_pid(Pid), sys:get_state(whereis(emqx_rule_monitor))),
timer:sleep(1200),
?assertEqual([{refresh_resources, 1}], ets:lookup(t_refresh_resources_rules, refresh_resources)),
?assertEqual([{refresh_rules, 1}], ets:lookup(t_refresh_resources_rules, refresh_rules)),
ok = emqx_rule_monitor:async_refresh_resources_rules(),
timer:sleep(1200),
?assertEqual([{refresh_resources, 2}], ets:lookup(t_refresh_resources_rules, refresh_resources)),
?assertEqual([{refresh_rules, 2}], ets:lookup(t_refresh_resources_rules, refresh_rules)).
on_resource_create(Id, _) -> on_resource_create(Id, _) ->
case ets:lookup(t_restart_resource, failed_count) of case ets:lookup(t_restart_resource, failed_count) of
[{_, 5}] -> [{_, 5}] ->

View File

@ -1,6 +1,6 @@
{application, emqx_web_hook, {application, emqx_web_hook,
[{description, "EMQ X WebHook Plugin"}, [{description, "EMQ X WebHook Plugin"},
{vsn, "4.3.14"}, % strict semver, bump manually! {vsn, "4.3.15"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_web_hook_sup]}, {registered, [emqx_web_hook_sup]},
{applications, [kernel,stdlib,ehttpc]}, {applications, [kernel,stdlib,ehttpc]},

View File

@ -1,7 +1,8 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{<<"4\\.3\\.[0-7]">>, [{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-7]">>,
[{apply,{application,stop,[emqx_web_hook]}}, [{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
@ -24,7 +25,8 @@
{<<"4\\.3\\.1[2-3]">>, {<<"4\\.3\\.1[2-3]">>,
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[0-7]">>, [{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-7]">>,
[{apply,{application,stop,[emqx_web_hook]}}, [{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},

View File

@ -384,7 +384,7 @@ test_http_connect(Conf) ->
false false
catch catch
Err:Reason:ST -> Err:Reason:ST ->
?LOG(error, "check http_connectivity failed: ~p, ~0p", [Conf, {Err, Reason, ST}]), ?LOG_SENSITIVE(error, "check http_connectivity failed: ~p, ~0p", [Conf, {Err, Reason, ST}]),
false false
end. end.
l2b(L) when is_list(L) -> iolist_to_binary(L); l2b(L) when is_list(L) -> iolist_to_binary(L);

View File

@ -1,14 +1,26 @@
### Enhancements # v4.3.22
## Enhancements
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
This is to avoid slowing down the boot if some resources spend long time establishing the connection.
- Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124). - Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124).
This is to make the ACL deny logging for subscription behave the same as for publish. This is to make the ACL deny logging for subscription behave the same as for publish.
### Bug fixes - JWT ACL claim supports `all` action to imply the rules applie to both `pub` and `sub` [#9044](https://github.com/emqx/emqx/pull/9044).
- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places. [#9185](https://github.com/emqx/emqx/pull/9185) - Added a log censor to avoid logging sensitive data [#9189](https://github.com/emqx/emqx/pull/9189).
If the data to be logged is a map or key-value list which contains sensitive key words such as `password`, the value is obfuscated as `******`.
## Bug fixes
- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places [#9185](https://github.com/emqx/emqx/pull/9185).
This is to avoid displaying floats like `0.30000000000000004` on the dashboard. This is to avoid displaying floats like `0.30000000000000004` on the dashboard.
- Fix the issue that emqx prints too many error logs when connecting to mongodb but auth failed. [#9184](https://github.com/emqx/emqx/pull/9184) - Fix the issue that emqx prints too many error logs when connecting to mongodb but auth failed [#9184](https://github.com/emqx/emqx/pull/9184).
- Fix that after receiving publish in `idle mode` the emqx-sn gateway may panic [#9024](https://github.com/emqx/emqx/pull/9024). - Fix that after receiving publish in `idle mode` the emqx-sn gateway may panic [#9024](https://github.com/emqx/emqx/pull/9024).
@ -16,5 +28,10 @@
- Restore old `emqx_auth_jwt` module API, so the hook callback functions registered in older version will not be invalidated after hot-upgrade [#9144](https://github.com/emqx/emqx/pull/9144). - Restore old `emqx_auth_jwt` module API, so the hook callback functions registered in older version will not be invalidated after hot-upgrade [#9144](https://github.com/emqx/emqx/pull/9144).
- Fixed the response status code for the `/status` endpoint [#9210](https://github.com/emqx/emqx/pull/9210). - Fixed the response status code for the `/status` endpoint [#9210](https://github.com/emqx/emqx/pull/9210).
Before the fix, it always returned `200` even if the EMQX application was not running. Now it returns `503` in that case. Before the fix, it always returned `200` even if the EMQX application was not running. Now it returns `503` in that case.
- Fix message delivery related event encoding [#9226](https://github.com/emqx/emqx/pull/9226)
For rule-engine's input events like `$events/message_delivered`, and `$events/message_dropped`,
if the message was delivered to a shared-subscription, the encoding (to JSON) of the event will fail.
Affected versions: `v4.3.21`, `v4.4.10`, `e4.3.16` and `e4.4.10`.

View File

@ -1,14 +1,26 @@
### 增强 # v4.3.22
## 增强
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。
- 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。 - 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。
该行为的改变主要是为了跟发布失败时的行为保持一致。 该行为的改变主要是为了跟发布失败时的行为保持一致。
### 修复 - 基于 JWT 的 ACL 支持 `all` 动作,指定同时适用于 `pub``sub` 两个动作的规则列表 [#9044](https://github.com/emqx/emqx/pull/9044)。
- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185) - 增强包含敏感数据的日志的安全性 [#9189](https://github.com/emqx/emqx/pull/9189)。
如果日志中包含敏感关键词,例如 `password`,那么关联的数据回被模糊化处理,替换成 `******`
## 修复
- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)。
避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。 避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。
- 修复在尝试连接 MongoDB 数据库过程中,如果认证失败会不停打印错误日志的问题。[#9184](https://github.com/emqx/emqx/pull/9184) - 修复在尝试连接 MongoDB 数据库过程中,如果认证失败会不停打印错误日志的问题 [#9184](https://github.com/emqx/emqx/pull/9184)。
- 修复 emqx-sn 插件在“空闲”状态下收到消息发布请求时可能崩溃的情况 [#9024](https://github.com/emqx/emqx/pull/9024)。 - 修复 emqx-sn 插件在“空闲”状态下收到消息发布请求时可能崩溃的情况 [#9024](https://github.com/emqx/emqx/pull/9024)。
@ -18,3 +30,8 @@
- 修正了 `/status` API 的响应状态代码 [#9210](https://github.com/emqx/emqx/pull/9210)。 - 修正了 `/status` API 的响应状态代码 [#9210](https://github.com/emqx/emqx/pull/9210)。
在修复之前,它总是返回 `200`,即使 EMQX 应用程序没有运行。 现在它在这种情况下返回 `503` 在修复之前,它总是返回 `200`,即使 EMQX 应用程序没有运行。 现在它在这种情况下返回 `503`
- 修复规则引擎的消息事件编码失败 [#9226](https://github.com/emqx/emqx/pull/9226)。
带消息的规则引擎事件,例如 `$events/message_delivered``$events/message_dropped`,
如果消息事件是共享订阅产生的,在编码(到 JSON 格式)过程中会失败。
影响到的版本:`v4.3.21`, `v4.4.10`, `e4.3.16``e4.4.10`

View File

@ -48,3 +48,13 @@
line => ?LINE})) line => ?LINE}))
end). end).
%% Copy-paste to avoid changing the old macro which may cause beam md5 changes in a lot of modules
%% i.e. hot-upgrade hell
-define(LOG_SENSITIVE(Level, Format, Args),
begin
(logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), emqx_misc:redact(Args)} end,
mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY},
line => ?LINE,
is_sensitive => true
}))
end).

View File

@ -217,6 +217,7 @@ json_key(Term) ->
throw({badkey, Term}) throw({badkey, Term})
end. end.
-ifdef(TEST). -ifdef(TEST).
-include_lib("proper/include/proper.hrl"). -include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").

View File

@ -66,6 +66,8 @@
nolink_apply/2 nolink_apply/2
]). ]).
-export([redact/1]).
-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$"). -define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$").
-define(DEFAULT_PMAP_TIMEOUT, 5000). -define(DEFAULT_PMAP_TIMEOUT, 5000).
@ -468,6 +470,67 @@ maybe_mute_rpc_log(Node) ->
ok ok
end. end.
is_sensitive_key(token) -> true;
is_sensitive_key("token") -> true;
is_sensitive_key(<<"token">>) -> true;
is_sensitive_key(password) -> true;
is_sensitive_key("password") -> true;
is_sensitive_key(<<"password">>) -> true;
is_sensitive_key(secret) -> true;
is_sensitive_key("secret") -> true;
is_sensitive_key(<<"secret">>) -> true;
is_sensitive_key(passcode) -> true;
is_sensitive_key("passcode") -> true;
is_sensitive_key(<<"passcode">>) -> true;
is_sensitive_key(passphrase) -> true;
is_sensitive_key("passphrase") -> true;
is_sensitive_key(<<"passphrase">>) -> true;
is_sensitive_key(key) -> true;
is_sensitive_key("key") -> true;
is_sensitive_key(<<"key">>) -> true;
is_sensitive_key(aws_secret_access_key) -> true;
is_sensitive_key("aws_secret_access_key") -> true;
is_sensitive_key(<<"aws_secret_access_key">>) -> true;
is_sensitive_key(secret_key) -> true;
is_sensitive_key("secret_key") -> true;
is_sensitive_key(<<"secret_key">>) -> true;
is_sensitive_key(bind_password) -> true;
is_sensitive_key("bind_password") -> true;
is_sensitive_key(<<"bind_password">>) -> true;
is_sensitive_key(_) -> false.
redact(L) when is_list(L) ->
lists:map(fun redact/1, L);
redact(M) when is_map(M) ->
maps:map(fun(K, V) ->
redact(K, V)
end, M);
redact({Key, Value}) ->
case is_sensitive_key(Key) of
true ->
{Key, redact_v(Value)};
false ->
{redact(Key), redact(Value)}
end;
redact(T) when is_tuple(T) ->
Elements = erlang:tuple_to_list(T),
Redact = redact(Elements),
erlang:list_to_tuple(Redact);
redact(Any) ->
Any.
redact(K, V) ->
case is_sensitive_key(K) of
true ->
redact_v(V);
false ->
redact(V)
end.
-define(REDACT_VAL, "******").
redact_v(V) when is_binary(V) -> <<?REDACT_VAL>>;
redact_v(_V) -> ?REDACT_VAL.
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -500,4 +563,53 @@ is_sane_id_test() ->
?assertMatch({error, _}, is_sane_id(list_to_binary(Bad))), ?assertMatch({error, _}, is_sane_id(list_to_binary(Bad))),
ok. ok.
redact_test_() ->
Case = fun(Type, KeyT) ->
Key =
case Type of
atom -> KeyT;
string -> erlang:atom_to_list(KeyT);
binary -> erlang:atom_to_binary(KeyT)
end,
?assert(is_sensitive_key(Key)),
%% direct
?assertEqual({Key, ?REDACT_VAL}, redact({Key, foo})),
?assertEqual(#{Key => ?REDACT_VAL}, redact(#{Key => foo})),
?assertEqual({Key, Key, Key}, redact({Key, Key, Key})),
?assertEqual({[{Key, ?REDACT_VAL}], bar}, redact({[{Key, foo}], bar})),
%% 1 level nested
?assertEqual([{Key, ?REDACT_VAL}], redact([{Key, foo}])),
?assertEqual([#{Key => ?REDACT_VAL}], redact([#{Key => foo}])),
%% 2 level nested
?assertEqual(#{opts => [{Key, ?REDACT_VAL}]}, redact(#{opts => [{Key, foo}]})),
?assertEqual(#{opts => #{Key => ?REDACT_VAL}}, redact(#{opts => #{Key => foo}})),
?assertEqual({opts, [{Key, ?REDACT_VAL}]}, redact({opts, [{Key, foo}]})),
%% 3 level nested
?assertEqual([#{opts => [{Key, ?REDACT_VAL}]}], redact([#{opts => [{Key, foo}]}])),
?assertEqual([{opts, [{Key, ?REDACT_VAL}]}], redact([{opts, [{Key, foo}]}])),
?assertEqual([{opts, [#{Key => ?REDACT_VAL}]}], redact([{opts, [#{Key => foo}]}]))
end,
Types = [atom, string, binary],
Keys = [
token,
password,
secret,
passcode,
passphrase,
key,
aws_secret_access_key,
secret_key,
bind_password
],
[{case_name(Type, Key), fun() -> Case(Type, Key) end} || Key <- Keys, Type <- Types].
case_name(Type, Key) ->
lists:concat([Type, "-", Key]).
-endif. -endif.