Merge remote-tracking branch 'origin/release-v43' into release-v44

This commit is contained in:
Zaiming (Stone) Shi 2022-10-28 14:35:56 +02:00
commit 4a020d0e1a
29 changed files with 406 additions and 90 deletions

View File

@ -46,13 +46,13 @@ check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl := ACLParams = #{path :=
{ok, 200, <<"ignore">>} -> ok; {ok, 200, <<"ignore">>} -> ok;
{ok, 200, _Body} -> {stop, allow}; {ok, 200, _Body} -> {stop, allow};
{ok, Code, _Body} -> {ok, Code, _Body} ->
?LOG(error, "Deny ~s to topic ~ts, username: ~ts, http response code: ~p", ?LOG(warning, "Deny ~s to topic ~ts, username: ~ts, http response code: ~p",
[PubSub, Topic, Username, Code]), [PubSub, Topic, Username, Code]),
{stop, deny}; {stop, deny};
{error, Error} -> {error, Error} ->
?LOG(error, "Deny ~s to topic ~ts, username: ~ts, due to request " ?LOG(warning, "Deny ~s to topic ~ts, username: ~ts, due to request "
"http server failure, path: ~p, error: ~0p", "http server failure, path: ~p, error: ~0p",
[PubSub, Topic, Username, Path, Error]), [PubSub, Topic, Username, Path, Error]),
ok ok
end. end.

View File

