Merge remote-tracking branch 'upstream/release-57' into 0531-sync-release-57

This commit is contained in:
Ivan Dyachkov 2024-05-31 07:32:26 +02:00
commit 29ad07ce29
31 changed files with 237 additions and 65 deletions

View File

@ -8,7 +8,7 @@ on:
tag: tag:
type: string type: string
required: true required: true
publish_release_artefacts: publish_release_artifacts:
type: boolean type: boolean
required: true required: true
default: false default: false
@ -75,7 +75,7 @@ jobs:
tag_name: "${{ env.ref_name }}" tag_name: "${{ env.ref_name }}"
skip_existing: true skip_existing: true
- name: update to emqx.io - name: update to emqx.io
if: startsWith(env.ref_name, 'v') && ((github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts) if: github.event_name == 'release' || inputs.publish_release_artifacts
run: | run: |
set -eux set -eux
curl -w %{http_code} \ curl -w %{http_code} \
@ -86,7 +86,7 @@ jobs:
-d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ env.ref_name }}\" }" \ -d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ env.ref_name }}\" }" \
${{ secrets.EMQX_IO_RELEASE_API }} ${{ secrets.EMQX_IO_RELEASE_API }}
- name: Push to packagecloud.io - name: Push to packagecloud.io
if: (github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts if: (github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artifacts
env: env:
PROFILE: ${{ steps.profile.outputs.profile }} PROFILE: ${{ steps.profile.outputs.profile }}
VERSION: ${{ steps.profile.outputs.version }} VERSION: ${{ steps.profile.outputs.version }}

View File

@ -2,7 +2,7 @@
{application, emqx, [ {application, emqx, [
{id, "emqx"}, {id, "emqx"},
{description, "EMQX Core"}, {description, "EMQX Core"},
{vsn, "5.4.0"}, {vsn, "5.3.1"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [ {applications, [

View File

@ -270,6 +270,8 @@ json(L, Config) when is_list(L) ->
end; end;
json(Map, Config) when is_map(Map) -> json(Map, Config) when is_map(Map) ->
best_effort_json_obj(Map, Config); best_effort_json_obj(Map, Config);
json({'$array$', List}, Config) when is_list(List) ->
[json(I, Config) || I <- List];
json(Term, Config) -> json(Term, Config) ->
do_format_msg("~p", [Term], Config). do_format_msg("~p", [Term], Config).
@ -448,6 +450,36 @@ best_effort_json_test() ->
<<"[\n {\n \"key\" : [\n \n ]\n }\n]">>, <<"[\n {\n \"key\" : [\n \n ]\n }\n]">>,
best_effort_json([#{key => []}]) best_effort_json([#{key => []}])
), ),
%% List is IO Data
?assertMatch(
#{<<"what">> := <<"hej\n">>},
emqx_utils_json:decode(emqx_logger_jsonfmt:best_effort_json(#{what => [<<"hej">>, 10]}))
),
%% Force list to be interpreted as an array
?assertMatch(
#{<<"what">> := [<<"hej">>, 10]},
emqx_utils_json:decode(
emqx_logger_jsonfmt:best_effort_json(#{what => {'$array$', [<<"hej">>, 10]}})
)
),
%% IO Data inside an array
?assertMatch(
#{<<"what">> := [<<"hej">>, 10, <<"hej\n">>]},
emqx_utils_json:decode(
emqx_logger_jsonfmt:best_effort_json(#{
what => {'$array$', [<<"hej">>, 10, [<<"hej">>, 10]]}
})
)
),
%% Array inside an array
?assertMatch(
#{<<"what">> := [<<"hej">>, 10, [<<"hej">>, 10]]},
emqx_utils_json:decode(
emqx_logger_jsonfmt:best_effort_json(#{
what => {'$array$', [<<"hej">>, 10, {'$array$', [<<"hej">>, 10]}]}
})
)
),
ok. ok.
config() -> config() ->

View File

@ -2487,7 +2487,7 @@ converter_ciphers(Ciphers, _Opts) when is_binary(Ciphers) ->
default_ciphers(Which) -> default_ciphers(Which) ->
lists:map( lists:map(
fun erlang:iolist_to_binary/1, fun unicode:characters_to_binary/1,
do_default_ciphers(Which) do_default_ciphers(Which)
). ).
@ -2510,7 +2510,7 @@ bin_str_converter(I, _) when is_integer(I) ->
integer_to_binary(I); integer_to_binary(I);
bin_str_converter(X, _) -> bin_str_converter(X, _) ->
try try
iolist_to_binary(X) unicode:characters_to_binary(X)
catch catch
_:_ -> _:_ ->
throw("must_quote") throw("must_quote")
@ -2665,7 +2665,7 @@ to_comma_separated_list(Str) ->
{ok, string:tokens(Str, ", ")}. {ok, string:tokens(Str, ", ")}.
to_comma_separated_binary(Str) -> to_comma_separated_binary(Str) ->
{ok, lists:map(fun erlang:list_to_binary/1, string:tokens(Str, ", "))}. {ok, lists:map(fun unicode:characters_to_binary/1, string:tokens(Str, ", "))}.
to_comma_separated_atoms(Str) -> to_comma_separated_atoms(Str) ->
{ok, lists:map(fun to_atom/1, string:tokens(Str, ", "))}. {ok, lists:map(fun to_atom/1, string:tokens(Str, ", "))}.
@ -2674,7 +2674,7 @@ to_url(Str) ->
case emqx_http_lib:uri_parse(Str) of case emqx_http_lib:uri_parse(Str) of
{ok, URIMap} -> {ok, URIMap} ->
URIString = emqx_http_lib:normalize(URIMap), URIString = emqx_http_lib:normalize(URIMap),
{ok, iolist_to_binary(URIString)}; {ok, unicode:characters_to_binary(URIString)};
Error -> Error ->
Error Error
end. end.
@ -2682,13 +2682,13 @@ to_url(Str) ->
to_json_binary(Str) -> to_json_binary(Str) ->
case emqx_utils_json:safe_decode(Str) of case emqx_utils_json:safe_decode(Str) of
{ok, _} -> {ok, _} ->
{ok, iolist_to_binary(Str)}; {ok, unicode:characters_to_binary(Str)};
Error -> Error ->
Error Error
end. end.
to_template(Str) -> to_template(Str) ->
{ok, iolist_to_binary(Str)}. {ok, unicode:characters_to_binary(Str, utf8)}.
to_template_str(Str) -> to_template_str(Str) ->
{ok, unicode:characters_to_list(Str, utf8)}. {ok, unicode:characters_to_list(Str, utf8)}.
@ -2784,7 +2784,7 @@ validate_keepalive_multiplier(_Multiplier) ->
{error, #{reason => keepalive_multiplier_out_of_range, min => 1, max => 65535}}. {error, #{reason => keepalive_multiplier_out_of_range, min => 1, max => 65535}}.
validate_tcp_keepalive(Value) -> validate_tcp_keepalive(Value) ->
case iolist_to_binary(Value) of case unicode:characters_to_binary(Value) of
<<"none">> -> <<"none">> ->
ok; ok;
_ -> _ ->
@ -2965,7 +2965,7 @@ convert_servers(undefined) ->
convert_servers(Map) when is_map(Map) -> convert_servers(Map) when is_map(Map) ->
try try
List = convert_hocon_map_host_port(Map), List = convert_hocon_map_host_port(Map),
iolist_to_binary(string:join(List, ",")) unicode:characters_to_binary(string:join(List, ","))
catch catch
_:_ -> _:_ ->
throw("bad_host_port") throw("bad_host_port")
@ -2973,13 +2973,13 @@ convert_servers(Map) when is_map(Map) ->
convert_servers([H | _] = Array) when is_binary(H) orelse is_list(H) -> convert_servers([H | _] = Array) when is_binary(H) orelse is_list(H) ->
%% if the old config was a string array %% if the old config was a string array
%% we want to make sure it's converted to a comma-separated %% we want to make sure it's converted to a comma-separated
iolist_to_binary([[I, ","] || I <- Array]); unicode:characters_to_binary([[I, ","] || I <- Array]);
convert_servers(Str) -> convert_servers(Str) ->
normalize_host_port_str(Str). normalize_host_port_str(Str).
%% remove spaces around comma (,) %% remove spaces around comma (,)
normalize_host_port_str(Str) -> normalize_host_port_str(Str) ->
iolist_to_binary(re:replace(Str, "(\s)*,(\s)*", ",")). unicode:characters_to_binary(re:replace(Str, "(\s)*,(\s)*", ",")).
%% @doc Shared validation function for both 'server' and 'servers' string. %% @doc Shared validation function for both 'server' and 'servers' string.
%% NOTE: Validator is called after converter. %% NOTE: Validator is called after converter.
@ -3458,8 +3458,10 @@ ensure_default_listener(Map, ListenerType) ->
NewMap = Map#{<<"default">> => default_listener(ListenerType)}, NewMap = Map#{<<"default">> => default_listener(ListenerType)},
keep_default_tombstone(NewMap, #{}). keep_default_tombstone(NewMap, #{}).
cert_file(_File, client) -> undefined; cert_file(_File, client) ->
cert_file(File, server) -> iolist_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])). undefined;
cert_file(File, server) ->
unicode:characters_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])).
mqtt_converter(#{<<"keepalive_multiplier">> := Multi} = Mqtt, _Opts) -> mqtt_converter(#{<<"keepalive_multiplier">> := Multi} = Mqtt, _Opts) ->
case round(Multi * 100) =:= round(?DEFAULT_MULTIPLIER * 100) of case round(Multi * 100) =:= round(?DEFAULT_MULTIPLIER * 100) of

View File

@ -89,8 +89,17 @@ weight({packet, _}) -> {0, packet};
weight({payload, _}) -> {2, payload}; weight({payload, _}) -> {2, payload};
weight({K, _}) -> {1, K}. weight({K, _}) -> {1, K}.
format_packet(undefined, _) -> ""; format_packet(undefined, _) ->
format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode). "";
format_packet(Packet, Encode) ->
try
emqx_packet:format(Packet, Encode)
catch
_:_ ->
%% We don't want to crash if there is a field named packet with
%% some other type of value
Packet
end.
format_payload(undefined, _) -> format_payload(undefined, _) ->
""; "";
@ -100,7 +109,11 @@ format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) ->
unicode:characters_to_list(Payload); unicode:characters_to_list(Payload);
format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload); format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload);
format_payload(<<Part:?TRUNCATED_PAYLOAD_SIZE/binary, _/binary>> = Payload, Type) -> format_payload(<<Part:?TRUNCATED_PAYLOAD_SIZE/binary, _/binary>> = Payload, Type) ->
emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type). emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type);
format_payload(Payload, _) ->
%% We don't want to crash if there is a field named payload with some other
%% type of value
Payload.
to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom); to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom);
to_iolist(Int) when is_integer(Int) -> integer_to_list(Int); to_iolist(Int) when is_integer(Int) -> integer_to_list(Int);

