Merge tag 'v5.0.8' into merge-release-v5.0.8-into-master

This commit is contained in:
JimMoen 2022-09-23 16:06:44 +08:00
commit 85835256f1
38 changed files with 712 additions and 196 deletions

View File

@ -23,14 +23,21 @@
* Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857)
* Fix delayed publish inaccurate caused by os time change. [#8926](https://github.com/emqx/emqx/pull/8926)
* Fix that EMQX can't start when the retainer is disabled [#8911](https://github.com/emqx/emqx/pull/8911)
* Fix that redis authn will deny the unknown users [#8934](https://github.com/emqx/emqx/pull/8934)
* Fix ExProto UDP client keepalive checking error.
This causes the clients to not expire as long as a new UDP packet arrives [#8866](https://github.com/emqx/emqx/pull/8866)
* Fix that MQTT Bridge message payload could be empty string. [#8949](https://github.com/emqx/emqx/pull/8949)
## Enhancements
* Print a warning message when boot with the default (insecure) Erlang cookie. [#8905](https://github.com/emqx/emqx/pull/8905)
* Change the `/gateway` API path to plural form. [#8823](https://github.com/emqx/emqx/pull/8823)
* Don't allow updating config items when they already exist in `local-override.conf`. [#8851](https://github.com/emqx/emqx/pull/8851)
* Remove `node.etc_dir` from emqx.conf, because it is never used.
Also allow user to customize the logging directory [#8892](https://github.com/emqx/emqx/pull/8892)
* Added a new API `POST /listeners` for creating listener. [#8876](https://github.com/emqx/emqx/pull/8876)
* Close ExProto client process immediately if it's keepalive timeouted. [#8866](https://github.com/emqx/emqx/pull/8866)
* Upgrade grpc-erl driver to 0.6.7 to support batch operation in sending stream. [#8866](https://github.com/emqx/emqx/pull/8866)
# 5.0.7

View File

@ -6,7 +6,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-d
export EMQX_DEFAULT_RUNNER = debian:11-slim
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
export EMQX_DASHBOARD_VERSION ?= v1.0.8
export EMQX_DASHBOARD_VERSION ?= v1.0.9
export EMQX_EE_DASHBOARD_VERSION ?= e1.0.0
export EMQX_REL_FORM ?= tgz
export QUICER_DOWNLOAD_FROM_RELEASE = 1

View File

@ -112,6 +112,27 @@ make
_build/emqx/rel/emqx/bin/emqx console
```
### 在 Apple 芯片M1,M2上编译
基于 Apple 芯片的 Homebrew 将[默认的 home 目录](https://github.com/Homebrew/brew/issues/9177)从 `/usr/local` 改成了 `/opt/homebrew`,这个改变导致了一些兼容性问题。
具体到 EMQX 来说,主要影响的是 `unixodbc`,如果使用 Homebrew 安装的 `unixodbc` 包,那么在使用 [kerl](https://github.com/kerl/kerl) 编译 Erlang/OTP 的时候kerl 会找不到 `unixodbc`
解决此问题的方法如下:
```bash
brew install unixodbc kerl
sudo ln -s $(realpath $(brew --prefix unixodbc)) /usr/local/odbc
export CC="/usr/bin/gcc -I$(brew --prefix unixodbc)/include"
export LDFLAGS="-L$(brew --prefix unixodbc)/lib"
kerl build 24.3
mkdir ~/.kerl/installations
kerl install 24.3 ~/.kerl/installations/24.3
. ~/.kerl/installations/24.3/activate
```
然后再使用 `make` 继续编译就可以了。
## 开源许可
详见 [LICENSE](./LICENSE)。

View File

@ -32,7 +32,7 @@
%% `apps/emqx/src/bpapi/README.md'
%% Community edition
-define(EMQX_RELEASE_CE, "5.0.7").
-define(EMQX_RELEASE_CE, "5.0.8").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.0.0-alpha.1").

View File

@ -476,7 +476,7 @@ read_override_conf(#{} = Opts) ->
override_conf_file(Opts) when is_map(Opts) ->
Key =
case maps:get(override_to, Opts, local) of
case maps:get(override_to, Opts, cluster) of
local -> local_override_conf_file;
cluster -> cluster_override_conf_file
end,

View File

@ -43,6 +43,7 @@
terminate/2,
code_change/3
]).
-export([is_mutable/3]).
-define(MOD, {mod}).
-define(WKEY, '?').
@ -229,15 +230,26 @@ process_update_request([_], _Handlers, {remove, _Opts}) ->
process_update_request(ConfKeyPath, _Handlers, {remove, Opts}) ->
OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
BinKeyPath = bin_path(ConfKeyPath),
NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf),
OverrideConf = remove_from_override_config(BinKeyPath, Opts),
{ok, NewRawConf, OverrideConf, Opts};
case check_permissions(remove, BinKeyPath, OldRawConf, Opts) of
allow ->
NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf),
OverrideConf = remove_from_override_config(BinKeyPath, Opts),
{ok, NewRawConf, OverrideConf, Opts};
{deny, Reason} ->
{error, {permission_denied, Reason}}
end;
process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
case do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) of
{ok, NewRawConf} ->
OverrideConf = update_override_config(NewRawConf, Opts),
{ok, NewRawConf, OverrideConf, Opts};
BinKeyPath = bin_path(ConfKeyPath),
case check_permissions(update, BinKeyPath, NewRawConf, Opts) of
allow ->
OverrideConf = update_override_config(NewRawConf, Opts),
{ok, NewRawConf, OverrideConf, Opts};
{deny, Reason} ->
{error, {permission_denied, Reason}}
end;
Error ->
Error
end.
@ -272,12 +284,11 @@ check_and_save_configs(
UpdateArgs,
Opts
) ->
OldConf = emqx_config:get_root(ConfKeyPath),
Schema = schema(SchemaModule, ConfKeyPath),
{AppEnvs, NewConf} = emqx_config:check_config(Schema, NewRawConf),
OldConf = emqx_config:get_root(ConfKeyPath),
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
{ok, Result0} ->
remove_from_local_if_cluster_change(ConfKeyPath, Opts),
ok = emqx_config:save_configs(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts),
Result1 = return_change_result(ConfKeyPath, UpdateArgs),
{ok, Result1#{post_config_update => Result0}};
@ -430,16 +441,6 @@ merge_to_old_config(UpdateReq, RawConf) when is_map(UpdateReq), is_map(RawConf)
merge_to_old_config(UpdateReq, _RawConf) ->
{ok, UpdateReq}.
%% local-override.conf priority is higher than cluster-override.conf
%% If we want cluster to take effect, we must remove the local.
remove_from_local_if_cluster_change(BinKeyPath, #{override_to := cluster} = Opts) ->
Opts1 = Opts#{override_to => local},
Local = remove_from_override_config(BinKeyPath, Opts1),
_ = emqx_config:save_to_override_conf(Local, Opts1),
ok;
remove_from_local_if_cluster_change(_BinKeyPath, _Opts) ->
ok.
remove_from_override_config(_BinKeyPath, #{persistent := false}) ->
undefined;
remove_from_override_config(BinKeyPath, Opts) ->
@ -544,3 +545,98 @@ load_prev_handlers() ->
save_handlers(Handlers) ->
application:set_env(emqx, ?MODULE, Handlers).
check_permissions(_Action, _ConfKeyPath, _NewRawConf, #{override_to := local}) ->
allow;
check_permissions(Action, ConfKeyPath, NewRawConf, _Opts) ->
case emqx_map_lib:deep_find(ConfKeyPath, NewRawConf) of
{ok, NewRaw} ->
LocalOverride = emqx_config:read_override_conf(#{override_to => local}),
case emqx_map_lib:deep_find(ConfKeyPath, LocalOverride) of
{ok, LocalRaw} ->
case is_mutable(Action, NewRaw, LocalRaw) of
ok ->
allow;
{error, Error} ->
?SLOG(error, #{
msg => "prevent_remove_local_override_conf",
config_key_path => ConfKeyPath,
error => Error
}),
{deny, "Disable changed from local-override.conf"}
end;
{not_found, _, _} ->
allow
end;
{not_found, _, _} ->
allow
end.
is_mutable(Action, NewRaw, LocalRaw) ->
try
KeyPath = [],
is_mutable(KeyPath, Action, NewRaw, LocalRaw)
catch
throw:Error -> Error
end.
-define(REMOVE_FAILED, "remove_failed").
-define(UPDATE_FAILED, "update_failed").
is_mutable(KeyPath, Action, New = #{}, Local = #{}) ->
maps:foreach(
fun(Key, SubLocal) ->
case maps:find(Key, New) of
error -> ok;
{ok, SubNew} -> is_mutable(KeyPath ++ [Key], Action, SubNew, SubLocal)
end
end,
Local
);
is_mutable(KeyPath, remove, Update, Origin) ->
throw({error, {?REMOVE_FAILED, KeyPath, Update, Origin}});
is_mutable(_KeyPath, update, Val, Val) ->
ok;
is_mutable(KeyPath, update, Update, Origin) ->
throw({error, {?UPDATE_FAILED, KeyPath, Update, Origin}}).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
is_mutable_update_test() ->
Action = update,
?assertEqual(ok, is_mutable(Action, #{}, #{})),
?assertEqual(ok, is_mutable(Action, #{a => #{b => #{c => #{}}}}, #{a => #{b => #{c => #{}}}})),
?assertEqual(ok, is_mutable(Action, #{a => #{b => #{c => 1}}}, #{a => #{b => #{c => 1}}})),
?assertEqual(
{error, {?UPDATE_FAILED, [a, b, c], 1, 2}},
is_mutable(Action, #{a => #{b => #{c => 1}}}, #{a => #{b => #{c => 2}}})
),
?assertEqual(
{error, {?UPDATE_FAILED, [a, b, d], 2, 3}},
is_mutable(Action, #{a => #{b => #{c => 1, d => 2}}}, #{a => #{b => #{c => 1, d => 3}}})
),
ok.
is_mutable_remove_test() ->
Action = remove,
?assertEqual(ok, is_mutable(Action, #{}, #{})),
?assertEqual(ok, is_mutable(Action, #{a => #{b => #{c => #{}}}}, #{a1 => #{b => #{c => #{}}}})),
?assertEqual(ok, is_mutable(Action, #{a => #{b => #{c => 1}}}, #{a => #{b1 => #{c => 1}}})),
?assertEqual(ok, is_mutable(Action, #{a => #{b => #{c => 1}}}, #{a => #{b => #{c1 => 1}}})),
?assertEqual(
{error, {?REMOVE_FAILED, [a, b, c], 1, 1}},
is_mutable(Action, #{a => #{b => #{c => 1}}}, #{a => #{b => #{c => 1}}})
),
?assertEqual(
{error, {?REMOVE_FAILED, [a, b, c], 1, 2}},
is_mutable(Action, #{a => #{b => #{c => 1}}}, #{a => #{b => #{c => 2}}})
),
?assertEqual(
{error, {?REMOVE_FAILED, [a, b, c], 1, 1}},
is_mutable(Action, #{a => #{b => #{c => 1, d => 2}}}, #{a => #{b => #{c => 1, d => 3}}})
),
ok.
-endif.

View File

@ -21,6 +21,8 @@
-define(MOD, {mod}).
-define(WKEY, '?').
-define(LOCAL_CONF, "/tmp/local-override.conf").
-define(CLUSTER_CONF, "/tmp/cluster-override.conf").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -36,6 +38,8 @@ end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
init_per_testcase(_Case, Config) ->
_ = file:delete(?LOCAL_CONF),
_ = file:delete(?CLUSTER_CONF),
Config.
end_per_testcase(_Case, _Config) ->
@ -196,6 +200,62 @@ t_sub_key_update_remove(_Config) ->
ok = emqx_config_handler:remove_handler(KeyPath2),
ok.
t_local_override_update_remove(_Config) ->
application:set_env(emqx, local_override_conf_file, ?LOCAL_CONF),
application:set_env(emqx, cluster_override_conf_file, ?CLUSTER_CONF),
KeyPath = [sysmon, os, cpu_high_watermark],
ok = emqx_config_handler:add_handler(KeyPath, ?MODULE),
LocalOpts = #{override_to => local},
{ok, Res} = emqx:update_config(KeyPath, <<"70%">>, LocalOpts),
?assertMatch(
#{
config := 0.7,
post_config_update := #{},
raw_config := <<"70%">>
},
Res
),
ClusterOpts = #{override_to => cluster},
?assertMatch(
{error, {permission_denied, _}}, emqx:update_config(KeyPath, <<"71%">>, ClusterOpts)
),
?assertMatch(0.7, emqx:get_config(KeyPath)),
KeyPath2 = [sysmon, os, cpu_low_watermark],
ok = emqx_config_handler:add_handler(KeyPath2, ?MODULE),
?assertMatch(
{error, {permission_denied, _}}, emqx:update_config(KeyPath2, <<"40%">>, ClusterOpts)
),
%% remove
?assertMatch({error, {permission_denied, _}}, emqx:remove_config(KeyPath)),
?assertEqual(
{ok, #{post_config_update => #{}}},
emqx:remove_config(KeyPath, #{override_to => local})
),
?assertEqual(
{ok, #{post_config_update => #{}}},
emqx:remove_config(KeyPath)
),
?assertError({config_not_found, KeyPath}, emqx:get_raw_config(KeyPath)),
OSKey = maps:keys(emqx:get_raw_config([sysmon, os])),
?assertEqual(false, lists:member(<<"cpu_high_watermark">>, OSKey)),
?assert(length(OSKey) > 0),
?assertEqual(
{ok, #{config => 0.8, post_config_update => #{}, raw_config => <<"80%">>}},
emqx:reset_config(KeyPath, ClusterOpts)
),
OSKey1 = maps:keys(emqx:get_raw_config([sysmon, os])),
?assertEqual(true, lists:member(<<"cpu_high_watermark">>, OSKey1)),
?assert(length(OSKey1) > 1),
ok = emqx_config_handler:remove_handler(KeyPath),
ok = emqx_config_handler:remove_handler(KeyPath2),
application:unset_env(emqx, local_override_conf_file),
application:unset_env(emqx, cluster_override_conf_file),
ok.
t_check_failed(_Config) ->
KeyPath = [sysmon, os, cpu_check_interval],
Opts = #{rawconf_with_defaults => true},
@ -219,7 +279,7 @@ t_stop(_Config) ->
ok.
t_callback_crash(_Config) ->
CrashPath = [sysmon, os, cpu_high_watermark],
CrashPath = [sysmon, os, procmem_high_watermark],
Opts = #{rawconf_with_defaults => true},
ok = emqx_config_handler:add_handler(CrashPath, ?MODULE),
Old = emqx:get_raw_config(CrashPath),
@ -334,6 +394,8 @@ pre_config_update([sysmon, os, cpu_check_interval], UpdateReq, _RawConf) ->
{ok, UpdateReq};
pre_config_update([sysmon, os, cpu_low_watermark], UpdateReq, _RawConf) ->
{ok, UpdateReq};
pre_config_update([sysmon, os, cpu_high_watermark], UpdateReq, _RawConf) ->
{ok, UpdateReq};
pre_config_update([sysmon, os, sysmem_high_watermark], UpdateReq, _RawConf) ->
{ok, UpdateReq};
pre_config_update([sysmon, os, mem_check_interval], _UpdateReq, _RawConf) ->
@ -347,6 +409,8 @@ post_config_update([sysmon, os, cpu_check_interval], _UpdateReq, _NewConf, _OldC
{ok, ok};
post_config_update([sysmon, os, cpu_low_watermark], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
ok;
post_config_update([sysmon, os, cpu_high_watermark], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
ok;
post_config_update([sysmon, os, sysmem_high_watermark], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
{error, post_config_update_error}.

View File

@ -111,7 +111,8 @@ parse_sql(Template, ReplaceWith) ->
Template,
#{
replace_with => ReplaceWith,
placeholders => ?AUTHN_PLACEHOLDERS
placeholders => ?AUTHN_PLACEHOLDERS,
strip_double_quote => true
}
).

View File

@ -133,33 +133,47 @@ authenticate(
password_hash_algorithm := Algorithm
}
) ->
NKey = emqx_authn_utils:render_str(KeyTemplate, Credential),
Command = [CommandName, NKey | Fields],
case emqx_resource:query(ResourceId, {cmd, Command}) of
{ok, []} ->
ignore;
{ok, Values} ->
Selected = merge(Fields, Values),
case
emqx_authn_utils:check_password_from_selected_map(
Algorithm, Selected, Password
)
of
ok ->
{ok, emqx_authn_utils:is_superuser(Selected)};
{error, _Reason} ->
?WITH_SUCCESSFUL_RENDER(
begin
NKey = emqx_authn_utils:render_str(KeyTemplate, Credential),
Command = [CommandName, NKey | Fields],
case emqx_resource:query(ResourceId, {cmd, Command}) of
{ok, []} ->
ignore;
{ok, Values} ->
case merge(Fields, Values) of
Selected when Selected =/= #{} ->
case
emqx_authn_utils:check_password_from_selected_map(
Algorithm, Selected, Password
)
of
ok ->
{ok, emqx_authn_utils:is_superuser(Selected)};
{error, _Reason} = Error ->
Error
end;
_ ->
?TRACE_AUTHN_PROVIDER(info, "redis_query_not_matched", #{
resource => ResourceId,
cmd => Command,
keys => NKey,
fields => Fields
}),
ignore
end;
{error, Reason} ->
?TRACE_AUTHN_PROVIDER(error, "redis_query_failed", #{
resource => ResourceId,
cmd => Command,
keys => NKey,
fields => Fields,
reason => Reason
}),
ignore
end;
{error, Reason} ->
?TRACE_AUTHN_PROVIDER(error, "redis_query_failed", #{
resource => ResourceId,
cmd => Command,
keys => NKey,
fields => Fields,
reason => Reason
}),
ignore
end.
end
end
).
%%------------------------------------------------------------------------------
%% Internal functions

View File

@ -318,6 +318,32 @@ user_seeds() ->
result => {ok, #{is_superuser => true}}
},
%% strip double quote support
#{
data => #{
username => "sha256",
password_hash => "ac63a624e7074776d677dd61a003b8c803eb11db004d0ec6ae032a5d7c9c5caf",
salt => "salt",
is_superuser_int => 1
},
credentials => #{
username => <<"sha256">>,
password => <<"sha256">>
},
config_params => #{
<<"query">> =>
<<
"SELECT password_hash, salt, is_superuser_int as is_superuser\n"
" FROM users where username = \"${username}\" LIMIT 1"
>>,
<<"password_hash_algorithm">> => #{
<<"name">> => <<"sha256">>,
<<"salt_position">> => <<"prefix">>
}
},
result => {ok, #{is_superuser => true}}
},
#{
data => #{
username => "sha256",

View File

@ -380,6 +380,32 @@ user_seeds() ->
result => {ok, #{is_superuser => true}}
},
%% strip double quote support
#{
data => #{
username => "sha256",
password_hash => "ac63a624e7074776d677dd61a003b8c803eb11db004d0ec6ae032a5d7c9c5caf",
salt => "salt",
is_superuser_int => 1
},
credentials => #{
username => <<"sha256">>,
password => <<"sha256">>
},
config_params => #{
<<"query">> =>
<<
"SELECT password_hash, salt, is_superuser_int as is_superuser\n"
" FROM users where username = \"${username}\" LIMIT 1"
>>,
<<"password_hash_algorithm">> => #{
<<"name">> => <<"sha256">>,
<<"salt_position">> => <<"prefix">>
}
},
result => {ok, #{is_superuser => true}}
},
#{
data => #{
username => "sha256",

View File

@ -161,11 +161,13 @@ t_authenticate(_Config) ->
user_seeds()
).
test_user_auth(#{
credentials := Credentials0,
config_params := SpecificConfigParams,
result := Result
}) ->
test_user_auth(
#{
credentials := Credentials0,
config_params := SpecificConfigParams,
result := Result
} = Config
) ->
AuthConfig = maps:merge(raw_redis_auth_config(), SpecificConfigParams),
{ok, _} = emqx:update_config(
@ -183,14 +185,12 @@ test_user_auth(#{
?assertEqual(Result, emqx_access_control:authenticate(Credentials)),
AuthnResult =
case Result of
{error, _} ->
ignore;
Any ->
Any
end,
?assertEqual(AuthnResult, emqx_authn_redis:authenticate(Credentials, State)),
case maps:get(redis_result, Config, undefined) of
undefined ->
ok;
RedisResult ->
?assertEqual(RedisResult, emqx_authn_redis:authenticate(Credentials, State))
end,
emqx_authn_test_lib:delete_authenticators(
[authentication],
@ -533,6 +533,23 @@ user_seeds() ->
}
},
result => {ok, #{is_superuser => true}}
},
%% user not exists
#{
data => #{
password_hash => <<"plainsalt">>,
salt => <<"salt">>,
is_superuser => <<"1">>
},
credentials => #{
username => <<"not_exists">>,
password => <<"plain">>
},
key => <<"mqtt_user:plain">>,
config_params => #{},
result => {error, not_authorized},
redis_result => ignore
}
].

View File

@ -223,7 +223,7 @@ sources(get, _) ->
])
end;
(Source, AccIn) ->
lists:append(AccIn, [drop_invalid_certs(Source)])
lists:append(AccIn, [Source])
end,
[],
get_raw_sources()
@ -257,7 +257,7 @@ source(get, #{bindings := #{type := Type}}) ->
}}
end;
[Source] ->
{200, drop_invalid_certs(Source)}
{200, Source}
end;
source(put, #{bindings := #{type := <<"file">>}, body := #{<<"type">> := <<"file">>} = Body}) ->
update_authz_file(Body);
@ -511,11 +511,6 @@ update_config(Cmd, Sources) ->
}}
end.
drop_invalid_certs(#{<<"ssl">> := SSL} = Source) when SSL =/= undefined ->
Source#{<<"ssl">> => emqx_tls_lib:drop_invalid_certs(SSL)};
drop_invalid_certs(Source) ->
Source.
parameters_field() ->
[
{type,

View File

@ -110,7 +110,8 @@ parse_sql(Template, ReplaceWith, PlaceHolders) ->
Template,
#{
replace_with => ReplaceWith,
placeholders => PlaceHolders
placeholders => PlaceHolders,
strip_double_quote => true
}
).

View File

@ -202,6 +202,34 @@ t_lookups(_Config) ->
}
),
ok = emqx_authz_test_lib:test_samples(
ClientInfo,
[
{allow, subscribe, <<"a">>},
{deny, subscribe, <<"b">>}
]
),
%% strip double quote support
ok = init_table(),
ok = q(
<<
"INSERT INTO acl(clientid, topic, permission, action)"
"VALUES(?, ?, ?, ?)"
>>,
[<<"clientid">>, <<"a">>, <<"allow">>, <<"subscribe">>]
),
ok = setup_config(
#{
<<"query">> => <<
"SELECT permission, action, topic "
"FROM acl WHERE clientid = \"${clientid}\""
>>
}
),
ok = emqx_authz_test_lib:test_samples(
ClientInfo,
[

View File

@ -202,6 +202,34 @@ t_lookups(_Config) ->
}
),
ok = emqx_authz_test_lib:test_samples(
ClientInfo,
[
{allow, subscribe, <<"a">>},
{deny, subscribe, <<"b">>}
]
),
%% strip double quote support
ok = init_table(),
ok = insert(
<<
"INSERT INTO acl(clientid, topic, permission, action)"
"VALUES($1, $2, $3, $4)"
>>,
[<<"clientid">>, <<"a">>, <<"allow">>, <<"subscribe">>]
),
ok = setup_config(
#{
<<"query">> => <<
"SELECT permission, action, topic "
"FROM acl WHERE clientid = \"${clientid}\""
>>
}
),
ok = emqx_authz_test_lib:test_samples(
ClientInfo,
[

View File

@ -584,10 +584,9 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
format_bridge_info([FirstBridge | _] = Bridges) ->
Res = maps:remove(node, FirstBridge),
NRes = emqx_connector_ssl:drop_invalid_certs(Res),
NodeStatus = collect_status(Bridges),
NodeMetrics = collect_metrics(Bridges),
NRes#{
Res#{
status => aggregate_status(NodeStatus),
node_status => NodeStatus,
metrics => aggregate_metrics(NodeMetrics),

View File

@ -536,6 +536,15 @@ fields("node") ->
desc => ?DESC(node_applications)
}
)},
{"etc_dir",
sc(
string(),
#{
desc => ?DESC(node_etc_dir),
'readOnly' => true,
deprecated => {since, "5.0.8"}
}
)},
{"cluster_call",
sc(
?R_REF("cluster_call"),

View File

@ -222,20 +222,20 @@ make_forward_confs(undefined) ->
make_forward_confs(FrowardConf) ->
FrowardConf.
basic_config(#{
server := Server,
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer,
bridge_mode := BridgeMode,
username := User,
password := Password,
clean_start := CleanStart,
keepalive := KeepAlive,
retry_interval := RetryIntv,
max_inflight := MaxInflight,
replayq := ReplayQ,
ssl := #{enable := EnableSsl} = Ssl
}) ->
basic_config(
#{
server := Server,
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer,
bridge_mode := BridgeMode,
clean_start := CleanStart,
keepalive := KeepAlive,
retry_interval := RetryIntv,
max_inflight := MaxInflight,
replayq := ReplayQ,
ssl := #{enable := EnableSsl} = Ssl
} = Conf
) ->
#{
replayq => ReplayQ,
%% connection opts
@ -251,8 +251,9 @@ basic_config(#{
%% non-standard mqtt connection packets will be filtered out by LB.
%% So let's disable bridge_mode.
bridge_mode => BridgeMode,
username => User,
password => Password,
%% should be iolist for emqtt
username => maps:get(username, Conf, <<>>),
password => maps:get(password, Conf, <<>>),
clean_start => CleanStart,
keepalive => ms_to_s(KeepAlive),
retry_interval => RetryIntv,

View File

@ -18,7 +18,6 @@
-export([
convert_certs/2,
drop_invalid_certs/1,
clear_certs/2
]).
@ -61,28 +60,6 @@ clear_certs(RltvDir, #{ssl := OldSSL} = _Config) ->
clear_certs(_RltvDir, _) ->
ok.
drop_invalid_certs(#{<<"connector">> := Connector} = Config) when
is_map(Connector)
->
SSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
NewSSL = emqx_tls_lib:drop_invalid_certs(SSL),
new_ssl_config(Config, NewSSL);
drop_invalid_certs(#{connector := Connector} = Config) when
is_map(Connector)
->
SSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
NewSSL = emqx_tls_lib:drop_invalid_certs(SSL),
new_ssl_config(Config, NewSSL);
drop_invalid_certs(#{<<"ssl">> := SSL} = Config) ->
NewSSL = emqx_tls_lib:drop_invalid_certs(SSL),
new_ssl_config(Config, NewSSL);
drop_invalid_certs(#{ssl := SSL} = Config) ->
NewSSL = emqx_tls_lib:drop_invalid_certs(SSL),
new_ssl_config(Config, NewSSL);
%% for bridges use connector name
drop_invalid_certs(Config) ->
Config.
new_ssl_config(RltvDir, Config, SSL) ->
case emqx_tls_lib:ensure_ssl_files(RltvDir, SSL) of
{ok, NewSSL} ->

View File

@ -177,7 +177,7 @@ fields("ingress") ->
sc(
binary(),
#{
default => <<"${payload}">>,
default => undefined,
desc => ?DESC("payload")
}
)}
@ -224,7 +224,7 @@ fields("egress") ->
sc(
binary(),
#{
default => <<"${payload}">>,
default => undefined,
desc => ?DESC("payload")
}
)}

View File

@ -43,6 +43,8 @@
{'message.dropped', {emqx_exhook_handler, on_message_dropped, []}}
]).
-define(SERVER_FORCE_SHUTDOWN_TIMEOUT, 5000).
-endif.
-define(CMD_MOVE_FRONT, front).

View File

@ -483,16 +483,11 @@ err_msg(Msg) -> emqx_misc:readable_error_msg(Msg).
get_raw_config() ->
RawConfig = emqx:get_raw_config([exhook, servers], []),
Schema = #{roots => emqx_exhook_schema:fields(exhook), fields => #{}},
Conf = #{<<"servers">> => lists:map(fun drop_invalid_certs/1, RawConfig)},
Conf = #{<<"servers">> => RawConfig},
Options = #{only_fill_defaults => true},
#{<<"servers">> := Servers} = hocon_tconf:check_plain(Schema, Conf, Options),
Servers.
drop_invalid_certs(#{<<"ssl">> := SSL} = Conf) when SSL =/= undefined ->
Conf#{<<"ssl">> => emqx_tls_lib:drop_invalid_certs(SSL)};
drop_invalid_certs(Conf) ->
Conf.
position_example() ->
#{
front =>

View File

@ -21,6 +21,7 @@
-include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% APIs
-export([start_link/0]).
@ -297,7 +298,8 @@ handle_info(refresh_tick, State) ->
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State = #{servers := Servers}) ->
terminate(Reason, State = #{servers := Servers}) ->
_ = unload_exhooks(),
_ = maps:fold(
fun(Name, _, AccIn) ->
do_unload_server(Name, AccIn)
@ -305,7 +307,7 @@ terminate(_Reason, State = #{servers := Servers}) ->
State,
Servers
),
_ = unload_exhooks(),
?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}),
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -179,13 +179,16 @@ filter(Ls) ->
-spec unload(server()) -> ok.
unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
_ = do_deinit(Name, ReqOpts),
_ = may_unload_hooks(HookSpecs),
_ = do_deinit(Name, ReqOpts),
_ = emqx_exhook_sup:stop_grpc_client_channel(Name),
ok.
do_deinit(Name, ReqOpts) ->
_ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts),
%% Override the request timeout to deinit grpc server to
%% avoid emqx_exhook_mgr force killed by upper supervisor
NReqOpts = ReqOpts#{timeout => ?SERVER_FORCE_SHUTDOWN_TIMEOUT},
_ = do_call(Name, undefined, 'on_provider_unloaded', #{}, NReqOpts),
ok.
do_init(ChannName, ReqOpts) ->

View File

@ -16,6 +16,8 @@
-module(emqx_exhook_sup).
-include("emqx_exhook.hrl").
-behaviour(supervisor).
-export([
@ -28,11 +30,13 @@
stop_grpc_client_channel/1
]).
-define(CHILD(Mod, Type, Args), #{
-define(DEFAULT_TIMEOUT, 5000).
-define(CHILD(Mod, Type, Args, Timeout), #{
id => Mod,
start => {Mod, start_link, Args},
type => Type,
shutdown => 15000
shutdown => Timeout
}).
%%--------------------------------------------------------------------
@ -45,7 +49,7 @@ start_link() ->
init([]) ->
_ = emqx_exhook_metrics:init(),
_ = emqx_exhook_mgr:init_ref_counter_table(),
Mngr = ?CHILD(emqx_exhook_mgr, worker, []),
Mngr = ?CHILD(emqx_exhook_mgr, worker, [], force_shutdown_timeout()),
{ok, {{one_for_one, 10, 100}, [Mngr]}}.
%%--------------------------------------------------------------------
@ -70,3 +74,9 @@ stop_grpc_client_channel(Name) ->
_:_:_ ->
ok
end.
%% Calculate the maximum timeout, which will help to shutdown the
%% emqx_exhook_mgr process correctly.
force_shutdown_timeout() ->
Factor = max(3, length(emqx:get_config([exhook, servers])) + 1),
Factor * ?SERVER_FORCE_SHUTDOWN_TIMEOUT.

View File

@ -24,6 +24,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
@ -313,6 +314,40 @@ t_cluster_name(_) ->
),
emqx_exhook_mgr:disable(<<"default">>).
t_stop_timeout(_) ->
snabbkaffe:start_trace(),
meck:new(emqx_exhook_demo_svr, [passthrough, no_history]),
meck:expect(
emqx_exhook_demo_svr,
on_provider_unloaded,
fun(Req, Md) ->
%% ensure sleep time greater than emqx_exhook_mgr shutdown timeout
timer:sleep(20000),
meck:passthrough([Req, Md])
end
),
%% stop application
application:stop(emqx_exhook),
?block_until(#{?snk_kind := exhook_mgr_terminated}, 20000),
%% all exhook hooked point should be unloaded
Mods = lists:flatten(
lists:map(
fun({hook, _, Cbs}) ->
lists:map(fun({callback, {M, _, _}, _, _}) -> M end, Cbs)
end,
ets:tab2list(emqx_hooks)
)
),
?assertEqual(false, lists:any(fun(M) -> M == emqx_exhook_handler end, Mods)),
%% ensure started for other tests
emqx_common_test_helpers:start_apps([emqx_exhook]),
snabbkaffe:stop(),
meck:unload(emqx_exhook_demo_svr).
%%--------------------------------------------------------------------
%% Cases Helpers
%%--------------------------------------------------------------------

View File

@ -80,7 +80,16 @@ stop() ->
stop(Name) ->
grpc:stop_server(Name),
to_atom_name(Name) ! stop.
case whereis(to_atom_name(Name)) of
undefined ->
ok;
Pid ->
Ref = erlang:monitor(process, Pid),
Pid ! stop,
receive
{'DOWN', Ref, process, Pid, _Reason} -> ok
end
end.
take() ->
to_atom_name(?NAME) ! {take, self()},

View File

@ -19,6 +19,7 @@
-include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API
-export([
@ -51,6 +52,9 @@
%% Internal callback
-export([wakeup_from_hib/2, recvloop/2]).
%% for channel module
-export([keepalive_stats/1]).
-record(state, {
%% TCP/SSL/UDP/DTLS Wrapped Socket
socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
@ -240,6 +244,11 @@ esockd_send(Data, #state{
esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
esockd_transport:async_send(Sock, Data).
keepalive_stats(recv) ->
emqx_pd:get_counter(recv_pkt);
keepalive_stats(send) ->
emqx_pd:get_counter(send_pkt).
is_datadram_socket({esockd_transport, _}) -> false;
is_datadram_socket({udp, _, _}) -> true.
@ -568,9 +577,15 @@ terminate(
channel = Channel
}
) ->
?SLOG(debug, #{msg => "conn_process_terminated", reason => Reason}),
_ = ChannMod:terminate(Reason, Channel),
_ = close_socket(State),
ClientId =
try ChannMod:info(clientid, Channel) of
Id -> Id
catch
_:_ -> undefined
end,
?tp(debug, conn_process_terminated, #{reason => Reason, clientid => ClientId}),
exit(Reason).
%%--------------------------------------------------------------------
@ -635,28 +650,22 @@ handle_timeout(
Keepalive,
State = #state{
chann_mod = ChannMod,
socket = Socket,
channel = Channel
}
) when
Keepalive == keepalive;
Keepalive == keepalive_send
->
Stat =
StatVal =
case Keepalive of
keepalive -> recv_oct;
keepalive_send -> send_oct
keepalive -> keepalive_stats(recv);
keepalive_send -> keepalive_stats(send)
end,
case ChannMod:info(conn_state, Channel) of
disconnected ->
{ok, State};
_ ->
case esockd_getstat(Socket, [Stat]) of
{ok, [{Stat, RecvOct}]} ->
handle_timeout(TRef, {Keepalive, RecvOct}, State);
{error, Reason} ->
handle_info({sock_error, Reason}, State)
end
handle_timeout(TRef, {Keepalive, StatVal}, State)
end;
handle_timeout(
_TRef,

View File

@ -78,7 +78,8 @@
-define(TIMER_TABLE, #{
alive_timer => keepalive,
force_timer => force_close
force_timer => force_close,
idle_timer => force_close_idle
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
@ -151,14 +152,17 @@ init(
Ctx = maps:get(ctx, Options),
GRpcChann = maps:get(handler, Options),
PoolName = maps:get(pool_name, Options),
NConnInfo = default_conninfo(ConnInfo),
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}),
ListenerId =
case maps:get(listener, Options, undefined) of
undefined -> undefined;
{GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName)
end,
EnableAuthn = maps:get(enable_authn, Options, true),
DefaultClientInfo = default_clientinfo(ConnInfo),
DefaultClientInfo = default_clientinfo(NConnInfo),
ClientInfo = DefaultClientInfo#{
listener => ListenerId,
enable_authn => EnableAuthn
@ -183,7 +187,9 @@ init(
}
)
},
try_dispatch(on_socket_created, wrap(Req), Channel).
start_idle_checking_timer(
try_dispatch(on_socket_created, wrap(Req), Channel)
).
%% @private
peercert(NoSsl, ConnInfo) when
@ -217,6 +223,12 @@ socktype(dtls) -> 'DTLS'.
address({Host, Port}) ->
#{host => inet:ntoa(Host), port => Port}.
%% avoid udp connection process leak
start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) ->
ensure_timer(idle_timer, Channel);
start_idle_checking_timer(Channel) ->
Channel.
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
@ -285,10 +297,15 @@ handle_timeout(
{ok, reset_timer(alive_timer, NChannel)};
{error, timeout} ->
Req = #{type => 'KEEPALIVE'},
{ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)}
NChannel = remove_timer_ref(alive_timer, Channel),
%% close connection if keepalive timeout
Replies = [{event, disconnected}, {close, keepalive_timeout}],
{ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
end;
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
{shutdown, {error, {force_close, Reason}}, Channel};
handle_timeout(_TRef, force_close_idle, Channel) ->
{shutdown, idle_timeout, Channel};
handle_timeout(_TRef, Msg, Channel) ->
?SLOG(warning, #{
msg => "unexpected_timeout_signal",
@ -390,7 +407,7 @@ handle_call(
NConnInfo = ConnInfo#{keepalive => Interval},
NClientInfo = ClientInfo#{keepalive => Interval},
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
{reply, ok, ensure_keepalive(NChannel)};
{reply, ok, [{event, updated}], ensure_keepalive(cancel_timer(idle_timer, NChannel))};
handle_call(
{subscribe_from_client, TopicFilter, Qos},
_From,
@ -405,21 +422,21 @@ handle_call(
{reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
_ ->
{ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
{reply, ok, NChannel}
{reply, ok, [{event, updated}], NChannel}
end;
handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
{ok, [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel),
{reply, {ok, {NTopicFilter, NSubOpts}}, NChannel};
{reply, {ok, {NTopicFilter, NSubOpts}}, [{event, updated}], NChannel};
handle_call(
{unsubscribe_from_client, TopicFilter},
_From,
Channel = #channel{conn_state = connected}
) ->
{ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
{reply, ok, NChannel};
{reply, ok, [{event, updated}], NChannel};
handle_call({unsubscribe, Topic}, _From, Channel) ->
{ok, NChannel} = do_unsubscribe([Topic], Channel),
{reply, ok, NChannel};
{reply, ok, [{event, update}], NChannel};
handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) ->
{reply, {ok, maps:to_list(Subs)}, Channel};
handle_call(
@ -446,7 +463,7 @@ handle_call(
{reply, ok, Channel}
end;
handle_call(kick, _From, Channel) ->
{shutdown, kicked, ok, ensure_disconnected(kicked, Channel)};
{reply, ok, [{event, disconnected}, {close, kicked}], Channel};
handle_call(discard, _From, Channel) ->
{shutdown, discarded, ok, Channel};
handle_call(Req, _From, Channel) ->
@ -648,7 +665,8 @@ ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) ->
ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
Channel;
ensure_keepalive_timer(Interval, Channel) ->
Keepalive = emqx_keepalive:init(timer:seconds(Interval)),
StatVal = emqx_gateway_conn:keepalive_stats(recv),
Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
@ -666,11 +684,17 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
ensure_timer(Name, clean_timer(Name, Channel)).
ensure_timer(Name, remove_timer_ref(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) ->
cancel_timer(Name, Channel = #channel{timers = Timers}) ->
emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)),
remove_timer_ref(Name, Channel).
remove_timer_ref(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
IdleTimeout;
interval(force_timer, _) ->
15000;
interval(alive_timer, #channel{keepalive = Keepalive}) ->
@ -725,7 +749,7 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
default_conninfo(ConnInfo) ->
ConnInfo#{
clean_start => true,
clientid => undefined,
clientid => anonymous_clientid(),
username => undefined,
conn_props => #{},
connected => true,
@ -739,14 +763,15 @@ default_conninfo(ConnInfo) ->
default_clientinfo(#{
peername := {PeerHost, _},
sockname := {_, SockPort}
sockname := {_, SockPort},
clientid := ClientId
}) ->
#{
zone => default,
protocol => exproto,
peerhost => PeerHost,
sockport => SockPort,
clientid => undefined,
clientid => ClientId,
username => undefined,
is_bridge => false,
is_superuser => false,
@ -764,3 +789,6 @@ proto_name_to_protocol(<<>>) ->
exproto;
proto_name_to_protocol(ProtoName) when is_binary(ProtoName) ->
binary_to_atom(ProtoName).
anonymous_clientid() ->
iolist_to_binary(["exproto-", emqx_misc:gen_id()]).

View File

@ -56,12 +56,19 @@ start_link(Pool, Id) ->
[]
).
-spec async_call(atom(), map(), map()) -> ok.
async_call(
FunName,
Req = #{conn := Conn},
Options = #{pool_name := PoolName}
) ->
cast(pick(PoolName, Conn), {rpc, FunName, Req, Options, self()}).
case pick(PoolName, Conn) of
false ->
reply(self(), FunName, {error, no_available_grpc_client});
Pid when is_pid(Pid) ->
cast(Pid, {rpc, FunName, Req, Options, self()})
end,
ok.
%%--------------------------------------------------------------------
%% cast, pick
@ -72,6 +79,7 @@ async_call(
cast(Deliver, Msg) ->
gen_server:cast(Deliver, Msg).
-spec pick(term(), term()) -> pid() | false.
pick(PoolName, Conn) ->
gproc_pool:pick_worker(PoolName, Conn).

View File

@ -20,6 +20,7 @@
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(
emqx_exproto_echo_svr,
@ -38,6 +39,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(TCPOPTS, [binary, {active, false}]).
-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
@ -62,6 +64,9 @@
all() ->
[{group, Name} || Name <- metrics()].
suite() ->
[{timetrap, {seconds, 30}}].
groups() ->
Cases = emqx_common_test_helpers:all(?MODULE),
[{Name, Cases} || Name <- metrics()].
@ -87,6 +92,7 @@ set_special_cfg(emqx_gateway) ->
[gateway, exproto],
#{
server => #{bind => 9100},
idle_timeout => 5000,
handler => #{address => "http://127.0.0.1:9001"},
listeners => listener_confs(LisType)
}
@ -223,14 +229,16 @@ t_acl_deny(Cfg) ->
close(Sock).
t_keepalive_timeout(Cfg) ->
ok = snabbkaffe:start_trace(),
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
ClientId1 = <<"keepalive_test_client1">>,
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>,
keepalive => 2
clientid => ClientId1,
keepalive => 5
},
Password = <<"123456">>,
@ -238,16 +246,42 @@ t_keepalive_timeout(Cfg) ->
ConnAckBin = frame_connack(0),
send(Sock, ConnBin),
{ok, ConnAckBin} = recv(Sock, 5000),
{ok, ConnAckBin} = recv(Sock),
DisconnectBin = frame_disconnect(),
{ok, DisconnectBin} = recv(Sock, 10000),
SockType =/= udp andalso
begin
{error, closed} = recv(Sock, 5000)
end,
ok.
case SockType of
udp ->
%% another udp client should not affect the first
%% udp client keepalive check
timer:sleep(4000),
Sock2 = open(SockType),
ConnBin2 = frame_connect(
Client#{clientid => <<"keepalive_test_client2">>},
Password
),
send(Sock2, ConnBin2),
%% first client will be keepalive timeouted in 6s
?assertMatch(
{ok, #{
clientid := ClientId1,
reason := {shutdown, {sock_closed, keepalive_timeout}}
}},
?block_until(#{?snk_kind := conn_process_terminated}, 8000)
);
_ ->
?assertMatch(
{ok, #{
clientid := ClientId1,
reason := {shutdown, {sock_closed, keepalive_timeout}}
}},
?block_until(#{?snk_kind := conn_process_terminated}, 12000)
),
Trace = snabbkaffe:collect_trace(),
%% conn process should be terminated
?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))),
%% socket port should be closed
?assertEqual({error, closed}, recv(Sock, 5000))
end,
snabbkaffe:stop().
t_hook_connected_disconnected(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
@ -337,6 +371,8 @@ t_hook_session_subscribed_unsubscribed(Cfg) ->
error(hook_is_not_running)
end,
send(Sock, frame_disconnect()),
close(Sock),
emqx_hooks:del('session.subscribed', {?MODULE, hook_fun3}),
emqx_hooks:del('session.unsubscribed', {?MODULE, hook_fun4}).
@ -373,6 +409,48 @@ t_hook_message_delivered(Cfg) ->
close(Sock),
emqx_hooks:del('message.delivered', {?MODULE, hook_fun5}).
t_idle_timeout(Cfg) ->
ok = snabbkaffe:start_trace(),
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
%% need to create udp client by sending something
case SockType of
udp ->
%% nothing to do
ok = meck:new(emqx_exproto_gcli, [passthrough, no_history]),
ok = meck:expect(
emqx_exproto_gcli,
async_call,
fun(FunName, _Req, _GClient) ->
self() ! {hreply, FunName, ok},
ok
end
),
%% send request, but nobody can respond to it
ClientId = <<"idle_test_client1">>,
Client = #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => ClientId,
keepalive => 5
},
Password = <<"123456">>,
ConnBin = frame_connect(Client, Password),
send(Sock, ConnBin),
?assertMatch(
{ok, #{reason := {shutdown, idle_timeout}}},
?block_until(#{?snk_kind := conn_process_terminated}, 10000)
),
ok = meck:unload(emqx_exproto_gcli);
_ ->
?assertMatch(
{ok, #{reason := {shutdown, idle_timeout}}},
?block_until(#{?snk_kind := conn_process_terminated}, 10000)
)
end,
snabbkaffe:stop().
%%--------------------------------------------------------------------
%% Utils
@ -422,6 +500,9 @@ send({ssl, Sock}, Bin) ->
send({dtls, Sock}, Bin) ->
ssl:send(Sock, Bin).
recv(Sock) ->
recv(Sock, infinity).
recv({tcp, Sock}, Ts) ->
gen_tcp:recv(Sock, 0, Ts);
recv({udp, Sock}, Ts) ->

View File

@ -141,7 +141,8 @@ schema("/configs_reset/:rootname") ->
],
responses => #{
200 => <<"Rest config successfully">>,
400 => emqx_dashboard_swagger:error_codes(['NO_DEFAULT_VALUE', 'REST_FAILED'])
400 => emqx_dashboard_swagger:error_codes(['NO_DEFAULT_VALUE', 'REST_FAILED']),
403 => emqx_dashboard_swagger:error_codes(['REST_FAILED'])
}
}
};
@ -160,7 +161,8 @@ schema("/configs/global_zone") ->
'requestBody' => Schema,
responses => #{
200 => Schema,
400 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED'])
400 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED']),
403 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED'])
}
}
};
@ -226,7 +228,8 @@ schema(Path) ->
'requestBody' => Schema,
responses => #{
200 => Schema,
400 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED'])
400 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED']),
403 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED'])
}
}
}.
@ -254,6 +257,8 @@ config(put, #{body := Body}, Req) ->
case emqx_conf:update(Path, Body, ?OPTS) of
{ok, #{raw_config := RawConf}} ->
{200, RawConf};
{error, {permission_denied, Reason}} ->
{403, #{code => 'UPDATE_FAILED', message => Reason}};
{error, Reason} ->
{400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}
end.
@ -297,6 +302,8 @@ config_reset(post, _Params, Req) ->
case emqx_conf:reset(Path, ?OPTS) of
{ok, _} ->
{200};
{error, {permission_denied, Reason}} ->
{403, #{code => 'REST_FAILED', message => Reason}};
{error, no_default_value} ->
{400, #{code => 'NO_DEFAULT_VALUE', message => <<"No Default Value.">>}};
{error, Reason} ->

View File

@ -39,7 +39,10 @@
sql_data/1
]).
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\}|\"\\$\\{[a-zA-Z0-9\\._]+\\}\")").
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
-define(EX_PLACE_HOLDER_DOUBLE_QUOTE, "(\\$\\{[a-zA-Z0-9\\._]+\\}|\"\\$\\{[a-zA-Z0-9\\._]+\\}\")").
%% Space and CRLF
-define(EX_WITHE_CHARS, "\\s").
@ -57,7 +60,8 @@
-type preproc_sql_opts() :: #{
placeholders => list(binary()),
replace_with => '?' | '$n'
replace_with => '?' | '$n',
strip_double_quote => boolean()
}.
-type preproc_deep_opts() :: #{
@ -89,7 +93,7 @@ preproc_tmpl(Str) ->
preproc_tmpl(Str, Opts) ->
RE = preproc_var_re(Opts),
Tokens = re:split(Str, RE, [{return, binary}, group, trim]),
do_preproc_tmpl(Tokens, []).
do_preproc_tmpl(Opts, Tokens, []).
-spec proc_tmpl(tmpl_token(), map()) -> binary().
proc_tmpl(Tokens, Data) ->
@ -140,10 +144,11 @@ preproc_sql(Sql, ReplaceWith) when is_atom(ReplaceWith) ->
preproc_sql(Sql, #{replace_with => ReplaceWith});
preproc_sql(Sql, Opts) ->
RE = preproc_var_re(Opts),
Strip = maps:get(strip_double_quote, Opts, false),
ReplaceWith = maps:get(replace_with, Opts, '?'),
case re:run(Sql, RE, [{capture, all_but_first, binary}, global]) of
{match, PlaceHolders} ->
PhKs = [parse_nested(unwrap(Phld)) || [Phld | _] <- PlaceHolders],
PhKs = [parse_nested(unwrap(Phld, Strip)) || [Phld | _] <- PlaceHolders],
{replace_with(Sql, RE, ReplaceWith), [{var, Phld} || Phld <- PhKs]};
nomatch ->
{Sql, []}
@ -234,29 +239,36 @@ get_phld_var(Fun, Data) when is_function(Fun) ->
get_phld_var(Phld, Data) ->
emqx_rule_maps:nested_get(Phld, Data).
preproc_var_re(#{placeholders := PHs}) ->
preproc_var_re(#{placeholders := PHs, strip_double_quote := true}) ->
Res = [ph_to_re(PH) || PH <- PHs],
QuoteRes = ["\"" ++ Re ++ "\"" || Re <- Res],
"(" ++ string:join(Res ++ QuoteRes, "|") ++ ")";
preproc_var_re(#{placeholders := PHs}) ->
"(" ++ string:join([ph_to_re(PH) || PH <- PHs], "|") ++ ")";
preproc_var_re(#{strip_double_quote := true}) ->
?EX_PLACE_HOLDER_DOUBLE_QUOTE;
preproc_var_re(#{}) ->
?EX_PLACE_HOLDER.
ph_to_re(VarPH) ->
re:replace(VarPH, "[\\$\\{\\}]", "\\\\&", [global, {return, list}]).
do_preproc_tmpl([], Acc) ->
do_preproc_tmpl(_Opts, [], Acc) ->
lists:reverse(Acc);
do_preproc_tmpl([[Str, Phld] | Tokens], Acc) ->
do_preproc_tmpl(Opts, [[Str, Phld] | Tokens], Acc) ->
Strip = maps:get(strip_double_quote, Opts, false),
do_preproc_tmpl(
Opts,
Tokens,
put_head(
var,
parse_nested(unwrap(Phld)),
parse_nested(unwrap(Phld, Strip)),
put_head(str, Str, Acc)
)
);
do_preproc_tmpl([[Str] | Tokens], Acc) ->
do_preproc_tmpl(Opts, [[Str] | Tokens], Acc) ->
do_preproc_tmpl(
Opts,
Tokens,
put_head(str, Str, Acc)
).
@ -293,10 +305,10 @@ parse_nested(Attr) ->
Nested -> {path, [{key, P} || P <- Nested]}
end.
unwrap(<<"${", Val/binary>>) ->
binary:part(Val, {0, byte_size(Val) - 1});
unwrap(<<"\"${", Val/binary>>) ->
binary:part(Val, {0, byte_size(Val) - 2}).
unwrap(<<"\"${", Val/binary>>, _StripDoubleQuote = true) ->
binary:part(Val, {0, byte_size(Val) - 2});
unwrap(<<"${", Val/binary>>, _StripDoubleQuote) ->
binary:part(Val, {0, byte_size(Val) - 1}).
quote_sql(Str) ->
quote(Str, <<"\\\\'">>).

View File

@ -150,20 +150,25 @@ t_preproc_sql6(_) ->
emqx_placeholder:proc_sql(ParamsTokens, Selected)
).
t_preproc_sql7(_) ->
t_preproc_sql_strip_double_quote(_) ->
Selected = #{a => <<"a">>, b => <<"b">>},
Opts = #{replace_with => '$n', placeholders => [<<"${a}">>]},
%% no strip_double_quote option: "${key}" -> "value"
{PrepareStatement, ParamsTokens} = emqx_placeholder:preproc_sql(
<<"a:\"${a}\",b:\"${b}\"">>,
#{
replace_with => '$n',
placeholders => [<<"${a}">>]
}
Opts
),
?assertEqual(<<"a:$1,b:\"${b}\"">>, PrepareStatement),
?assertEqual(
[<<"a">>],
emqx_placeholder:proc_sql(ParamsTokens, Selected)
).
?assertEqual(<<"a:\"$1\",b:\"${b}\"">>, PrepareStatement),
?assertEqual([<<"a">>], emqx_placeholder:proc_sql(ParamsTokens, Selected)),
%% strip_double_quote = true: "${key}" -> value
{PrepareStatement1, ParamsTokens1} = emqx_placeholder:preproc_sql(
<<"a:\"${a}\",b:\"${b}\"">>,
Opts#{strip_double_quote => true}
),
?assertEqual(<<"a:$1,b:\"${b}\"">>, PrepareStatement1),
?assertEqual([<<"a">>], emqx_placeholder:proc_sql(ParamsTokens1, Selected)).
t_preproc_tmpl_deep(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.0.7
version: 5.0.8
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.0.7
appVersion: 5.0.8

View File

@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do
{:esockd, github: "emqx/esockd", tag: "5.9.4", override: true},
{:ekka, github: "emqx/ekka", tag: "0.13.5", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
{:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
{:ecpool, github: "emqx/ecpool", tag: "0.5.2"},
{:replayq, "0.3.4", override: true},

View File

@ -56,7 +56,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.5"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
, {replayq, "0.3.4"}