@ -47,15 +47,14 @@ check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path},
anonymous => false, anonymous => false,
mountpoint => mountpoint(Body, ClientInfo)}}; mountpoint => mountpoint(Body, ClientInfo)}};
{ok, Code, _Body} -> {ok, Code, _Body} ->
?LOG(error, "Deny connection from path: ~s, username: ~ts, http " ?LOG(warning, "Deny connection from path: ~s, username: ~ts, http "
"response code: ~p", "response code: ~p",
[Path, Username, Code]), [Path, Username, Code]),
{stop, AuthResult#{auth_result => http_to_connack_error(Code), {stop, AuthResult#{auth_result => http_to_connack_error(Code),
anonymous => false}}; anonymous => false}};
{error, Error} -> {error, Error} ->
?LOG(error, "Deny connection from path: ~s, username: ~ts, due to " ?LOG_SENSITIVE(warning, "Deny connection from path: ~s, username: ~ts, due to "
"request http-server failed: ~0p", "request http-server failed: ~0p", [Path, Username, Error]),
[Path, Username, Error]),
%%FIXME later: server_unavailable is not right. %%FIXME later: server_unavailable is not right.
{stop, AuthResult#{auth_result => server_unavailable, {stop, AuthResult#{auth_result => server_unavailable,
anonymous => false}} anonymous => false}}
@ -89,10 +88,13 @@ is_superuser(SuperParams =
timeout := Timeout}, ClientInfo) -> timeout := Timeout}, ClientInfo) ->
Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES), Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES),
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of
{ok, 200, _Body} -> true; {ok, 200, _Body} ->
{ok, _Code, _Body} -> false; true;
{error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]), {ok, _Code, _Body} ->
false false;
{error, Error} ->
?LOG_SENSITIVE(warning, "Request superuser path ~s, error: ~p", [Path, Error]),
false
end. end.
mountpoint(Body, #{mountpoint := Mountpoint}) -> mountpoint(Body, #{mountpoint := Mountpoint}) ->

View File

@ -27,7 +27,7 @@
%% APIs %% APIs
-export([start_link/1]). -export([start_link/1]).
-export([verify/1]). -export([verify/1, trace/2]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([ init/1
@ -143,7 +143,8 @@ request_jwks(Addr) ->
?tp(debug, emqx_auth_jwt_svr_jwks_updated, #{jwks => Jwks, pid => self()}), ?tp(debug, emqx_auth_jwt_svr_jwks_updated, #{jwks => Jwks, pid => self()}),
Jwks Jwks
catch _:_ -> catch _:_ ->
?LOG(error, "Invalid jwks server response: ~p~n", [Body]), ?MODULE:trace(jwks_server_reesponse, Body),
?LOG(error, "Invalid jwks server response, body is not logged for security reasons, trace it if inspection is required", []),
error(badarg) error(badarg)
end end
end. end.
@ -174,7 +175,7 @@ do_verify(JwsCompacted) ->
end end
catch catch
Class : Reason : Stk -> Class : Reason : Stk ->
?LOG(error, "verify JWK crashed: ~p, ~p, stacktrace: ~p~n", ?LOG_SENSITIVE(error, "verify JWK crashed: ~p, ~p, stacktrace: ~p~n",
[Class, Reason, Stk]), [Class, Reason, Stk]),
{error, invalid_signature} {error, invalid_signature}
end. end.
@ -249,13 +250,15 @@ key2jwt_value(Key, Func, Options) ->
V -> V ->
try Func(V) of try Func(V) of
{error, Reason} -> {error, Reason} ->
?LOG(warning, "Build ~p JWK ~p failed: {error, ~p}~n", ?LOG_SENSITIVE(warning, "Build ~p JWK ~p failed: {error, ~p}~n",
[Key, V, Reason]), [Key, V, Reason]),
undefined; undefined;
J -> J J -> J
catch T:R -> catch T:R ->
?LOG(warning, "Build ~p JWK ~p failed: {~p, ~p}~n", ?LOG_SENSITIVE(warning, "Build ~p JWK ~p failed: {~p, ~p}~n",
[Key, V, T, R]), [Key, V, T, R]),
undefined undefined
end end
end. end.
trace(_Tag, _Data) -> ok.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_ldap, {application, emqx_auth_ldap,
[{description, "EMQ X Authentication/ACL with LDAP"}, [{description, "EMQ X Authentication/ACL with LDAP"},
{vsn, "4.3.5"}, % strict semver, bump manually! {vsn, "4.3.6"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_ldap_sup]}, {registered, [emqx_auth_ldap_sup]},
{applications, [kernel,stdlib,eldap2,ecpool]}, {applications, [kernel,stdlib,eldap2,ecpool]},

View File

@ -1,11 +1,16 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{<<"4\\.3\\.[3-4]">>, [{"4.3.5",
[{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
[{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>, {<<"4\\.3\\.[0-1]">>,
@ -14,11 +19,16 @@
{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[3-4]">>, [{"4.3.5",
[{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
[{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_ldap_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_ldap,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_ldap,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>, {<<"4\\.3\\.[0-1]">>,

View File

@ -62,7 +62,7 @@ check(ClientInfo = #{username := Username, password := Password}, AuthResult,
{error, not_found} -> {error, not_found} ->
ok; ok;
{error, ResultCode} -> {error, ResultCode} ->
?LOG(error, "[LDAP] Auth from ldap failed: ~p", [ResultCode]), ?LOG_SENSITIVE(error, "[LDAP] Auth from ldap failed: ~p", [ResultCode]),
{stop, AuthResult#{auth_result => ResultCode, anonymous => false}} {stop, AuthResult#{auth_result => ResultCode, anonymous => false}}
end. end.

View File

@ -54,22 +54,22 @@ connect(Opts) ->
false -> false ->
[{port, Port}, {timeout, Timeout}] [{port, Port}, {timeout, Timeout}]
end, end,
?LOG(debug, "[LDAP] Connecting to OpenLDAP server: ~p, Opts:~p ...", [Servers, LdapOpts]), ?LOG_SENSITIVE(debug, "[LDAP] Connecting to OpenLDAP server: ~p, Opts:~p ...", [Servers, LdapOpts]),
case eldap2:open(Servers, LdapOpts) of case eldap2:open(Servers, LdapOpts) of
{ok, LDAP} -> {ok, LDAP} ->
try eldap2:simple_bind(LDAP, BindDn, BindPassword) of try eldap2:simple_bind(LDAP, BindDn, BindPassword) of
ok -> {ok, LDAP}; ok -> {ok, LDAP};
{error, Error} -> {error, Error} ->
?LOG(error, "[LDAP] Can't authenticated to OpenLDAP server: ~p", [Error]), ?LOG_SENSITIVE(error, "[LDAP] Can't authenticated to OpenLDAP server: ~p", [Error]),
{error, Error} {error, Error}
catch catch
error:Reason -> error:Reason ->
?LOG(error, "[LDAP] Can't authenticated to OpenLDAP server: ~p", [Reason]), ?LOG_SENSITIVE(error, "[LDAP] Can't authenticated to OpenLDAP server: ~p", [Reason]),
{error, Reason} {error, Reason}
end; end;
{error, Reason} -> {error, Reason} ->
?LOG(error, "[LDAP] Can't connect to OpenLDAP server: ~p", [Reason]), ?LOG_SENSITIVE(error, "[LDAP] Can't connect to OpenLDAP server: ~p", [Reason]),
{error, Reason} {error, Reason}
end. end.
@ -147,4 +147,3 @@ init_args(ENVS) ->
match_objectclass => ObjectClass, match_objectclass => ObjectClass,
username_attr => UidAttr, username_attr => UidAttr,
password_attr => PasswdAttr}}. password_attr => PasswdAttr}}.

View File

@ -55,7 +55,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
undefined -> ok; undefined -> ok;
{error, Reason} -> {error, Reason} ->
?tp(emqx_auth_mongo_check_authn_error, #{error => Reason}), ?tp(emqx_auth_mongo_check_authn_error, #{error => Reason}),
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]), ?LOG_SENSITIVE(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
{stop, AuthResult#{auth_result => not_authorized, anonymous => false}}; {stop, AuthResult#{auth_result => not_authorized, anonymous => false}};
UserMap -> UserMap ->
Result = case [maps:get(Field, UserMap, undefined) || Field <- Fields] of Result = case [maps:get(Field, UserMap, undefined) || Field <- Fields] of
@ -72,7 +72,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
anonymous => false, anonymous => false,
auth_result => success}}; auth_result => success}};
{error, Error} -> {error, Error} ->
?LOG(error, "[MongoDB] check auth fail: ~p", [Error]), ?LOG_SENSITIVE(error, "[MongoDB] check auth fail: ~p", [Error]),
{stop, AuthResult#{auth_result => Error, anonymous => false}} {stop, AuthResult#{auth_result => Error, anonymous => false}}
end end
end. end.
@ -99,7 +99,7 @@ is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Sele
false; false;
{error, Reason} -> {error, Reason} ->
?tp(emqx_auth_mongo_superuser_query_error, #{error => Reason}), ?tp(emqx_auth_mongo_superuser_query_error, #{error => Reason}),
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]), ?LOG_SENSITIVE(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
false; false;
Row -> Row ->
case maps:get(Field, Row, false) of case maps:get(Field, Row, false) of

View File

@ -1,6 +1,6 @@
{application, emqx_auth_mysql, {application, emqx_auth_mysql,
[{description, "EMQ X Authentication/ACL with MySQL"}, [{description, "EMQ X Authentication/ACL with MySQL"},
{vsn, "4.3.3"}, % strict semver, bump manually! {vsn, "4.3.4"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_mysql_sup]}, {registered, [emqx_auth_mysql_sup]},
{applications, [kernel,stdlib,mysql,ecpool]}, {applications, [kernel,stdlib,mysql,ecpool]},

View File

@ -1,18 +1,28 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[{<<"4\\.3\\.[1-2]">>, [{"4.3.3",
[{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_mysql,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_mysql,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[1-2]">>, [{"4.3.3",
[{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_mysql_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_mysql,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_mysql,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_mysql,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}] {<<".*">>,[]}]

View File

@ -41,7 +41,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{ok, _Columns, []} -> {ok, _Columns, []} ->
{error, not_found}; {error, not_found};
{error, Reason} -> {error, Reason} ->
?LOG(error, "[MySQL] query '~p' failed: ~p", [AuthSql, Reason]), ?LOG_SENSITIVE(error, "[MySQL] query '~p' failed: ~p", [AuthSql, Reason]),
{error, Reason} {error, Reason}
end, end,
case CheckPass of case CheckPass of
@ -52,7 +52,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{error, not_found} -> {error, not_found} ->
ok; ok;
{error, ResultCode} -> {error, ResultCode} ->
?LOG(error, "[MySQL] Auth from mysql failed: ~p", [ResultCode]), ?LOG_SENSITIVE(error, "[MySQL] Auth from mysql failed: ~p", [ResultCode]),
{stop, AuthResult#{auth_result => ResultCode, anonymous => false}} {stop, AuthResult#{auth_result => ResultCode, anonymous => false}}
end. end.

View File

@ -54,10 +54,10 @@ connect(Options) ->
?LOG(error, "[MySQL] Can't connect to MySQL server: Connection refused."), ?LOG(error, "[MySQL] Can't connect to MySQL server: Connection refused."),
{error, Reason}; {error, Reason};
{error, Reason = {ErrorCode, _, Error}} -> {error, Reason = {ErrorCode, _, Error}} ->
?LOG(error, "[MySQL] Can't connect to MySQL server: ~p - ~p", [ErrorCode, Error]), ?LOG_SENSITIVE(error, "[MySQL] Can't connect to MySQL server: ~p - ~p", [ErrorCode, Error]),
{error, Reason}; {error, Reason};
{error, Reason} -> {error, Reason} ->
?LOG(error, "[MySQL] Can't connect to MySQL server: ~p", [Reason]), ?LOG_SENSITIVE(error, "[MySQL] Can't connect to MySQL server: ~p", [Reason]),
{error, Reason} {error, Reason}
end. end.

View File

@ -40,7 +40,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{ok, _, []} -> {ok, _, []} ->
{error, not_found}; {error, not_found};
{error, Reason} -> {error, Reason} ->
?LOG(error, "[Postgres] query '~p' failed: ~p", [AuthSql, Reason]), ?LOG_SENSITIVE(error, "[Postgres] query '~p' failed: ~p", [AuthSql, Reason]),
{error, not_found} {error, not_found}
end, end,
case CheckPass of case CheckPass of
@ -51,7 +51,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{error, not_found} -> {error, not_found} ->
ok; ok;
{error, ResultCode} -> {error, ResultCode} ->
?LOG(error, "[Postgres] Auth from pgsql failed: ~p", [ResultCode]), ?LOG_SENSITIVE(error, "[Postgres] Auth from pgsql failed: ~p", [ResultCode]),
{stop, AuthResult#{auth_result => ResultCode, anonymous => false}} {stop, AuthResult#{auth_result => ResultCode, anonymous => false}}
end. end.

View File

@ -82,7 +82,7 @@ connect(Opts) ->
?LOG(error, "[Postgres] Can't connect to Postgres server: Invalid password."), ?LOG(error, "[Postgres] Can't connect to Postgres server: Invalid password."),
{error, Reason}; {error, Reason};
{error, Reason} -> {error, Reason} ->
?LOG(error, "[Postgres] Can't connect to Postgres server: ~p", [Reason]), ?LOG_SENSITIVE(error, "[Postgres] Can't connect to Postgres server: ~p", [Reason]),
{error, Reason} {error, Reason}
end. end.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_redis, {application, emqx_auth_redis,
[{description, "EMQ X Authentication/ACL with Redis"}, [{description, "EMQ X Authentication/ACL with Redis"},
{vsn, "4.3.3"}, % strict semver, bump manually! {vsn, "4.3.4"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_redis_sup]}, {registered, [emqx_auth_redis_sup]},
{applications, [kernel,stdlib,eredis,eredis_cluster,ecpool]}, {applications, [kernel,stdlib,eredis,eredis_cluster,ecpool]},

View File

@ -1,19 +1,29 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{<<"4\\.3\\.[1-2]">>, [{"4.3.3",
[{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_redis,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_redis,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_redis,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_redis,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[1-2]">>, [{"4.3.3",
[{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_redis,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_redis_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_redis,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_redis,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_redis,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_redis,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}] {<<".*">>,[]}]}.
}.

View File

@ -42,7 +42,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{ok, [PassHash, Salt|_]} -> {ok, [PassHash, Salt|_]} ->
check_pass({PassHash, Salt, Password}, HashType); check_pass({PassHash, Salt, Password}, HashType);
{error, Reason} -> {error, Reason} ->
?LOG(error, "[Redis] Command: ~p failed: ~p", [AuthCmd, Reason]), ?LOG_SENSITIVE(error, "[Redis] Command: ~p failed: ~p", [AuthCmd, Reason]),
{error, not_found} {error, not_found}
end, end,
case CheckPass of case CheckPass of
@ -54,7 +54,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
{error, not_found} -> {error, not_found} ->
ok; ok;
{error, ResultCode} -> {error, ResultCode} ->
?LOG(error, "[Redis] Auth from redis failed: ~p", [ResultCode]), ?LOG_SENSITIVE(error, "[Redis] Auth from redis failed: ~p", [ResultCode]),
{stop, AuthResult#{auth_result => ResultCode, anonymous => false}} {stop, AuthResult#{auth_result => ResultCode, anonymous => false}}
end. end.

View File

@ -56,7 +56,7 @@ connect(Opts) ->
?LOG(error, "[Redis] Can't connect to Redis server: Authentication failed."), ?LOG(error, "[Redis] Can't connect to Redis server: Authentication failed."),
{error, Reason}; {error, Reason};
{error, Reason} -> {error, Reason} ->
?LOG(error, "[Redis] Can't connect to Redis server: ~p", [Reason]), ?LOG_SENSITIVE(error, "[Redis] Can't connect to Redis server: ~p", [Reason]),
{error, Reason} {error, Reason}
end. end.
@ -86,4 +86,3 @@ repl(S, _Var, undefined) ->
repl(S, Var, Val) -> repl(S, Var, Val) ->
NVal = re:replace(Val, "&", "\\\\&", [global, {return, list}]), NVal = re:replace(Val, "&", "\\\\&", [global, {return, list}]),
re:replace(S, Var, NVal, [{return, list}]). re:replace(S, Var, NVal, [{return, list}]).

View File

@ -250,7 +250,8 @@ do_create_rule2(ParsedParams) ->
return({error, 400, ?ERR_BADARGS(Reason)}) return({error, 400, ?ERR_BADARGS(Reason)})
end. end.
update_rule(#{id := Id}, Params) -> update_rule(#{id := Id0}, Params) ->
Id = urldecode(Id0),
case parse_rule_params(Params, #{id => Id}) of case parse_rule_params(Params, #{id => Id}) of
{ok, ParsedParams} -> {ok, ParsedParams} ->
case emqx_rule_engine:update_rule(ParsedParams) of case emqx_rule_engine:update_rule(ParsedParams) of
@ -275,16 +276,21 @@ list_rules(_Bindings, Params) ->
return_all(emqx_rule_registry:get_rules_ordered_by_ts()) return_all(emqx_rule_registry:get_rules_ordered_by_ts())
end. end.
show_rule(#{id := Id}, _Params) -> show_rule(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
reply_with(fun emqx_rule_registry:get_rule/1, Id). reply_with(fun emqx_rule_registry:get_rule/1, Id).
delete_rule(#{id := Id}, _Params) -> delete_rule(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
ok = emqx_rule_engine:delete_rule(Id), ok = emqx_rule_engine:delete_rule(Id),
return(ok). return(ok).
reset_metrics_local(Id) -> emqx_rule_metrics:reset_metrics(Id). reset_metrics_local(Id0) ->
Id = urldecode(Id0),
emqx_rule_metrics:reset_metrics(Id).
reset_metrics(#{id := Id}, _Params) -> reset_metrics(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
_ = ?CLUSTER_CALL(reset_metrics_local, [Id]), _ = ?CLUSTER_CALL(reset_metrics_local, [Id]),
return(ok). return(ok).
@ -350,7 +356,8 @@ list_resources(#{}, _Params) ->
list_resources_by_type(#{type := Type}, _Params) -> list_resources_by_type(#{type := Type}, _Params) ->
return_all(emqx_rule_registry:get_resources_by_type(Type)). return_all(emqx_rule_registry:get_resources_by_type(Type)).
show_resource(#{id := Id}, _Params) -> show_resource(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
case emqx_rule_registry:find_resource(Id) of case emqx_rule_registry:find_resource(Id) of
{ok, R} -> {ok, R} ->
StatusFun = StatusFun =
@ -366,7 +373,8 @@ show_resource(#{id := Id}, _Params) ->
return({error, 404, <<"Not Found">>}) return({error, 404, <<"Not Found">>})
end. end.
get_resource_status(#{id := Id}, _Params) -> get_resource_status(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
case emqx_rule_engine:get_resource_status(Id) of case emqx_rule_engine:get_resource_status(Id) of
{ok, Status} -> {ok, Status} ->
return({ok, Status}); return({ok, Status});
@ -374,7 +382,8 @@ get_resource_status(#{id := Id}, _Params) ->
return({error, 400, ?ERR_NO_RESOURCE(Id)}) return({error, 400, ?ERR_NO_RESOURCE(Id)})
end. end.
start_resource(#{id := Id}, _Params) -> start_resource(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
case emqx_rule_engine:start_resource(Id) of case emqx_rule_engine:start_resource(Id) of
ok -> ok ->
return(ok); return(ok);
@ -385,7 +394,8 @@ start_resource(#{id := Id}, _Params) ->
return({error, 400, ?ERR_BADARGS(Reason)}) return({error, 400, ?ERR_BADARGS(Reason)})
end. end.
update_resource(#{id := Id}, NewParams) -> update_resource(#{id := Id0}, NewParams) ->
Id = urldecode(Id0),
P1 = case proplists:get_value(<<"description">>, NewParams) of P1 = case proplists:get_value(<<"description">>, NewParams) of
undefined -> #{}; undefined -> #{};
Value -> #{<<"description">> => Value} Value -> #{<<"description">> => Value}
@ -409,7 +419,8 @@ update_resource(#{id := Id}, NewParams) ->
return({error, 400, ?ERR_BADARGS(Reason)}) return({error, 400, ?ERR_BADARGS(Reason)})
end. end.
delete_resource(#{id := Id}, _Params) -> delete_resource(#{id := Id0}, _Params) ->
Id = urldecode(Id0),
case emqx_rule_engine:delete_resource(Id) of case emqx_rule_engine:delete_resource(Id) of
ok -> return(ok); ok -> return(ok);
{error, not_found} -> return(ok); {error, not_found} -> return(ok);
@ -660,3 +671,6 @@ run_fuzzy_match(E = #rule{for = Topics}, [{for, like, Pattern}|Fuzzy]) ->
lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics) lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics)
andalso run_fuzzy_match(E, Fuzzy); andalso run_fuzzy_match(E, Fuzzy);
run_fuzzy_match(_E, [{_Key, like, _SubStr}| _Fuzzy]) -> false. run_fuzzy_match(_E, [{_Key, like, _SubStr}| _Fuzzy]) -> false.
urldecode(S) ->
emqx_http_lib:uri_decode(S).

View File

@ -34,6 +34,7 @@
, proc_sql/2 , proc_sql/2
, proc_sql_param_str/2 , proc_sql_param_str/2
, proc_cql_param_str/2 , proc_cql_param_str/2
, if_contains_placeholder/1
]). ]).
%% type converting %% type converting
@ -175,6 +176,14 @@ proc_param_str(Tokens, Data, Quote) ->
iolist_to_binary( iolist_to_binary(
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})). proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})).
%% return true if the Str contains any placeholder in "${var}" format.
-spec(if_contains_placeholder(string() | binary()) -> boolean()).
if_contains_placeholder(Str) ->
case re:split(Str, ?EX_PLACE_HOLDER, [{return, list}, group, trim]) of
[[_]] -> false;
_ -> true
end.
%% backward compatibility for hot upgrading from =< e4.2.1 %% backward compatibility for hot upgrading from =< e4.2.1
get_phld_var(Fun, Data) when is_function(Fun) -> get_phld_var(Fun, Data) when is_function(Fun) ->
Fun(Data); Fun(Data);

View File

@ -28,6 +28,14 @@
-include("emqx_rule_test.hrl"). -include("emqx_rule_test.hrl").
-import(emqx_rule_test_lib, [make_simple_resource_type/1]). -import(emqx_rule_test_lib, [make_simple_resource_type/1]).
%% API request funcs
-import(emqx_rule_test_lib,
[ request_api/4
, request_api/5
, auth_header_/0
, api_path/1
]).
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())). %%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
all() -> all() ->
@ -62,6 +70,7 @@ groups() ->
]}, ]},
{api, [], {api, [],
[t_crud_rule_api, [t_crud_rule_api,
t_rule_api_unicode_ids,
t_list_actions_api, t_list_actions_api,
t_show_action_api, t_show_action_api,
t_crud_resources_api, t_crud_resources_api,
@ -229,6 +238,10 @@ init_per_testcase(Test, Config)
{conn_event, TriggerConnEvent}, {conn_event, TriggerConnEvent},
{connsql, SQL} {connsql, SQL}
| Config]; | Config];
init_per_testcase(t_rule_api_unicode_ids, Config) ->
ok = emqx_dashboard_admin:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_management, emqx_dashboard]),
Config;
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
ok = emqx_rule_registry:register_resource_types( ok = emqx_rule_registry:register_resource_types(
[make_simple_debug_resource_type()]), [make_simple_debug_resource_type()]),
@ -251,6 +264,10 @@ end_per_testcase(Test, Config)
emqtt:stop(?config(subclient, Config)), emqtt:stop(?config(subclient, Config)),
emqtt:stop(?config(connclient, Config)), emqtt:stop(?config(connclient, Config)),
Config; Config;
end_per_testcase(t_rule_api_unicode_ids, _Config) ->
application:stop(emqx_dashboard),
application:stop(emqx_management),
ok;
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
ok. ok.
@ -525,6 +542,46 @@ t_crud_rule_api(_Config) ->
?assertMatch({ok, #{code := 404, message := _Message}}, NotFound), ?assertMatch({ok, #{code := 404, message := _Message}}, NotFound),
ok. ok.
-define(PRED(Elem), fun(Elem) -> true; (_) -> false end).
t_rule_api_unicode_ids(_Config) ->
UData =
fun(Description) ->
#{<<"name">> => <<"debug-rule">>,
<<"rawsql">> => <<"select * from \"t/a\"">>,
<<"actions">> => [#{<<"name">> => <<"do_nothing">>,
<<"params">> => []}
],
<<"description">> => Description}
end,
CData = fun(Id, Description) -> maps:put(<<"id">>, Id, UData(Description)) end,
CDes = <<"Creating rules description">>,
UDes = <<"Updating rules description">>,
%% create rule
CFun = fun(Id) -> {ok, Return} = request_api(post, api_path(["rules"]), [], auth_header_(), CData(Id, CDes)), Return end,
%% update rule
UFun = fun(Id) -> {ok, Return} = request_api(put, api_path(["rules", cow_uri:urlencode(Id)]), [], auth_header_(), UData(UDes)), Return end,
%% show rule
SFun = fun(Id) -> {ok, Return} = request_api(get, api_path(["rules", cow_uri:urlencode(Id)]), [], auth_header_()), Return end,
%% delete rule
DFun = fun(Id) -> {ok, Return} = request_api(delete, api_path(["rules", cow_uri:urlencode(Id)]), [], auth_header_()), Return end,
Ids = [unicode:characters_to_binary([Char]) || Char <- lists:seq(0, 1000) -- [46]] ++ [<<"%2e">>],
Ress = [begin
{?assertMatch(#{<<"code">> := 0, <<"data">> := #{<<"description">> := CDes}}, decode_to_map(CFun(Id))),
?assertMatch(#{<<"code">> := 0}, decode_to_map(UFun(Id))),
?assertMatch(#{<<"code">> := 0, <<"data">> := #{<<"description">> := UDes}}, decode_to_map(SFun(Id))),
?assertMatch(#{<<"code">> := 0}, decode_to_map(DFun(Id)))}
end || Id <- Ids],
?assertEqual(true, lists:all(?PRED(true), [?PRED({ok, ok, ok, ok})(Res) || Res <- Ress ])).
decode_to_map(ResponseBody) ->
jiffy:decode(list_to_binary(ResponseBody), [return_maps]).
t_list_rule_api(_Config) -> t_list_rule_api(_Config) ->
AddIds = AddIds =
lists:map(fun(Seq) -> lists:map(fun(Seq) ->

View File

@ -128,6 +128,67 @@ make_simple_resource_type(ResTypeName) ->
init_events_counters() -> init_events_counters() ->
ets:new(events_record_tab, [named_table, bag, public]). ets:new(events_record_tab, [named_table, bag, public]).
%%------------------------------------------------------------------------------
%% rule test helper funcs
%%------------------------------------------------------------------------------
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v4").
-define(BASE_PATH, "api").
request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []).
request_api(Method, Url, QueryParams, Auth) ->
request_api(Method, Url, QueryParams, Auth, []).
request_api(Method, Url, QueryParams, Auth, []) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
Headers = case Auth of
no_auth -> [];
Header -> [Header]
end,
do_request_api(Method, {NewUrl, Headers});
request_api(Method, Url, QueryParams, Auth, Body) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
Headers = case Auth of
no_auth -> [];
Header -> [Header]
end,
do_request_api(Method, {NewUrl, Headers, "application/json", emqx_json:encode(Body)}).
do_request_api(Method, Request)->
%% ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
when Code =:= 200 orelse Code =:= 201 ->
{ok, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
auth_header_() ->
AppId = <<"admin">>,
AppSecret = <<"public">>,
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
{"Authorization","Basic " ++ Encoded}.
api_path(Parts)->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal helper funcs %% Internal helper funcs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -134,3 +134,18 @@ t_preproc_sql5(_) ->
ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>, ?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)). emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)).
t_if_contains_placeholder(_) ->
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}${b}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a},${b},${c}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a},b:${b}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"abc${ab}">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc$">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${a">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${ab">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a${ab${c${e">>)),
ok.

View File

@ -22,7 +22,7 @@
{"4.3.11", {"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.1[2-3]">>, {<<"4\\.3\\.1[2-4]">>,
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, [{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
@ -46,6 +46,6 @@
{"4.3.11", {"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.1[2-3]">>, {<<"4\\.3\\.1[2-4]">>,
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -26,6 +26,13 @@
, on_action_data_to_webserver/2 , on_action_data_to_webserver/2
]). ]).
-ifdef(TEST).
-export([ preproc_and_normalise_headers/1
, maybe_proc_headers/3
, maybe_remove_content_type_header/2
]).
-endif.
-export_type([action_fun/0]). -export_type([action_fun/0]).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
@ -263,7 +270,8 @@ on_action_data_to_webserver(Selected, _Envs =
metadata := Metadata}) -> metadata := Metadata}) ->
NBody = format_msg(BodyTokens, clear_user_property_header(Selected)), NBody = format_msg(BodyTokens, clear_user_property_header(Selected)),
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected), NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
Req = create_req(Method, NPath, Headers, NBody), Headers1 = maybe_proc_headers(Headers, Method, Selected),
Req = create_req(Method, NPath, Headers1, NBody),
case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]), ?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]),
@ -307,8 +315,9 @@ create_req(_, Path, Headers, Body) ->
parse_action_params(Params = #{<<"url">> := URL}) -> parse_action_params(Params = #{<<"url">> := URL}) ->
{ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL), {ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL),
Method = method(maps:get(<<"method">>, Params, <<"POST">>)), Method = method(maps:get(<<"method">>, Params, <<"POST">>)),
Headers = headers(maps:get(<<"headers">>, Params, #{})), Headers0 = maps:get(<<"headers">>, Params, #{}),
NHeaders = ensure_content_type_header(Headers, Method), Headers1 = preproc_and_normalise_headers(Headers0),
NHeaders = maybe_remove_content_type_header(Headers1, Method),
#{method => Method, #{method => Method,
path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)), path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)),
headers => NHeaders, headers => NHeaders,
@ -316,9 +325,17 @@ parse_action_params(Params = #{<<"url">> := URL}) ->
request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))), request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
pool => maps:get(<<"pool">>, Params)}. pool => maps:get(<<"pool">>, Params)}.
ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =:= put -> %% According to https://www.rfc-editor.org/rfc/rfc7231#section-3.1.1.5, the
%% Content-Type HTTP header should be set only for PUT and POST requests.
maybe_remove_content_type_header({has_tmpl_token, Headers}, Method) ->
{has_tmpl_token, maybe_remove_content_type_header(Headers, Method)};
maybe_remove_content_type_header(Headers, Method) when is_map(Headers), (Method =:= post orelse Method =:= put) ->
maps:to_list(Headers);
maybe_remove_content_type_header(Headers, Method) when is_list(Headers), (Method =:= post orelse Method =:= put) ->
Headers; Headers;
ensure_content_type_header(Headers, _Method) -> maybe_remove_content_type_header(Headers, _Method) when is_map(Headers) ->
maps:to_list(maps:remove(<<"content-type">>, Headers));
maybe_remove_content_type_header(Headers, _Method) when is_list(Headers) ->
lists:keydelete(<<"content-type">>, 1, Headers). lists:keydelete(<<"content-type">>, 1, Headers).
merge_path(CommonPath, <<>>) -> merge_path(CommonPath, <<>>) ->
@ -335,8 +352,46 @@ method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put; method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put;
method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete. method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete.
headers(Headers) -> normalize_key(K) ->
emqx_http_lib:normalise_headers(maps:to_list(Headers)). %% see emqx_http_lib:normalise_headers/1 for more info
K1 = re:replace(K, "_", "-", [{return, binary}]),
string:lowercase(K1).
preproc_and_normalise_headers(Headers) ->
Preproc = fun(Str) -> {tmpl_token, emqx_rule_utils:preproc_tmpl(Str)} end,
Res = maps:fold(fun(K, V, {Flag, Acc}) ->
case {emqx_rule_utils:if_contains_placeholder(K),
emqx_rule_utils:if_contains_placeholder(V)} of
{false, false} ->
{Flag, Acc#{normalize_key(K) => V}};
{false, true} ->
{has_tmpl_token, Acc#{normalize_key(K) => Preproc(V)}};
{true, false} ->
{has_tmpl_token, Acc#{Preproc(K) => V}};
{true, true} ->
{has_tmpl_token, Acc#{Preproc(K) => Preproc(V)}}
end
end, {no_token, #{}}, Headers),
case Res of
{no_token, RHeaders} -> RHeaders;
{has_tmpl_token, _} -> Res
end.
maybe_proc_headers({has_tmpl_token, HeadersTks}, Method, Data) ->
MaybeProc = fun
(key, {tmpl_token, Tokens}) ->
normalize_key(emqx_rule_utils:proc_tmpl(Tokens, Data));
(val, {tmpl_token, Tokens}) ->
emqx_rule_utils:proc_tmpl(Tokens, Data);
(_, Str) ->
Str
end,
Headers = [{MaybeProc(key, K), MaybeProc(val, V)} || {K, V} <- HeadersTks],
maybe_remove_content_type_header(Headers, Method);
maybe_proc_headers(Headers, _, _) ->
%% For headers of old emqx versions, and normal header without placeholders,
%% the Headers are not pre-processed
Headers.
str(Str) when is_list(Str) -> Str; str(Str) when is_list(Str) -> Str;
str(Atom) when is_atom(Atom) -> atom_to_list(Atom); str(Atom) when is_atom(Atom) -> atom_to_list(Atom);

View File

@ -35,6 +35,7 @@ all() ->
, {group, ipv6http} , {group, ipv6http}
, {group, ipv6https} , {group, ipv6https}
, test_rule_webhook , test_rule_webhook
, test_preproc_headers
]. ].
groups() -> groups() ->
@ -138,6 +139,58 @@ set_special_cfgs() ->
%% Test cases %% Test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
test_preproc_headers(_) ->
TestTable = [
{#{<<"Content_TYPE">> => <<"application/JSON">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
},
{#{<<"${ContentTypeKey}">> => <<"application/JSON">>},
#{<<"content-type">> => <<"application/JSON">>}
},
{#{<<"content-type">> => <<"${ContentTypeVal}">>},
#{<<"content-type">> => <<"application/JSON">>}
},
{#{<<"Content_type">> => <<"${ContentTypeVal}">>},
#{<<"content-type">> => <<"application/JSON">>}
},
{#{<<"${ContentTypeKey}">> => <<"${ContentTypeVal}">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
},
{#{<<"${ContentTypeKey}">> => <<"${ContentTypeVal}">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
},
{#{<<"Content_${TypeKey}">> => <<"application/${TypeVal}">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
}
],
SelectedData1 = #{
<<"ContentTypeKey">> => <<"content-type">>,
<<"ContentTypeVal">> => <<"application/JSON">>,
<<"TypeKey">> => <<"type">>,
<<"TypeVal">> => <<"JSON">>
},
SelectedData2 = #{
<<"ContentTypeKey">> => <<"ConTent_Type">>,
<<"ContentTypeVal">> => <<"application/JSON">>,
<<"TypeKey">> => <<"TYPe">>,
<<"TypeVal">> => <<"JSON">>
},
[begin
ct:pal("test_preproc_headers, input: ~p, method: ~p, selected: ~p", [Input, Method, Selected]),
Headers0 = emqx_web_hook_actions:preproc_and_normalise_headers(Input),
Headers1 = emqx_web_hook_actions:maybe_remove_content_type_header(Headers0, Method),
Result0 = emqx_web_hook_actions:maybe_proc_headers(Headers1, Method, Selected),
Expected1 = case Method =/= post andalso Method =/= put of
true -> maps:remove(<<"content-type">>, Expected);
false -> Expected
end,
?assertEqual(Expected1, maps:from_list(Result0))
end ||
{Input, Expected} <- TestTable,
Selected <- [SelectedData1, SelectedData2],
Method <- [post, put, get, delete]
].
test_rule_webhook(_) -> test_rule_webhook(_) ->
{ok, ServerPid} = http_server:start_link(self(), 9999, []), {ok, ServerPid} = http_server:start_link(self(), 9999, []),
receive {ServerPid, ready} -> ok receive {ServerPid, ready} -> ok

View File

@ -2,6 +2,8 @@
## Enhancements ## Enhancements
- Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239).
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199). - Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
This is to avoid slowing down the boot if some resources spend long time establishing the connection. This is to avoid slowing down the boot if some resources spend long time establishing the connection.
@ -13,6 +15,8 @@
- Added a log censor to avoid logging sensitive data [#9189](https://github.com/emqx/emqx/pull/9189). - Added a log censor to avoid logging sensitive data [#9189](https://github.com/emqx/emqx/pull/9189).
If the data to be logged is a map or key-value list which contains sensitive key words such as `password`, the value is obfuscated as `******`. If the data to be logged is a map or key-value list which contains sensitive key words such as `password`, the value is obfuscated as `******`.
- Enhanced log security in ACL modules, sensitive data will be obscured. [#9242](https://github.com/emqx/emqx/pull/9242).
## Bug fixes ## Bug fixes
- Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224). - Fix that after uploading a backup file with an UTF8 filename, HTTP API `GET /data/export` fails with status code 500 [#9224](https://github.com/emqx/emqx/pull/9224).
@ -35,3 +39,7 @@
For rule-engine's input events like `$events/message_delivered`, and `$events/message_dropped`, For rule-engine's input events like `$events/message_delivered`, and `$events/message_dropped`,
if the message was delivered to a shared-subscription, the encoding (to JSON) of the event will fail. if the message was delivered to a shared-subscription, the encoding (to JSON) of the event will fail.
Affected versions: `v4.3.21`, `v4.4.10`, `e4.3.16` and `e4.4.10`. Affected versions: `v4.3.21`, `v4.4.10`, `e4.3.16` and `e4.4.10`.
- Make sure Rule-Engine API supports Percent-encoding `rule_id` and `resource_id` in HTTP request path [#9190](https://github.com/emqx/emqx/pull/9190).
Note that the `id` in `POST /api/v4/rules` should be literals (not encoded) when creating a `rule` or `resource`.
See docs [Create Rule](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) [Create Resource](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources).

View File

@ -2,6 +2,8 @@
## 增强 ## 增强
- 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。 - 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。 这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。
@ -13,6 +15,8 @@
- 增强包含敏感数据的日志的安全性 [#9189](https://github.com/emqx/emqx/pull/9189)。 - 增强包含敏感数据的日志的安全性 [#9189](https://github.com/emqx/emqx/pull/9189)。
如果日志中包含敏感关键词,例如 `password`,那么关联的数据回被模糊化处理,替换成 `******` 如果日志中包含敏感关键词,例如 `password`,那么关联的数据回被模糊化处理,替换成 `******`
- 增强 ACL 模块中的日志安全性,敏感数据将被模糊化。[#9242](https://github.com/emqx/emqx/pull/9242)。
## 修复 ## 修复
- 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。 - 修复若上传的备份文件名中包含 UTF8 字符,`GET /data/export` HTTP 接口返回 500 错误 [#9224](https://github.com/emqx/emqx/pull/9224)。
@ -35,3 +39,7 @@
带消息的规则引擎事件,例如 `$events/message_delivered``$events/message_dropped`, 带消息的规则引擎事件,例如 `$events/message_delivered``$events/message_dropped`,
如果消息事件是共享订阅产生的,在编码(到 JSON 格式)过程中会失败。 如果消息事件是共享订阅产生的,在编码(到 JSON 格式)过程中会失败。
影响到的版本:`v4.3.21`, `v4.4.10`, `e4.3.16``e4.4.10` 影响到的版本:`v4.3.21`, `v4.4.10`, `e4.3.16``e4.4.10`
- 使规则引擎 API 在 HTTP 请求路径中支持百分号编码的 `rule_id``resource_id` [#9190](https://github.com/emqx/emqx/pull/9190)。
注意在创建规则或资源时HTTP body 中的 `id` 字段仍为字面值,而不是编码之后的值。
详情请参考 [创建规则](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) 和 [创建资源](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources)。

View File

@ -64,8 +64,8 @@
]). ]).
-export([all_channels/0, -export([all_channels/0,
channel_with_session_table/0, channel_with_session_table/1,
live_connection_table/0]). live_connection_table/1]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([ init/1
@ -443,7 +443,7 @@ all_channels() ->
ets:select(?CHAN_TAB, Pat). ets:select(?CHAN_TAB, Pat).
%% @doc Get clientinfo for all clients with sessions %% @doc Get clientinfo for all clients with sessions
channel_with_session_table() -> channel_with_session_table(ConnModules) ->
Ms = ets:fun2ms( Ms = ets:fun2ms(
fun({{ClientId, _ChanPid}, fun({{ClientId, _ChanPid},
Info, Info,
@ -451,22 +451,25 @@ channel_with_session_table() ->
{ClientId, Info} {ClientId, Info}
end), end),
Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]), Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
ConnModuleMap = maps:from_list([{Mod, true} || Mod <- ConnModules]),
qlc:q([ {ClientId, ConnState, ConnInfo, ClientInfo} qlc:q([ {ClientId, ConnState, ConnInfo, ClientInfo}
|| {ClientId, || {ClientId,
#{conn_state := ConnState, #{conn_state := ConnState,
clientinfo := ClientInfo, clientinfo := ClientInfo,
conninfo := #{clean_start := false} = ConnInfo}} <- Table conninfo := #{clean_start := false, conn_mod := ConnModule} = ConnInfo}}
<- Table,
maps:is_key(ConnModule, ConnModuleMap)
]). ]).
%% @doc Get all local connection query handle %% @doc Get all local connection query handle
live_connection_table() -> live_connection_table(ConnModules) ->
Ms = ets:fun2ms( Ms = lists:map(fun live_connection_ms/1, ConnModules),
fun({{ClientId, ChanPid}, _}) ->
{ClientId, ChanPid}
end),
Table = ets:table(?CHAN_CONN_TAB, [{traverse, {select, Ms}}]), Table = ets:table(?CHAN_CONN_TAB, [{traverse, {select, Ms}}]),
qlc:q([{ClientId, ChanPid} || {ClientId, ChanPid} <- Table, is_channel_connected(ClientId, ChanPid)]). qlc:q([{ClientId, ChanPid} || {ClientId, ChanPid} <- Table, is_channel_connected(ClientId, ChanPid)]).
live_connection_ms(ConnModule) ->
{{{'$1','$2'},ConnModule},[],[{{'$1','$2'}}]}.
is_channel_connected(ClientId, ChanPid) when node(ChanPid) =:= node() -> is_channel_connected(ClientId, ChanPid) when node(ChanPid) =:= node() ->
case get_chan_info(ClientId, ChanPid) of case get_chan_info(ClientId, ChanPid) of
#{conn_state := disconnected} -> false; #{conn_state := disconnected} -> false;