View File

@ -930,3 +930,15 @@ timeout_types_test_() ->
typerefl:from_string(emqx_schema:timeout_duration_s(), <<"4294967001ms">>) typerefl:from_string(emqx_schema:timeout_duration_s(), <<"4294967001ms">>)
) )
]. ].
unicode_template_test() ->
Sc = #{
roots => [root],
fields => #{root => [{template, #{type => emqx_schema:template()}}]}
},
HoconText = <<"root = {template = \"中文\"}"/utf8>>,
{ok, Hocon} = hocon:binary(HoconText),
?assertEqual(
#{<<"root">> => #{<<"template">> => <<"中文"/utf8>>}},
hocon_tconf:check_plain(Sc, Hocon)
).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_auth, [ {application, emqx_auth, [
{description, "EMQX Authentication and authorization"}, {description, "EMQX Authentication and authorization"},
{vsn, "0.4.0"}, {vsn, "0.3.1"},
{modules, []}, {modules, []},
{registered, [emqx_auth_sup]}, {registered, [emqx_auth_sup]},
{applications, [ {applications, [

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -214,7 +214,7 @@ ensure_client(ClientId, Hosts, ClientConfig) ->
case wolff_client_sup:find_client(ClientId) of case wolff_client_sup:find_client(ClientId) of
{ok, _Pid} -> {ok, _Pid} ->
ok; ok;
{error, no_such_client} -> {error, #{reason := no_such_client}} ->
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
{ok, _} -> {ok, _} ->
?SLOG(info, #{ ?SLOG(info, #{
@ -546,13 +546,13 @@ check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
{ok, Pid} -> {ok, Pid} ->
ok = check_topic_status(ClientId, Pid, KafkaTopic), ok = check_topic_status(ClientId, Pid, KafkaTopic),
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions); ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
{error, no_such_client} -> {error, #{reason := no_such_client}} ->
throw(#{ throw(#{
reason => cannot_find_kafka_client, reason => cannot_find_kafka_client,
kafka_client => ClientId, kafka_client => ClientId,
kafka_topic => KafkaTopic kafka_topic => KafkaTopic
}); });
{error, client_supervisor_not_initialized} -> {error, #{reason := client_supervisor_not_initialized}} ->
throw(#{ throw(#{
reason => restarting, reason => restarting,
kafka_client => ClientId, kafka_client => ClientId,
@ -593,33 +593,54 @@ maybe_check_health_check_topic(_) ->
%% Cannot infer further information. Maybe upgraded from older version. %% Cannot infer further information. Maybe upgraded from older version.
?status_connected. ?status_connected.
is_alive(Pid) ->
is_pid(Pid) andalso erlang:is_process_alive(Pid).
error_summary(Map, [Error]) ->
Map#{error => Error};
error_summary(Map, [Error | More]) ->
Map#{first_error => Error, total_errors => length(More) + 1}.
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
Leaders = case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of {ok, Leaders} ->
{ok, LeadersToCheck} -> %% Kafka is considered healthy as long as any of the partition leader is reachable.
%% Kafka is considered healthy as long as any of the partition leader is reachable. case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
lists:filtermap( {[], Errors} ->
fun({_Partition, Pid}) -> throw(
case is_pid(Pid) andalso erlang:is_process_alive(Pid) of error_summary(
true -> {true, Pid}; #{
_ -> false cause => "no_connected_partition_leader",
end kafka_client => ClientId,
end, kafka_topic => KafkaTopic
LeadersToCheck },
); Errors
{error, _} -> )
[] );
end, {_, []} ->
case Leaders of ok;
[] -> {_, Errors} ->
?SLOG(
warning,
"not_all_kafka_partitions_connected",
error_summary(
#{
kafka_client => ClientId,
kafka_topic => KafkaTopic
},
Errors
)
),
ok
end;
{error, Reason} ->
%% If failed to fetch metadata, wolff_client logs a warning level message
%% which includes the reason for each seed host
throw(#{ throw(#{
error => no_connected_partition_leader, cause => Reason,
kafka_client => ClientId, kafka_client => ClientId,
kafka_topic => KafkaTopic, kafka_topic => KafkaTopic
partitions_limit => MaxPartitions })
});
_ ->
ok
end. end.
check_topic_status(ClientId, WolffClientPid, KafkaTopic) -> check_topic_status(ClientId, WolffClientPid, KafkaTopic) ->

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_redis, [ {application, emqx_bridge_redis, [
{description, "EMQX Enterprise Redis Bridge"}, {description, "EMQX Enterprise Redis Bridge"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -6,6 +6,7 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
-behaviour(emqx_resource). -behaviour(emqx_resource).
@ -143,7 +144,13 @@ on_batch_query(
[{ChannelID, _} | _] = BatchData, [{ChannelID, _} | _] = BatchData,
emqx_trace:rendered_action_template( emqx_trace:rendered_action_template(
ChannelID, ChannelID,
#{commands => Cmds, batch => ture} #{
commands => #emqx_trace_format_func_data{
function = fun trace_format_commands/1,
data = Cmds
},
batch => true
}
), ),
Result = query(InstId, {cmds, Cmds}, RedisConnSt), Result = query(InstId, {cmds, Cmds}, RedisConnSt),
?tp( ?tp(
@ -162,6 +169,10 @@ on_batch_query(
Error Error
end. end.
trace_format_commands(Commands0) ->
Commands1 = [lists:join(" ", C) || C <- Commands0],
unicode:characters_to_binary(lists:join("; ", Commands1)).
on_format_query_result({ok, Msg}) -> on_format_query_result({ok, Msg}) ->
#{result => ok, message => Msg}; #{result => ok, message => Msg};
on_format_query_result(Res) -> on_format_query_result(Res) ->

View File

@ -245,7 +245,7 @@ t_license_setting_bc(_Config) ->
?assertMatch(#{<<"max_connections">> := 25}, request_dump()), ?assertMatch(#{<<"max_connections">> := 25}, request_dump()),
%% get %% get
GetRes = request(get, uri(["license", "setting"]), []), GetRes = request(get, uri(["license", "setting"]), []),
%% aslo check that the settings return correctly %% also check that the settings return correctly
validate_setting(GetRes, <<"75%">>, <<"80%">>, 25), validate_setting(GetRes, <<"75%">>, <<"80%">>, 25),
%% update %% update
Low = <<"50%">>, Low = <<"50%">>,

View File

@ -2,7 +2,7 @@
{application, emqx_management, [ {application, emqx_management, [
{description, "EMQX Management API and CLI"}, {description, "EMQX Management API and CLI"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.3.0"}, {vsn, "5.2.1"},
{modules, []}, {modules, []},
{registered, [emqx_management_sup]}, {registered, [emqx_management_sup]},
{applications, [ {applications, [

View File

@ -1,6 +1,6 @@
{application, emqx_oracle, [ {application, emqx_oracle, [
{description, "EMQX Enterprise Oracle Database Connector"}, {description, "EMQX Enterprise Oracle Database Connector"},
{vsn, "0.2.0"}, {vsn, "0.2.1"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -8,6 +8,7 @@
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(UNHEALTHY_TARGET_MSG, -define(UNHEALTHY_TARGET_MSG,
@ -288,7 +289,7 @@ on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
type => Type, type => Type,
apply_mode => ApplyMode, apply_mode => ApplyMode,
name_or_sql => NameOrSQL, name_or_sql => NameOrSQL,
data => Data data => #emqx_trace_format_func_data{function = fun trace_format_data/1, data = Data}
}), }),
case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of
{error, Reason} = Result -> {error, Reason} = Result ->
@ -317,6 +318,15 @@ on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
Result Result
end. end.
trace_format_data(Data0) ->
%% In batch request, we get a two level list
{'$array$', lists:map(fun insert_array_marker_if_list/1, Data0)}.
insert_array_marker_if_list(List) when is_list(List) ->
{'$array$', List};
insert_array_marker_if_list(Item) ->
Item.
on_get_status(_InstId, #{pool_name := Pool} = _State) -> on_get_status(_InstId, #{pool_name := Pool} = _State) ->
case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of
true -> true ->

View File

@ -1,6 +1,6 @@
{application, emqx_schema_registry, [ {application, emqx_schema_registry, [
{description, "EMQX Schema Registry"}, {description, "EMQX Schema Registry"},
{vsn, "0.3.0"}, {vsn, "0.3.1"},
{registered, [emqx_schema_registry_sup]}, {registered, [emqx_schema_registry_sup]},
{mod, {emqx_schema_registry_app, []}}, {mod, {emqx_schema_registry_app, []}},
{included_applications, [ {included_applications, [

View File

@ -64,7 +64,21 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
); );
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) -> handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
decode(SchemaId, Data, MoreArgs); try
decode(SchemaId, Data, MoreArgs)
catch
error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} ->
throw(
{schema_decode_error, #{
error_type => decoding_failure,
schema_id => SchemaId,
data => Data,
more_args => MoreArgs,
explain =>
<<"The given data could not be decoded. Please check the input data and the schema.">>
}}
)
end;
handle_rule_function(schema_decode, Args) -> handle_rule_function(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}}); error({args_count_error, {schema_decode, Args}});
handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) -> handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->

View File

@ -44,7 +44,8 @@ sparkplug_tests() ->
t_sparkplug_decode, t_sparkplug_decode,
t_sparkplug_encode, t_sparkplug_encode,
t_sparkplug_decode_encode_with_message_name, t_sparkplug_decode_encode_with_message_name,
t_sparkplug_encode_float_to_uint64_key t_sparkplug_encode_float_to_uint64_key,
t_decode_fail
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
@ -532,6 +533,23 @@ t_encode(Config) ->
end, end,
ok. ok.
t_decode_fail(_Config) ->
SerdeName = my_serde,
SerdeType = protobuf,
ok = create_serde(SerdeType, SerdeName),
Payload = <<"ss">>,
?assertThrow(
{schema_decode_error, #{
data := <<"ss">>,
error_type := decoding_failure,
explain := _,
more_args := [<<"Person">>],
schema_id := <<"my_serde">>
}},
emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>)
),
ok.
t_decode(Config) -> t_decode(Config) ->
SerdeType = ?config(serde_type, Config), SerdeType = ?config(serde_type, Config),
SerdeName = my_serde, SerdeName = my_serde,

View File

@ -0,0 +1 @@
The issue causing text traces for the republish action to crash and not display correctly has been resolved.

View File

@ -0,0 +1,5 @@
Improve Kafka connector error logs.
Previously, specific error details, such as unreachable advertised listeners, were not logged.
Now, error details are captured in the logs to provide more diagnostic information.
To manage log verbosity, only the first occurrence of an error is logged, accompanied by the total count of similar errors.

View File

@ -0,0 +1,6 @@
Improve Kafka producer error handling for `message_too_large`.
Prior to this change, Kafka producers would retry sending oversized batches (`message_too_large` error) in hopes of a server side configuration fix (`max.message.bytes`).
Now, oversized messages are automatically split into single-message batches for retry.
If a message still exceeds size limits, it will be dropped to maintain data flow.

View File

@ -0,0 +1 @@
Traces for Redis action batch requests have got improved formatting. Spaces are now added between components of commands and semicolons are added between commands to make the trace message easier to read.

View File

@ -0,0 +1 @@
The template-rendered traces for Oracle actions have been enhanced for better readability.

View File

@ -0,0 +1 @@
Error messages for decoding failures in the rule engine protobuf decode functions have been improved by adding a clear descriptions to indicate what went wrong when message decoding fails.

View File

@ -21,6 +21,8 @@ set -e
case "$1" in case "$1" in
purge) purge)
# force kill all processes owned by emqx, if any
pkill -9 -u emqx || true
rm -f /etc/default/emqx rm -f /etc/default/emqx
if [ -d /var/lib/emqx ]; then if [ -d /var/lib/emqx ]; then
@ -38,9 +40,8 @@ case "$1" in
if [ -e /etc/init.d/emqx ]; then if [ -e /etc/init.d/emqx ]; then
rm /etc/init.d/emqx rm /etc/init.d/emqx
fi fi
# Remove User & Group, killing any process owned by them # Remove User & Group
if getent passwd emqx >/dev/null; then if getent passwd emqx >/dev/null; then
pkill -u emqx || true
deluser --quiet --system emqx deluser --quiet --system emqx
fi fi
if getent group emqx >/dev/null; then if getent group emqx >/dev/null; then

View File

@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do
{:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
{:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true},
{:ekka, github: "emqx/ekka", tag: "0.19.3", override: true}, {:ekka, github: "emqx/ekka", tag: "0.19.3", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
@ -210,7 +210,7 @@ defmodule EMQXUmbrella.MixProject do
{:hstreamdb_erl, {:hstreamdb_erl,
github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.3"}, {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"},

View File

@ -82,7 +82,7 @@
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},

View File

@ -131,6 +131,21 @@ emqx_test(){
exit 1 exit 1
fi fi
echo "try to install again and purge while the service is running"
dpkg -i "${PACKAGE_PATH}/${packagename}"
if [ "$(dpkg -l | grep ${EMQX_NAME} | awk '{print $1}')" != "ii" ]
then
echo "package install error"
exit 1
fi
if ! /usr/bin/emqx start
then
echo "ERROR: failed_to_start_emqx"
cat /var/log/emqx/erlang.log.1 || true
cat /var/log/emqx/emqx.log.1 || true
exit 1
fi
/usr/bin/emqx ping
dpkg -P "${EMQX_NAME}" dpkg -P "${EMQX_NAME}"
if dpkg -l |grep -q emqx if dpkg -l |grep -q emqx
then then

View File

@ -100,7 +100,15 @@ def test_docs_link(driver, login, dashboard_url):
driver.get(dest_url) driver.get(dest_url)
ensure_current_url(driver, dest_url) ensure_current_url(driver, dest_url)
xpath_link_help = "//div[@id='app']//div[@class='nav-header']//a[contains(@class, 'link-help')]" xpath_link_help = "//div[@id='app']//div[@class='nav-header']//a[contains(@class, 'link-help')]"
link_help = driver.find_element(By.XPATH, xpath_link_help) # retry up to 5 times
for _ in range(5):
try:
link_help = driver.find_element(By.XPATH, xpath_link_help)
break
except NoSuchElementException:
time.sleep(1)
else:
raise AssertionError("Cannot find the help link")
driver.execute_script("arguments[0].click();", link_help) driver.execute_script("arguments[0].click();", link_help)
prefix, emqx_version = fetch_version(dashboard_url) prefix, emqx_version = fetch_version(dashboard_url)