diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index f40db0039..b3c5e4613 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -8,7 +8,7 @@ on: tag: type: string required: true - publish_release_artefacts: + publish_release_artifacts: type: boolean required: true default: false @@ -75,7 +75,7 @@ jobs: tag_name: "${{ env.ref_name }}" skip_existing: true - name: update to emqx.io - if: startsWith(env.ref_name, 'v') && ((github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts) + if: github.event_name == 'release' || inputs.publish_release_artifacts run: | set -eux curl -w %{http_code} \ @@ -86,7 +86,7 @@ jobs: -d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ env.ref_name }}\" }" \ ${{ secrets.EMQX_IO_RELEASE_API }} - name: Push to packagecloud.io - if: (github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts + if: (github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artifacts env: PROFILE: ${{ steps.profile.outputs.profile }} VERSION: ${{ steps.profile.outputs.version }} diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index c391bc1c8..97769fe1f 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -2,7 +2,7 @@ {application, emqx, [ {id, "emqx"}, {description, "EMQX Core"}, - {vsn, "5.4.0"}, + {vsn, "5.3.1"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx/src/emqx_logger_jsonfmt.erl b/apps/emqx/src/emqx_logger_jsonfmt.erl index 45697c1df..d8dcc77dc 100644 --- a/apps/emqx/src/emqx_logger_jsonfmt.erl +++ b/apps/emqx/src/emqx_logger_jsonfmt.erl @@ -270,6 +270,8 @@ json(L, Config) when is_list(L) -> end; json(Map, Config) when is_map(Map) -> best_effort_json_obj(Map, Config); +json({'$array$', List}, Config) when is_list(List) -> + [json(I, Config) || I <- List]; json(Term, Config) -> do_format_msg("~p", [Term], Config). @@ -448,6 +450,36 @@ best_effort_json_test() -> <<"[\n {\n \"key\" : [\n \n ]\n }\n]">>, best_effort_json([#{key => []}]) ), + %% List is IO Data + ?assertMatch( + #{<<"what">> := <<"hej\n">>}, + emqx_utils_json:decode(emqx_logger_jsonfmt:best_effort_json(#{what => [<<"hej">>, 10]})) + ), + %% Force list to be interpreted as an array + ?assertMatch( + #{<<"what">> := [<<"hej">>, 10]}, + emqx_utils_json:decode( + emqx_logger_jsonfmt:best_effort_json(#{what => {'$array$', [<<"hej">>, 10]}}) + ) + ), + %% IO Data inside an array + ?assertMatch( + #{<<"what">> := [<<"hej">>, 10, <<"hej\n">>]}, + emqx_utils_json:decode( + emqx_logger_jsonfmt:best_effort_json(#{ + what => {'$array$', [<<"hej">>, 10, [<<"hej">>, 10]]} + }) + ) + ), + %% Array inside an array + ?assertMatch( + #{<<"what">> := [<<"hej">>, 10, [<<"hej">>, 10]]}, + emqx_utils_json:decode( + emqx_logger_jsonfmt:best_effort_json(#{ + what => {'$array$', [<<"hej">>, 10, {'$array$', [<<"hej">>, 10]}]} + }) + ) + ), ok. config() -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 670ee4523..ce4840eb9 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -2487,7 +2487,7 @@ converter_ciphers(Ciphers, _Opts) when is_binary(Ciphers) -> default_ciphers(Which) -> lists:map( - fun erlang:iolist_to_binary/1, + fun unicode:characters_to_binary/1, do_default_ciphers(Which) ). @@ -2510,7 +2510,7 @@ bin_str_converter(I, _) when is_integer(I) -> integer_to_binary(I); bin_str_converter(X, _) -> try - iolist_to_binary(X) + unicode:characters_to_binary(X) catch _:_ -> throw("must_quote") @@ -2665,7 +2665,7 @@ to_comma_separated_list(Str) -> {ok, string:tokens(Str, ", ")}. to_comma_separated_binary(Str) -> - {ok, lists:map(fun erlang:list_to_binary/1, string:tokens(Str, ", "))}. + {ok, lists:map(fun unicode:characters_to_binary/1, string:tokens(Str, ", "))}. to_comma_separated_atoms(Str) -> {ok, lists:map(fun to_atom/1, string:tokens(Str, ", "))}. @@ -2674,7 +2674,7 @@ to_url(Str) -> case emqx_http_lib:uri_parse(Str) of {ok, URIMap} -> URIString = emqx_http_lib:normalize(URIMap), - {ok, iolist_to_binary(URIString)}; + {ok, unicode:characters_to_binary(URIString)}; Error -> Error end. @@ -2682,13 +2682,13 @@ to_url(Str) -> to_json_binary(Str) -> case emqx_utils_json:safe_decode(Str) of {ok, _} -> - {ok, iolist_to_binary(Str)}; + {ok, unicode:characters_to_binary(Str)}; Error -> Error end. to_template(Str) -> - {ok, iolist_to_binary(Str)}. + {ok, unicode:characters_to_binary(Str, utf8)}. to_template_str(Str) -> {ok, unicode:characters_to_list(Str, utf8)}. @@ -2784,7 +2784,7 @@ validate_keepalive_multiplier(_Multiplier) -> {error, #{reason => keepalive_multiplier_out_of_range, min => 1, max => 65535}}. validate_tcp_keepalive(Value) -> - case iolist_to_binary(Value) of + case unicode:characters_to_binary(Value) of <<"none">> -> ok; _ -> @@ -2965,7 +2965,7 @@ convert_servers(undefined) -> convert_servers(Map) when is_map(Map) -> try List = convert_hocon_map_host_port(Map), - iolist_to_binary(string:join(List, ",")) + unicode:characters_to_binary(string:join(List, ",")) catch _:_ -> throw("bad_host_port") @@ -2973,13 +2973,13 @@ convert_servers(Map) when is_map(Map) -> convert_servers([H | _] = Array) when is_binary(H) orelse is_list(H) -> %% if the old config was a string array %% we want to make sure it's converted to a comma-separated - iolist_to_binary([[I, ","] || I <- Array]); + unicode:characters_to_binary([[I, ","] || I <- Array]); convert_servers(Str) -> normalize_host_port_str(Str). %% remove spaces around comma (,) normalize_host_port_str(Str) -> - iolist_to_binary(re:replace(Str, "(\s)*,(\s)*", ",")). + unicode:characters_to_binary(re:replace(Str, "(\s)*,(\s)*", ",")). %% @doc Shared validation function for both 'server' and 'servers' string. %% NOTE: Validator is called after converter. @@ -3458,8 +3458,10 @@ ensure_default_listener(Map, ListenerType) -> NewMap = Map#{<<"default">> => default_listener(ListenerType)}, keep_default_tombstone(NewMap, #{}). -cert_file(_File, client) -> undefined; -cert_file(File, server) -> iolist_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])). +cert_file(_File, client) -> + undefined; +cert_file(File, server) -> + unicode:characters_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])). mqtt_converter(#{<<"keepalive_multiplier">> := Multi} = Mqtt, _Opts) -> case round(Multi * 100) =:= round(?DEFAULT_MULTIPLIER * 100) of diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl index 34469f835..e540ae82a 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -89,8 +89,17 @@ weight({packet, _}) -> {0, packet}; weight({payload, _}) -> {2, payload}; weight({K, _}) -> {1, K}. -format_packet(undefined, _) -> ""; -format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode). +format_packet(undefined, _) -> + ""; +format_packet(Packet, Encode) -> + try + emqx_packet:format(Packet, Encode) + catch + _:_ -> + %% We don't want to crash if there is a field named packet with + %% some other type of value + Packet + end. format_payload(undefined, _) -> ""; @@ -100,7 +109,11 @@ format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> unicode:characters_to_list(Payload); format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload); format_payload(<> = Payload, Type) -> - emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type). + emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type); +format_payload(Payload, _) -> + %% We don't want to crash if there is a field named payload with some other + %% type of value + Payload. to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom); to_iolist(Int) when is_integer(Int) -> integer_to_list(Int); diff --git a/apps/emqx/test/emqx_schema_tests.erl b/apps/emqx/test/emqx_schema_tests.erl index 2f3f2aa1d..8f72c3d48 100644 --- a/apps/emqx/test/emqx_schema_tests.erl +++ b/apps/emqx/test/emqx_schema_tests.erl @@ -930,3 +930,15 @@ timeout_types_test_() -> typerefl:from_string(emqx_schema:timeout_duration_s(), <<"4294967001ms">>) ) ]. + +unicode_template_test() -> + Sc = #{ + roots => [root], + fields => #{root => [{template, #{type => emqx_schema:template()}}]} + }, + HoconText = <<"root = {template = \"中文\"}"/utf8>>, + {ok, Hocon} = hocon:binary(HoconText), + ?assertEqual( + #{<<"root">> => #{<<"template">> => <<"中文"/utf8>>}}, + hocon_tconf:check_plain(Sc, Hocon) + ). diff --git a/apps/emqx_auth/src/emqx_auth.app.src b/apps/emqx_auth/src/emqx_auth.app.src index cd77a5ffc..6db2d6213 100644 --- a/apps/emqx_auth/src/emqx_auth.app.src +++ b/apps/emqx_auth/src/emqx_auth.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth, [ {description, "EMQX Authentication and authorization"}, - {vsn, "0.4.0"}, + {vsn, "0.3.1"}, {modules, []}, {registered, [emqx_auth_sup]}, {applications, [ diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index c820cee06..8cd5ee427 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index 2c109298c..a969ac83b 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index 69ace9289..fd7f03da8 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 2c6e99076..a01d2edf4 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -214,7 +214,7 @@ ensure_client(ClientId, Hosts, ClientConfig) -> case wolff_client_sup:find_client(ClientId) of {ok, _Pid} -> ok; - {error, no_such_client} -> + {error, #{reason := no_such_client}} -> case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of {ok, _} -> ?SLOG(info, #{ @@ -546,13 +546,13 @@ check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) -> {ok, Pid} -> ok = check_topic_status(ClientId, Pid, KafkaTopic), ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions); - {error, no_such_client} -> + {error, #{reason := no_such_client}} -> throw(#{ reason => cannot_find_kafka_client, kafka_client => ClientId, kafka_topic => KafkaTopic }); - {error, client_supervisor_not_initialized} -> + {error, #{reason := client_supervisor_not_initialized}} -> throw(#{ reason => restarting, kafka_client => ClientId, @@ -593,33 +593,54 @@ maybe_check_health_check_topic(_) -> %% Cannot infer further information. Maybe upgraded from older version. ?status_connected. +is_alive(Pid) -> + is_pid(Pid) andalso erlang:is_process_alive(Pid). + +error_summary(Map, [Error]) -> + Map#{error => Error}; +error_summary(Map, [Error | More]) -> + Map#{first_error => Error, total_errors => length(More) + 1}. + check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> - Leaders = - case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of - {ok, LeadersToCheck} -> - %% Kafka is considered healthy as long as any of the partition leader is reachable. - lists:filtermap( - fun({_Partition, Pid}) -> - case is_pid(Pid) andalso erlang:is_process_alive(Pid) of - true -> {true, Pid}; - _ -> false - end - end, - LeadersToCheck - ); - {error, _} -> - [] - end, - case Leaders of - [] -> + case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of + {ok, Leaders} -> + %% Kafka is considered healthy as long as any of the partition leader is reachable. + case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of + {[], Errors} -> + throw( + error_summary( + #{ + cause => "no_connected_partition_leader", + kafka_client => ClientId, + kafka_topic => KafkaTopic + }, + Errors + ) + ); + {_, []} -> + ok; + {_, Errors} -> + ?SLOG( + warning, + "not_all_kafka_partitions_connected", + error_summary( + #{ + kafka_client => ClientId, + kafka_topic => KafkaTopic + }, + Errors + ) + ), + ok + end; + {error, Reason} -> + %% If failed to fetch metadata, wolff_client logs a warning level message + %% which includes the reason for each seed host throw(#{ - error => no_connected_partition_leader, + cause => Reason, kafka_client => ClientId, - kafka_topic => KafkaTopic, - partitions_limit => MaxPartitions - }); - _ -> - ok + kafka_topic => KafkaTopic + }) end. check_topic_status(ClientId, WolffClientPid, KafkaTopic) -> diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src index 57b4d90f1..acd1837ba 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_redis, [ {description, "EMQX Enterprise Redis Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index 535d6e13c..e12155bb1 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -6,6 +6,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -behaviour(emqx_resource). @@ -143,7 +144,13 @@ on_batch_query( [{ChannelID, _} | _] = BatchData, emqx_trace:rendered_action_template( ChannelID, - #{commands => Cmds, batch => ture} + #{ + commands => #emqx_trace_format_func_data{ + function = fun trace_format_commands/1, + data = Cmds + }, + batch => true + } ), Result = query(InstId, {cmds, Cmds}, RedisConnSt), ?tp( @@ -162,6 +169,10 @@ on_batch_query( Error end. +trace_format_commands(Commands0) -> + Commands1 = [lists:join(" ", C) || C <- Commands0], + unicode:characters_to_binary(lists:join("; ", Commands1)). + on_format_query_result({ok, Msg}) -> #{result => ok, message => Msg}; on_format_query_result(Res) -> diff --git a/apps/emqx_license/test/emqx_license_http_api_SUITE.erl b/apps/emqx_license/test/emqx_license_http_api_SUITE.erl index 380930527..be0b31dcf 100644 --- a/apps/emqx_license/test/emqx_license_http_api_SUITE.erl +++ b/apps/emqx_license/test/emqx_license_http_api_SUITE.erl @@ -245,7 +245,7 @@ t_license_setting_bc(_Config) -> ?assertMatch(#{<<"max_connections">> := 25}, request_dump()), %% get GetRes = request(get, uri(["license", "setting"]), []), - %% aslo check that the settings return correctly + %% also check that the settings return correctly validate_setting(GetRes, <<"75%">>, <<"80%">>, 25), %% update Low = <<"50%">>, diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index f01f5c7b9..7c99b70cd 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -2,7 +2,7 @@ {application, emqx_management, [ {description, "EMQX Management API and CLI"}, % strict semver, bump manually! - {vsn, "5.3.0"}, + {vsn, "5.2.1"}, {modules, []}, {registered, [emqx_management_sup]}, {applications, [ diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src index ce05bdfae..09aa4e589 100644 --- a/apps/emqx_oracle/src/emqx_oracle.app.src +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_oracle, [ {description, "EMQX Enterprise Oracle Database Connector"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index e90665cc4..5b25e049a 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -8,6 +8,7 @@ -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(UNHEALTHY_TARGET_MSG, @@ -288,7 +289,7 @@ on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) -> type => Type, apply_mode => ApplyMode, name_or_sql => NameOrSQL, - data => Data + data => #emqx_trace_format_func_data{function = fun trace_format_data/1, data = Data} }), case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of {error, Reason} = Result -> @@ -317,6 +318,15 @@ on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) -> Result end. +trace_format_data(Data0) -> + %% In batch request, we get a two level list + {'$array$', lists:map(fun insert_array_marker_if_list/1, Data0)}. + +insert_array_marker_if_list(List) when is_list(List) -> + {'$array$', List}; +insert_array_marker_if_list(Item) -> + Item. + on_get_status(_InstId, #{pool_name := Pool} = _State) -> case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of true -> diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.app.src b/apps/emqx_schema_registry/src/emqx_schema_registry.app.src index f05295579..efd9f1162 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.app.src +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.app.src @@ -1,6 +1,6 @@ {application, emqx_schema_registry, [ {description, "EMQX Schema Registry"}, - {vsn, "0.3.0"}, + {vsn, "0.3.1"}, {registered, [emqx_schema_registry_sup]}, {mod, {emqx_schema_registry_app, []}}, {included_applications, [ diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl index 638147fd1..e8da35449 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl @@ -64,7 +64,21 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) -> [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] ); handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) -> - decode(SchemaId, Data, MoreArgs); + try + decode(SchemaId, Data, MoreArgs) + catch + error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} -> + throw( + {schema_decode_error, #{ + error_type => decoding_failure, + schema_id => SchemaId, + data => Data, + more_args => MoreArgs, + explain => + <<"The given data could not be decoded. Please check the input data and the schema.">> + }} + ) + end; handle_rule_function(schema_decode, Args) -> error({args_count_error, {schema_decode, Args}}); handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) -> diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl index 22252b7c3..d9286f266 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl @@ -44,7 +44,8 @@ sparkplug_tests() -> t_sparkplug_decode, t_sparkplug_encode, t_sparkplug_decode_encode_with_message_name, - t_sparkplug_encode_float_to_uint64_key + t_sparkplug_encode_float_to_uint64_key, + t_decode_fail ]. init_per_suite(Config) -> @@ -532,6 +533,23 @@ t_encode(Config) -> end, ok. +t_decode_fail(_Config) -> + SerdeName = my_serde, + SerdeType = protobuf, + ok = create_serde(SerdeType, SerdeName), + Payload = <<"ss">>, + ?assertThrow( + {schema_decode_error, #{ + data := <<"ss">>, + error_type := decoding_failure, + explain := _, + more_args := [<<"Person">>], + schema_id := <<"my_serde">> + }}, + emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>) + ), + ok. + t_decode(Config) -> SerdeType = ?config(serde_type, Config), SerdeName = my_serde, diff --git a/changes/ce/fix-13140.en.md b/changes/ce/fix-13140.en.md new file mode 100644 index 000000000..921da9052 --- /dev/null +++ b/changes/ce/fix-13140.en.md @@ -0,0 +1 @@ +The issue causing text traces for the republish action to crash and not display correctly has been resolved. diff --git a/changes/ee/fix-13070.en.md b/changes/ee/fix-13070.en.md new file mode 100644 index 000000000..e73cca3cf --- /dev/null +++ b/changes/ee/fix-13070.en.md @@ -0,0 +1,5 @@ +Improve Kafka connector error logs. + +Previously, specific error details, such as unreachable advertised listeners, were not logged. +Now, error details are captured in the logs to provide more diagnostic information. +To manage log verbosity, only the first occurrence of an error is logged, accompanied by the total count of similar errors. diff --git a/changes/ee/fix-13079.en.md b/changes/ee/fix-13079.en.md new file mode 100644 index 000000000..c469bdf6e --- /dev/null +++ b/changes/ee/fix-13079.en.md @@ -0,0 +1,6 @@ +Improve Kafka producer error handling for `message_too_large`. + +Prior to this change, Kafka producers would retry sending oversized batches (`message_too_large` error) in hopes of a server side configuration fix (`max.message.bytes`). + +Now, oversized messages are automatically split into single-message batches for retry. +If a message still exceeds size limits, it will be dropped to maintain data flow. diff --git a/changes/ee/fix-13130.en.md b/changes/ee/fix-13130.en.md new file mode 100644 index 000000000..7c111cacd --- /dev/null +++ b/changes/ee/fix-13130.en.md @@ -0,0 +1 @@ +Traces for Redis action batch requests have got improved formatting. Spaces are now added between components of commands and semicolons are added between commands to make the trace message easier to read. diff --git a/changes/ee/fix-13136.en.md b/changes/ee/fix-13136.en.md new file mode 100644 index 000000000..8593c6fdb --- /dev/null +++ b/changes/ee/fix-13136.en.md @@ -0,0 +1 @@ +The template-rendered traces for Oracle actions have been enhanced for better readability. diff --git a/changes/ee/fix-13147.en.md b/changes/ee/fix-13147.en.md new file mode 100644 index 000000000..4f717892d --- /dev/null +++ b/changes/ee/fix-13147.en.md @@ -0,0 +1 @@ +Error messages for decoding failures in the rule engine protobuf decode functions have been improved by adding a clear descriptions to indicate what went wrong when message decoding fails. diff --git a/deploy/packages/deb/debian/postrm b/deploy/packages/deb/debian/postrm index 5df55135a..9ee1dca87 100755 --- a/deploy/packages/deb/debian/postrm +++ b/deploy/packages/deb/debian/postrm @@ -21,6 +21,8 @@ set -e case "$1" in purge) + # force kill all processes owned by emqx, if any + pkill -9 -u emqx || true rm -f /etc/default/emqx if [ -d /var/lib/emqx ]; then @@ -38,9 +40,8 @@ case "$1" in if [ -e /etc/init.d/emqx ]; then rm /etc/init.d/emqx fi - # Remove User & Group, killing any process owned by them + # Remove User & Group if getent passwd emqx >/dev/null; then - pkill -u emqx || true deluser --quiet --system emqx fi if getent group emqx >/dev/null; then diff --git a/mix.exs b/mix.exs index b4e7db9f8..556aae872 100644 --- a/mix.exs +++ b/mix.exs @@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}, - {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true}, + {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true}, {:ekka, github: "emqx/ekka", tag: "0.19.3", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, @@ -210,7 +210,7 @@ defmodule EMQXUmbrella.MixProject do {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.10.3"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"}, diff --git a/rebar.config b/rebar.config index bb75bf3b8..768550fbd 100644 --- a/rebar.config +++ b/rebar.config @@ -82,7 +82,7 @@ {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, - {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}}, + {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, diff --git a/scripts/pkg-tests.sh b/scripts/pkg-tests.sh index a88020dd5..dfb5d098f 100755 --- a/scripts/pkg-tests.sh +++ b/scripts/pkg-tests.sh @@ -131,6 +131,21 @@ emqx_test(){ exit 1 fi + echo "try to install again and purge while the service is running" + dpkg -i "${PACKAGE_PATH}/${packagename}" + if [ "$(dpkg -l | grep ${EMQX_NAME} | awk '{print $1}')" != "ii" ] + then + echo "package install error" + exit 1 + fi + if ! /usr/bin/emqx start + then + echo "ERROR: failed_to_start_emqx" + cat /var/log/emqx/erlang.log.1 || true + cat /var/log/emqx/emqx.log.1 || true + exit 1 + fi + /usr/bin/emqx ping dpkg -P "${EMQX_NAME}" if dpkg -l |grep -q emqx then diff --git a/scripts/ui-tests/dashboard_test.py b/scripts/ui-tests/dashboard_test.py index 602d944fd..85d40cec8 100644 --- a/scripts/ui-tests/dashboard_test.py +++ b/scripts/ui-tests/dashboard_test.py @@ -100,7 +100,15 @@ def test_docs_link(driver, login, dashboard_url): driver.get(dest_url) ensure_current_url(driver, dest_url) xpath_link_help = "//div[@id='app']//div[@class='nav-header']//a[contains(@class, 'link-help')]" - link_help = driver.find_element(By.XPATH, xpath_link_help) + # retry up to 5 times + for _ in range(5): + try: + link_help = driver.find_element(By.XPATH, xpath_link_help) + break + except NoSuchElementException: + time.sleep(1) + else: + raise AssertionError("Cannot find the help link") driver.execute_script("arguments[0].click();", link_help) prefix, emqx_version = fetch_version(dashboard_url)