diff --git a/.github/workflows/run_emqx_app_tests.yaml b/.github/workflows/run_emqx_app_tests.yaml index 72fe2b0d5..e062cef2e 100644 --- a/.github/workflows/run_emqx_app_tests.yaml +++ b/.github/workflows/run_emqx_app_tests.yaml @@ -55,7 +55,7 @@ jobs: cd apps/emqx ./rebar3 xref ./rebar3 dialyzer - ./rebar3 eunit -v + ./rebar3 eunit -v --name 'eunit@127.0.0.1' ./rebar3 as standalone_test ct --name 'test@127.0.0.1' -v --readable=true ./rebar3 proper -d test/props - uses: actions/upload-artifact@v3 diff --git a/Makefile b/Makefile index e4470c5f5..42ae88e95 100644 --- a/Makefile +++ b/Makefile @@ -15,8 +15,8 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 -export EMQX_DASHBOARD_VERSION ?= v1.4.1 -export EMQX_EE_DASHBOARD_VERSION ?= e1.2.1 +export EMQX_DASHBOARD_VERSION ?= v1.5.0 +export EMQX_EE_DASHBOARD_VERSION ?= e1.3.0 # `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used # In make 4.4+, for backward-compatibility the value from the original environment is used. @@ -75,7 +75,7 @@ mix-deps-get: $(ELIXIR_COMMON_DEPS) .PHONY: eunit eunit: $(REBAR) merge-config - @ENABLE_COVER_COMPILE=1 $(REBAR) eunit -v -c --cover_export_name $(CT_COVER_EXPORT_PREFIX)-eunit + @ENABLE_COVER_COMPILE=1 $(REBAR) eunit --name eunit@127.0.0.1 -v -c --cover_export_name $(CT_COVER_EXPORT_PREFIX)-eunit .PHONY: proper proper: $(REBAR) diff --git a/apps/emqx/etc/ssl_dist.conf b/apps/emqx/etc/ssl_dist.conf index b4c16e2cc..82c523534 100644 --- a/apps/emqx/etc/ssl_dist.conf +++ b/apps/emqx/etc/ssl_dist.conf @@ -1,6 +1,6 @@ %% This additional config file is used when the config 'cluster.proto_dist' in emqx.conf is set to 'inet_tls'. %% Which means the EMQX nodes will connect to each other over TLS. -%% For more information about inter-broker security, see: https://docs.emqx.com/en/enterprise/v5.0/deploy/cluster/security.html +%% For more information about inter-broker security, see: https://docs.emqx.com/en/enterprise/v5.3/deploy/cluster/security.html %% For more information in technical details see: http://erlang.org/doc/apps/ssl/ssl_distribution.html diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 9b6252efb..801773663 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.2.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.3.0-alpha.1"). +-define(EMQX_RELEASE_EE, "5.3.0-alpha.2"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 9bff4b293..d803f67be 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -61,7 +61,7 @@ ) end). --define(AUDIT(_Level_, _Msg_, _Meta_), begin +-define(AUDIT(_Level_, _From_, _Meta_), begin case emqx_config:get([log, audit], #{enable => false}) of #{enable := false} -> ok; @@ -71,8 +71,8 @@ end). emqx_trace:log( _Level_, [{emqx_audit, fun(L, _) -> L end, undefined, undefined}], - {report, _Msg_}, - _Meta_ + _Msg = undefined, + _Meta_#{from => _From_} ); gt -> ok diff --git a/apps/emqx/rebar.config.script b/apps/emqx/rebar.config.script index 96d19b6d0..db54b6177 100644 --- a/apps/emqx/rebar.config.script +++ b/apps/emqx/rebar.config.script @@ -24,7 +24,7 @@ IsQuicSupp = fun() -> end, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}, -Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.200"}}}. +Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.201"}}}. Dialyzer = fun(Config) -> {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config), diff --git a/apps/emqx/src/config/emqx_config_logger.erl b/apps/emqx/src/config/emqx_config_logger.erl index 070fa6f32..c675edb52 100644 --- a/apps/emqx/src/config/emqx_config_logger.erl +++ b/apps/emqx/src/config/emqx_config_logger.erl @@ -151,13 +151,22 @@ tr_file_handlers(Conf) -> lists:map(fun tr_file_handler/1, Handlers). tr_file_handler({HandlerName, SubConf}) -> + FilePath = conf_get("path", SubConf), + RotationCount = conf_get("rotation_count", SubConf), + RotationSize = conf_get("rotation_size", SubConf), + Type = + case RotationSize of + infinity -> halt; + _ -> wrap + end, + HandlerConf = log_handler_conf(SubConf), {handler, atom(HandlerName), logger_disk_log_h, #{ level => conf_get("level", SubConf), - config => (log_handler_conf(SubConf))#{ - type => wrap, - file => conf_get("path", SubConf), - max_no_files => conf_get("rotation_count", SubConf), - max_no_bytes => conf_get("rotation_size", SubConf) + config => HandlerConf#{ + type => Type, + file => FilePath, + max_no_files => RotationCount, + max_no_bytes => RotationSize }, formatter => log_formatter(HandlerName, SubConf), filters => log_filter(HandlerName, SubConf), @@ -216,38 +225,26 @@ log_formatter(HandlerName, Conf) -> end, SingleLine = conf_get("single_line", Conf), Depth = conf_get("max_depth", Conf), + Format = + case HandlerName of + ?AUDIT_HANDLER -> + json; + _ -> + conf_get("formatter", Conf) + end, do_formatter( - HandlerName, conf_get("formatter", Conf), CharsLimit, SingleLine, TimeOffSet, Depth + Format, CharsLimit, SingleLine, TimeOffSet, Depth ). %% helpers -do_formatter(?AUDIT_HANDLER, _, CharsLimit, SingleLine, TimeOffSet, Depth) -> - {emqx_logger_jsonfmt, #{ - template => [ - time, - " [", - level, - "] ", - %% http api - {method, [code, " ", method, " ", operate_id, " ", username, " "], []}, - %% cli - {cmd, [cmd, " "], []}, - msg, - "\n" - ], - chars_limit => CharsLimit, - single_line => SingleLine, - time_offset => TimeOffSet, - depth => Depth - }}; -do_formatter(_, json, CharsLimit, SingleLine, TimeOffSet, Depth) -> +do_formatter(json, CharsLimit, SingleLine, TimeOffSet, Depth) -> {emqx_logger_jsonfmt, #{ chars_limit => CharsLimit, single_line => SingleLine, time_offset => TimeOffSet, depth => Depth }}; -do_formatter(_, text, CharsLimit, SingleLine, TimeOffSet, Depth) -> +do_formatter(text, CharsLimit, SingleLine, TimeOffSet, Depth) -> {emqx_logger_textfmt, #{ template => [time, " [", level, "] ", msg, "\n"], chars_limit => CharsLimit, diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index a189fc9e5..f38c5563a 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -678,7 +678,7 @@ return_change_result(ConfKeyPath, {{update, Req}, Opts}) -> case Req =/= ?TOMBSTONE_CONFIG_CHANGE_REQ of true -> #{ - config => emqx_config:get(ConfKeyPath), + config => emqx_config:get(ConfKeyPath, undefined), raw_config => return_rawconf(ConfKeyPath, Opts) }; false -> diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 09a1bfc10..9abff250b 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -437,6 +437,10 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> case maps:get(cacertfile, SSLOpts, undefined) of undefined -> []; + <<>> -> + []; + "" -> + []; CaCertFile -> [{cacertfile, emqx_schema:naive_env_interpolation(CaCertFile)}] end ++ diff --git a/apps/emqx/src/emqx_logger_jsonfmt.erl b/apps/emqx/src/emqx_logger_jsonfmt.erl index 8710032e6..5df4157f4 100644 --- a/apps/emqx/src/emqx_logger_jsonfmt.erl +++ b/apps/emqx/src/emqx_logger_jsonfmt.erl @@ -51,7 +51,8 @@ -type config() :: #{ depth => pos_integer() | unlimited, report_cb => logger:report_cb(), - single_line => boolean() + single_line => boolean(), + chars_limit => unlimited | pos_integer() }. -define(IS_STRING(String), (is_list(String) orelse is_binary(String))). @@ -64,19 +65,17 @@ best_effort_json(Input) -> best_effort_json(Input, [pretty, force_utf8]). best_effort_json(Input, Opts) -> - Config = #{depth => unlimited, single_line => true}, + Config = #{depth => unlimited, single_line => true, chars_limit => unlimited}, JsonReady = best_effort_json_obj(Input, Config), emqx_utils_json:encode(JsonReady, Opts). -spec format(logger:log_event(), config()) -> iodata(). -format(#{level := Level, msg := Msg, meta := Meta} = Event, Config0) when is_map(Config0) -> +format(#{level := Level, msg := Msg, meta := Meta}, Config0) when is_map(Config0) -> Config = add_default_config(Config0), - MsgBin = format(Msg, Meta#{level => Level}, Config), - logger_formatter:format(Event#{msg => {string, MsgBin}}, Config). + [format(Msg, Meta#{level => Level}, Config), "\n"]. -format(Msg, Meta0, Config) -> - Meta = maps:without([time, level], Meta0), - Data0 = +format(Msg, Meta, Config) -> + Data = try maybe_format_msg(Msg, Meta, Config) of Map when is_map(Map) -> maps:merge(Map, Meta); @@ -92,9 +91,10 @@ format(Msg, Meta0, Config) -> fmt_stacktrace => S } end, - Data = maps:without([report_cb], Data0), - emqx_utils_json:encode(json_obj(Data, Config)). + emqx_utils_json:encode(json_obj_root(Data, Config)). +maybe_format_msg(undefined, _Meta, _Config) -> + #{}; maybe_format_msg({report, Report} = Msg, #{report_cb := Cb} = Meta, Config) -> case is_map(Report) andalso Cb =:= ?DEFAULT_FORMATTER of true -> @@ -128,7 +128,7 @@ format_msg({report, Report}, #{report_cb := Fun} = Meta, Config) when is_functio end; format_msg({report, Report}, #{report_cb := Fun}, Config) when is_function(Fun, 2) -> %% a format callback function of arity 2 - case Fun(Report, maps:with([depth, single_line], Config)) of + case Fun(Report, maps:with([depth, single_line, chars_limit], Config)) of Chardata when ?IS_STRING(Chardata) -> try unicode:characters_to_binary(Chardata, utf8) @@ -152,11 +152,13 @@ format_msg({Fmt, Args}, _Meta, Config) -> do_format_msg(Format0, Args, #{ depth := Depth, - single_line := SingleLine + single_line := SingleLine, + chars_limit := Limit }) -> + Opts = chars_limit_to_opts(Limit), Format1 = io_lib:scan_format(Format0, Args), Format = reformat(Format1, Depth, SingleLine), - Text0 = io_lib:build_text(Format, []), + Text0 = io_lib:build_text(Format, Opts), Text = case SingleLine of true -> re:replace(Text0, ",?\r?\n\s*", ", ", [{return, list}, global, unicode]); @@ -164,6 +166,9 @@ do_format_msg(Format0, Args, #{ end, trim(unicode:characters_to_binary(Text, utf8)). +chars_limit_to_opts(unlimited) -> []; +chars_limit_to_opts(Limit) -> [{chars_limit, Limit}]. + %% Get rid of the leading spaces. %% leave alone the trailing spaces. trim(<<$\s, Rest/binary>>) -> trim(Rest); @@ -221,10 +226,6 @@ best_effort_json_obj(Map, Config) -> do_format_msg("~p", [Map], Config) end. -json([], _) -> - ""; -json(<<"">>, _) -> - "\"\""; json(A, _) when is_atom(A) -> atom_to_binary(A, utf8); json(I, _) when is_integer(I) -> I; json(F, _) when is_float(F) -> F; @@ -233,52 +234,76 @@ json(P, C) when is_port(P) -> json(port_to_list(P), C); json(F, C) when is_function(F) -> json(erlang:fun_to_list(F), C); json(B, Config) when is_binary(B) -> best_effort_unicode(B, Config); -json(L, Config) when is_list(L), is_integer(hd(L)) -> - best_effort_unicode(L, Config); json(M, Config) when is_list(M), is_tuple(hd(M)), tuple_size(hd(M)) =:= 2 -> best_effort_json_obj(M, Config); json(L, Config) when is_list(L) -> - [json(I, Config) || I <- L]; + case lists:all(fun erlang:is_binary/1, L) of + true -> + %% string array + L; + false -> + try unicode:characters_to_binary(L, utf8) of + B when is_binary(B) -> B; + _ -> [json(I, Config) || I <- L] + catch + _:_ -> + [json(I, Config) || I <- L] + end + end; json(Map, Config) when is_map(Map) -> best_effort_json_obj(Map, Config); json(Term, Config) -> do_format_msg("~p", [Term], Config). +json_obj_root(Data0, Config) -> + Time = maps:get(time, Data0, undefined), + Level = maps:get(level, Data0, undefined), + Msg1 = + case maps:get(msg, Data0, undefined) of + undefined -> + maps:get('$kind', Data0, undefined); + Msg0 -> + Msg0 + end, + Msg = + case Msg1 of + undefined -> + undefined; + _ -> + json(Msg1, Config) + end, + Mfal = emqx_utils:format_mfal(Data0), + Data = + maps:fold( + fun(K, V, D) -> + {K1, V1} = json_kv(K, V, Config), + [{K1, V1} | D] + end, + [], + maps:without( + [time, gl, file, report_cb, msg, '$kind', mfa, level, line, is_trace], Data0 + ) + ), + lists:filter( + fun({_, V}) -> V =/= undefined end, + [{time, Time}, {level, Level}, {msg, Msg}, {mfa, Mfal}] + ) ++ Data. + json_obj(Data, Config) -> maps:fold( fun(K, V, D) -> - json_kv(K, V, D, Config) + {K1, V1} = json_kv(K, V, Config), + maps:put(K1, V1, D) end, maps:new(), Data ). -json_kv(mfa, {M, F, A}, Data, _Config) -> - maps:put( - mfa, - << - (atom_to_binary(M, utf8))/binary, - $:, - (atom_to_binary(F, utf8))/binary, - $/, - (integer_to_binary(A))/binary - >>, - Data - ); -%% snabbkaffe -json_kv('$kind', Kind, Data, Config) -> - maps:put(msg, json(Kind, Config), Data); -json_kv(gl, _, Data, _Config) -> - %% drop gl because it's not interesting - Data; -json_kv(file, _, Data, _Config) -> - %% drop 'file' because we have mfa - Data; -json_kv(K0, V, Data, Config) -> +json_kv(K0, V, Config) -> K = json_key(K0), case is_map(V) of - true -> maps:put(json(K, Config), best_effort_json_obj(V, Config), Data); - false -> maps:put(json(K, Config), json(V, Config), Data) + true -> {K, best_effort_json_obj(V, Config)}; + false -> {K, json(V, Config)} end. json_key(A) when is_atom(A) -> json_key(atom_to_binary(A, utf8)); @@ -373,23 +398,83 @@ p_config() -> proper_types:shrink_list( [ {depth, p_limit()}, - {single_line, proper_types:boolean()} + {single_line, proper_types:boolean()}, + {chars_limit, p_limit()} ] ). +%% NOTE: pretty-printing format is asserted in the test +%% This affects the CLI output format, consult the team before changing +%% the format. best_effort_json_test() -> ?assertEqual( <<"{\n \n}">>, - emqx_logger_jsonfmt:best_effort_json([]) + best_effort_json([]) ), ?assertEqual( <<"{\n \"key\" : [\n \n ]\n}">>, - emqx_logger_jsonfmt:best_effort_json(#{key => []}) + best_effort_json(#{key => []}) ), ?assertEqual( <<"[\n {\n \"key\" : [\n \n ]\n }\n]">>, - emqx_logger_jsonfmt:best_effort_json([#{key => []}]) + best_effort_json([#{key => []}]) ), ok. +config() -> + #{ + chars_limit => unlimited, + depth => unlimited, + single_line => true + }. + +make_log(Report) -> + #{ + level => info, + msg => Report, + meta => #{time => 1111, report_cb => ?DEFAULT_FORMATTER} + }. + +ensure_json_output_test() -> + JSON = format(make_log({report, #{foo => bar}}), config()), + ?assert(is_map(emqx_utils_json:decode(JSON))), + ok. + +chars_limit_not_applied_on_raw_map_fields_test() -> + Limit = 32, + Len = 100, + LongStr = lists:duplicate(Len, $a), + Config0 = config(), + Config = Config0#{ + chars_limit => Limit + }, + JSON = format(make_log({report, #{foo => LongStr}}), Config), + #{<<"foo">> := LongStr1} = emqx_utils_json:decode(JSON), + ?assertEqual(Len, size(LongStr1)), + ok. + +chars_limit_applied_on_format_result_test() -> + Limit = 32, + Len = 100, + LongStr = lists:duplicate(Len, $a), + Config0 = config(), + Config = Config0#{ + chars_limit => Limit + }, + JSON = format(make_log({string, LongStr}), Config), + #{<<"msg">> := LongStr1} = emqx_utils_json:decode(JSON), + ?assertEqual(Limit, size(LongStr1)), + ok. + +string_array_test() -> + Array = #{<<"arr">> => [<<"a">>, <<"b">>]}, + Encoded = emqx_utils_json:encode(json(Array, config())), + ?assertEqual(Array, emqx_utils_json:decode(Encoded)). + +iolist_test() -> + Iolist = #{iolist => ["a", ["b"]]}, + Concat = #{<<"iolist">> => <<"ab">>}, + Encoded = emqx_utils_json:encode(json(Iolist, config())), + ?assertEqual(Concat, emqx_utils_json:decode(Encoded)). + -endif. diff --git a/apps/emqx/src/emqx_logger_textfmt.erl b/apps/emqx/src/emqx_logger_textfmt.erl index 3dce8a2ec..2e8718c37 100644 --- a/apps/emqx/src/emqx_logger_textfmt.erl +++ b/apps/emqx/src/emqx_logger_textfmt.erl @@ -56,8 +56,7 @@ enrich_report(ReportRaw, Meta) -> end, ClientId = maps:get(clientid, Meta, undefined), Peer = maps:get(peername, Meta, undefined), - MFA = maps:get(mfa, Meta, undefined), - Line = maps:get(line, Meta, undefined), + MFA = emqx_utils:format_mfal(Meta), Msg = maps:get(msg, ReportRaw, undefined), %% turn it into a list so that the order of the fields is determined lists:foldl( @@ -70,8 +69,7 @@ enrich_report(ReportRaw, Meta) -> {topic, try_format_unicode(Topic)}, {clientid, try_format_unicode(ClientId)}, {peername, Peer}, - {line, Line}, - {mfa, mfa(MFA)}, + {mfa, try_format_unicode(MFA)}, {msg, Msg} ] ). @@ -84,7 +82,7 @@ try_format_unicode(Char) -> case unicode:characters_to_list(Char) of {error, _, _} -> error; {incomplete, _, _} -> error; - Binary -> Binary + List1 -> List1 end catch _:_ -> @@ -95,8 +93,8 @@ try_format_unicode(Char) -> _ -> List end. -enrich_mfa({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) -> - {Fmt ++ " mfa: ~ts line: ~w", Args ++ [mfa(Mfa), Line]}; +enrich_mfa({Fmt, Args}, Data) when is_list(Fmt) -> + {Fmt ++ " mfa: ~ts", Args ++ [emqx_utils:format_mfal(Data)]}; enrich_mfa(Msg, _) -> Msg. @@ -113,6 +111,3 @@ enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) -> {" topic: ~ts" ++ Fmt, [Topic | Args]}; enrich_topic(Msg, _) -> Msg. - -mfa(undefined) -> undefined; -mfa({M, F, A}) -> [atom_to_list(M), ":", atom_to_list(F), "/" ++ integer_to_list(A)]. diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index 86ec5937f..4675c3734 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -43,6 +43,10 @@ erpc_multicall/1 ]). +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + -compile( {inline, [ rpc_node/1, @@ -75,15 +79,15 @@ -spec call(node(), module(), atom(), list()) -> call_result(). call(Node, Mod, Fun, Args) -> - filter_result(gen_rpc:call(rpc_node(Node), Mod, Fun, Args)). + maybe_badrpc(gen_rpc:call(rpc_node(Node), Mod, Fun, Args)). -spec call(term(), node(), module(), atom(), list()) -> call_result(). call(Key, Node, Mod, Fun, Args) -> - filter_result(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args)). + maybe_badrpc(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args)). -spec call(term(), node(), module(), atom(), list(), timeout()) -> call_result(). call(Key, Node, Mod, Fun, Args, Timeout) -> - filter_result(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args, Timeout)). + maybe_badrpc(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args, Timeout)). -spec multicall([node()], module(), atom(), list()) -> multicall_result(). multicall(Nodes, Mod, Fun, Args) -> @@ -127,18 +131,15 @@ rpc_nodes([], Acc) -> rpc_nodes([Node | Nodes], Acc) -> rpc_nodes(Nodes, [rpc_node(Node) | Acc]). -filter_result({Error, Reason}) when - Error =:= badrpc; Error =:= badtcp --> +maybe_badrpc({Error, Reason}) when Error =:= badrpc; Error =:= badtcp -> {badrpc, Reason}; -filter_result(Delivery) -> +maybe_badrpc(Delivery) -> Delivery. max_client_num() -> emqx:get_config([rpc, tcp_client_num], ?DefaultClientNum). -spec unwrap_erpc(emqx_rpc:erpc(A) | [emqx_rpc:erpc(A)]) -> A | {error, _Err} | list(). - unwrap_erpc(Res) when is_list(Res) -> [unwrap_erpc(R) || R <- Res]; unwrap_erpc({ok, A}) -> @@ -151,3 +152,73 @@ unwrap_erpc({exit, Err}) -> {error, Err}; unwrap_erpc({error, {erpc, Err}}) -> {error, Err}. + +-ifdef(TEST). + +badrpc_call_test_() -> + application:ensure_all_started(gen_rpc), + Node = node(), + [ + {"throw", fun() -> + ?assertEqual(foo, call(Node, erlang, throw, [foo])) + end}, + {"error", fun() -> + ?assertMatch({badrpc, {'EXIT', {foo, _}}}, call(Node, erlang, error, [foo])) + end}, + {"exit", fun() -> + ?assertEqual({badrpc, {'EXIT', foo}}, call(Node, erlang, exit, [foo])) + end}, + {"timeout", fun() -> + ?assertEqual({badrpc, timeout}, call(key, Node, timer, sleep, [1000], 100)) + end}, + {"noconnection", fun() -> + %% mute crash report from gen_rpc + logger:set_primary_config(level, critical), + try + ?assertEqual( + {badrpc, nxdomain}, call(key, 'no@such.node', foo, bar, []) + ) + after + logger:set_primary_config(level, notice) + end + end} + ]. + +multicall_test() -> + application:ensure_all_started(gen_rpc), + logger:set_primary_config(level, critical), + BadNode = 'no@such.node', + ThisNode = node(), + Nodes = [ThisNode, BadNode], + Call4 = fun(M, F, A) -> multicall(Nodes, M, F, A) end, + Call5 = fun(Key, M, F, A) -> multicall(Key, Nodes, M, F, A) end, + try + ?assertMatch({[foo], [{BadNode, _}]}, Call4(erlang, throw, [foo])), + ?assertMatch({[], [{ThisNode, _}, {BadNode, _}]}, Call4(erlang, error, [foo])), + ?assertMatch({[], [{ThisNode, _}, {BadNode, _}]}, Call4(erlang, exit, [foo])), + ?assertMatch({[], [{ThisNode, _}, {BadNode, _}]}, Call5(key, foo, bar, [])) + after + logger:set_primary_config(level, notice) + end. + +unwrap_erpc_test_() -> + Nodes = [node()], + MultiC = fun(M, F, A) -> unwrap_erpc(erpc:multicall(Nodes, M, F, A, 100)) end, + [ + {"throw", fun() -> + ?assertEqual([{error, foo}], MultiC(erlang, throw, [foo])) + end}, + {"error", fun() -> + ?assertEqual([{error, foo}], MultiC(erlang, error, [foo])) + end}, + {"exit", fun() -> + ?assertEqual([{error, {exception, foo}}], MultiC(erlang, exit, [foo])) + end}, + {"noconnection", fun() -> + ?assertEqual( + [{error, noconnection}], unwrap_erpc(erpc:multicall(['no@such.node'], foo, bar, [])) + ) + end} + ]. + +-endif. diff --git a/apps/emqx/src/emqx_tls_certfile_gc.erl b/apps/emqx/src/emqx_tls_certfile_gc.erl index 9e2e98b7f..ccac02471 100644 --- a/apps/emqx/src/emqx_tls_certfile_gc.erl +++ b/apps/emqx/src/emqx_tls_certfile_gc.erl @@ -271,9 +271,12 @@ find_config_references(Root) -> is_file_reference(Stack) -> lists:any( fun(KP) -> lists:prefix(lists:reverse(KP), Stack) end, - emqx_tls_lib:ssl_file_conf_keypaths() + conf_keypaths() ). +conf_keypaths() -> + emqx_tls_lib:ssl_file_conf_keypaths(). + mk_fileref(AbsPath) -> case emqx_utils_fs:read_info(AbsPath) of {ok, Info} -> diff --git a/apps/emqx/src/emqx_tls_lib.erl b/apps/emqx/src/emqx_tls_lib.erl index 9113bd5e6..0b9bfe805 100644 --- a/apps/emqx/src/emqx_tls_lib.erl +++ b/apps/emqx/src/emqx_tls_lib.erl @@ -50,11 +50,17 @@ -define(IS_FALSE(Val), ((Val =:= false) orelse (Val =:= <<"false">>))). -define(SSL_FILE_OPT_PATHS, [ + %% common ssl options [<<"keyfile">>], [<<"certfile">>], [<<"cacertfile">>], - [<<"ocsp">>, <<"issuer_pem">>] + %% OCSP + [<<"ocsp">>, <<"issuer_pem">>], + %% SSO + [<<"sp_public_key">>], + [<<"sp_private_key">>] ]). + -define(SSL_FILE_OPT_PATHS_A, [ [keyfile], [certfile], diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 24105b2b4..5cbca3243 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -52,7 +52,7 @@ %% (e.g. in `init_per_suite/1` / `init_per_group/2`), providing the appspecs %% and unique work dir for the testrun (e.g. `work_dir/1`). Save the result %% in a context. -%% 3. Call `emqx_cth_sutie:stop/1` to stop the applications after the testrun +%% 3. Call `emqx_cth_suite:stop/1` to stop the applications after the testrun %% finishes (e.g. in `end_per_suite/1` / `end_per_group/2`), providing the %% result from step 2. -module(emqx_cth_suite). @@ -245,6 +245,9 @@ spec_fmt(ffun, {_, X}) -> X. maybe_configure_app(_App, #{config := false}) -> ok; +maybe_configure_app(_App, AppConfig = #{schema_mod := SchemaModule}) when is_atom(SchemaModule) -> + #{config := Config} = AppConfig, + configure_app(SchemaModule, Config); maybe_configure_app(App, #{config := Config}) -> case app_schema(App) of {ok, SchemaModule} -> diff --git a/apps/emqx/test/props/prop_emqx_rpc.erl b/apps/emqx/test/props/prop_emqx_rpc.erl deleted file mode 100644 index e544ae082..000000000 --- a/apps/emqx/test/props/prop_emqx_rpc.erl +++ /dev/null @@ -1,195 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(prop_emqx_rpc). - --include_lib("proper/include/proper.hrl"). --include_lib("eunit/include/eunit.hrl"). - --define(NODENAME, 'test@127.0.0.1'). - --define(ALL(Vars, Types, Exprs), - ?SETUP( - fun() -> - State = do_setup(), - fun() -> do_teardown(State) end - end, - ?FORALL(Vars, Types, Exprs) - ) -). - -%%-------------------------------------------------------------------- -%% Properties -%%-------------------------------------------------------------------- - -prop_node() -> - ?ALL( - Node0, - nodename(), - begin - Node = punch(Node0), - ?assert(emqx_rpc:cast(Node, erlang, system_time, [])), - case emqx_rpc:call(Node, erlang, system_time, []) of - {badrpc, _Reason} -> true; - Delivery when is_integer(Delivery) -> true; - _Other -> false - end - end - ). - -prop_node_with_key() -> - ?ALL( - {Node0, Key}, - nodename_with_key(), - begin - Node = punch(Node0), - ?assert(emqx_rpc:cast(Key, Node, erlang, system_time, [])), - case emqx_rpc:call(Key, Node, erlang, system_time, []) of - {badrpc, _Reason} -> true; - Delivery when is_integer(Delivery) -> true; - _Other -> false - end - end - ). - -prop_nodes() -> - ?ALL( - Nodes0, - nodesname(), - begin - Nodes = punch(Nodes0), - case emqx_rpc:multicall(Nodes, erlang, system_time, []) of - {RealResults, RealBadNodes} when - is_list(RealResults); - is_list(RealBadNodes) - -> - true; - _Other -> - false - end - end - ). - -prop_nodes_with_key() -> - ?ALL( - {Nodes0, Key}, - nodesname_with_key(), - begin - Nodes = punch(Nodes0), - case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of - {RealResults, RealBadNodes} when - is_list(RealResults); - is_list(RealBadNodes) - -> - true; - _Other -> - false - end - end - ). - -%%-------------------------------------------------------------------- -%% Helper -%%-------------------------------------------------------------------- - -do_setup() -> - ensure_distributed_nodename(), - ok = logger:set_primary_config(#{level => warning}), - {ok, _Apps} = application:ensure_all_started(gen_rpc), - ok = application:set_env(gen_rpc, call_receive_timeout, 100), - ok = meck:new(gen_rpc, [passthrough, no_history]), - ok = meck:expect( - gen_rpc, - multicall, - fun(Nodes, Mod, Fun, Args) -> - gen_rpc:multicall(Nodes, Mod, Fun, Args, 100) - end - ). - -do_teardown(_) -> - ok = net_kernel:stop(), - ok = application:stop(gen_rpc), - ok = meck:unload(gen_rpc), - %% wait for tcp close - timer:sleep(2500). - -ensure_distributed_nodename() -> - case net_kernel:start([?NODENAME]) of - {ok, _} -> - ok; - {error, {already_started, _}} -> - net_kernel:stop(), - net_kernel:start([?NODENAME]); - {error, {{shutdown, {_, _, {'EXIT', nodistribution}}}, _}} -> - %% start epmd first - spawn_link(fun() -> os:cmd("epmd") end), - timer:sleep(100), - net_kernel:start([?NODENAME]) - end. - -%%-------------------------------------------------------------------- -%% Generator -%%-------------------------------------------------------------------- - -nodename() -> - ?LET( - {NodePrefix, HostName}, - {node_prefix(), hostname()}, - begin - Node = NodePrefix ++ "@" ++ HostName, - list_to_atom(Node) - end - ). - -nodename_with_key() -> - ?LET( - {NodePrefix, HostName, Key}, - {node_prefix(), hostname(), choose(0, 10)}, - begin - Node = NodePrefix ++ "@" ++ HostName, - {list_to_atom(Node), Key} - end - ). - -nodesname() -> - oneof([list(nodename()), [node()]]). - -nodesname_with_key() -> - oneof([{list(nodename()), choose(0, 10)}, {[node()], 1}]). - -node_prefix() -> - oneof(["emqxct", text_like()]). - -text_like() -> - ?SUCHTHAT(Text, list(range($a, $z)), (length(Text) =< 100 andalso length(Text) > 0)). - -hostname() -> - oneof(["127.0.0.1", "localhost"]). - -%%-------------------------------------------------------------------- -%% Utils -%%-------------------------------------------------------------------- - -%% After running the props, the `node()` () is only able to return an -%% incorrect node name - `nonode@nohost`, But we want a distributed nodename -%% So, just translate the `nonode@nohost` to ?NODENAME -punch(Nodes) when is_list(Nodes) -> - lists:map(fun punch/1, Nodes); -punch('nonode@nohost') -> - %% Equal to ?NODENAME - node(); -punch(GoodBoy) -> - GoodBoy. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index f476ded39..395761d48 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -139,6 +139,7 @@ kafka_consumer_test() -> ok. message_key_dispatch_validations_test() -> + Name = myproducer, Conf0 = kafka_producer_new_hocon(), Conf1 = Conf0 ++ @@ -155,7 +156,7 @@ message_key_dispatch_validations_test() -> <<"message">> := #{<<"key">> := <<>>} } }, - emqx_utils_maps:deep_get([<<"bridges">>, <<"kafka">>, <<"myproducer">>], Conf) + emqx_utils_maps:deep_get([<<"bridges">>, <<"kafka">>, atom_to_binary(Name)], Conf) ), ?assertThrow( {_, [ @@ -166,8 +167,6 @@ message_key_dispatch_validations_test() -> ]}, check(Conf) ), - %% ensure atoms exist - _ = [myproducer], ?assertThrow( {_, [ #{ diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl index 031767063..7169ea3d2 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl @@ -11,6 +11,7 @@ %%=========================================================================== pulsar_producer_validations_test() -> + Name = my_producer, Conf0 = pulsar_producer_hocon(), Conf1 = Conf0 ++ @@ -24,7 +25,7 @@ pulsar_producer_validations_test() -> <<"strategy">> := <<"key_dispatch">>, <<"message">> := #{<<"key">> := <<>>} }, - emqx_utils_maps:deep_get([<<"bridges">>, <<"pulsar_producer">>, <<"my_producer">>], Conf) + emqx_utils_maps:deep_get([<<"bridges">>, <<"pulsar_producer">>, atom_to_binary(Name)], Conf) ), ?assertThrow( {_, [ @@ -35,8 +36,6 @@ pulsar_producer_validations_test() -> ]}, check(Conf) ), - %% ensure atoms exist - _ = [my_producer], ?assertThrow( {_, [ #{ diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 109e88b4d..6cb53f8f3 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -108,7 +108,17 @@ admins(_) -> emqx_ctl:usage(usage_sync()). audit(Level, From, Log) -> - ?AUDIT(Level, From, Log#{time => logger:timestamp()}). + Log1 = redact(Log#{time => logger:timestamp()}), + ?AUDIT(Level, From, Log1). + +redact(Logs = #{cmd := admins, args := ["add", Username, _Password | Rest]}) -> + Logs#{args => ["add", Username, "******" | Rest]}; +redact(Logs = #{cmd := admins, args := ["passwd", Username, _Password]}) -> + Logs#{args => ["passwd", Username, "******"]}; +redact(Logs = #{cmd := license, args := ["update", _License]}) -> + Logs#{args => ["update", "******"]}; +redact(Logs) -> + Logs. usage_conf() -> [ diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index e87c3c898..e4fb06ef5 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -43,6 +43,9 @@ ]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). +%% internal exports for `emqx_enterprise_schema' only. +-export([ensure_unicode_path/2, convert_rotation/2, log_handler_common_confs/2]). + %% Static apps which merge their configs into the merged emqx.conf %% The list can not be made a dynamic read at run-time as it is used %% by nodetool to generate app.