diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index e71b3b9f4..7a6ec9810 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,10 +28,10 @@ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.1"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 932afca15..d9da5da9f 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -16,6 +16,8 @@ -module(emqx_banned). +-feature(maybe_expr, enable). + -behaviour(gen_server). -behaviour(emqx_db_backup). @@ -49,6 +51,7 @@ handle_call/3, handle_cast/2, handle_info/2, + handle_continue/2, terminate/2, code_change/3 ]). @@ -137,7 +140,7 @@ format(#banned{ until => to_rfc3339(Until) }. --spec parse(map()) -> emqx_types:banned() | {error, term()}. +-spec parse(map()) -> {ok, emqx_types:banned()} | {error, term()}. parse(Params) -> case parse_who(Params) of {error, Reason} -> @@ -149,13 +152,13 @@ parse(Params) -> Until = maps:get(<<"until">>, Params, At + ?EXPIRATION_TIME), case Until > erlang:system_time(second) of true -> - #banned{ + {ok, #banned{ who = Who, by = By, reason = Reason, at = At, until = Until - }; + }}; false -> ErrorReason = io_lib:format("Cannot create expired banned, ~p to ~p", [At, Until]), @@ -239,12 +242,139 @@ who(peerhost_net, CIDR) when is_tuple(CIDR) -> {peerhost_net, CIDR}; who(peerhost_net, CIDR) when is_binary(CIDR) -> {peerhost_net, esockd_cidr:parse(binary_to_list(CIDR), true)}. +%%-------------------------------------------------------------------- +%% Import From CSV +%%-------------------------------------------------------------------- +init_from_csv(undefined) -> + ok; +init_from_csv(File) -> + maybe + core ?= mria_rlog:role(), + '$end_of_table' ?= mnesia:dirty_first(?BANNED_RULE_TAB), + '$end_of_table' ?= mnesia:dirty_first(?BANNED_INDIVIDUAL_TAB), + {ok, Bin} ?= file:read_file(File), + Stream = emqx_utils_stream:csv(Bin, #{nullable => true, filter_null => true}), + {ok, List} ?= parse_stream(Stream), + import_from_stream(List), + ?SLOG(info, #{ + msg => "load_banned_bootstrap_file_succeeded", + file => File + }) + else + replicant -> + ok; + {Name, _} when + Name == peerhost; + Name == peerhost_net; + Name == clientid_re; + Name == username_re; + Name == clientid; + Name == username + -> + ok; + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "load_banned_bootstrap_file_failed", + reason => Reason, + file => File + }), + Error + end. + +import_from_stream(Stream) -> + Groups = maps:groups_from_list( + fun(#banned{who = Who}) -> table(Who) end, Stream + ), + maps:foreach( + fun(Tab, Items) -> + Trans = fun() -> + lists:foreach( + fun(Item) -> + mnesia:write(Tab, Item, write) + end, + Items + ) + end, + + case trans(Trans) of + {ok, _} -> + ?SLOG(info, #{ + msg => "import_banned_from_stream_succeeded", + items => Items + }); + {error, Reason} -> + ?SLOG(error, #{ + msg => "import_banned_from_stream_failed", + reason => Reason, + items => Items + }) + end + end, + Groups + ). + +parse_stream(Stream) -> + try + List = emqx_utils_stream:consume(Stream), + parse_stream(List, [], []) + catch + error:Reason -> + {error, Reason} + end. + +parse_stream([Item | List], Ok, Error) -> + maybe + {ok, Item1} ?= normalize_parse_item(Item), + {ok, Banned} ?= parse(Item1), + parse_stream(List, [Banned | Ok], Error) + else + {error, _} -> + parse_stream(List, Ok, [Item | Error]) + end; +parse_stream([], Ok, []) -> + {ok, Ok}; +parse_stream([], Ok, Error) -> + ?SLOG(warning, #{ + msg => "invalid_banned_items", + items => Error + }), + {ok, Ok}. + +normalize_parse_item(#{<<"as">> := As} = Item) -> + ParseTime = fun(Name, Input) -> + maybe + #{Name := Time} ?= Input, + {ok, Epoch} ?= emqx_utils_calendar:to_epoch_second(emqx_utils_conv:str(Time)), + {ok, Input#{Name := Epoch}} + else + {error, _} = Error -> + Error; + NoTime when is_map(NoTime) -> + {ok, NoTime} + end + end, + + maybe + {ok, Type} ?= emqx_utils:safe_to_existing_atom(As), + {ok, Item1} ?= ParseTime(<<"at">>, Item#{<<"as">> := Type}), + ParseTime(<<"until">>, Item1) + end; +normalize_parse_item(_Item) -> + {error, invalid_item}. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> - {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. + {ok, ensure_expiry_timer(#{expiry_timer => undefined}), {continue, init_from_csv}}. + +handle_continue(init_from_csv, State) -> + File = emqx_schema:naive_env_interpolation( + emqx:get_config([banned, bootstrap_file], undefined) + ), + _ = init_from_csv(File), + {noreply, State}. handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), @@ -255,7 +385,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - _ = mria:transaction(?COMMON_SHARD, fun ?MODULE:expire_banned_items/1, [ + _ = trans(fun ?MODULE:expire_banned_items/1, [ erlang:system_time(second) ]), {noreply, ensure_expiry_timer(State), hibernate}; @@ -396,3 +526,15 @@ on_banned(_) -> all_rules() -> ets:tab2list(?BANNED_RULE_TAB). + +trans(Fun) -> + case mria:transaction(?COMMON_SHARD, Fun) of + {atomic, Res} -> {ok, Res}; + {aborted, Reason} -> {error, Reason} + end. + +trans(Fun, Args) -> + case mria:transaction(?COMMON_SHARD, Fun, Args) of + {atomic, Res} -> {ok, Res}; + {aborted, Reason} -> {error, Reason} + end. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 8a10a7be2..32c92c8f0 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -212,16 +212,29 @@ short_paths_fields() -> short_paths_fields(Importance) -> [ {Name, - ?HOCON(rate_type(), #{ - desc => ?DESC(Name), - required => false, - importance => Importance, - example => Example - })} + ?HOCON( + rate_type(), + maps:merge( + #{ + desc => ?DESC(Name), + required => false, + importance => Importance, + example => Example + }, + short_paths_fields_extra(Name) + ) + )} || {Name, Example} <- lists:zip(short_paths(), [<<"1000/s">>, <<"1000/s">>, <<"100MB/s">>]) ]. +short_paths_fields_extra(max_conn_rate) -> + #{ + default => infinity + }; +short_paths_fields_extra(_Name) -> + #{}. + desc(limiter) -> "Settings for the rate limiter."; desc(node_opts) -> diff --git a/apps/emqx/src/emqx_passwd.erl b/apps/emqx/src/emqx_passwd.erl index c243442ba..dc3622411 100644 --- a/apps/emqx/src/emqx_passwd.erl +++ b/apps/emqx/src/emqx_passwd.erl @@ -102,7 +102,11 @@ hash({SimpleHash, _Salt, disable}, Password) when is_binary(Password) -> hash({SimpleHash, Salt, prefix}, Password) when is_binary(Password), is_binary(Salt) -> hash_data(SimpleHash, <>); hash({SimpleHash, Salt, suffix}, Password) when is_binary(Password), is_binary(Salt) -> - hash_data(SimpleHash, <>). + hash_data(SimpleHash, <>); +hash({_SimpleHash, Salt, _SaltPos}, _Password) when not is_binary(Salt) -> + error({salt_not_string, Salt}); +hash({_SimpleHash, _Salt, _SaltPos}, Password) when not is_binary(Password) -> + error({password_not_string, Password}). -spec hash_data(hash_type(), binary()) -> binary(). hash_data(plain, Data) when is_binary(Data) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 9dfd6a17f..db1d5350f 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -63,6 +63,7 @@ -type json_binary() :: binary(). -type template() :: binary(). -type template_str() :: string(). +-type binary_kv() :: #{binary() => binary()}. -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). @@ -167,7 +168,8 @@ json_binary/0, port_number/0, template/0, - template_str/0 + template_str/0, + binary_kv/0 ]). -export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]). @@ -319,6 +321,11 @@ roots(low) -> sc( ref("crl_cache"), #{importance => ?IMPORTANCE_HIDDEN} + )}, + {banned, + sc( + ref("banned"), + #{importance => ?IMPORTANCE_HIDDEN} )} ]. @@ -1762,6 +1769,17 @@ fields("client_attrs_init") -> desc => ?DESC("client_attrs_init_set_as_attr"), validator => fun restricted_string/1 })} + ]; +fields("banned") -> + [ + {bootstrap_file, + sc( + binary(), + #{ + desc => ?DESC("banned_bootstrap_file"), + require => false + } + )} ]. compile_variform(undefined, _Opts) -> @@ -2101,6 +2119,8 @@ desc(durable_storage) -> ?DESC(durable_storage); desc("client_attrs_init") -> ?DESC(client_attrs_init); +desc("banned") -> + "Banned ."; desc(_) -> undefined. diff --git a/apps/emqx/test/data/banned/error.csv b/apps/emqx/test/data/banned/error.csv new file mode 100644 index 000000000..49046a5a3 --- /dev/null +++ b/apps/emqx/test/data/banned/error.csv @@ -0,0 +1,4 @@ +as,who,reason,at,until,by +clientid,c1,right,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot +username,u1,reason 1,abc,2025-10-25T21:53:47+08:00,boot +usernamx,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot diff --git a/apps/emqx/test/data/banned/full.csv b/apps/emqx/test/data/banned/full.csv new file mode 100644 index 000000000..f6abf9de2 --- /dev/null +++ b/apps/emqx/test/data/banned/full.csv @@ -0,0 +1,3 @@ +as,who,reason,at,until,by +clientid,c1,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot +username,u1,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot diff --git a/apps/emqx/test/data/banned/full2.csv b/apps/emqx/test/data/banned/full2.csv new file mode 100644 index 000000000..e033d2e60 --- /dev/null +++ b/apps/emqx/test/data/banned/full2.csv @@ -0,0 +1,3 @@ +as,who,reason,at,until,by +clientid,c2,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot +username,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot diff --git a/apps/emqx/test/data/banned/omitted.csv b/apps/emqx/test/data/banned/omitted.csv new file mode 100644 index 000000000..5db11d7cf --- /dev/null +++ b/apps/emqx/test/data/banned/omitted.csv @@ -0,0 +1,3 @@ +as,who,reason,at,until,by +clientid,c1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00, +username,u1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00, diff --git a/apps/emqx/test/data/banned/optional.csv b/apps/emqx/test/data/banned/optional.csv new file mode 100644 index 000000000..4e752e29a --- /dev/null +++ b/apps/emqx/test/data/banned/optional.csv @@ -0,0 +1,3 @@ +as,who +clientid,c1 +username,u1 diff --git a/apps/emqx/test/emqx_banned_SUITE.erl b/apps/emqx/test/emqx_banned_SUITE.erl index e941e24a8..f62c8e5d3 100644 --- a/apps/emqx/test/emqx_banned_SUITE.erl +++ b/apps/emqx/test/emqx_banned_SUITE.erl @@ -254,6 +254,45 @@ t_session_taken(_) -> {ok, #{}, [0]} = emqtt:unsubscribe(C3, Topic), ok = emqtt:disconnect(C3). +t_full_bootstrap_file(_) -> + emqx_banned:clear(), + ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full.csv">>))), + FullDatas = lists:sort([ + {banned, {username, <<"u1">>}, <<"boot">>, <<"reason 2">>, 1635170027, 1761400427}, + {banned, {clientid, <<"c1">>}, <<"boot">>, <<"reason 1">>, 1635170027, 1761400427} + ]), + ?assertMatch(FullDatas, lists:sort(get_banned_list())), + + ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full2.csv">>))), + ?assertMatch(FullDatas, lists:sort(get_banned_list())), + ok. + +t_optional_bootstrap_file(_) -> + emqx_banned:clear(), + ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"optional.csv">>))), + Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]), + ?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])), + ok. + +t_omitted_bootstrap_file(_) -> + emqx_banned:clear(), + ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"omitted.csv">>))), + Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]), + ?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])), + ok. + +t_error_bootstrap_file(_) -> + emqx_banned:clear(), + ?assertEqual( + {error, enoent}, emqx_banned:init_from_csv(mk_bootstrap_file(<<"not_exists.csv">>)) + ), + ?assertEqual( + ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"error.csv">>)) + ), + Keys = [{clientid, <<"c1">>}], + ?assertMatch(Keys, [element(2, Data) || Data <- get_banned_list()]), + ok. + receive_messages(Count) -> receive_messages(Count, []). receive_messages(0, Msgs) -> @@ -269,3 +308,17 @@ receive_messages(Count, Msgs) -> after 1200 -> Msgs end. + +mk_bootstrap_file(File) -> + Dir = code:lib_dir(emqx, test), + filename:join([Dir, <<"data/banned">>, File]). + +get_banned_list() -> + Tabs = emqx_banned:tables(), + lists:foldl( + fun(Tab, Acc) -> + Acc ++ ets:tab2list(Tab) + end, + [], + Tabs + ). diff --git a/apps/emqx/test/emqx_passwd_SUITE.erl b/apps/emqx/test/emqx_passwd_SUITE.erl index fd032bdb1..3078a5805 100644 --- a/apps/emqx/test/emqx_passwd_SUITE.erl +++ b/apps/emqx/test/emqx_passwd_SUITE.erl @@ -124,4 +124,18 @@ t_hash(_) -> false = emqx_passwd:check_pass({pbkdf2, sha, Pbkdf2Salt, 2, BadDKlen}, Pbkdf2, Password), %% Invalid derived_length, pbkdf2 fails - ?assertException(error, _, emqx_passwd:hash({pbkdf2, sha, Pbkdf2Salt, 2, BadDKlen}, Password)). + ?assertException(error, _, emqx_passwd:hash({pbkdf2, sha, Pbkdf2Salt, 2, BadDKlen}, Password)), + + %% invalid salt (not binary) + ?assertException( + error, + {salt_not_string, false}, + emqx_passwd:hash({sha256, false, suffix}, Password) + ), + + %% invalid password (not binary) + ?assertException( + error, + {password_not_string, bad_password_type}, + emqx_passwd:hash({sha256, Salt, suffix}, bad_password_type) + ). diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 28a05ce23..b76c5dd33 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -816,8 +816,8 @@ t_no_limiter_for_listener(_) -> CfgStr = <<>>, ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr), ListenerOpt = emqx:get_config([listeners, tcp, default]), - ?assertEqual( - undefined, + ?assertMatch( + #{connection := #{rate := infinity}}, emqx_limiter_utils:get_listener_opts(ListenerOpt) ). diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz_api_sources.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz_api_sources.erl index cb7c1664f..9d16216a4 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz_api_sources.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz_api_sources.erl @@ -470,7 +470,13 @@ make_result_map(ResList) -> lists:foldl(Fun, {maps:new(), maps:new(), maps:new(), maps:new()}, ResList). restructure_map(#{ - counters := #{deny := Failed, total := Total, allow := Succ, nomatch := Nomatch}, + counters := #{ + ignore := Ignore, + deny := Failed, + total := Total, + allow := Succ, + nomatch := Nomatch + }, rate := #{total := #{current := Rate, last5m := Rate5m, max := RateMax}} }) -> #{ @@ -478,6 +484,7 @@ restructure_map(#{ allow => Succ, deny => Failed, nomatch => Nomatch, + ignore => Ignore, rate => Rate, rate_last5m => Rate5m, rate_max => RateMax diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz_rule.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz_rule.erl index e19080edf..35f585e44 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz_rule.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz_rule.erl @@ -68,7 +68,8 @@ -export_type([ permission_resolution/0, action_condition/0, - topic_condition/0 + topic_condition/0, + rule/0 ]). %%-------------------------------------------------------------------- diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz_rule_raw.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz_rule_raw.erl index a638af550..a02526668 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz_rule_raw.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz_rule_raw.erl @@ -21,7 +21,7 @@ -module(emqx_authz_rule_raw). --export([parse_rule/1, format_rule/1]). +-export([parse_rule/1, parse_and_compile_rules/1, format_rule/1]). -include("emqx_authz.hrl"). @@ -55,6 +55,27 @@ %% API %%-------------------------------------------------------------------- +%% @doc Parse and compile raw ACL rules. +%% If any bad rule is found, `{bad_acl_rule, ..}' is thrown. +-spec parse_and_compile_rules([rule_raw()]) -> [emqx_authz_rule:rule()]. +parse_and_compile_rules(Rules) -> + lists:map( + fun(Rule) -> + case parse_rule(Rule) of + {ok, {Permission, Action, Topics}} -> + try + emqx_authz_rule:compile({Permission, all, Action, Topics}) + catch + throw:Reason -> + throw({bad_acl_rule, Reason}) + end; + {error, Reason} -> + throw({bad_acl_rule, Reason}) + end + end, + Rules + ). + -spec parse_rule(rule_raw()) -> {ok, { emqx_authz_rule:permission_resolution_precompile(), diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz_schema.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz_schema.erl index f7bd59b82..24deb0161 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz_schema.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz_schema.erl @@ -88,6 +88,7 @@ fields("metrics_status_fields") -> fields("metrics") -> [ {"total", ?HOCON(integer(), #{desc => ?DESC("metrics_total")})}, + {"ignore", ?HOCON(integer(), #{desc => ?DESC("ignore")})}, {"allow", ?HOCON(integer(), #{desc => ?DESC("allow")})}, {"deny", ?HOCON(integer(), #{desc => ?DESC("deny")})}, {"nomatch", ?HOCON(float(), #{desc => ?DESC("nomatch")})} diff --git a/apps/emqx_auth/test/data/bad_public_key_file.pem b/apps/emqx_auth/test/data/bad_public_key_file.pem new file mode 100644 index 000000000..526dbf577 --- /dev/null +++ b/apps/emqx_auth/test/data/bad_public_key_file.pem @@ -0,0 +1,3 @@ +-----BEGIN PUBLIC KEY----- +XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX +-----END PUBLIC KEY----- diff --git a/apps/emqx_auth_http/src/emqx_auth_http.app.src b/apps/emqx_auth_http/src/emqx_auth_http.app.src index 608cbef57..9cf62ae15 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.app.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_http, [ {description, "EMQX External HTTP API Authentication and Authorization"}, - {vsn, "0.2.3"}, + {vsn, "0.3.0"}, {registered, []}, {mod, {emqx_auth_http_app, []}}, {applications, [ diff --git a/apps/emqx_auth_http/src/emqx_authn_http.erl b/apps/emqx_auth_http/src/emqx_authn_http.erl index fd8136e49..818e355e5 100644 --- a/apps/emqx_auth_http/src/emqx_authn_http.erl +++ b/apps/emqx_auth_http/src/emqx_authn_http.erl @@ -186,19 +186,7 @@ handle_response(Headers, Body) -> ContentType = proplists:get_value(<<"content-type">>, Headers), case safely_parse_body(ContentType, Body) of {ok, NBody} -> - case maps:get(<<"result">>, NBody, <<"ignore">>) of - <<"allow">> -> - IsSuperuser = emqx_authn_utils:is_superuser(NBody), - Attrs = emqx_authn_utils:client_attrs(NBody), - Result = maps:merge(IsSuperuser, Attrs), - {ok, Result}; - <<"deny">> -> - {error, not_authorized}; - <<"ignore">> -> - ignore; - _ -> - ignore - end; + body_to_auth_data(NBody); {error, Reason} -> ?TRACE_AUTHN_PROVIDER( error, @@ -208,6 +196,91 @@ handle_response(Headers, Body) -> ignore end. +body_to_auth_data(Body) -> + case maps:get(<<"result">>, Body, <<"ignore">>) of + <<"allow">> -> + IsSuperuser = emqx_authn_utils:is_superuser(Body), + Attrs = emqx_authn_utils:client_attrs(Body), + try + ExpireAt = expire_at(Body), + ACL = acl(ExpireAt, Body), + Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]), + {ok, Result} + catch + throw:{bad_acl_rule, Reason} -> + %% it's a invalid token, so ok to log + ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}), + {error, bad_username_or_password}; + throw:Reason -> + ?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}), + {error, bad_username_or_password} + end; + <<"deny">> -> + {error, not_authorized}; + <<"ignore">> -> + ignore; + _ -> + ignore + end. + +merge_maps([]) -> #{}; +merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)). + +%% Return either an empty map, or a map with `expire_at` at millisecond precision +%% Millisecond precision timestamp is required by `auth_expire_at` +%% emqx_channel:schedule_connection_expire/1 +expire_at(Body) -> + case expire_sec(Body) of + undefined -> + #{}; + Sec -> + #{expire_at => erlang:convert_time_unit(Sec, second, millisecond)} + end. + +expire_sec(#{<<"expire_at">> := ExpireTime}) when is_integer(ExpireTime) -> + Now = erlang:system_time(second), + NowMs = erlang:convert_time_unit(Now, second, millisecond), + case ExpireTime < Now of + true -> + throw(#{ + cause => "'expire_at' is in the past.", + system_time => Now, + expire_at => ExpireTime + }); + false when ExpireTime > (NowMs div 2) -> + throw(#{ + cause => "'expire_at' does not appear to be a Unix epoch time in seconds.", + system_time => Now, + expire_at => ExpireTime + }); + false -> + ExpireTime + end; +expire_sec(#{<<"expire_at">> := _}) -> + throw(#{cause => "'expire_at' is not an integer (Unix epoch time in seconds)."}); +expire_sec(_) -> + undefined. + +acl(#{expire_at := ExpireTimeMs}, #{<<"acl">> := Rules}) -> + #{ + acl => #{ + source_for_logging => http, + rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules), + %% It's seconds level precision (like JWT) for authz + %% see emqx_authz_client_info:check/1 + expire => erlang:convert_time_unit(ExpireTimeMs, millisecond, second) + } + }; +acl(_NoExpire, #{<<"acl">> := Rules}) -> + #{ + acl => #{ + source_for_logging => http, + rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules) + } + }; +acl(_, _) -> + #{}. + safely_parse_body(ContentType, Body) -> try parse_body(ContentType, Body) diff --git a/apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl b/apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl index 48c406946..e09d42b3e 100644 --- a/apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl +++ b/apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -define(PATH, [?CONF_NS_ATOM]). @@ -49,6 +50,21 @@ }) ). +-define(SERVER_RESPONSE_WITH_ACL_JSON(ACL), + emqx_utils_json:encode(#{ + result => allow, + acl => ACL + }) +). + +-define(SERVER_RESPONSE_WITH_ACL_JSON(ACL, Expire), + emqx_utils_json:encode(#{ + result => allow, + acl => ACL, + expire_at => Expire + }) +). + -define(SERVER_RESPONSE_URLENCODE(Result, IsSuperuser), list_to_binary( "result=" ++ @@ -510,6 +526,129 @@ test_ignore_allow_deny({ExpectedValue, ServerResponse}) -> ) end. +t_acl(_Config) -> + ACL = acl_rules(), + Config = raw_http_auth_config(), + {ok, _} = emqx:update_config( + ?PATH, + {create_authenticator, ?GLOBAL, Config} + ), + ok = emqx_authn_http_test_server:set_handler( + fun(Req0, State) -> + Req = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + ?SERVER_RESPONSE_WITH_ACL_JSON(ACL), + Req0 + ), + {ok, Req, State} + end + ), + {ok, C} = emqtt:start_link( + [ + {clean_start, true}, + {proto_ver, v5}, + {clientid, <<"clientid">>}, + {username, <<"username">>}, + {password, <<"password">>} + ] + ), + {ok, _} = emqtt:connect(C), + Cases = [ + {allow, <<"http-authn-acl/#">>}, + {deny, <<"http-authn-acl/1">>}, + {deny, <<"t/#">>} + ], + try + lists:foreach( + fun(Case) -> + test_acl(Case, C) + end, + Cases + ) + after + ok = emqtt:disconnect(C) + end. + +t_auth_expire(_Config) -> + ACL = acl_rules(), + Config = raw_http_auth_config(), + {ok, _} = emqx:update_config( + ?PATH, + {create_authenticator, ?GLOBAL, Config} + ), + ExpireSec = 3, + WaitTime = timer:seconds(ExpireSec + 1), + Tests = [ + {<<"ok-to-connect-but-expire-on-pub">>, erlang:system_time(second) + ExpireSec, fun(C) -> + {ok, _} = emqtt:connect(C), + receive + {'DOWN', _Ref, process, C, Reason} -> + ?assertMatch({disconnected, ?RC_NOT_AUTHORIZED, _}, Reason) + after WaitTime -> + error(timeout) + end + end}, + {<<"past">>, erlang:system_time(second) - 1, fun(C) -> + ?assertMatch({error, {bad_username_or_password, _}}, emqtt:connect(C)), + receive + {'DOWN', _Ref, process, C, Reason} -> + ?assertMatch({shutdown, bad_username_or_password}, Reason) + end + end}, + {<<"invalid">>, erlang:system_time(millisecond), fun(C) -> + ?assertMatch({error, {bad_username_or_password, _}}, emqtt:connect(C)), + receive + {'DOWN', _Ref, process, C, Reason} -> + ?assertMatch({shutdown, bad_username_or_password}, Reason) + end + end} + ], + ok = emqx_authn_http_test_server:set_handler( + fun(Req0, State) -> + QS = cowboy_req:parse_qs(Req0), + {_, Username} = lists:keyfind(<<"username">>, 1, QS), + {_, ExpireTime, _} = lists:keyfind(Username, 1, Tests), + Req = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + ?SERVER_RESPONSE_WITH_ACL_JSON(ACL, ExpireTime), + Req0 + ), + {ok, Req, State} + end + ), + lists:foreach(fun test_auth_expire/1, Tests). + +test_auth_expire({Username, _ExpireTime, TestFn}) -> + {ok, C} = emqtt:start_link( + [ + {clean_start, true}, + {proto_ver, v5}, + {clientid, <<"clientid">>}, + {username, Username}, + {password, <<"password">>} + ] + ), + _ = monitor(process, C), + unlink(C), + try + TestFn(C) + after + [ok = emqtt:disconnect(C) || is_process_alive(C)] + end. + +test_acl({allow, Topic}, C) -> + ?assertMatch( + {ok, #{}, [0]}, + emqtt:subscribe(C, Topic) + ); +test_acl({deny, Topic}, C) -> + ?assertMatch( + {ok, #{}, [?RC_NOT_AUTHORIZED]}, + emqtt:subscribe(C, Topic) + ). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ @@ -874,3 +1013,27 @@ to_list(B) when is_binary(B) -> binary_to_list(B); to_list(L) when is_list(L) -> L. + +acl_rules() -> + [ + #{ + <<"permission">> => <<"allow">>, + <<"action">> => <<"pub">>, + <<"topics">> => [ + <<"http-authn-acl/1">> + ] + }, + #{ + <<"permission">> => <<"allow">>, + <<"action">> => <<"sub">>, + <<"topics">> => + [ + <<"eq http-authn-acl/#">> + ] + }, + #{ + <<"permission">> => <<"deny">>, + <<"action">> => <<"all">>, + <<"topics">> => [<<"#">>] + } + ]. diff --git a/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl b/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl index e07a9f69e..057478f12 100644 --- a/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl @@ -48,7 +48,7 @@ init_per_suite(Config) -> emqx_auth, emqx_auth_http ], - #{work_dir => ?config(priv_dir, Config)} + #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{suite_apps, Apps} | Config]. @@ -56,12 +56,22 @@ end_per_suite(_Config) -> ok = emqx_authz_test_lib:restore_authorizers(), emqx_cth_suite:stop(?config(suite_apps, _Config)). -init_per_testcase(_Case, Config) -> +init_per_testcase(t_bad_response = TestCase, Config) -> + TCApps = emqx_cth_suite:start_apps( + [emqx_management, emqx_mgmt_api_test_util:emqx_dashboard()], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), + init_per_testcase(common, [{tc_apps, TCApps} | Config]); +init_per_testcase(_TestCase, Config) -> ok = emqx_authz_test_lib:reset_authorizers(), {ok, _} = emqx_authz_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH), Config. -end_per_testcase(_Case, _Config) -> +end_per_testcase(t_bad_response, Config) -> + TCApps = ?config(tc_apps, Config), + emqx_cth_suite:stop_apps(TCApps), + end_per_testcase(common, Config); +end_per_testcase(_TestCase, _Config) -> _ = emqx_authz:set_feature_available(rich_actions, true), try ok = emqx_authz_http_test_server:stop() @@ -589,6 +599,29 @@ t_bad_response(_Config) -> }, get_metrics() ), + ?assertMatch( + {200, #{ + <<"metrics">> := #{ + <<"ignore">> := 1, + <<"nomatch">> := 0, + <<"allow">> := 0, + <<"deny">> := 0, + <<"total">> := 1 + }, + <<"node_metrics">> := [ + #{ + <<"metrics">> := #{ + <<"ignore">> := 1, + <<"nomatch">> := 0, + <<"allow">> := 0, + <<"deny">> := 0, + <<"total">> := 1 + } + } + ] + }}, + get_status_api() + ), ok. t_no_value_for_placeholder(_Config) -> @@ -806,3 +839,11 @@ get_metrics() -> 'authorization.nomatch' ] ). + +get_status_api() -> + Path = emqx_mgmt_api_test_util:uri(["authorization", "sources", "http", "status"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + Res0 = emqx_mgmt_api_test_util:request_api(get, Path, _QParams = [], Auth, _Body = [], Opts), + {Status, RawBody} = emqx_mgmt_api_test_util:simplify_result(Res0), + {Status, emqx_utils_json:decode(RawBody, [return_maps])}. diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwks_client.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwks_client.erl index dc03cafef..f574a421f 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwks_client.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwks_client.erl @@ -133,11 +133,13 @@ code_change(_OldVsn, State, _Extra) -> handle_options(#{ endpoint := Endpoint, + headers := Headers, refresh_interval := RefreshInterval0, ssl_opts := SSLOpts }) -> #{ endpoint => Endpoint, + headers => to_httpc_headers(Headers), refresh_interval => limit_refresh_interval(RefreshInterval0), ssl_opts => maps:to_list(SSLOpts), jwks => [], @@ -147,6 +149,7 @@ handle_options(#{ refresh_jwks( #{ endpoint := Endpoint, + headers := Headers, ssl_opts := SSLOpts } = State ) -> @@ -159,7 +162,7 @@ refresh_jwks( case httpc:request( get, - {Endpoint, [{"Accept", "application/json"}]}, + {Endpoint, Headers}, HTTPOpts, [{body_format, binary}, {sync, false}, {receiver, self()}] ) @@ -185,6 +188,9 @@ limit_refresh_interval(Interval) when Interval < 10 -> limit_refresh_interval(Interval) -> Interval. +to_httpc_headers(Headers) -> + [{binary_to_list(bin(K)), V} || {K, V} <- maps:to_list(Headers)]. + cancel_http_request(#{request_id := undefined} = State) -> State; cancel_http_request(#{request_id := RequestID} = State) -> @@ -195,3 +201,10 @@ cancel_http_request(#{request_id := RequestID} = State) -> ok end, State#{request_id => undefined}. + +bin(List) when is_list(List) -> + unicode:characters_to_binary(List, utf8); +bin(Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom); +bin(Bin) when is_binary(Bin) -> + Bin. diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl index f582d4ae9..fff7d056e 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl @@ -19,6 +19,7 @@ -include_lib("emqx_auth/include/emqx_authn.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). +-include_lib("jose/include/jose_jwk.hrl"). -export([ create/2, @@ -40,7 +41,7 @@ create(_AuthenticatorID, Config) -> create(Config). create(#{verify_claims := VerifyClaims} = Config) -> - create2(Config#{verify_claims => handle_verify_claims(VerifyClaims)}). + do_create(Config#{verify_claims => handle_verify_claims(VerifyClaims)}). update( #{use_jwks := false} = Config, @@ -85,6 +86,7 @@ authenticate( } ) -> JWT = maps:get(From, Credential), + %% XXX: Only supports single public key JWKs = [JWK], VerifyClaims = render_expected(VerifyClaims0, Credential), verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire); @@ -121,7 +123,7 @@ destroy(_) -> %% Internal functions %%-------------------------------------------------------------------- -create2(#{ +do_create(#{ use_jwks := false, algorithm := 'hmac-based', secret := Secret0, @@ -144,24 +146,35 @@ create2(#{ from => From }} end; -create2(#{ - use_jwks := false, - algorithm := 'public-key', - public_key := PublicKey, - verify_claims := VerifyClaims, - disconnect_after_expire := DisconnectAfterExpire, - acl_claim_name := AclClaimName, - from := From -}) -> - JWK = create_jwk_from_public_key(PublicKey), - {ok, #{ - jwk => JWK, - verify_claims => VerifyClaims, - disconnect_after_expire => DisconnectAfterExpire, - acl_claim_name => AclClaimName, - from => From - }}; -create2( +do_create( + #{ + use_jwks := false, + algorithm := 'public-key', + public_key := PublicKey, + verify_claims := VerifyClaims, + disconnect_after_expire := DisconnectAfterExpire, + acl_claim_name := AclClaimName, + from := From + } = Config +) -> + case + create_jwk_from_public_key( + maps:get(enable, Config, false), + PublicKey + ) + of + {ok, JWK} -> + {ok, #{ + jwk => JWK, + verify_claims => VerifyClaims, + disconnect_after_expire => DisconnectAfterExpire, + acl_claim_name => AclClaimName, + from => From + }}; + {error, _Reason} = Err -> + Err + end; +do_create( #{ use_jwks := true, verify_claims := VerifyClaims, @@ -185,9 +198,23 @@ create2( from => From }}. -create_jwk_from_public_key(PublicKey) when +create_jwk_from_public_key(true, PublicKey) when is_binary(PublicKey); is_list(PublicKey) -> + try do_create_jwk_from_public_key(PublicKey) of + %% XXX: Only supports single public key + #jose_jwk{} = Res -> + {ok, Res}; + _ -> + {error, invalid_public_key} + catch + _:_ -> + {error, invalid_public_key} + end; +create_jwk_from_public_key(false, _PublicKey) -> + {ok, []}. + +do_create_jwk_from_public_key(PublicKey) -> case filelib:is_file(PublicKey) of true -> jose_jwk:from_pem_file(PublicKey); @@ -384,20 +411,7 @@ binary_to_number(Bin) -> parse_rules(Rules) when is_map(Rules) -> Rules; parse_rules(Rules) when is_list(Rules) -> - lists:map(fun parse_rule/1, Rules). - -parse_rule(Rule) -> - case emqx_authz_rule_raw:parse_rule(Rule) of - {ok, {Permission, Action, Topics}} -> - try - emqx_authz_rule:compile({Permission, all, Action, Topics}) - catch - throw:Reason -> - throw({bad_acl_rule, Reason}) - end; - {error, Reason} -> - throw({bad_acl_rule, Reason}) - end. + emqx_authz_rule_raw:parse_and_compile_rules(Rules). merge_maps([]) -> #{}; merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)). diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl index 274f2b72b..f0d5ede37 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl @@ -100,6 +100,15 @@ fields(jwt_jwks) -> [ {use_jwks, sc(hoconsc:enum([true]), #{required => true, desc => ?DESC(use_jwks)})}, {endpoint, fun endpoint/1}, + {headers, + sc( + typerefl:alias("map", emqx_schema:binary_kv()), + #{ + default => #{<<"Accept">> => <<"application/json">>}, + validator => fun validate_headers/1, + desc => ?DESC("jwks_headers") + } + )}, {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {refresh_interval, fun refresh_interval/1}, {ssl, #{ @@ -235,3 +244,26 @@ to_binary(B) when is_binary(B) -> B. sc(Type, Meta) -> hoconsc:mk(Type, Meta). + +validate_headers(undefined) -> + ok; +validate_headers(Headers) -> + BadKeys0 = + lists:filter( + fun(K) -> + re:run(K, <<"[^-0-9a-zA-Z_ ]">>, [{capture, none}]) =:= match + end, + maps:keys(Headers) + ), + case BadKeys0 of + [] -> + ok; + _ -> + BadKeys = lists:join(", ", BadKeys0), + Msg0 = io_lib:format( + "headers should contain only characters matching [-0-9a-zA-Z_ ]; bad headers: ~s", + [BadKeys] + ), + Msg = iolist_to_binary(Msg0), + {error, Msg} + end. diff --git a/apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl b/apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl index 89336e396..7e1197dd4 100644 --- a/apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl +++ b/apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl @@ -22,18 +22,21 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). -define(AUTHN_ID, <<"mechanism:jwt">>). -define(JWKS_PORT, 31333). -define(JWKS_PATH, "/jwks.json"). +-import(emqx_common_test_helpers, [on_exit/1]). + all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_jwt], #{ - work_dir => ?config(priv_dir, Config) + work_dir => emqx_cth_suite:work_dir(Config) }), [{apps, Apps} | Config]. @@ -41,6 +44,10 @@ end_per_suite(Config) -> ok = emqx_cth_suite:stop(?config(apps, Config)), ok. +end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(), + ok. + %%------------------------------------------------------------------------------ %% Tests %%------------------------------------------------------------------------------ @@ -178,6 +185,7 @@ t_public_key(_) -> from => password, acl_claim_name => <<"acl">>, use_jwks => false, + enable => true, algorithm => 'public-key', public_key => PublicKey, verify_claims => [], @@ -199,6 +207,51 @@ t_public_key(_) -> ?assertEqual(ok, emqx_authn_jwt:destroy(State)), ok. +t_bad_public_keys(_) -> + BaseConfig = #{ + mechanism => jwt, + from => password, + acl_claim_name => <<"acl">>, + use_jwks => false, + algorithm => 'public-key', + verify_claims => [], + disconnect_after_expire => false + }, + + %% try create with invalid public key + ?assertMatch( + {error, invalid_public_key}, + emqx_authn_jwt:create(?AUTHN_ID, BaseConfig#{ + enable => true, + public_key => <<"bad_public_key">> + }) + ), + + %% no such file + ?assertMatch( + {error, invalid_public_key}, + emqx_authn_jwt:create(?AUTHN_ID, BaseConfig#{ + enable => true, + public_key => data_file("bad_flie_path.pem") + }) + ), + + %% bad public key file content + ?assertMatch( + {error, invalid_public_key}, + emqx_authn_jwt:create(?AUTHN_ID, BaseConfig#{ + enable => true, + public_key => data_file("bad_public_key_file.pem") + }) + ), + + %% assume jwk authenticator is disabled + {ok, State} = + emqx_authn_jwt:create(?AUTHN_ID, BaseConfig#{public_key => <<"bad_public_key">>}), + + ?assertEqual(ok, emqx_authn_jwt:destroy(State)), + ok. + t_jwt_in_username(_) -> Secret = <<"abcdef">>, Config = #{ @@ -276,6 +329,7 @@ t_jwks_renewal(_Config) -> disconnect_after_expire => false, use_jwks => true, endpoint => "https://127.0.0.1:" ++ integer_to_list(?JWKS_PORT + 1) ++ ?JWKS_PATH, + headers => #{<<"Accept">> => <<"application/json">>}, refresh_interval => 1000, pool_size => 1 }, @@ -360,6 +414,102 @@ t_jwks_renewal(_Config) -> ?assertEqual(ok, emqx_authn_jwt:destroy(State2)), ok = emqx_authn_http_test_server:stop(). +t_jwks_custom_headers(_Config) -> + {ok, _} = emqx_authn_http_test_server:start_link(?JWKS_PORT, ?JWKS_PATH, server_ssl_opts()), + on_exit(fun() -> ok = emqx_authn_http_test_server:stop() end), + ok = emqx_authn_http_test_server:set_handler(jwks_handler_spy()), + + PrivateKey = test_rsa_key(private), + Payload = #{ + <<"username">> => <<"myuser">>, + <<"foo">> => <<"myuser">>, + <<"exp">> => erlang:system_time(second) + 10 + }, + Endpoint = iolist_to_binary("https://127.0.0.1:" ++ integer_to_list(?JWKS_PORT) ++ ?JWKS_PATH), + Config0 = #{ + <<"mechanism">> => <<"jwt">>, + <<"use_jwks">> => true, + <<"from">> => <<"password">>, + <<"endpoint">> => Endpoint, + <<"headers">> => #{ + <<"Accept">> => <<"application/json">>, + <<"Content-Type">> => <<>>, + <<"foo">> => <<"bar">> + }, + <<"pool_size">> => 1, + <<"refresh_interval">> => 1_000, + <<"ssl">> => #{ + <<"keyfile">> => cert_file("client.key"), + <<"certfile">> => cert_file("client.crt"), + <<"cacertfile">> => cert_file("ca.crt"), + <<"enable">> => true, + <<"verify">> => <<"verify_peer">>, + <<"server_name_indication">> => <<"authn-server">> + }, + <<"verify_claims">> => #{<<"foo">> => <<"${username}">>} + }, + {ok, Config} = hocon:binary(hocon_pp:do(Config0, #{})), + ChainName = 'mqtt:global', + AuthenticatorId = <<"jwt">>, + ?check_trace( + #{timetrap => 10_000}, + begin + %% bad header keys + BadConfig1 = emqx_utils_maps:deep_put( + [<<"headers">>, <<"ça-va"/utf8>>], Config, <<"bien">> + ), + ?assertMatch( + {error, #{ + kind := validation_error, + reason := <<"headers should contain only characters matching ", _/binary>> + }}, + emqx_authn_api:update_config( + [authentication], + {create_authenticator, ChainName, BadConfig1} + ) + ), + BadConfig2 = emqx_utils_maps:deep_put( + [<<"headers">>, <<"test_哈哈"/utf8>>], + Config, + <<"test_haha">> + ), + ?assertMatch( + {error, #{ + kind := validation_error, + reason := <<"headers should contain only characters matching ", _/binary>> + }}, + emqx_authn_api:update_config( + [authentication], + {create_authenticator, ChainName, BadConfig2} + ) + ), + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqx_authn_api:update_config( + [authentication], + {create_authenticator, ChainName, Config} + ), + #{?snk_kind := jwks_endpoint_response}, + 5_000 + ), + ?assertReceive( + {http_request, #{ + headers := #{ + <<"accept">> := <<"application/json">>, + <<"foo">> := <<"bar">> + } + }} + ), + {ok, _} = emqx_authn_api:update_config( + [authentication], + {delete_authenticator, ChainName, AuthenticatorId} + ), + ok + end, + [] + ), + ok. + t_verify_claims(_) -> Secret = <<"abcdef">>, Config0 = #{ @@ -613,6 +763,16 @@ jwks_handler(Req0, State) -> ), {ok, Req, State}. +jwks_handler_spy() -> + TestPid = self(), + fun(Req, State) -> + ReqHeaders = cowboy_req:headers(Req), + ReqMap = #{headers => ReqHeaders}, + ct:pal("jwks request:\n ~p", [ReqMap]), + TestPid ! {http_request, ReqMap}, + jwks_handler(Req, State) + end. + test_rsa_key(public) -> data_file("public_key.pem"); test_rsa_key(private) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index 7273df000..e510dda7f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -216,7 +216,7 @@ t_create_with_bad_name(_Config) -> ok. t_create_with_bad_name_root(_Config) -> - BadBridgeName = <<"test_哈哈">>, + BadBridgeName = <<"test_哈哈"/utf8>>, BridgeConf = #{ <<"bridge_mode">> => false, <<"clean_start">> => true, diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 08b3270ea..6b160f3b3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1431,7 +1431,7 @@ t_cluster_later_join_metrics(Config) -> t_create_with_bad_name(Config) -> Port = ?config(port, Config), URL1 = ?URL(Port, "path1"), - Name = <<"test_哈哈">>, + Name = <<"test_哈哈"/utf8>>, BadBridgeParams = emqx_utils_maps:deep_merge( ?HTTP_BRIDGE(URL1, Name), @@ -1457,7 +1457,7 @@ t_create_with_bad_name(Config) -> ?assertMatch( #{ <<"kind">> := <<"validation_error">>, - <<"reason">> := <<"Invalid name format.", _/binary>> + <<"reason">> := <<"invalid_map_key">> }, Msg ), diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index c0b8acc0d..0fc13ffb0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -604,8 +604,7 @@ deprecated_config() -> t_name_too_long(_Config) -> LongName = list_to_binary(lists:duplicate(256, $a)), ?assertMatch( - {error, - {{_, 400, _}, _, #{<<"message">> := #{<<"reason">> := <<"Name is too long", _/binary>>}}}}, + {error, {{_, 400, _}, _, #{<<"message">> := #{<<"reason">> := <<"invalid_map_key">>}}}}, create_bridge_http_api_v1(#{name => LongName}) ), ok. @@ -942,7 +941,7 @@ t_scenario_2(Config) -> ok. t_create_with_bad_name(_Config) -> - BadBridgeName = <<"test_哈哈">>, + BadBridgeName = <<"test_哈哈"/utf8>>, %% Note: must contain SSL options to trigger bug. Cacertfile = emqx_common_test_helpers:app_path( emqx, @@ -960,7 +959,7 @@ t_create_with_bad_name(_Config) -> <<"code">> := <<"BAD_REQUEST">>, <<"message">> := #{ <<"kind">> := <<"validation_error">>, - <<"reason">> := <<"Invalid name format.", _/binary>> + <<"reason">> := <<"invalid_map_key">> } }}} = create_bridge_http_api_v1(Opts), ok. diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index f671c90df..a88cddf76 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -728,8 +728,8 @@ t_prepared_statement_exists(Config) -> emqx_common_test_helpers:on_exit(fun() -> meck:unload() end), - MeckOpts = [passthrough, no_link, no_history, non_strict], - meck:new(emqx_postgresql, MeckOpts), + MeckOpts = [passthrough, no_link, no_history], + meck:new(epgsql, MeckOpts), InsertPrepStatementDupAndThenRemoveMeck = fun(Conn, Key, SQL, List) -> meck:passthrough([Conn, Key, SQL, List]), @@ -795,6 +795,7 @@ t_prepared_statement_exists(Config) -> ok end ), + meck:unload(), ok. t_table_removed(Config) -> diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index 1b210e7fb..fbdece6ff 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -275,7 +275,7 @@ t_create_with_bad_name_root_path({'end', _Config}) -> ok; t_create_with_bad_name_root_path(_Config) -> Path = [connectors], - BadConnectorName = <<"test_哈哈">>, + BadConnectorName = <<"test_哈哈"/utf8>>, ConnConfig0 = connector_config(), %% Note: must contain SSL options to trigger original bug. Cacertfile = emqx_common_test_helpers:app_path( diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 7c7bc432c..f3e91ef12 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -697,7 +697,7 @@ t_connectors_probe(Config) -> ok. t_create_with_bad_name(Config) -> - ConnectorName = <<"test_哈哈">>, + ConnectorName = <<"test_哈哈"/utf8>>, Conf0 = ?KAFKA_CONNECTOR(ConnectorName), %% Note: must contain SSL options to trigger original bug. Cacertfile = emqx_common_test_helpers:app_path( diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl index dbc0d7f0b..1d2520d0f 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl @@ -181,12 +181,16 @@ create(#{name_var := NameVar} = Config) -> end. update(Config, State) -> - destroy(State), + destroy(State, false), create(Config). destroy(State) -> + destroy(State, true). + +destroy(State, TryDelete) -> emqx_dashboard_sso_oidc_session:stop(), - try_delete_jwks_file(State). + _ = TryDelete andalso try_delete_jwks_file(State), + ok. -dialyzer({nowarn_function, login/2}). login( diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_session.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_session.erl index b28bcc64d..c5756def9 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_session.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_session.erl @@ -35,6 +35,8 @@ -define(DEFAULT_RANDOM_LEN, 32). -define(NOW, erlang:system_time(millisecond)). +-define(BACKOFF_MIN, 5000). +-define(BACKOFF_MAX, 10000). %%------------------------------------------------------------------------------ %% API @@ -49,7 +51,10 @@ start(Name, #{issuer := Issuer, session_expiry := SessionExpiry0}) -> [ #{ issuer => Issuer, - name => {local, Name} + name => {local, Name}, + backoff_min => ?BACKOFF_MIN, + backoff_max => ?BACKOFF_MAX, + backoff_type => random } ] ) diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 88d249d5e..2dc0414f5 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -57,7 +57,7 @@ -record(state, { %% TCP/SSL/UDP/DTLS Wrapped Socket - socket :: {esockd_transport, esockd:socket()} | {udp, _, _}, + socket :: {esockd_transport, esockd:socket()} | {udp, _, _} | {esockd_udp_proxy, _, _}, %% Peername of the connection peername :: emqx_types:peername(), %% Sockname of the connection @@ -122,6 +122,9 @@ start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) -> start_link(esockd_transport, Sock, Options) -> Socket = {esockd_transport, Sock}, Args = [self(), Socket, undefined, Options] ++ callback_modules(Options), + {ok, proc_lib:spawn_link(?MODULE, init, Args)}; +start_link(Socket = {esockd_udp_proxy, _ProxyId, _Sock}, Peername, Options) -> + Args = [self(), Socket, Peername, Options] ++ callback_modules(Options), {ok, proc_lib:spawn_link(?MODULE, init, Args)}. callback_modules(Options) -> @@ -196,10 +199,14 @@ esockd_peername({udp, _SockPid, _Sock}, Peername) -> Peername; esockd_peername({esockd_transport, Sock}, _Peername) -> {ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]), + Peername; +esockd_peername({esockd_udp_proxy, _ProxyId, _Sock}, Peername) -> Peername. esockd_wait(Socket = {udp, _SockPid, _Sock}) -> {ok, Socket}; +esockd_wait(Socket = {esockd_udp_proxy, _ProxyId, _Sock}) -> + {ok, Socket}; esockd_wait({esockd_transport, Sock}) -> case esockd_transport:wait(Sock) of {ok, NSock} -> {ok, {esockd_transport, NSock}}; @@ -211,29 +218,41 @@ esockd_close({udp, _SockPid, _Sock}) -> %%gen_udp:close(Sock); ok; esockd_close({esockd_transport, Sock}) -> - esockd_transport:fast_close(Sock). + esockd_transport:fast_close(Sock); +esockd_close({esockd_udp_proxy, ProxyId, _Sock}) -> + esockd_udp_proxy:close(ProxyId). esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) -> nossl; esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) -> esockd_transport:ensure_ok_or_exit(Fun, [Sock]); esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) -> - esockd_transport:ensure_ok_or_exit(Fun, [Socket]). + esockd_transport:ensure_ok_or_exit(Fun, [Socket]); +esockd_ensure_ok_or_exit(Fun, {esockd_udp_proxy, _ProxyId, Sock}) -> + esockd_transport:ensure_ok_or_exit(Fun, [Sock]). esockd_type({udp, _, _}) -> udp; esockd_type({esockd_transport, Socket}) -> - esockd_transport:type(Socket). + esockd_transport:type(Socket); +esockd_type({esockd_udp_proxy, _ProxyId, Sock}) when is_port(Sock) -> + udp; +esockd_type({esockd_udp_proxy, _ProxyId, _Sock}) -> + ssl. esockd_setopts({udp, _, _}, _) -> ok; esockd_setopts({esockd_transport, Socket}, Opts) -> %% FIXME: DTLS works?? + esockd_transport:setopts(Socket, Opts); +esockd_setopts({esockd_udp_proxy, _ProxyId, Socket}, Opts) -> esockd_transport:setopts(Socket, Opts). esockd_getstat({udp, _SockPid, Sock}, Stats) -> inet:getstat(Sock, Stats); esockd_getstat({esockd_transport, Sock}, Stats) -> + esockd_transport:getstat(Sock, Stats); +esockd_getstat({esockd_udp_proxy, _ProxyId, Sock}, Stats) -> esockd_transport:getstat(Sock, Stats). esockd_send(Data, #state{ @@ -242,7 +261,9 @@ esockd_send(Data, #state{ }) -> gen_udp:send(Sock, Ip, Port, Data); esockd_send(Data, #state{socket = {esockd_transport, Sock}}) -> - esockd_transport:send(Sock, Data). + esockd_transport:send(Sock, Data); +esockd_send(Data, #state{socket = {esockd_udp_proxy, ProxyId, _Sock}}) -> + esockd_udp_proxy:send(ProxyId, Data). keepalive_stats(recv) -> emqx_pd:get_counter(recv_pkt); @@ -250,7 +271,8 @@ keepalive_stats(send) -> emqx_pd:get_counter(send_pkt). is_datadram_socket({esockd_transport, _}) -> false; -is_datadram_socket({udp, _, _}) -> true. +is_datadram_socket({udp, _, _}) -> true; +is_datadram_socket({esockd_udp_proxy, _ProxyId, Sock}) -> erlang:is_port(Sock). %%-------------------------------------------------------------------- %% callbacks @@ -461,6 +483,21 @@ handle_msg({'$gen_cast', Req}, State) -> with_channel(handle_cast, [Req], State); handle_msg({datagram, _SockPid, Data}, State) -> parse_incoming(Data, State); +handle_msg( + {{esockd_udp_proxy, _ProxyId, _Socket} = NSock, Data, Packets}, + State = #state{ + chann_mod = ChannMod, + channel = Channel + } +) -> + ?SLOG(debug, #{msg => "RECV_data", data => Data}), + Oct = iolist_size(Data), + inc_counter(incoming_bytes, Oct), + Ctx = ChannMod:info(ctx, Channel), + ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct), + + NState = State#state{socket = NSock}, + {ok, next_incoming_msgs(Packets), NState}; handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl @@ -506,6 +543,9 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> handle_msg({close, Reason}, State) -> ?tp(debug, force_socket_close, #{reason => Reason}), handle_info({sock_closed, Reason}, close_socket(State)); +handle_msg(udp_proxy_closed, State) -> + ?tp(debug, udp_proxy_closed, #{reason => normal}), + handle_info({sock_closed, normal}, close_socket(State)); handle_msg( {event, connected}, State = #state{ diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 00555c67a..11488d1a3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -139,6 +139,16 @@ fields(websocket) -> fields(udp_listener) -> [ %% some special configs for udp listener + {health_check, + sc( + ref(udp_health_check), + #{ + desc => ?DESC( + udp_health_check + ), + required => false + } + )} ] ++ udp_opts() ++ common_listener_opts(); @@ -175,7 +185,12 @@ fields(dtls_opts) -> versions => dtls_all_available }, _IsRanchListener = false - ). + ); +fields(udp_health_check) -> + [ + {request, sc(binary(), #{desc => ?DESC(udp_health_check_request), required => false})}, + {reply, sc(binary(), #{desc => ?DESC(udp_health_check_reply), required => false})} + ]. desc(gateway) -> "EMQX Gateway configuration root."; @@ -201,6 +216,8 @@ desc(dtls_opts) -> "Settings for DTLS protocol."; desc(websocket) -> "Websocket options"; +desc(udp_health_check) -> + "UDP health check"; desc(_) -> undefined. diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index e6a5be8ab..88a537613 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -151,7 +151,12 @@ find_sup_child(Sup, ChildId) -> {ok, [pid()]} | {error, term()} when - ModCfg :: #{frame_mod := atom(), chann_mod := atom(), connection_mod => atom()}. + ModCfg :: #{ + frame_mod := atom(), + chann_mod := atom(), + connection_mod => atom(), + esockd_proxy_opts => map() + }. start_listeners(Listeners, GwName, Ctx, ModCfg) -> start_listeners(Listeners, GwName, Ctx, ModCfg, []). @@ -519,7 +524,8 @@ esockd_opts(Type, Opts0) when ?IS_ESOCKD_LISTENER(Type) -> max_connections, max_conn_rate, proxy_protocol, - proxy_protocol_timeout + proxy_protocol_timeout, + health_check ], Opts0 ), diff --git a/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl index e09d9356e..7df3a01f4 100644 --- a/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl @@ -92,6 +92,7 @@ end_per_suite(Config) -> %%------------------------------------------------------------------------------ t_case_coap(_) -> + emqx_coap_SUITE:restart_coap_with_connection_mode(false), Login = fun(URI, Checker) -> Action = fun(Channel) -> Req = emqx_coap_SUITE:make_req(post), diff --git a/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl index 923b71ced..23702db7d 100644 --- a/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl @@ -98,7 +98,8 @@ t_case_coap_publish(_) -> end, Case = fun(Channel, Token) -> Fun(Channel, Token, <<"/publish">>, ?checkMatch({ok, changed, _})), - Fun(Channel, Token, <<"/badpublish">>, ?checkMatch({error, uauthorized})) + Fun(Channel, Token, <<"/badpublish">>, ?checkMatch({error, uauthorized})), + true end, Mod:with_connection(Case). @@ -114,7 +115,8 @@ t_case_coap_subscribe(_) -> end, Case = fun(Channel, Token) -> Fun(Channel, Token, <<"/subscribe">>, ?checkMatch({ok, content, _})), - Fun(Channel, Token, <<"/badsubscribe">>, ?checkMatch({error, uauthorized})) + Fun(Channel, Token, <<"/badsubscribe">>, ?checkMatch({error, uauthorized})), + true end, Mod:with_connection(Case). diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index 844677d12..b2aac2a4c 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -430,7 +430,6 @@ check_token( clientinfo = ClientInfo } = Channel ) -> - IsDeleteConn = is_delete_connection_request(Msg), #{clientid := ClientId} = ClientInfo, case emqx_coap_message:extract_uri_query(Msg) of #{ @@ -438,39 +437,18 @@ check_token( <<"token">> := Token } -> call_session(handle_request, Msg, Channel); - #{<<"clientid">> := ReqClientId, <<"token">> := ReqToken} -> - case emqx_gateway_cm:call(coap, ReqClientId, {check_token, ReqToken}) of - undefined when IsDeleteConn -> + Any -> + %% This channel is create by this DELETE command, so here can safely close this channel + case Token =:= undefined andalso is_delete_connection_request(Msg) of + true -> Reply = emqx_coap_message:piggyback({ok, deleted}, Msg), {shutdown, normal, Reply, Channel}; - undefined -> - ?SLOG(info, #{ - msg => "remote_connection_not_found", - clientid => ReqClientId, - token => ReqToken - }), - Reply = emqx_coap_message:reset(Msg), - {shutdown, normal, Reply, Channel}; false -> - ?SLOG(info, #{ - msg => "request_token_invalid", clientid => ReqClientId, token => ReqToken - }), - Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg), - {shutdown, normal, Reply, Channel}; - true -> - %% hack: since each message request can spawn a new connection - %% process, we can't rely on the `inc_incoming_stats' call in - %% `emqx_gateway_conn:handle_incoming' to properly keep track of - %% bumping incoming requests for an existing channel. Since this - %% number is used by keepalive, we have to bump it inside the - %% requested channel/connection pid so heartbeats actually work. - emqx_gateway_cm:cast(coap, ReqClientId, inc_recv_pkt), - call_session(handle_request, Msg, Channel) - end; - _ -> - ErrMsg = <<"Missing token or clientid in connection mode">>, - Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg), - {shutdown, normal, Reply, Channel} + io:format(">>> C1:~p, T1:~p~nC2:~p~n", [ClientId, Token, Any]), + ErrMsg = <<"Missing token or clientid in connection mode">>, + Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg), + {ok, {outgoing, Reply}, Channel} + end end. run_conn_hooks( @@ -785,6 +763,7 @@ process_connection( ) when ConnState == connected -> + %% TODO should take over the session here Queries = emqx_coap_message:extract_uri_query(Req), ErrMsg0 = case Queries of diff --git a/apps/emqx_gateway_coap/src/emqx_coap_proxy_conn.erl b/apps/emqx_gateway_coap/src/emqx_coap_proxy_conn.erl new file mode 100644 index 000000000..2eb8419f4 --- /dev/null +++ b/apps/emqx_gateway_coap/src/emqx_coap_proxy_conn.erl @@ -0,0 +1,67 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_coap_proxy_conn). + +-behaviour(esockd_udp_proxy_connection). + +-include("emqx_coap.hrl"). + +-export([initialize/1, find_or_create/4, get_connection_id/4, dispatch/3, close/2]). + +%%-------------------------------------------------------------------- +%% Callbacks +%%-------------------------------------------------------------------- +initialize(_Opts) -> + emqx_coap_frame:initial_parse_state(#{}). + +find_or_create(CId, Transport, Peer, Opts) -> + case emqx_gateway_cm_registry:lookup_channels(coap, CId) of + [Pid] -> + {ok, Pid}; + [] -> + emqx_gateway_conn:start_link(Transport, Peer, Opts) + end. + +get_connection_id(_Transport, _Peer, State, Data) -> + case parse_incoming(Data, [], State) of + {[Msg | _] = Packets, NState} -> + case emqx_coap_message:extract_uri_query(Msg) of + #{ + <<"clientid">> := ClientId + } -> + {ok, ClientId, Packets, NState}; + _ -> + ErrMsg = <<"Missing token or clientid in connection mode">>, + Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg), + Bin = emqx_coap_frame:serialize_pkt(Reply, emqx_coap_frame:serialize_opts()), + {error, Bin} + end; + _Error -> + invalid + end. + +dispatch(Pid, _State, Packet) -> + erlang:send(Pid, Packet). + +close(Pid, _State) -> + erlang:send(Pid, udp_proxy_closed). + +parse_incoming(<<>>, Packets, State) -> + {Packets, State}; +parse_incoming(Data, Packets, State) -> + {ok, Packet, Rest, NParseState} = emqx_coap_frame:parse(Data, State), + parse_incoming(Rest, [Packet | Packets], NParseState). diff --git a/apps/emqx_gateway_coap/src/emqx_gateway_coap.erl b/apps/emqx_gateway_coap/src/emqx_gateway_coap.erl index c92103bfc..cc18c3351 100644 --- a/apps/emqx_gateway_coap/src/emqx_gateway_coap.erl +++ b/apps/emqx_gateway_coap/src/emqx_gateway_coap.erl @@ -20,7 +20,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_gateway/include/emqx_gateway.hrl"). -%% define a gateway named stomp +%% define a gateway named coap -gateway(#{ name => coap, callback_module => ?MODULE, @@ -58,10 +58,11 @@ on_gateway_load( Ctx ) -> Listeners = normalize_config(Config), - ModCfg = #{ + ModCfg = maps:merge(connection_opts(Config), #{ frame_mod => emqx_coap_frame, chann_mod => emqx_coap_channel - }, + }), + case start_listeners( Listeners, GwName, Ctx, ModCfg @@ -105,3 +106,13 @@ on_gateway_unload( ) -> Listeners = normalize_config(Config), stop_listeners(GwName, Listeners). + +connection_opts(#{connection_required := false}) -> + #{}; +connection_opts(_) -> + #{ + connection_mod => esockd_udp_proxy, + esockd_proxy_opts => #{ + connection_mod => emqx_coap_proxy_conn + } + }. diff --git a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl index bd403a463..5a4a027ab 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl @@ -165,7 +165,8 @@ t_connection(_) -> emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) ) end, - do(Action). + do(Action), + ok. t_connection_with_short_param_name(_) -> Action = fun(Channel) -> @@ -330,7 +331,8 @@ t_publish(_) -> ?assertEqual(Payload, Msg#message.payload) after 500 -> ?assert(false) - end + end, + true end, with_connection(Topics, Action). @@ -360,7 +362,9 @@ t_publish_with_retain_qos_expiry(_) -> ?assertEqual(Payload, Msg#message.payload) after 500 -> ?assert(false) - end + end, + + true end, with_connection(Topics, Action), @@ -392,7 +396,8 @@ t_subscribe(_) -> #coap_content{payload = PayloadRecv} = Notify, - ?assertEqual(Payload, PayloadRecv) + ?assertEqual(Payload, PayloadRecv), + true end, with_connection(Topics, Fun), @@ -431,7 +436,8 @@ t_subscribe_with_qos_opt(_) -> #coap_content{payload = PayloadRecv} = Notify, - ?assertEqual(Payload, PayloadRecv) + ?assertEqual(Payload, PayloadRecv), + true end, with_connection(Topics, Fun), @@ -468,7 +474,8 @@ t_un_subscribe(_) -> {ok, nocontent, _} = do_request(Channel, URI, UnReq), ?LOGT("un observer topic:~ts~n", [Topic]), timer:sleep(100), - ?assertEqual([], emqx:subscribers(Topic)) + ?assertEqual([], emqx:subscribers(Topic)), + true end, with_connection(Topics, Fun). @@ -497,7 +504,8 @@ t_observe_wildcard(_) -> #coap_content{payload = PayloadRecv} = Notify, - ?assertEqual(Payload, PayloadRecv) + ?assertEqual(Payload, PayloadRecv), + true end, with_connection(Fun). @@ -530,7 +538,8 @@ t_clients_api(_) -> {204, _} = request(delete, "/gateways/coap/clients/client1"), timer:sleep(200), - {200, #{data := []}} = request(get, "/gateways/coap/clients") + {200, #{data := []}} = request(get, "/gateways/coap/clients"), + false end, with_connection(Fun). @@ -560,7 +569,8 @@ t_clients_subscription_api(_) -> {204, _} = request(delete, Path ++ "/tx"), - {200, []} = request(get, Path) + {200, []} = request(get, Path), + true end, with_connection(Fun). @@ -578,7 +588,8 @@ t_clients_get_subscription_api(_) -> observe(Channel, Token, false), - {200, []} = request(get, Path) + {200, []} = request(get, Path), + true end, with_connection(Fun). @@ -773,8 +784,7 @@ with_connection(Action) -> Fun = fun(Channel) -> Token = connection(Channel), timer:sleep(100), - Action(Channel, Token), - disconnection(Channel, Token), + _ = Action(Channel, Token) andalso disconnection(Channel, Token), timer:sleep(100) end, do(Fun). diff --git a/apps/emqx_gateway_coap/test/emqx_coap_api_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_api_SUITE.erl index 79975d331..6bfa3a90b 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_api_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_api_SUITE.erl @@ -207,7 +207,8 @@ test_recv_coap_request(UdpSock) -> test_send_coap_response(UdpSock, Host, Port, Code, Content, Request) -> is_list(Host) orelse error("Host is not a string"), {ok, IpAddr} = inet:getaddr(Host, inet), - Response = emqx_coap_message:piggyback(Code, Content, Request), + Response0 = emqx_coap_message:piggyback(Code, Content, Request), + Response = Response0#coap_message{options = #{uri_query => [<<"clientid=client1">>]}}, ?LOGT("test_send_coap_response Response=~p", [Response]), Binary = emqx_coap_frame:serialize_pkt(Response, undefined), ok = gen_udp:send(UdpSock, IpAddr, Port, Binary). diff --git a/apps/emqx_management/src/emqx_mgmt_api_banned.erl b/apps/emqx_management/src/emqx_mgmt_api_banned.erl index 0a24dab15..659eed48a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_banned.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_banned.erl @@ -171,7 +171,7 @@ banned(post, #{body := Body}) -> {error, Reason} -> ErrorReason = io_lib:format("~p", [Reason]), {400, 'BAD_REQUEST', list_to_binary(ErrorReason)}; - Ban -> + {ok, Ban} -> case emqx_banned:create(Ban) of {ok, Banned} -> {200, format(Banned)}; diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index 02a098f28..227ae0107 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -418,6 +418,35 @@ t_update_listener_zone(_Config) -> ?assertMatch({error, {_, 400, _}}, request(put, Path, [], AddConf1)), ?assertMatch(#{<<"zone">> := <<"zone1">>}, request(put, Path, [], AddConf2)). +t_update_listener_max_conn_rate({init, Config}) -> + Config; +t_update_listener_max_conn_rate({'end', _Config}) -> + ok; +t_update_listener_max_conn_rate(_Config) -> + ListenerId = <<"tcp:default">>, + Path = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), + Conf = request(get, Path, [], []), + %% Check that default is infinity + ?assertMatch(#{<<"max_conn_rate">> := <<"infinity">>}, Conf), + %% Update to infinity + UpdateConfToInfinity = Conf#{<<"max_conn_rate">> => <<"infinity">>}, + ?assertMatch( + #{<<"max_conn_rate">> := <<"infinity">>}, + request(put, Path, [], UpdateConfToInfinity) + ), + %% Update to 42/s + UpdateConfTo42PerSec = Conf#{<<"max_conn_rate">> => <<"42/s">>}, + ?assertMatch( + #{<<"max_conn_rate">> := <<"42/s">>}, + request(put, Path, [], UpdateConfTo42PerSec) + ), + %% Update back to infinity + UpdateConfToInfinity = Conf#{<<"max_conn_rate">> => <<"infinity">>}, + ?assertMatch( + #{<<"max_conn_rate">> := <<"infinity">>}, + request(put, Path, [], UpdateConfToInfinity) + ). + t_delete_nonexistent_listener(Config) when is_list(Config) -> NonExist = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:nonexistent"]), ?assertMatch( diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 99d1b6cb8..106a65a9c 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -154,6 +154,14 @@ do_request_api(Method, Request, Opts) -> {error, Reason} end. +simplify_result(Res) -> + case Res of + {error, {{_, Status, _}, _, Body}} -> + {Status, Body}; + {ok, {{_, Status, _}, _, Body}} -> + {Status, Body} + end. + auth_header_() -> emqx_common_test_http:default_auth_header(). diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl b/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl index 1ba5cee8a..9b4afb914 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl @@ -452,18 +452,12 @@ ref(Struct) -> hoconsc:ref(?MODULE, Struct). mk(Type, Opts) -> hoconsc:mk(Type, Opts). array(Type) -> hoconsc:array(Type). -%% FIXME: all examples example_input_create() -> #{ - <<"sql_check">> => + <<"message_transformation">> => #{ - summary => <<"Using a SQL check">>, - value => example_transformation([example_sql_check()]) - }, - <<"avro_check">> => - #{ - summary => <<"Using an Avro schema check">>, - value => example_transformation([example_avro_check()]) + summary => <<"Simple message transformation">>, + value => example_transformation() } }. @@ -472,7 +466,7 @@ example_input_update() -> <<"update">> => #{ summary => <<"Update">>, - value => example_transformation([example_sql_check()]) + value => example_transformation() } }. @@ -493,20 +487,28 @@ example_input_dryrun_transformation() -> #{ summary => <<"Test an input against a configuration">>, value => #{ - todo => true + message => #{ + client_attrs => #{}, + payload => <<"{}">>, + qos => 2, + retain => true, + topic => <<"t/u/v">>, + user_property => #{} + }, + transformation => example_transformation() } } }. example_return_list() -> - OtherVal0 = example_transformation([example_avro_check()]), + OtherVal0 = example_transformation(), OtherVal = OtherVal0#{name => <<"other_transformation">>}, #{ <<"list">> => #{ summary => <<"List">>, value => [ - example_transformation([example_sql_check()]), + example_transformation(), OtherVal ] } @@ -547,29 +549,23 @@ example_return_metrics() -> } }. -example_transformation(Checks) -> +example_transformation() -> #{ name => <<"my_transformation">>, enable => true, description => <<"my transformation">>, tags => [<<"transformation">>], topics => [<<"t/+">>], - strategy => <<"all_pass">>, failure_action => <<"drop">>, log_failure => #{<<"level">> => <<"info">>}, - checks => Checks - }. - -example_sql_check() -> - #{ - type => <<"sql">>, - sql => <<"select payload.temp as t where t > 10">> - }. - -example_avro_check() -> - #{ - type => <<"avro">>, - schema => <<"my_avro_schema">> + payload_decoder => #{<<"type">> => <<"json">>}, + payload_encoder => #{<<"type">> => <<"json">>}, + operations => [ + #{ + key => <<"topic">>, + value => <<"concat([topic, '/', payload.t])">> + } + ] }. error_schema(Code, Message) -> diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation_schema.erl b/apps/emqx_message_transformation/src/emqx_message_transformation_schema.erl index 9dee9d5b0..169743e5b 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation_schema.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation_schema.erl @@ -231,6 +231,8 @@ do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(N do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) -> do_validate_unique_names(Rest, Acc#{Name => true}). +validate_unique_topics([]) -> + {error, <<"at least one topic filter must be defined">>}; validate_unique_topics(Topics) -> Grouped = maps:groups_from_list( fun(T) -> T end, diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_tests.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_tests.erl index 3e86e3862..545773c00 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_tests.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_tests.erl @@ -87,6 +87,19 @@ schema_test_() -> ) ]) )}, + {"topics must be non-empty", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"at least one topic filter must be defined", _/binary>>, + value := [], + kind := validation_error + } + ]}, + parse_and_check([ + transformation(<<"foo">>, [dummy_operation()], #{<<"topics">> => []}) + ]) + )}, {"names are unique", ?_assertThrow( {_Schema, [ diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index aa97c6584..f98bf5b5f 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -906,7 +906,8 @@ group_t_cluster_leave(Config) -> %% hooks added by the plugin's `application:start/2' callback are indeed in place. %% See also: https://github.com/emqx/emqx/issues/13378 t_start_node_with_plugin_enabled({init, Config}) -> - #{package := Package, shdir := InstallDir} = get_demo_plugin_package(), + #{package := Package} = get_demo_plugin_package(), + Basename = filename:basename(Package), NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), AppSpecs = [ emqx, @@ -917,7 +918,7 @@ t_start_node_with_plugin_enabled({init, Config}) -> #{ plugins => #{ - install_dir => InstallDir, + install_dir => <<"plugins">>, states => [ #{ @@ -938,6 +939,14 @@ t_start_node_with_plugin_enabled({init, Config}) -> ], #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} ), + lists:foreach( + fun(#{work_dir := WorkDir}) -> + Destination = filename:join([WorkDir, "plugins", Basename]), + ok = filelib:ensure_dir(Destination), + {ok, _} = file:copy(Package, Destination) + end, + Specs + ), Names = [Name1, Name2], Nodes = [emqx_cth_cluster:node_name(N) || N <- Names], [ @@ -955,7 +964,9 @@ t_start_node_with_plugin_enabled(Config) when is_list(Config) -> ?check_trace( #{timetrap => 10_000}, begin - [N1, N2 | _] = emqx_cth_cluster:start(NodeSpecs), + %% Hack: we use `restart' here to disable the clean slate verification, as we + %% just created and populated the `plugins' directory... + [N1, N2 | _] = lists:flatmap(fun emqx_cth_cluster:restart/1, NodeSpecs), ?ON(N1, assert_started_and_hooks_loaded()), ?ON(N2, assert_started_and_hooks_loaded()), %% Now make them join. diff --git a/apps/emqx_prometheus/.gitignore b/apps/emqx_prometheus/.gitignore index b89cbebfd..233b9f887 100644 --- a/apps/emqx_prometheus/.gitignore +++ b/apps/emqx_prometheus/.gitignore @@ -13,7 +13,6 @@ rel/example_project emqx_prometheus.d ct.coverdata logs/ -data/ test/ct.cover.spec cover/ erlang.mk diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index c1d30e604..219ca1846 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -78,6 +78,10 @@ do_stop/0 ]). +-ifdef(TEST). +-export([cert_expiry_at_from_path/1]). +-endif. + %%-------------------------------------------------------------------- %% Macros %%-------------------------------------------------------------------- @@ -950,10 +954,8 @@ cert_expiry_at_from_path(Path0) -> {ok, PemBin} -> [CertEntry | _] = public_key:pem_decode(PemBin), Cert = public_key:pem_entry_decode(CertEntry), - %% TODO: Not fully tested for all certs type - {'utcTime', NotAfterUtc} = - Cert#'Certificate'.'tbsCertificate'#'TBSCertificate'.validity#'Validity'.'notAfter', - utc_time_to_epoch(NotAfterUtc); + %% XXX: Only pem cert supported by listeners + not_after_epoch(Cert); {error, Reason} -> ?SLOG(error, #{ msg => "read_cert_file_failed", @@ -976,21 +978,17 @@ cert_expiry_at_from_path(Path0) -> 0 end. -utc_time_to_epoch(UtcTime) -> - date_to_expiry_epoch(utc_time_to_datetime(UtcTime)). - -utc_time_to_datetime(Str) -> - {ok, [Year, Month, Day, Hour, Minute, Second], _} = io_lib:fread( - "~2d~2d~2d~2d~2d~2dZ", Str - ), - %% Always Assuming YY is in 2000 - {{2000 + Year, Month, Day}, {Hour, Minute, Second}}. - %% 62167219200 =:= calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}). -define(EPOCH_START, 62167219200). --spec date_to_expiry_epoch(calendar:datetime()) -> Seconds :: non_neg_integer(). -date_to_expiry_epoch(DateTime) -> - calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START. +not_after_epoch(#'Certificate'{ + 'tbsCertificate' = #'TBSCertificate'{ + validity = + #'Validity'{'notAfter' = NotAfter} + } +}) -> + pubkey_cert:'time_str_2_gregorian_sec'(NotAfter) - ?EPOCH_START; +not_after_epoch(_) -> + 0. %%======================================== %% Mria diff --git a/apps/emqx_prometheus/test/data/cert.crt b/apps/emqx_prometheus/test/data/cert.crt new file mode 100644 index 000000000..6f09b5609 --- /dev/null +++ b/apps/emqx_prometheus/test/data/cert.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDfzCCAmegAwIBAgIUJ3pE/Dwffa5gKNHY2L8HmazicmowDQYJKoZIhvcNAQEL +BQAwZzELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExFjAUBgNVBAcM +DVNhbiBGcmFuY2lzY28xFTATBgNVBAoMDEV4YW1wbGUgSW5jLjEUMBIGA1UEAwwL +ZXhhbXBsZS5jb20wIBcNMjQwNzAzMTAyOTMzWhgPMjA1NDA2MjYxMDI5MzNaMGcx +CzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4g +RnJhbmNpc2NvMRUwEwYDVQQKDAxFeGFtcGxlIEluYy4xFDASBgNVBAMMC2V4YW1w +bGUuY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArFZKxzsxCaGP +rVhilTd4PKk9jVrBLQ4xaFG6tmmlzjBCp+E35EulND4gpWZSUs9bYO/C+qykKmrL +J7TddGBVXe6lbl6mMHqZzHUp9mJdvPBSHcqOHc2E/UiBwOpN4tatx6UdK+VEQySr +z+dtc0Az5Itkoy/SvAu1Zzdq3d3MfxaTUvCmWfeR2huTalNQkG1jQ0C2CjCU9Z1f +Ex+y1MzxNhVrrdExC8Vwrb4TDlue8/XwJ4A4gBJYNbVAwALcSKnF56nRib3evE3J +Irvy2Rt4aC694JawWLPzJ1e2Rz8WBzCRPJAmaV4iD66sU8BMkmbCV+mMmF673s3R +sS4kGqklvQIDAQABoyEwHzAdBgNVHQ4EFgQU0tDKnCDey6fKrzs7caDfS41Dii4w +DQYJKoZIhvcNAQELBQADggEBAEIKvrSuUgpkIEUDV+UMr/5xUKkDyjNi4rwkBA6X +Ej0HskXg6u9wOIkBKwpQbleDFICdyqXMhGMjN4050PQCizaInBJBz77ah47UwGGQ +P+wavbcdHR9cbhewhCo6EtbCclPY1LXq4OFkgHMToLFzXC4S/kLX/KrhVApGHskO +Ad4U4gmMtIalruz5Mzc4YuSaAjbRI9v0IxhvS8JU0uoOwhIstkrMlFc26SU6EcZ9 +k88gVmmqEnsvmJi4gn4XPgvJB8hPs0/OMDBCVjAM8VaxZZ6sqlTT9FTGaKbIJdDc +KjT7VdbhVcuZo4s1u9gQzJNU2WHlHLwZi1wCjTC1vTE/HrQ= +-----END CERTIFICATE----- diff --git a/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl index c97913ec2..dd125f97b 100644 --- a/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl +++ b/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl @@ -211,6 +211,16 @@ t_push_gateway(_) -> ok. +t_cert_expiry_epoch(_) -> + Path = some_pem_path(), + ?assertEqual( + 2666082573, + emqx_prometheus:cert_expiry_at_from_path(Path) + ). + +%%-------------------------------------------------------------------- +%% Helper functions + start_mock_pushgateway(Port) -> ensure_loaded(cowboy), ensure_loaded(ranch), @@ -249,3 +259,7 @@ init(Req0, Opts) -> RespHeader = #{<<"content-type">> => <<"text/plain; charset=utf-8">>}, Req = cowboy_req:reply(200, RespHeader, <<"OK">>, Req0), {ok, Req, Opts}. + +some_pem_path() -> + Dir = code:lib_dir(emqx_prometheus, test), + _Path = filename:join([Dir, "data", "cert.crt"]). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c3b746d8e..50a25620c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -143,6 +143,15 @@ perform_health_check => boolean() }. +%% calls/casts/generic timeouts +-record(add_channel, {channel_id :: channel_id(), config :: map()}). +-record(start_channel_health_check, {channel_id :: channel_id()}). + +-type generic_timeout(Id, Content) :: {{timeout, Id}, timeout(), Content}. +-type start_channel_health_check_action() :: generic_timeout( + #start_channel_health_check{}, #start_channel_health_check{} +). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -405,7 +414,7 @@ add_channel(ResId, ChannelId, Config) -> ) -> ok | {error, term()}. add_channel(ResId, ChannelId, Config, Opts) -> - Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION), + Result = safe_call(ResId, #add_channel{channel_id = ChannelId, config = Config}, ?T_OPERATION), maybe true ?= maps:get(perform_health_check, Opts, true), %% Wait for health_check to finish @@ -570,7 +579,9 @@ handle_event({call, From}, health_check, _State, Data) -> handle_manual_resource_health_check(From, Data); handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) -> handle_manual_channel_health_check(From, Data, ChannelId); -% State: CONNECTING +%%-------------------------- +%% State: CONNECTING +%%-------------------------- handle_event(enter, _OldState, ?state_connecting = State, Data) -> ok = log_status_consistency(State, Data), {keep_state_and_data, [{state_timeout, 0, health_check}]}; @@ -582,25 +593,39 @@ handle_event( {call, From}, {remove_channel, ChannelId}, ?state_connecting = _State, Data ) -> handle_remove_channel(From, ChannelId, Data); +%%-------------------------- %% State: CONNECTED %% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks +%%-------------------------- handle_event(enter, _OldState, ?state_connected = State, Data) -> ok = log_status_consistency(State, Data), _ = emqx_alarm:safe_deactivate(Data#data.id), ?tp(resource_connected_enter, #{}), - {keep_state_and_data, health_check_actions(Data)}; + {keep_state_and_data, resource_health_check_actions(Data)}; handle_event(state_timeout, health_check, ?state_connected, Data) -> start_resource_health_check(Data); handle_event( - {call, From}, {add_channel, ChannelId, Config}, ?state_connected = _State, Data + {call, From}, + #add_channel{channel_id = ChannelId, config = Config}, + ?state_connected = _State, + Data ) -> handle_add_channel(From, Data, ChannelId, Config); handle_event( {call, From}, {remove_channel, ChannelId}, ?state_connected = _State, Data ) -> handle_remove_channel(From, ChannelId, Data); +handle_event( + {timeout, #start_channel_health_check{channel_id = ChannelId}}, + _, + ?state_connected = _State, + Data +) -> + handle_start_channel_health_check(Data, ChannelId); +%%-------------------------- %% State: DISCONNECTED +%%-------------------------- handle_event(enter, _OldState, ?state_disconnected = State, Data) -> ok = log_status_consistency(State, Data), ?tp(resource_disconnected_enter, #{}), @@ -608,14 +633,18 @@ handle_event(enter, _OldState, ?state_disconnected = State, Data) -> handle_event(state_timeout, auto_retry, ?state_disconnected, Data) -> ?tp(resource_auto_reconnect, #{}), start_resource(Data, undefined); +%%-------------------------- %% State: STOPPED %% The stopped state is entered after the resource has been explicitly stopped +%%-------------------------- handle_event(enter, _OldState, ?state_stopped = State, Data) -> ok = log_status_consistency(State, Data), {keep_state_and_data, []}; +%%-------------------------- %% The following events can be handled in any other state +%%-------------------------- handle_event( - {call, From}, {add_channel, ChannelId, Config}, State, Data + {call, From}, #add_channel{channel_id = ChannelId, config = Config}, State, Data ) -> handle_not_connected_add_channel(From, ChannelId, Config, State, Data); handle_event( @@ -645,6 +674,9 @@ handle_event( is_map_key(Pid, CHCWorkers) -> handle_channel_health_check_worker_down(Data0, Pid, Res); +handle_event({timeout, #start_channel_health_check{channel_id = _}}, _, _State, _Data) -> + %% Stale health check action; currently, we only probe channel health when connected. + keep_state_and_data; % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( @@ -702,7 +734,7 @@ retry_actions(Data) -> [{state_timeout, RetryInterval, auto_retry}] end. -health_check_actions(Data) -> +resource_health_check_actions(Data) -> [{state_timeout, health_check_interval(Data#data.opts), health_check}]. handle_remove_event(From, ClearMetrics, Data) -> @@ -1079,7 +1111,7 @@ continue_resource_health_check_connected(NewStatus, Data0) -> {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0), Data2 = channels_health_check(?status_connected, Data1), Data = update_state(Data2, Data0), - Actions = Replies ++ health_check_actions(Data), + Actions = Replies ++ resource_health_check_actions(Data), {keep_state, Data, Actions}; _ -> ?SLOG(warning, #{ @@ -1091,23 +1123,28 @@ continue_resource_health_check_connected(NewStatus, Data0) -> %% subset of resource manager state... But there should be a conversion %% between the two here, as resource manager also has `stopped', which is %% not a valid status at the time of writing. - {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0), - {next_state, NewStatus, channels_health_check(NewStatus, Data), Replies} + {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0), + Data = channels_health_check(NewStatus, Data1), + Actions = Replies, + {next_state, NewStatus, Data, Actions} end. %% Continuation to be used when the current resource state is not `?state_connected'. continue_resource_health_check_not_connected(NewStatus, Data0) -> - {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0), + {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0), case NewStatus of ?status_connected -> - {next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies}; + Data = channels_health_check(?status_connected, Data1), + Actions = Replies, + {next_state, ?state_connected, Data, Actions}; ?status_connecting -> - Actions = Replies ++ health_check_actions(Data), - {next_state, ?status_connecting, channels_health_check(?status_connecting, Data), - Actions}; + Data = channels_health_check(?status_connecting, Data1), + Actions = Replies ++ resource_health_check_actions(Data), + {next_state, ?status_connecting, Data, Actions}; ?status_disconnected -> - {next_state, ?state_disconnected, channels_health_check(?status_disconnected, Data), - Replies} + Data = channels_health_check(?status_disconnected, Data1), + Actions = Replies, + {next_state, ?state_disconnected, Data, Actions} end. handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) -> @@ -1269,38 +1306,60 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) -> ) ). +-spec generic_timeout_action(Id, timeout(), Content) -> generic_timeout(Id, Content). +generic_timeout_action(Id, Timeout, Content) -> + {{timeout, Id}, Timeout, Content}. + +-spec start_channel_health_check_action(channel_id(), map(), map(), data() | timeout()) -> + [start_channel_health_check_action()]. +start_channel_health_check_action(ChannelId, NewChanStatus, PreviousChanStatus, Data = #data{}) -> + Timeout = get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data), + Event = #start_channel_health_check{channel_id = ChannelId}, + [generic_timeout_action(Event, Timeout, Event)]. + +get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data) -> + emqx_utils:foldl_while( + fun + (#{config := #{resource_opts := #{health_check_interval := HCInterval}}}, _Acc) -> + {halt, HCInterval}; + (_, Acc) -> + {cont, Acc} + end, + ?HEALTHCHECK_INTERVAL, + [ + NewChanStatus, + PreviousChanStatus, + maps:get(ChannelId, Data#data.added_channels, #{}) + ] + ). + %% Currently, we only call resource channel health checks when the underlying resource is %% `?status_connected'. -spec trigger_health_check_for_added_channels(data()) -> data(). trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) -> #{ - channel := CHCWorkers0 = + channel := #{ - pending := CPending0, + %% TODO: rm pending + %% pending := CPending0, ongoing := Ongoing0 } } = HCWorkers0, NewOngoing = maps:filter( fun(ChannelId, OldStatus) -> - not is_map_key(ChannelId, Ongoing0) and + (not is_map_key(ChannelId, Ongoing0)) andalso channel_status_is_channel_added(OldStatus) end, Data0#data.added_channels ), ChannelsToCheck = maps:keys(NewOngoing), - case ChannelsToCheck of - [] -> - %% Nothing to do. - Data0; - [ChannelId | Rest] -> - %% Shooting one check at a time. We could increase concurrency in the future. - CHCWorkers = CHCWorkers0#{ - pending := CPending0 ++ Rest, - ongoing := maps:merge(Ongoing0, NewOngoing) - }, - Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, - start_channel_health_check(Data1, ChannelId) - end. + lists:foldl( + fun(ChannelId, Acc) -> + start_channel_health_check(Acc, ChannelId) + end, + Data0, + ChannelsToCheck + ). -spec continue_channel_health_check_connected( channel_id(), channel_status_map(), channel_status_map(), data() @@ -1338,12 +1397,29 @@ continue_channel_health_check_connected_no_update_during_check(ChannelId, OldSta end, Data. +-spec handle_start_channel_health_check(data(), channel_id()) -> + gen_statem:event_handler_result(state(), data()). +handle_start_channel_health_check(Data0, ChannelId) -> + Data = start_channel_health_check(Data0, ChannelId), + {keep_state, Data}. + -spec start_channel_health_check(data(), channel_id()) -> data(). -start_channel_health_check(#data{} = Data0, ChannelId) -> +start_channel_health_check( + #data{added_channels = AddedChannels, hc_workers = #{channel := #{ongoing := CHCOngoing0}}} = + Data0, + ChannelId +) when + is_map_key(ChannelId, AddedChannels) andalso (not is_map_key(ChannelId, CHCOngoing0)) +-> #data{hc_workers = HCWorkers0 = #{channel := CHCWorkers0}} = Data0, WorkerPid = spawn_channel_health_check_worker(Data0, ChannelId), - HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerPid => ChannelId}}, - Data0#data{hc_workers = HCWorkers}. + ChannelStatus = maps:get(ChannelId, AddedChannels), + CHCOngoing = CHCOngoing0#{ChannelId => ChannelStatus}, + CHCWorkers = CHCWorkers0#{WorkerPid => ChannelId, ongoing := CHCOngoing}, + HCWorkers = HCWorkers0#{channel := CHCWorkers}, + Data0#data{hc_workers = HCWorkers}; +start_channel_health_check(Data, _ChannelId) -> + Data. -spec spawn_channel_health_check_worker(data(), channel_id()) -> pid(). spawn_channel_health_check_worker(#data{} = Data, ChannelId) -> @@ -1380,33 +1456,19 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) -> #{ongoing := Ongoing0} = CHCWorkers1, {PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0), CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1}, - CHCWorkers3 = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers2), Data1 = Data0#data{added_channels = AddedChannels}, {Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1), - case CHCWorkers1 of - #{pending := [NextChannelId | Rest]} -> - CHCWorkers = CHCWorkers3#{pending := Rest}, - HCWorkers = HCWorkers0#{channel := CHCWorkers}, - Data3 = Data2#data{hc_workers = HCWorkers}, - Data4 = continue_channel_health_check_connected( - ChannelId, - PreviousChanStatus, - CurrentStatus, - Data3 - ), - Data = start_channel_health_check(Data4, NextChannelId), - {keep_state, update_state(Data, Data0), Replies}; - #{pending := []} -> - HCWorkers = HCWorkers0#{channel := CHCWorkers3}, - Data3 = Data2#data{hc_workers = HCWorkers}, - Data = continue_channel_health_check_connected( - ChannelId, - PreviousChanStatus, - CurrentStatus, - Data3 - ), - {keep_state, update_state(Data, Data0), Replies} - end. + HCWorkers = HCWorkers0#{channel := CHCWorkers2}, + Data3 = Data2#data{hc_workers = HCWorkers}, + Data = continue_channel_health_check_connected( + ChannelId, + PreviousChanStatus, + CurrentStatus, + Data3 + ), + CHCActions = start_channel_health_check_action(ChannelId, NewStatus, PreviousChanStatus, Data), + Actions = Replies ++ CHCActions, + {keep_state, update_state(Data, Data0), Actions}. handle_channel_health_check_worker_down_new_channels_and_status( ChannelId, diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.erl b/apps/emqx_schema_registry/src/emqx_schema_registry.erl index 6d894ab90..f8d760ddc 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.erl @@ -44,6 +44,12 @@ get_serde/1 ]). +%%------------------------------------------------------------------------------------------------- +%% Type definitions +%%------------------------------------------------------------------------------------------------- + +-define(BAD_SCHEMA_NAME, <<"bad_schema_name">>). + -type schema() :: #{ type := serde_type(), source := binary(), @@ -87,6 +93,8 @@ get_schema(SchemaName) -> Config -> {ok, Config} catch + throw:#{reason := ?BAD_SCHEMA_NAME} -> + {error, not_found}; throw:not_found -> {error, not_found} end. @@ -343,16 +351,20 @@ to_bin(A) when is_atom(A) -> atom_to_binary(A); to_bin(B) when is_binary(B) -> B. schema_name_bin_to_atom(Bin) when size(Bin) > 255 -> - throw( - iolist_to_binary( - io_lib:format( - "Name is is too long." - " Please provide a shorter name (<= 255 bytes)." - " The name that is too long: \"~s\"", - [Bin] - ) + Msg = iolist_to_binary( + io_lib:format( + "Name is is too long." + " Please provide a shorter name (<= 255 bytes)." + " The name that is too long: \"~s\"", + [Bin] ) - ); + ), + Reason = #{ + kind => validation_error, + reason => ?BAD_SCHEMA_NAME, + hint => Msg + }, + throw(Reason); schema_name_bin_to_atom(Bin) -> try binary_to_existing_atom(Bin, utf8) diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl index 3e5726c99..d587417d5 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl @@ -407,7 +407,8 @@ t_empty_sparkplug(_Config) -> ), ok. -%% Tests that we can't create names that are too long and get a decent error message. +%% Tests that we can't create or lookup names that are too long and get a decent error +%% message. t_name_too_long(Config) -> SerdeType = ?config(serde_type, Config), SourceBin = ?config(schema_source, Config), @@ -428,4 +429,11 @@ t_name_too_long(Config) -> }}, request({post, Params}) ), + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Schema not found">> + }}, + request({get, SchemaName}) + ), ok. diff --git a/apps/emqx_schema_validation/src/emqx_schema_validation.app.src b/apps/emqx_schema_validation/src/emqx_schema_validation.app.src index 2dfe710db..31b2a30fc 100644 --- a/apps/emqx_schema_validation/src/emqx_schema_validation.app.src +++ b/apps/emqx_schema_validation/src/emqx_schema_validation.app.src @@ -1,6 +1,6 @@ {application, emqx_schema_validation, [ {description, "EMQX Schema Validation"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, [emqx_schema_validation_sup, emqx_schema_validation_registry]}, {mod, {emqx_schema_validation_app, []}}, {applications, [ diff --git a/apps/emqx_schema_validation/src/emqx_schema_validation_schema.erl b/apps/emqx_schema_validation/src/emqx_schema_validation_schema.erl index fa9461745..8984d1066 100644 --- a/apps/emqx_schema_validation/src/emqx_schema_validation_schema.erl +++ b/apps/emqx_schema_validation/src/emqx_schema_validation_schema.erl @@ -259,6 +259,8 @@ do_validate_unique_schema_checks( do_validate_unique_schema_checks([_Check | Rest], Seen, Duplicated) -> do_validate_unique_schema_checks(Rest, Seen, Duplicated). +validate_unique_topics([]) -> + {error, <<"at least one topic filter must be defined">>}; validate_unique_topics(Topics) -> Grouped = maps:groups_from_list( fun(T) -> T end, diff --git a/apps/emqx_schema_validation/test/emqx_schema_validation_tests.erl b/apps/emqx_schema_validation/test/emqx_schema_validation_tests.erl index a75e4b556..d6e18da92 100644 --- a/apps/emqx_schema_validation/test/emqx_schema_validation_tests.erl +++ b/apps/emqx_schema_validation/test/emqx_schema_validation_tests.erl @@ -117,6 +117,19 @@ schema_test_() -> ) ]) )}, + {"topics must be non-empty", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"at least one topic filter must be defined", _/binary>>, + value := [], + kind := validation_error + } + ]}, + parse_and_check([ + validation(<<"foo">>, [sql_check()], #{<<"topics">> => []}) + ]) + )}, {"foreach expression is not allowed", ?_assertThrow( {_Schema, [ diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index a6efcb443..0f5349e07 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -290,8 +290,10 @@ do_check_oom([{Val, Max, Reason} | Rest]) -> end. tune_heap_size(#{enable := false}) -> - ok; + ignore; %% If the max_heap_size is set to zero, the limit is disabled. +tune_heap_size(#{max_heap_size := 0}) -> + ignore; tune_heap_size(#{max_heap_size := MaxHeapSize}) when MaxHeapSize > 0 -> MaxSize = case erlang:system_info(wordsize) of diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index bab09b6b3..53186ab76 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -49,16 +49,19 @@ %% Streams from .csv data -export([ - csv/1 + csv/1, + csv/2 ]). --export_type([stream/1]). +-export_type([stream/1, csv_parse_opts/0]). %% @doc A stream is essentially a lazy list. -type stream_tail(T) :: fun(() -> next(T) | []). -type stream(T) :: list(T) | nonempty_improper_list(T, stream_tail(T)) | stream_tail(T). -type next(T) :: nonempty_improper_list(T, stream_tail(T)). +-type csv_parse_opts() :: #{nullable => boolean(), filter_null => boolean()}. + -dialyzer(no_improper_lists). -elvis([{elvis_style, nesting_level, disable}]). @@ -325,13 +328,42 @@ ets(Cont, ContF) -> %% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once. %% The .csv binary is assumed to be in UTF-8 encoding and to have a header row. -spec csv(binary()) -> stream(map()). -csv(Bin) when is_binary(Bin) -> +csv(Bin) -> + csv(Bin, #{}). + +-spec csv(binary(), csv_parse_opts()) -> stream(map()). +csv(Bin, Opts) when is_binary(Bin) -> + Liner = + case Opts of + #{nullable := true} -> + fun csv_read_nullable_line/1; + _ -> + fun csv_read_line/1 + end, + Maper = + case Opts of + #{filter_null := true} -> + fun(Headers, Fields) -> + maps:from_list( + lists:filter( + fun({_, Value}) -> + Value =/= undefined + end, + lists:zip(Headers, Fields) + ) + ) + end; + _ -> + fun(Headers, Fields) -> + maps:from_list(lists:zip(Headers, Fields)) + end + end, Reader = fun _Iter(Headers, Lines) -> - case csv_read_line(Lines) of + case Liner(Lines) of {Fields, Rest} -> case length(Fields) == length(Headers) of true -> - User = maps:from_list(lists:zip(Headers, Fields)), + User = Maper(Headers, Fields), [User | fun() -> _Iter(Headers, Rest) end]; false -> error(bad_format) @@ -355,6 +387,23 @@ csv_read_line([Line | Lines]) -> csv_read_line([]) -> eof. +csv_read_nullable_line([Line | Lines]) -> + %% XXX: not support ' ' for the field value + Fields = lists:map( + fun(Bin) -> + case string:trim(Bin, both) of + <<>> -> + undefined; + Any -> + Any + end + end, + binary:split(Line, [<<",">>], [global]) + ), + {Fields, Lines}; +csv_read_nullable_line([]) -> + eof. + do_interleave(_Cont, _, [], []) -> []; do_interleave(Cont, N, [{N, S} | Rest], Rev) -> diff --git a/apps/emqx_utils/test/emqx_utils_SUITE.erl b/apps/emqx_utils/test/emqx_utils_SUITE.erl index c124f5384..9887fc41c 100644 --- a/apps/emqx_utils/test/emqx_utils_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_SUITE.erl @@ -154,6 +154,22 @@ t_check(_) -> emqx_utils:check_oom(Policy) ). +t_tune_heap_size(_Config) -> + Policy = #{ + max_mailbox_size => 10, + max_heap_size => 1024 * 1024 * 8, + enable => true + }, + ?assertEqual(ignore, emqx_utils:tune_heap_size(Policy#{enable := false})), + %% Setting it to 0 disables the check. + ?assertEqual(ignore, emqx_utils:tune_heap_size(Policy#{max_heap_size := 0})), + {max_heap_size, PreviousHeapSize} = process_info(self(), max_heap_size), + try + ?assertMatch(PreviousHeapSize, emqx_utils:tune_heap_size(Policy)) + after + process_flag(max_heap_size, PreviousHeapSize) + end. + t_rand_seed(_) -> ?assert(is_tuple(emqx_utils:rand_seed())). diff --git a/changes/ce/feat-13436.en.md b/changes/ce/feat-13436.en.md new file mode 100644 index 000000000..cda52a137 --- /dev/null +++ b/changes/ce/feat-13436.en.md @@ -0,0 +1 @@ +Added the option to add custom request headers to JWKS requests. diff --git a/changes/ce/fix-13375.en.md b/changes/ce/fix-13375.en.md new file mode 100644 index 000000000..e09bc649b --- /dev/null +++ b/changes/ce/fix-13375.en.md @@ -0,0 +1 @@ +The value infinity has been added as default value to the listener configuration fields max_conn_rate, messages_rate and bytes_rate. diff --git a/changes/ce/fix-13398.en.md b/changes/ce/fix-13398.en.md new file mode 100644 index 000000000..fb2f891e8 --- /dev/null +++ b/changes/ce/fix-13398.en.md @@ -0,0 +1 @@ +Fix acl rule clearing when reloading built-in-database for authorization using command line. diff --git a/changes/ce/fix-13403.en.md b/changes/ce/fix-13403.en.md new file mode 100644 index 000000000..1a02fffe7 --- /dev/null +++ b/changes/ce/fix-13403.en.md @@ -0,0 +1 @@ +Fixed environment variable config override logging behaviour to avoid logging passwords. diff --git a/changes/ce/fix-13408.en.md b/changes/ce/fix-13408.en.md new file mode 100644 index 000000000..e27482d91 --- /dev/null +++ b/changes/ce/fix-13408.en.md @@ -0,0 +1 @@ +Fix function_clause crash that occurs when attempting to authenticate with an invalid type of salt or password. diff --git a/changes/ce/fix-13419.en.md b/changes/ce/fix-13419.en.md new file mode 100644 index 000000000..95f82e970 --- /dev/null +++ b/changes/ce/fix-13419.en.md @@ -0,0 +1 @@ +Fix garbled hints in crash log message when calling /configs API diff --git a/changes/ce/fix-13422.en.md b/changes/ce/fix-13422.en.md new file mode 100644 index 000000000..78b66c72a --- /dev/null +++ b/changes/ce/fix-13422.en.md @@ -0,0 +1 @@ +Fixed an issue where the option `force_shutdown.max_heap_size` could not be set to 0 to disable this tuning. diff --git a/changes/ce/fix-13442.en.md b/changes/ce/fix-13442.en.md new file mode 100644 index 000000000..05aaee8a0 --- /dev/null +++ b/changes/ce/fix-13442.en.md @@ -0,0 +1 @@ +Fixed an issue where the health check interval values of actions/sources were not being taken into account. diff --git a/changes/ce/perf-13441.en.md b/changes/ce/perf-13441.en.md new file mode 100644 index 000000000..949b3601f --- /dev/null +++ b/changes/ce/perf-13441.en.md @@ -0,0 +1 @@ +Enhanced CoAP gateway connection mode, UDP connection will always be bound to the corresponding gateway connection through the `clientid`. diff --git a/changes/ee/breaking-13420.en.md b/changes/ee/breaking-13420.en.md new file mode 100644 index 000000000..dc7a05134 --- /dev/null +++ b/changes/ee/breaking-13420.en.md @@ -0,0 +1 @@ +Added a schema validation that prevents configuring an empty set of topic filters for a Schema Validation. Any such configurations will have to define at least one topic filter to be valid. Such configurations, though, are probably very rare, as a Schema Validation with empty topics is essentially the same as having no validation at all. diff --git a/changes/ee/feat-13386.en.md b/changes/ee/feat-13386.en.md new file mode 100644 index 000000000..e038e0144 --- /dev/null +++ b/changes/ee/feat-13386.en.md @@ -0,0 +1,17 @@ +Added a bootstrap file to batch loading banned data when initializing a single node or cluster, in other words, the import operation is performed only if there is no data in the database. + + +This file is a CSV file with `,` as its delimiter. + +The first line of this file must be a header line. All valid headers are listed here: +- as :: required +- who :: required +- by :: optional +- reason :: optional +- at :: optional +- until :: optional + +See the documentation for details on each field. + +Each row in the rest of this file must contain the same number of columns as the header line, +and column can be omitted then its value will be `undefined`. diff --git a/changes/ee/fix-13420.en.md b/changes/ee/fix-13420.en.md new file mode 100644 index 000000000..994ee8a88 --- /dev/null +++ b/changes/ee/fix-13420.en.md @@ -0,0 +1 @@ +Added a schema validation to forbid empty topic filter lists when configuring a Schema Validation. diff --git a/changes/fix-13412.en.md b/changes/fix-13412.en.md new file mode 100644 index 000000000..0afc6cceb --- /dev/null +++ b/changes/fix-13412.en.md @@ -0,0 +1 @@ +Fixed an issue in the Prometheus API where the certificate expiration time format incorrectly returned `0` due to the use of `generalTime`. diff --git a/changes/fix-13432.en.md b/changes/fix-13432.en.md new file mode 100644 index 000000000..d1f56f052 --- /dev/null +++ b/changes/fix-13432.en.md @@ -0,0 +1 @@ +Fixed the issue where JWT authentication was silently bypassed when an invalid public key (or invalid public key file path) was used. diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index 4f068fab6..89524b4a7 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -2,17 +2,17 @@ ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-9:1.15.7-26.2.5-3-debian12 ARG RUN_FROM=public.ecr.aws/debian/debian:stable-20240612-slim ARG SOURCE_TYPE=src # tgz -FROM ${BUILD_FROM} as builder_src +FROM ${BUILD_FROM} AS builder_src ONBUILD COPY . /emqx -FROM ${RUN_FROM} as builder_tgz +FROM ${RUN_FROM} AS builder_tgz ARG PROFILE=emqx ARG PKG_VSN ARG SUFFIX ARG TARGETARCH ONBUILD COPY ${PROFILE}-${PKG_VSN}${SUFFIX}-debian12-$TARGETARCH.tar.gz /${PROFILE}.tar.gz -FROM builder_${SOURCE_TYPE} as builder +FROM builder_${SOURCE_TYPE} AS builder ARG PROFILE=emqx ARG IS_ELIXIR=no diff --git a/mix.exs b/mix.exs index 5368d1509..dc2953683 100644 --- a/mix.exs +++ b/mix.exs @@ -182,9 +182,9 @@ defmodule EMQXUmbrella.MixProject do end def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true} - def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true} + def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.3", override: true} def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true} - def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.42.2", override: true} + def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.1", override: true} def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true} # in conflict by ehttpc and emqtt def common_dep(:gun), do: {:gun, github: "emqx/gun", tag: "1.3.11", override: true} diff --git a/rebar.config b/rebar.config index 91befeea8..c7d9b019b 100644 --- a/rebar.config +++ b/rebar.config @@ -82,7 +82,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, @@ -98,7 +98,7 @@ {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.5"}}}, {getopt, "1.0.2"}, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.1"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.1"}}}, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}, diff --git a/rel/i18n/emqx_authn_jwt_schema.hocon b/rel/i18n/emqx_authn_jwt_schema.hocon index f99b8c7f1..a5e4e3298 100644 --- a/rel/i18n/emqx_authn_jwt_schema.hocon +++ b/rel/i18n/emqx_authn_jwt_schema.hocon @@ -152,4 +152,9 @@ disconnect_after_expire.desc: disconnect_after_expire.label: """Disconnect After Expire""" +jwks_headers.label: +"""HTTP Headers""" +jwks_headers.desc: +"""List of HTTP headers to send with the JWKS request.""" + } diff --git a/rel/i18n/emqx_authz_schema.hocon b/rel/i18n/emqx_authz_schema.hocon index 5e11f26a6..7a454f046 100644 --- a/rel/i18n/emqx_authz_schema.hocon +++ b/rel/i18n/emqx_authz_schema.hocon @@ -78,6 +78,11 @@ failed.desc: failed.label: """Failed""" +ignore.desc: +"""Count of query ignored. This counter is increased whenever the authorization source attempts to authorize a request, but either it's not applicable, or an error was encountered and the result is undecidable""" +ignore.label: +"""Ignored""" + metrics.desc: """The metrics of the resource.""" diff --git a/rel/i18n/emqx_gateway_schema.hocon b/rel/i18n/emqx_gateway_schema.hocon index 1a04d57ac..9f7bd8462 100644 --- a/rel/i18n/emqx_gateway_schema.hocon +++ b/rel/i18n/emqx_gateway_schema.hocon @@ -195,4 +195,13 @@ Relevant when the EMQX cluster is deployed behind a load-balancer.""" fields_ws_opts_proxy_address_header.label: """Proxy address header""" +udp_health_check.desc: +"""Some Cloud platform use a `request-reply` mechanism to check whether a UDP port is healthy, here can configure this pair.""" + +udp_health_check_request.desc: +"""The content of the request.""" + +udp_health_check_reply.desc: +"""The content to reply.""" + } diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index f9978fe6f..e2ec44cc9 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -801,7 +801,7 @@ mqtt_max_topic_levels.label: """Max Topic Levels""" force_shutdown_max_heap_size.desc: -"""Total heap size""" +"""Total heap size. Setting this to 0 disables this limitation.""" force_shutdown_max_heap_size.label: """Total heap size""" @@ -1630,4 +1630,22 @@ client_attrs_init_set_as_attr { The extracted attribute will be stored in the `client_attrs` property with this name.""" } +banned_bootstrap_file.desc: +"""The bootstrap file is a CSV file used to batch loading banned data when initializing a single node or cluster, in other words, the import operation is performed only if there is no data in the database. + +The delimiter for this file is `,`. + +The first line of this file must be a header line. All valid headers are listed here: +- as :: required +- who :: required +- by :: optional +- reason :: optional +- at :: optional +- until :: optional + +See the documentation for details on each field. + +Each row in the rest of this file must contain the same number of columns as the header line, +and column can be omitted then its value will be `undefined`.""" + } diff --git a/scripts/ui-tests/dashboard_test.py b/scripts/ui-tests/dashboard_test.py index 6005a9403..c349739b6 100644 --- a/scripts/ui-tests/dashboard_test.py +++ b/scripts/ui-tests/dashboard_test.py @@ -3,6 +3,7 @@ import time import unittest import pytest import requests +import logging from urllib.parse import urljoin from selenium import webdriver from selenium.webdriver.common.by import By @@ -12,6 +13,9 @@ from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.common import utils from selenium.common.exceptions import NoSuchElementException +logger = logging.getLogger() +logger.setLevel(logging.INFO) + @pytest.fixture def driver(): options = Options() @@ -31,39 +35,52 @@ def dashboard_url(dashboard_host, dashboard_port): time.sleep(1) return f"http://{dashboard_host}:{dashboard_port}" -@pytest.fixture def login(driver, dashboard_url): # admin is set in CI jobs, hence as default value password = os.getenv("EMQX_DASHBOARD__DEFAULT_PASSWORD", "admin") driver.get(dashboard_url) assert "EMQX Dashboard" == driver.title assert f"{dashboard_url}/#/login?to=/dashboard/overview" == driver.current_url - driver.find_element(By.XPATH, "//div[@class='login']//form[1]//input[@type='text']").send_keys("admin") - driver.find_element(By.XPATH, "//div[@class='login']//form[1]//input[@type='password']").send_keys(password) - driver.find_element(By.XPATH, "//div[@class='login']//form[1]//button[1]").click() + driver.execute_script("window.localStorage.setItem('licenseTipVisible','false');") + driver.find_element(By.XPATH, "//div[@class='login']//form//input[@type='text']").send_keys("admin") + driver.find_element(By.XPATH, "//div[@class='login']//form//input[@type='password']").send_keys(password) + driver.find_element(By.XPATH, "//div[@class='login']//form//button").click() dest_url = urljoin(dashboard_url, "/#/dashboard/overview") - driver.get(dest_url) ensure_current_url(driver, dest_url) + assert len(driver.find_elements(By.XPATH, "//div[@class='login']")) == 0 + logger.info(f"Logged in to {dashboard_url}") -def ensure_current_url(driver, url): +def ensure_current_url(d, url): count = 0 - while url != driver.current_url: + while url != d.current_url: if count == 10: raise Exception(f"Failed to load {url}") count += 1 time.sleep(1) -def title(driver): - return driver.find_element("xpath", "//div[@id='app']//h1[@class='header-title']") +def title(d): + title = '' + for _ in range(5): + try: + title = d.find_element("xpath", "//div[@id='app']//h1[@class='header-title']") + break + except NoSuchElementException: + time.sleep(1) + else: + raise AssertionError("Cannot find the title element") + return title -def wait_title_text(driver, text): - return WebDriverWait(driver, 10).until(lambda x: title(x).text == text) +def wait_title_text(d, text): + return WebDriverWait(d, 10).until(lambda x: title(x).text == text) -def test_basic(driver, login, dashboard_url): - driver.get(dashboard_url) +def test_basic(driver, dashboard_url): + login(driver, dashboard_url) + logger.info(f"Current URL: {driver.current_url}") wait_title_text(driver, "Cluster Overview") -def test_log(driver, login, dashboard_url): +def test_log(driver, dashboard_url): + login(driver, dashboard_url) + logger.info(f"Current URL: {driver.current_url}") dest_url = urljoin(dashboard_url, "/#/log") driver.get(dest_url) ensure_current_url(driver, dest_url) @@ -95,10 +112,9 @@ def fetch_version(url): version_str = info['rel_vsn'] return parse_version(version_str) -def test_docs_link(driver, login, dashboard_url): - dest_url = urljoin(dashboard_url, "/#/dashboard/overview") - driver.get(dest_url) - ensure_current_url(driver, dest_url) +def test_docs_link(driver, dashboard_url): + login(driver, dashboard_url) + logger.info(f"Current URL: {driver.current_url}") xpath_link_help = "//div[@id='app']//div[@class='nav-header']//a[contains(@class, 'link-help')]" # retry up to 5 times for _ in range(5): diff --git a/scripts/ui-tests/docker-compose.yaml b/scripts/ui-tests/docker-compose.yaml index 952b0f382..ce87613d2 100644 --- a/scripts/ui-tests/docker-compose.yaml +++ b/scripts/ui-tests/docker-compose.yaml @@ -1,14 +1,14 @@ -version: '3.9' - services: emqx: image: ${_EMQX_DOCKER_IMAGE_TAG:-emqx/emqx:latest} environment: EMQX_DASHBOARD__DEFAULT_PASSWORD: admin + EMQX_LOG__CONSOLE__LEVEL: debug selenium: shm_size: '2gb' image: ghcr.io/emqx/selenium-chrome:1.0.0 + platform: linux/amd64 volumes: - ./:/app depends_on: