diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index 5576f579d..1ca2e8b24 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -457,6 +457,17 @@ bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath]. bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(B) when is_binary(B) -> B. +atom(Bin) when is_binary(Bin), size(Bin) > 255 -> + erlang:throw( + iolist_to_binary( + io_lib:format( + "Name is is too long." + " Please provide a shorter name (<= 255 bytes)." + " The name that is too long: \"~s\"", + [Bin] + ) + ) + ); atom(Bin) when is_binary(Bin) -> binary_to_atom(Bin, utf8); atom(Str) when is_list(Str) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index dd04fad78..8c520d14f 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -3018,7 +3018,7 @@ non_empty_string(_) -> {error, invalid_string}. %% hosts can be successfully parsed. %% 3. parsing: Done at runtime in each module which uses this config servers_sc(Meta0, ParseOpts) -> - %% if this filed has a default value + %% if this field has a default value %% then it is not NOT required %% NOTE: maps:is_key is not the solution because #{default => undefined} is legit HasDefault = (maps:get(default, Meta0, undefined) =/= undefined), @@ -3079,7 +3079,7 @@ servers_validator(Opts, Required) -> %% we should remove field from config if it's empty throw("cannot_be_empty"); "undefined" when Required -> - %% when the filed is not set in config file + %% when the field is not set in config file %% NOTE: assuming nobody is going to name their server "undefined" throw("cannot_be_empty"); "undefined" -> diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index d68ea342e..35aa0449f 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -181,7 +181,7 @@ t_create_with_config_values_wont_work(_Config) -> InvalidConfigs ). -%% creating without a require filed should return error +%% creating without a require field should return error t_create_invalid_config(_Config) -> AuthzConfig = raw_redis_authz_config(), C = maps:without([<<"server">>], AuthzConfig), diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index c0b36dd76..a233a53ef 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -602,8 +602,8 @@ create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) -> case emqx_bridge:create(BridgeType, BridgeName, Conf) of {ok, _} -> lookup_from_all_nodes(BridgeType, BridgeName, HttpStatusCode); - {error, #{kind := validation_error} = Reason} -> - ?BAD_REQUEST(map_to_json(Reason)) + {error, Reason} when is_map(Reason) -> + ?BAD_REQUEST(map_to_json(emqx_utils:redact(Reason))) end. get_metrics_from_local_node(BridgeType, BridgeName) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 2f5d65259..6f5c669a0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -421,6 +421,26 @@ t_http_crud_apis(Config) -> ), %% Test bad updates + %% ================ + + %% Add bridge with a name that is too long + %% We only support bridge names up to 255 characters + LongName = list_to_binary(lists:duplicate(256, $a)), + NameTooLongRequestResult = request_json( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, LongName), + Config + ), + ?assertMatch( + {ok, 400, _}, + NameTooLongRequestResult + ), + {ok, 400, #{<<"message">> := NameTooLongMessage}} = NameTooLongRequestResult, + %% Use regex to check that the message contains the name + Match = re:run(NameTooLongMessage, LongName), + ?assertMatch({match, _}, Match), + %% Add bridge without the URL field {ok, 400, PutFail1} = request_json( put, uri(["bridges", BridgeID]), diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index c2a04e93d..b178f77e0 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -95,33 +95,33 @@ namespace() -> "bridge_influxdb". roots() -> []. fields("post_api_v1") -> - method_fileds(post, influxdb_api_v1); + method_fields(post, influxdb_api_v1); fields("post_api_v2") -> - method_fileds(post, influxdb_api_v2); + method_fields(post, influxdb_api_v2); fields("put_api_v1") -> - method_fileds(put, influxdb_api_v1); + method_fields(put, influxdb_api_v1); fields("put_api_v2") -> - method_fileds(put, influxdb_api_v2); + method_fields(put, influxdb_api_v2); fields("get_api_v1") -> - method_fileds(get, influxdb_api_v1); + method_fields(get, influxdb_api_v1); fields("get_api_v2") -> - method_fileds(get, influxdb_api_v2); + method_fields(get, influxdb_api_v2); fields(Type) when Type == influxdb_api_v1 orelse Type == influxdb_api_v2 -> influxdb_bridge_common_fields() ++ connector_fields(Type). -method_fileds(post, ConnectorType) -> +method_fields(post, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType); -method_fileds(get, ConnectorType) -> +method_fields(get, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType) ++ emqx_bridge_schema:status_fields(); -method_fileds(put, ConnectorType) -> +method_fields(put, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType). diff --git a/apps/emqx_conf/etc/emqx_conf.conf b/apps/emqx_conf/etc/emqx_conf.conf index 76e3c0805..2d7b8d910 100644 --- a/apps/emqx_conf/etc/emqx_conf.conf +++ b/apps/emqx_conf/etc/emqx_conf.conf @@ -6,7 +6,7 @@ ## are stored in data/configs/cluster.hocon. ## To avoid confusion, please do not store the same configs in both files. ## -## See https://docs.emqx.com/en/enterprise/v5.0/configuration/configuration.html +## See {{ emqx_configuration_doc }} for more details. ## Configuration full example can be found in emqx.conf.example node { diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 9240d2116..530e4bfcb 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -22,36 +22,32 @@ unload/0 ]). +-include_lib("hocon/include/hoconsc.hrl"). + +%% kept cluster_call for compatibility -define(CLUSTER_CALL, cluster_call). -define(CONF, conf). load() -> - emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, []), + emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]), emqx_ctl:register_command(?CONF, {?MODULE, conf}, []). unload() -> emqx_ctl:unregister_command(?CLUSTER_CALL), emqx_ctl:unregister_command(?CONF). -conf(["show", "--keys-only"]) -> - print(emqx_config:get_root_names()); +conf(["show_keys" | _]) -> + print_keys(get_config()); conf(["show"]) -> print_hocon(get_config()); conf(["show", Key]) -> print_hocon(get_config(Key)); conf(["load", Path]) -> load_config(Path); +conf(["cluster_sync" | Args]) -> + admins(Args); conf(_) -> - emqx_ctl:usage( - [ - %% TODO add reload - %{"conf reload", "reload etc/emqx.conf on local node"}, - {"conf show --keys-only", "print all keys"}, - {"conf show", "print all running configures"}, - {"conf show ", "print a specific configuration"}, - {"conf load ", "load a hocon file to all nodes"} - ] - ). + emqx_ctl:usage(usage_conf() ++ usage_sync()). admins(["status"]) -> status(); @@ -87,14 +83,34 @@ admins(["fast_forward", Node0, ToTnxId]) -> emqx_cluster_rpc:fast_forward_to_commit(Node, TnxId), status(); admins(_) -> - emqx_ctl:usage( - [ - {"cluster_call status", "status"}, - {"cluster_call skip [node]", "increase one commit on specific node"}, - {"cluster_call tnxid ", "get detailed about TnxId"}, - {"cluster_call fast_forward [node] [tnx_id]", "fast forwards to tnx_id"} - ] - ). + emqx_ctl:usage(usage_sync()). + +usage_conf() -> + [ + %% TODO add reload + %{"conf reload", "reload etc/emqx.conf on local node"}, + {"conf show_keys", "Print all config keys"}, + {"conf show []", + "Print in-use configs (including default values) under the given key. " + "Print ALL keys if key is not provided"}, + {"conf load ", + "Load a HOCON format config file." + "The config is overlay on top of the existing configs. " + "The current node will initiate a cluster wide config change " + "transaction to sync the changes to other nodes in the cluster. " + "NOTE: do not make runtime config changes during rolling upgrade."} + ]. + +usage_sync() -> + [ + {"conf cluster_sync status", "Show cluster config sync status summary"}, + {"conf cluster_sync skip [node]", "Increase one commit on specific node"}, + {"conf cluster_sync tnxid ", + "Display detailed information of the config change transaction at TnxId"}, + {"conf cluster_sync fast_forward [node] [tnx_id]", + "Fast-forward config change transaction to tnx_id on the given node." + "WARNING: This results in inconsistent configs among the clustered nodes."} + ]. status() -> emqx_ctl:print("-----------------------------------------------\n"), @@ -116,14 +132,39 @@ status() -> ), emqx_ctl:print("-----------------------------------------------\n"). +print_keys(Config) -> + print(lists:sort(maps:keys(Config))). + print(Json) -> emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Json)]). print_hocon(Hocon) -> emqx_ctl:print("~ts~n", [hocon_pp:do(Hocon, #{})]). -get_config() -> emqx_config:fill_defaults(emqx:get_raw_config([])). -get_config(Key) -> emqx_config:fill_defaults(#{Key => emqx:get_raw_config([Key])}). +get_config() -> + drop_hidden_roots(emqx_config:fill_defaults(emqx:get_raw_config([]))). + +drop_hidden_roots(Conf) -> + Hidden = hidden_roots(), + maps:without(Hidden, Conf). + +hidden_roots() -> + SchemaModule = emqx_conf:schema_module(), + Roots = hocon_schema:roots(SchemaModule), + lists:filtermap( + fun({BinName, {_RefName, Schema}}) -> + case hocon_schema:field_schema(Schema, importance) =/= ?IMPORTANCE_HIDDEN of + true -> + false; + false -> + {true, BinName} + end + end, + Roots + ). + +get_config(Key) -> + emqx_config:fill_defaults(#{Key => emqx:get_raw_config([Key])}). -define(OPTIONS, #{rawconf_with_defaults => true, override_to => cluster}). load_config(Path) -> diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index bf500de26..9725c2da9 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -1323,7 +1323,7 @@ roots(Module) -> lists:map(fun({_BinName, Root}) -> Root end, hocon_schema:roots(Module)). %% Like authentication schema, authorization schema is incomplete in emqx_schema -%% module, this function replaces the root filed "authorization" with a new schema +%% module, this function replaces the root field "authorization" with a new schema emqx_schema_high_prio_roots() -> Roots = emqx_schema:roots(high), Authz = diff --git a/apps/emqx_ctl/src/emqx_ctl.erl b/apps/emqx_ctl/src/emqx_ctl.erl index d2ced7268..76068d361 100644 --- a/apps/emqx_ctl/src/emqx_ctl.erl +++ b/apps/emqx_ctl/src/emqx_ctl.erl @@ -157,18 +157,21 @@ help() -> print("No commands available.~n"); Cmds -> print("Usage: ~ts~n", ["emqx ctl"]), - lists:foreach( - fun({_, {Mod, Cmd}, _}) -> - print("~110..-s~n", [""]), - apply(Mod, Cmd, [usage]) - end, - Cmds - ) + lists:foreach(fun print_usage/1, Cmds) end; false -> print("Command table is initializing.~n") end. +print_usage({_, {Mod, Cmd}, Opts}) -> + case proplists:get_bool(hidden, Opts) of + true -> + ok; + false -> + print("~110..-s~n", [""]), + apply(Mod, Cmd, [usage]) + end. + -spec print(io:format()) -> ok. print(Msg) -> io:format("~ts", [format(Msg, [])]). diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 4145a92a7..52f96bcd2 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -970,6 +970,12 @@ close_socket(State = #state{socket = Socket}) -> %% Inc incoming/outgoing stats inc_incoming_stats(Ctx, FrameMod, Packet) -> + do_inc_incoming_stats(FrameMod:type(Packet), Ctx, FrameMod, Packet). + +%% If a mailformed packet is received, the type of the packet is undefined. +do_inc_incoming_stats(undefined, _Ctx, _FrameMod, _Packet) -> + ok; +do_inc_incoming_stats(Type, Ctx, FrameMod, Packet) -> inc_counter(recv_pkt, 1), case FrameMod:is_message(Packet) of true -> @@ -978,9 +984,7 @@ inc_incoming_stats(Ctx, FrameMod, Packet) -> false -> ok end, - Name = list_to_atom( - lists:concat(["packets.", FrameMod:type(Packet), ".received"]) - ), + Name = list_to_atom(lists:concat(["packets.", Type, ".received"])), emqx_gateway_ctx:metrics_inc(Ctx, Name). inc_outgoing_stats(Ctx, FrameMod, Packet) -> diff --git a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src index 38da1e18b..1fda99700 100644 --- a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src +++ b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_stomp, [ {description, "Stomp Gateway"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 316432dea..07dfd5f46 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -499,7 +499,7 @@ handle_in( [{MountedTopic, SubOpts} | _] -> NSubs = [{SubId, MountedTopic, Ack, SubOpts} | Subs], NChannel1 = NChannel#channel{subscriptions = NSubs}, - handle_out(receipt, receipt_id(Headers), NChannel1) + handle_out_and_update(receipt, receipt_id(Headers), NChannel1) end; {error, ErrMsg, NChannel} -> ?SLOG(error, #{ @@ -541,7 +541,7 @@ handle_in( false -> {ok, Channel} end, - handle_out(receipt, receipt_id(Headers), NChannel); + handle_out_and_update(receipt, receipt_id(Headers), NChannel); %% XXX: How to ack a frame ??? handle_in(Frame = ?PACKET(?CMD_ACK, Headers), Channel) -> case header(<<"transaction">>, Headers) of @@ -638,12 +638,12 @@ handle_in( ] end, {ok, Outgoings, Channel}; +handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> + shutdown(Reason, Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) -> - ?SLOG(error, #{ - msg => "unexpected_frame_error", - reason => Reason - }), - shutdown(Reason, Channel). + ErrMsg = io_lib:format("Frame error: ~0p", [Reason]), + Frame = error_frame(undefined, ErrMsg), + shutdown(Reason, Frame, Channel). with_transaction(Headers, Channel = #channel{transaction = Trans}, Fun) -> Id = header(<<"transaction">>, Headers), @@ -769,6 +769,12 @@ handle_out(receipt, ReceiptId, Channel) -> Frame = receipt_frame(ReceiptId), {ok, {outgoing, Frame}, Channel}. +handle_out_and_update(receipt, undefined, Channel) -> + {ok, [{event, updated}], Channel}; +handle_out_and_update(receipt, ReceiptId, Channel) -> + Frame = receipt_frame(ReceiptId), + {ok, [{outgoing, Frame}, {event, updated}], Channel}. + %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- @@ -812,7 +818,7 @@ handle_call( ), NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts} | Subs], NChannel1 = NChannel#channel{subscriptions = NSubs}, - reply({ok, {MountedTopic, NSubOpts}}, NChannel1); + reply({ok, {MountedTopic, NSubOpts}}, [{event, updated}], NChannel1); {error, ErrMsg, NChannel} -> ?SLOG(error, #{ msg => "failed_to_subscribe_topic", @@ -841,6 +847,7 @@ handle_call( ), reply( ok, + [{event, updated}], Channel#channel{ subscriptions = lists:keydelete(MountedTopic, 2, Subs) } @@ -1107,6 +1114,9 @@ terminate(Reason, #channel{ reply(Reply, Channel) -> {reply, Reply, Channel}. +reply(Reply, Msgs, Channel) -> + {reply, Reply, Msgs, Channel}. + shutdown(Reason, Channel) -> {shutdown, Reason, Channel}. diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl index 4913d6b2a..561f9e229 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl @@ -129,8 +129,8 @@ initial_parse_state(Opts) -> limit(Opts) -> #frame_limit{ - max_header_num = g(max_header_num, Opts, ?MAX_HEADER_NUM), - max_header_length = g(max_header_length, Opts, ?MAX_HEADER_LENGTH), + max_header_num = g(max_headers, Opts, ?MAX_HEADER_NUM), + max_header_length = g(max_headers_length, Opts, ?MAX_HEADER_LENGTH), max_body_length = g(max_body_length, Opts, ?MAX_BODY_LENGTH) }. @@ -243,7 +243,9 @@ content_len(#parser_state{headers = Headers}) -> false -> none end. -new_frame(#parser_state{cmd = Cmd, headers = Headers, acc = Acc}) -> +new_frame(#parser_state{cmd = Cmd, headers = Headers, acc = Acc, limit = Limit}) -> + ok = check_max_headers(Headers, Limit), + ok = check_max_body(Acc, Limit), #stomp_frame{command = Cmd, headers = Headers, body = Acc}. acc(Chunk, State = #parser_state{acc = Acc}) when is_binary(Chunk) -> @@ -261,6 +263,57 @@ unescape($c) -> ?COLON; unescape($\\) -> ?BSL; unescape(_Ch) -> error(cannnot_unescape). +check_max_headers( + Headers, + #frame_limit{ + max_header_num = MaxNum, + max_header_length = MaxLen + } +) -> + HeadersLen = length(Headers), + case HeadersLen > MaxNum of + true -> + error( + {too_many_headers, #{ + max_header_num => MaxNum, + received_headers_num => length(Headers) + }} + ); + false -> + ok + end, + lists:foreach( + fun({Name, Val}) -> + Len = byte_size(Name) + byte_size(Val), + case Len > MaxLen of + true -> + error( + {too_long_header, #{ + max_header_length => MaxLen, + found_header_length => Len + }} + ); + false -> + ok + end + end, + Headers + ). + +check_max_body(Acc, #frame_limit{max_body_length = MaxLen}) -> + Len = byte_size(Acc), + case Len > MaxLen of + true -> + error( + {too_long_body, #{ + max_body_length => MaxLen, + received_body_length => Len + }} + ); + false -> + ok + end. + %%-------------------------------------------------------------------- %% Serialize funcs %%-------------------------------------------------------------------- @@ -330,7 +383,10 @@ make(Command, Headers, Body) -> #stomp_frame{command = Command, headers = Headers, body = Body}. %% @doc Format a frame -format(Frame) -> serialize_pkt(Frame, #{}). +format({frame_error, _Reason} = Error) -> + Error; +format(Frame) -> + serialize_pkt(Frame, #{}). is_message(#stomp_frame{command = CMD}) when CMD == ?CMD_SEND; @@ -373,4 +429,6 @@ type(?CMD_RECEIPT) -> type(?CMD_ERROR) -> error; type(?CMD_HEARTBEAT) -> - heartbeat. + heartbeat; +type(_) -> + undefined. diff --git a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl index 4323cf32f..196ed703c 100644 --- a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl @@ -40,7 +40,12 @@ " username = \"${Packet.headers.login}\"\n" " password = \"${Packet.headers.passcode}\"\n" " }\n" - " listeners.tcp.default {\n" + " frame {\n" + " max_headers = 10\n" + " max_headers_length = 100\n" + " max_body_length = 1024\n" + " }\n" + " listeners.tcp.default {\n" " bind = 61613\n" " }\n" "}\n" @@ -256,6 +261,10 @@ t_subscribe(_) -> ] ), + %% assert subscription stats + [ClientInfo1] = clients(), + ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1), + %% Unsubscribe gen_tcp:send( Sock, @@ -278,6 +287,10 @@ t_subscribe(_) -> }, _, _} = parse(Data2), + %% assert subscription stats + [ClientInfo2] = clients(), + ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2), + gen_tcp:send( Sock, serialize( @@ -697,6 +710,129 @@ t_sticky_packets_truncate_after_headers(_) -> ?assert(false, "waiting message timeout") end end). + +t_frame_error_in_connect(_) -> + with_connection(fun(Sock) -> + gen_tcp:send( + Sock, + serialize( + <<"CONNECT">>, + [ + {<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}, + {<<"custome_header1">>, <<"val">>}, + {<<"custome_header2">>, <<"val">>}, + {<<"custome_header3">>, <<"val">>}, + {<<"custome_header4">>, <<"val">>}, + {<<"custome_header5">>, <<"val">>}, + {<<"custome_header6">>, <<"val">>} + ] + ) + ), + ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0)) + end). + +t_frame_error_too_many_headers(_) -> + Frame = serialize( + <<"SEND">>, + [ + {<<"destination">>, <<"/queue/foo">>}, + {<<"custome_header1">>, <<"val">>}, + {<<"custome_header2">>, <<"val">>}, + {<<"custome_header3">>, <<"val">>}, + {<<"custome_header4">>, <<"val">>}, + {<<"custome_header5">>, <<"val">>}, + {<<"custome_header6">>, <<"val">>}, + {<<"custome_header7">>, <<"val">>}, + {<<"custome_header8">>, <<"val">>}, + {<<"custome_header9">>, <<"val">>}, + {<<"custome_header10">>, <<"val">>} + ], + <<"test">> + ), + Assert = + fun(Sock) -> + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ErrorFrame, _, _} = parse(Data), + ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame), + ?assertMatch( + match, re:run(ErrorFrame#stomp_frame.body, "too_many_headers", [{capture, none}]) + ), + ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0)) + end, + test_frame_error(Frame, Assert). + +t_frame_error_too_long_header(_) -> + LongHeaderVal = emqx_utils:bin_to_hexstr(crypto:strong_rand_bytes(50), upper), + Frame = serialize( + <<"SEND">>, + [ + {<<"destination">>, <<"/queue/foo">>}, + {<<"custome_header10">>, LongHeaderVal} + ], + <<"test">> + ), + Assert = + fun(Sock) -> + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ErrorFrame, _, _} = parse(Data), + ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame), + ?assertMatch( + match, re:run(ErrorFrame#stomp_frame.body, "too_long_header", [{capture, none}]) + ), + ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0)) + end, + test_frame_error(Frame, Assert). + +t_frame_error_too_long_body(_) -> + LongBody = emqx_utils:bin_to_hexstr(crypto:strong_rand_bytes(513), upper), + Frame = serialize( + <<"SEND">>, + [{<<"destination">>, <<"/queue/foo">>}], + LongBody + ), + Assert = + fun(Sock) -> + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ErrorFrame, _, _} = parse(Data), + ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame), + ?assertMatch( + match, re:run(ErrorFrame#stomp_frame.body, "too_long_body", [{capture, none}]) + ), + ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0)) + end, + test_frame_error(Frame, Assert). + +test_frame_error(Frame, AssertFun) -> + with_connection(fun(Sock) -> + gen_tcp:send( + Sock, + serialize( + <<"CONNECT">>, + [ + {<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>} + ] + ) + ), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, + #stomp_frame{ + command = <<"CONNECTED">>, + headers = _, + body = _ + }, + _, _} = parse(Data), + gen_tcp:send(Sock, Frame), + AssertFun(Sock) + end). + t_rest_clienit_info(_) -> with_connection(fun(Sock) -> gen_tcp:send( @@ -802,10 +938,14 @@ t_rest_clienit_info(_) -> {200, Subs1} = request(get, ClientPath ++ "/subscriptions"), ?assertEqual(2, length(Subs1)), + {200, StompClient2} = request(get, ClientPath), + ?assertMatch(#{subscriptions_cnt := 2}, StompClient2), {204, _} = request(delete, ClientPath ++ "/subscriptions/t%2Fa"), {200, Subs2} = request(get, ClientPath ++ "/subscriptions"), ?assertEqual(1, length(Subs2)), + {200, StompClient3} = request(get, ClientPath), + ?assertMatch(#{subscriptions_cnt := 1}, StompClient3), %% kickout {204, _} = request(delete, ClientPath), @@ -844,9 +984,9 @@ serialize(Command, Headers, Body) -> parse(Data) -> ProtoEnv = #{ - max_headers => 10, - max_header_length => 1024, - max_body_length => 8192 + max_headers => 1024, + max_header_length => 10240, + max_body_length => 81920 }, Parser = emqx_stomp_frame:initial_parse_state(ProtoEnv), emqx_stomp_frame:parse(Data, Parser). @@ -855,3 +995,7 @@ get_field(command, #stomp_frame{command = Command}) -> Command; get_field(body, #stomp_frame{body = Body}) -> Body. + +clients() -> + {200, Clients} = request(get, "/gateways/stomp/clients"), + maps:get(data, Clients). diff --git a/apps/emqx_prometheus/grafana_template/ErlangVM.json b/apps/emqx_prometheus/grafana_template/ErlangVM.json index 6e443d3fc..5d9a715bc 100644 --- a/apps/emqx_prometheus/grafana_template/ErlangVM.json +++ b/apps/emqx_prometheus/grafana_template/ErlangVM.json @@ -1207,7 +1207,7 @@ "type": "prometheus", "uid": "${datasource}" }, - "expr": "erlang_vm_statistics_run_queues_length_total{job=~\"$job\", instance=\"$instance\"}", + "expr": "erlang_vm_statistics_run_queues_length{job=~\"$job\", instance=\"$instance\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "Run queue length", diff --git a/apps/emqx_prometheus/rebar.config b/apps/emqx_prometheus/rebar.config index 7b9a6cc48..12aa9060b 100644 --- a/apps/emqx_prometheus/rebar.config +++ b/apps/emqx_prometheus/rebar.config @@ -3,7 +3,7 @@ {deps, [ {emqx, {path, "../emqx"}}, {emqx_utils, {path, "../emqx_utils"}}, - {prometheus, {git, "https://github.com/deadtrickster/prometheus.erl", {tag, "v4.8.1"}}} + {prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.1"}}} ]}. {edoc_opts, [{preprocess, true}]}. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl index a33eaa5e7..d657e0772 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl @@ -170,4 +170,7 @@ validate_push_gateway_server(Url) -> %% for CI test, CI don't load the whole emqx_conf_schema. translation(Name) -> + %% translate 'vm_dist_collector', 'mnesia_collector', 'vm_statistics_collector', + %% 'vm_system_info_collector', 'vm_memory_collector', 'vm_msacc_collector' + %% to prometheus envrionments emqx_conf_schema:translation(Name). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 0dbc3067f..169b326c8 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -388,7 +388,11 @@ call_start(ResId, Mod, Config) -> throw:Error -> {error, Error}; Kind:Error:Stacktrace -> - {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}} + {error, #{ + exception => Kind, + reason => Error, + stacktrace => emqx_utils:redact(Stacktrace) + }} end. -spec call_health_check(resource_id(), module(), resource_state()) -> diff --git a/changes/ce/feat-10985.en.md b/changes/ce/feat-10985.en.md new file mode 100644 index 000000000..89c0838a9 --- /dev/null +++ b/changes/ce/feat-10985.en.md @@ -0,0 +1,2 @@ +Renamed emqx ctl command 'cluster_call' to 'conf cluster_sync'. +The old command 'cluster_call' is still a valid command, but not included in usage info. diff --git a/changes/ce/fix-10911.en.md b/changes/ce/fix-10911.en.md new file mode 100644 index 000000000..8fafb7ce4 --- /dev/null +++ b/changes/ce/fix-10911.en.md @@ -0,0 +1 @@ +The error message and log entry that appear when one tries to create a bridge with a name the exceeds 255 bytes is now easier to understand. diff --git a/changes/ce/fix-10977.en.md b/changes/ce/fix-10977.en.md new file mode 100644 index 000000000..9bd0d6b60 --- /dev/null +++ b/changes/ce/fix-10977.en.md @@ -0,0 +1 @@ +Fix delay in updating subscription count metric and correct configuration issues in Stomp gateway. diff --git a/changes/ce/perf-10941.en.md b/changes/ce/perf-10941.en.md new file mode 100644 index 000000000..01e65f4c6 --- /dev/null +++ b/changes/ce/perf-10941.en.md @@ -0,0 +1,3 @@ +Improve the collection speed of Prometheus metrics when setting +`prometheus.vm_dist_collector=disabled` and +metric `erlang_vm_statistics_run_queues_length_total` is renamed to `erlang_vm_statistics_run_queues_length` diff --git a/changes/ce/perf-10988.en.md b/changes/ce/perf-10988.en.md new file mode 100644 index 000000000..2ebb563c3 --- /dev/null +++ b/changes/ce/perf-10988.en.md @@ -0,0 +1 @@ +Improve log security when data bridge creation fails to ensure sensitive data is always obfuscated. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index a728ecb7e..ff15aa00f 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -101,23 +101,23 @@ namespace() -> "bridge_redis". roots() -> []. fields("post_single") -> - method_fileds(post, redis_single); + method_fields(post, redis_single); fields("post_sentinel") -> - method_fileds(post, redis_sentinel); + method_fields(post, redis_sentinel); fields("post_cluster") -> - method_fileds(post, redis_cluster); + method_fields(post, redis_cluster); fields("put_single") -> - method_fileds(put, redis_single); + method_fields(put, redis_single); fields("put_sentinel") -> - method_fileds(put, redis_sentinel); + method_fields(put, redis_sentinel); fields("put_cluster") -> - method_fileds(put, redis_cluster); + method_fields(put, redis_cluster); fields("get_single") -> - method_fileds(get, redis_single); + method_fields(get, redis_single); fields("get_sentinel") -> - method_fileds(get, redis_sentinel); + method_fields(get, redis_sentinel); fields("get_cluster") -> - method_fileds(get, redis_cluster); + method_fields(get, redis_cluster); fields(Type) when Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster -> @@ -126,16 +126,16 @@ fields(Type) when fields("creation_opts_" ++ Type) -> resource_creation_fields(Type). -method_fileds(post, ConnectorType) -> +method_fields(post, ConnectorType) -> redis_bridge_common_fields(ConnectorType) ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType); -method_fileds(get, ConnectorType) -> +method_fields(get, ConnectorType) -> redis_bridge_common_fields(ConnectorType) ++ connector_fields(ConnectorType) ++ type_name_fields(ConnectorType) ++ emqx_bridge_schema:status_fields(); -method_fileds(put, ConnectorType) -> +method_fields(put, ConnectorType) -> redis_bridge_common_fields(ConnectorType) ++ connector_fields(ConnectorType). diff --git a/mix.exs b/mix.exs index fbd88e61d..c5158b256 100644 --- a/mix.exs +++ b/mix.exs @@ -6,7 +6,7 @@ defmodule EMQXUmbrella.MixProject do The purpose of this file is to configure the release of EMQX under Mix. Since EMQX uses its own configuration conventions and startup procedures, one cannot simply use `iex -S mix`. Instead, it's - recommendd to build and use the release. + recommended to build and use the release. ## Profiles @@ -736,6 +736,7 @@ defmodule EMQXUmbrella.MixProject do defp template_vars(release, release_type, :bin = _package_type, edition_type) do [ emqx_default_erlang_cookie: default_cookie(), + emqx_configuration_doc: emqx_configuration_doc(edition_type), platform_data_dir: "data", platform_etc_dir: "etc", platform_plugins_dir: "plugins", @@ -758,6 +759,7 @@ defmodule EMQXUmbrella.MixProject do defp template_vars(release, release_type, :pkg = _package_type, edition_type) do [ emqx_default_erlang_cookie: default_cookie(), + emqx_configuration_doc: emqx_configuration_doc(edition_type), platform_data_dir: "/var/lib/emqx", platform_etc_dir: "/etc/emqx", platform_plugins_dir: "/var/lib/emqx/plugins", @@ -791,6 +793,12 @@ defmodule EMQXUmbrella.MixProject do end end + defp emqx_configuration_doc(:enterprise), + do: "https://docs.emqx.com/en/enterprise/v5.0/configuration/configuration.html" + + defp emqx_configuration_doc(:community), + do: "https://www.emqx.io/docs/en/v5.0/configuration/configuration.html" + defp emqx_schema_mod(:enterprise), do: :emqx_enterprise_schema defp emqx_schema_mod(:community), do: :emqx_conf_schema diff --git a/rebar.config.erl b/rebar.config.erl index d265f53cd..572d82d49 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -345,11 +345,15 @@ overlay_vars(cloud, PkgType, Edition) -> overlay_vars_edition(ce) -> [ {emqx_schema_mod, emqx_conf_schema}, + {emqx_configuration_doc, + "https://www.emqx.io/docs/en/v5.0/configuration/configuration.html"}, {is_enterprise, "no"} ]; overlay_vars_edition(ee) -> [ {emqx_schema_mod, emqx_enterprise_schema}, + {emqx_configuration_doc, + "https://docs.emqx.com/en/enterprise/v5.0/configuration/configuration.html"}, {is_enterprise, "yes"} ]. diff --git a/rel/emqx_conf.template.en.md b/rel/emqx_conf.template.en.md index c1259869c..2dcb83896 100644 --- a/rel/emqx_conf.template.en.md +++ b/rel/emqx_conf.template.en.md @@ -84,7 +84,7 @@ There are 4 complex data types in EMQX's HOCON config: 1. Array: `[ElementType]` ::: tip Tip -If map filed name is a positive integer number, it is interpreted as an alternative representation of an `Array`. +If map field name is a positive integer number, it is interpreted as an alternative representation of an `Array`. For example: ``` myarray.1 = 74 @@ -120,7 +120,7 @@ If we consider the whole EMQX config as a tree, to reference a primitive value, we can use a dot-separated names form string for the path from the tree-root (always a Struct) down to the primitive values at tree-leaves. -Each segment of the dotted string is a Struct filed name or Map key. +Each segment of the dotted string is a Struct field name or Map key. For Array elements, 1-based index is used. below are some examples