Merge remote-tracking branch 'upstream/release-56' into 0415-sync-release-56

This commit is contained in:
Ivan Dyachkov 2024-04-15 08:09:03 +02:00
commit 3ef160eed2
28 changed files with 289 additions and 37 deletions

View File

@ -151,7 +151,23 @@ jobs:
with:
ref: ${{ github.event.inputs.ref }}
fetch-depth: 0
- name: build emqx packages
- name: build tgz
env:
PROFILE: ${{ matrix.profile }}
ARCH: ${{ matrix.arch }}
OS: ${{ matrix.os }}
IS_ELIXIR: ${{ matrix.with_elixir }}
BUILDER: "ghcr.io/emqx/emqx-builder/${{ matrix.builder }}:${{ matrix.elixir }}-${{ matrix.otp }}-${{ matrix.os }}"
BUILDER_SYSTEM: force_docker
run: |
./scripts/buildx.sh \
--profile $PROFILE \
--arch $ARCH \
--builder $BUILDER \
--elixir $IS_ELIXIR \
--pkgtype tgz
- name: build pkg
if: matrix.with_elixir == 'no'
env:
PROFILE: ${{ matrix.profile }}
ARCH: ${{ matrix.arch }}

View File

@ -67,12 +67,13 @@ jobs:
BUCKET=${{ secrets.AWS_S3_BUCKET }}
OUTPUT_DIR=${{ steps.profile.outputs.s3dir }}
aws s3 cp --recursive s3://$BUCKET/$OUTPUT_DIR/${{ env.ref_name }} packages
- uses: emqx/upload-assets@8d2083b4dbe3151b0b735572eaa153b6acb647fe # 0.5.0
- uses: emqx/upload-assets@974befcf0e72a1811360a81c798855efb66b0551 # 0.5.2
env:
GITHUB_TOKEN: ${{ github.token }}
with:
asset_paths: '["packages/*"]'
tag_name: "${{ env.ref_name }}"
skip_existing: true
- name: update to emqx.io
if: startsWith(env.ref_name, 'v') && ((github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts)
run: |

View File

@ -20,8 +20,8 @@ endif
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.8.0
export EMQX_EE_DASHBOARD_VERSION ?= e1.6.0
export EMQX_DASHBOARD_VERSION ?= v1.8.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.6.1
PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise

View File

@ -32,7 +32,7 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.6.0").
-define(EMQX_RELEASE_CE, "5.6.1-beta.1").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.6.0").
-define(EMQX_RELEASE_EE, "5.6.1-beta.1").

View File

@ -28,7 +28,7 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.1"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.1"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},

View File

@ -16,9 +16,14 @@
-module(emqx_config_backup).
-type ok_result() :: #{
root_key => emqx_utils_maps:config_key(),
changed => [emqx_utils_maps:config_key_path()]
}.
-type error_result() :: #{root_key => emqx_utils_maps:config_key(), reason => term()}.
-callback import_config(RawConf :: map()) ->
{ok, #{
root_key => emqx_utils_maps:config_key(),
changed => [emqx_utils_maps:config_key_path()]
}}
| {error, #{root_key => emqx_utils_maps:config_key(), reason => term()}}.
{ok, ok_result()}
| {error, error_result()}
| {results, {[ok_result()], [error_result()]}}.

View File

@ -1030,7 +1030,26 @@ bridge_v2_type_to_connector_type(Type) ->
import_config(RawConf) ->
%% actions structure
emqx_bridge:import_config(RawConf, <<"actions">>, ?ROOT_KEY_ACTIONS, config_key_path()).
ActionRes = emqx_bridge:import_config(
RawConf, <<"actions">>, ?ROOT_KEY_ACTIONS, config_key_path()
),
SourceRes = emqx_bridge:import_config(
RawConf, <<"sources">>, ?ROOT_KEY_SOURCES, config_key_path_sources()
),
group_import_results([ActionRes, SourceRes]).
group_import_results(Results0) ->
Results = lists:foldr(
fun
({ok, OkRes}, {OkAcc, ErrAcc}) ->
{[OkRes | OkAcc], ErrAcc};
({error, ErrRes}, {OkAcc, ErrAcc}) ->
{OkAcc, [ErrRes | ErrAcc]}
end,
{[], []},
Results0
),
{results, Results}.
%%====================================================================
%% Config Update Handler API

View File

@ -224,6 +224,7 @@ reset() -> gen_server:call(?MODULE, reset).
status() ->
transaction(fun ?MODULE:trans_status/0, []).
%% DO NOT delete this on_leave_clean/0, It's use when rpc before v560.
on_leave_clean() ->
on_leave_clean(node()).
@ -367,7 +368,7 @@ handle_call({fast_forward_to_commit, ToTnxId}, _From, State) ->
NodeId = do_fast_forward_to_commit(ToTnxId, State),
{reply, NodeId, State, catch_up(State)};
handle_call(on_leave, _From, State) ->
{atomic, ok} = transaction(fun ?MODULE:on_leave_clean/0, []),
{atomic, ok} = transaction(fun ?MODULE:on_leave_clean/1, [node()]),
{reply, ok, State#{is_leaving := true}};
handle_call(_, _From, State) ->
{reply, ok, State, catch_up(State)}.

View File

@ -144,7 +144,13 @@ t_open_ports_check(Config) ->
?assertEqual(ok, erpc:call(Core2, emqx_machine, open_ports_check, [])),
?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])),
true = erlang:monitor_node(Core2, true),
ok = emqx_cth_cluster:stop_node(Core2),
receive
{nodedown, Core2} -> ok
after 10000 ->
ct:fail("nodedown message not received after 10 seconds.")
end,
?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])),
?retry(200, 20, begin

View File

@ -713,5 +713,24 @@ call_conn(ConnMod, Pid, Req) ->
exit:R when R =:= shutdown; R =:= normal ->
{error, shutdown};
exit:{R, _} when R =:= shutdown; R =:= noproc ->
{error, shutdown}
{error, shutdown};
exit:{{shutdown, _OOMInfo}, _Location} ->
{error, shutdown};
exit:timeout ->
LogData = #{
msg => "call_client_connection_process_timeout",
request => Req,
pid => Pid,
module => ConnMod
},
LogData1 =
case node(Pid) =:= node() of
true ->
LogData#{stacktrace => erlang:process_info(Pid, current_stacktrace)};
false ->
LogData
end,
?SLOG(warning, LogData1),
{error, timeout}
end.

View File

@ -1559,6 +1559,8 @@ list_client_msgs(MsgType, ClientID, QString) ->
code => 'NOT_IMPLEMENTED',
message => <<"API not implemented for persistent sessions">>
}};
{error, Reason} ->
?INTERNAL_ERROR(Reason);
{Msgs, Meta = #{}} when is_list(Msgs) ->
format_msgs_resp(MsgType, Msgs, Meta, QString)
end

View File

@ -108,6 +108,7 @@ cluster(["join", SNode]) ->
emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
end;
cluster(["leave"]) ->
_ = maybe_disable_autocluster(),
case mria:leave() of
ok ->
emqx_ctl:print("Leave the cluster successfully.~n"),
@ -139,12 +140,15 @@ cluster(["status"]) ->
cluster(["status", "--json"]) ->
Info = sort_map_list_fields(cluster_info()),
emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Info)]);
cluster(["discovery", "enable"]) ->
enable_autocluster();
cluster(_) ->
emqx_ctl:usage([
{"cluster join <Node>", "Join the cluster"},
{"cluster leave", "Leave the cluster"},
{"cluster force-leave <Node>", "Force the node leave from cluster"},
{"cluster status [--json]", "Cluster status"}
{"cluster status [--json]", "Cluster status"},
{"cluster discovery enable", "Enable and run automatic cluster discovery (if configured)"}
]).
%% sort lists for deterministic output
@ -163,6 +167,25 @@ sort_map_list_field(Field, Map) ->
_ -> Map
end.
enable_autocluster() ->
ok = ekka:enable_autocluster(),
_ = ekka:autocluster(emqx),
emqx_ctl:print("Automatic cluster discovery enabled.~n").
maybe_disable_autocluster() ->
case ekka:autocluster_enabled() of
true ->
ok = ekka:disable_autocluster(),
emqx_ctl:print(
"Automatic cluster discovery is disabled on this node: ~p to avoid"
" re-joining the same cluster again, if the node is not stopped soon."
" To enable it run: 'emqx ctl cluster discovery enable' or restart the node.~n",
[node()]
);
false ->
ok
end.
%%--------------------------------------------------------------------
%% @doc Query clients

View File

@ -773,23 +773,42 @@ validate_cluster_hocon(RawConf) ->
do_import_conf(RawConf, Opts) ->
GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))),
maybe_print_conf_errors(GenConfErrs, Opts),
Errors =
lists:foldl(
fun(Module, ErrorsAcc) ->
case Module:import_config(RawConf) of
{ok, #{changed := Changed}} ->
maybe_print_changed(Changed, Opts),
ErrorsAcc;
{error, #{root_key := RootKey, reason := Reason}} ->
ErrorsAcc#{[RootKey] => Reason}
end
end,
GenConfErrs,
sort_importer_modules(find_behaviours(emqx_config_backup))
),
Modules = sort_importer_modules(find_behaviours(emqx_config_backup)),
Errors = lists:foldl(print_ok_results_collect_errors(RawConf, Opts), GenConfErrs, Modules),
maybe_print_conf_errors(Errors, Opts),
Errors.
print_ok_results_collect_errors(RawConf, Opts) ->
fun(Module, Errors) ->
case Module:import_config(RawConf) of
{results, {OkResults, ErrResults}} ->
print_ok_results(OkResults, Opts),
collect_errors(ErrResults, Errors);
{ok, OkResult} ->
print_ok_results([OkResult], Opts),
Errors;
{error, ErrResult} ->
collect_errors([ErrResult], Errors)
end
end.
print_ok_results(Results, Opts) ->
lists:foreach(
fun(#{changed := Changed}) ->
maybe_print_changed(Changed, Opts)
end,
Results
).
collect_errors(Results, Errors) ->
lists:foldr(
fun(#{root_key := RootKey, reason := Reason}, Acc) ->
Acc#{[RootKey] => Reason}
end,
Errors,
Results
).
sort_importer_modules(Modules) ->
lists:sort(
fun(M1, M2) -> order(M1, ?IMPORT_ORDER) =< order(M2, ?IMPORT_ORDER) end,

View File

@ -19,6 +19,7 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -31,6 +32,47 @@ init_per_suite(Config) ->
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
init_per_testcase(t_autocluster_leave = TC, Config) ->
[Core1, Core2, Repl1, Repl2] =
Nodes = [
t_autocluster_leave_core1,
t_autocluster_leave_core2,
t_autocluster_leave_replicant1,
t_autocluster_leave_replicant2
],
NodeNames = [emqx_cth_cluster:node_name(N) || N <- Nodes],
AppSpec = [
emqx,
{emqx_conf, #{
config => #{
cluster => #{
discovery_strategy => static,
static => #{seeds => NodeNames}
}
}
}},
emqx_management
],
Cluster = emqx_cth_cluster:start(
[
{Core1, #{role => core, apps => AppSpec}},
{Core2, #{role => core, apps => AppSpec}},
{Repl1, #{role => replicant, apps => AppSpec}},
{Repl2, #{role => replicant, apps => AppSpec}}
],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
),
[{cluster, Cluster} | Config];
init_per_testcase(_TC, Config) ->
Config.
end_per_testcase(_TC, Config) ->
case ?config(cluster, Config) of
undefined -> ok;
Cluster -> emqx_cth_cluster:stop(Cluster)
end.
t_status(_Config) ->
emqx_ctl:run_command([]),
emqx_ctl:run_command(["status"]),
@ -263,3 +305,44 @@ t_admin(_Config) ->
%% admins passwd <Username> <Password> # Reset dashboard user password
%% admins del <Username> # Delete dashboard user
ok.
t_autocluster_leave(Config) ->
[Core1, Core2, Repl1, Repl2] = Cluster = ?config(cluster, Config),
%% Mria membership updates are async, makes sense to wait a little
timer:sleep(300),
ClusterView = [lists:sort(rpc:call(N, emqx, running_nodes, [])) || N <- Cluster],
[View1, View2, View3, View4] = ClusterView,
?assertEqual(lists:sort(Cluster), View1),
?assertEqual(View1, View2),
?assertEqual(View1, View3),
?assertEqual(View1, View4),
rpc:call(Core2, emqx_mgmt_cli, cluster, [["leave"]]),
timer:sleep(1000),
%% Replicant nodes can discover Core2 which is now split from [Core1, Core2],
%% but they are expected to ignore Core2,
%% since mria_lb must filter out core nodes that disabled discovery.
?assertMatch([Core2], rpc:call(Core2, emqx, running_nodes, [])),
?assertEqual(undefined, rpc:call(Core1, erlang, whereis, [ekka_autocluster])),
?assertEqual(lists:sort([Core1, Repl1, Repl2]), rpc:call(Core1, emqx, running_nodes, [])),
?assertEqual(lists:sort([Core1, Repl1, Repl2]), rpc:call(Repl1, emqx, running_nodes, [])),
?assertEqual(lists:sort([Core1, Repl1, Repl2]), rpc:call(Repl2, emqx, running_nodes, [])),
rpc:call(Repl1, emqx_mgmt_cli, cluster, [["leave"]]),
timer:sleep(1000),
?assertEqual(lists:sort([Core1, Repl2]), rpc:call(Core1, emqx, running_nodes, [])),
rpc:call(Core2, emqx_mgmt_cli, cluster, [["discovery", "enable"]]),
rpc:call(Repl1, emqx_mgmt_cli, cluster, [["discovery", "enable"]]),
%% nodes will join and restart asyncly, may need more time to re-cluster
?assertEqual(
ok,
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
[lists:sort(rpc:call(N, emqx, running_nodes, [])) || N <- Cluster] =:= ClusterView
end,
10_000
)
).

View File

@ -18,6 +18,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -86,6 +87,33 @@ t_empty_export_import(_Config) ->
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
?assertEqual(ExpRawConf, emqx:get_raw_config([])).
t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) ->
case emqx_release:edition() of
ce ->
ok;
ee ->
FNameEmqx44 = "emqx-export-4.4.24-retainer-mqttsub.tar.gz",
BackupFile = filename:join(?config(data_dir, Config), FNameEmqx44),
Exp = {ok, #{db_errors => #{}, config_errors => #{}}},
?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)),
RawConfAfterImport = emqx:get_raw_config([]),
%% verify that MQTT sources are imported
?assertMatch(
#{<<"sources">> := #{<<"mqtt">> := Sources}} when map_size(Sources) > 0,
RawConfAfterImport
),
%% verify that retainer messages are imported
?assertMatch(
{ok, [#message{payload = <<"test-payload">>}]},
emqx_retainer:read_message(<<"test-retained-message/1">>)
),
%% Export and import again
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
?assertEqual(RawConfAfterImport, emqx:get_raw_config([]))
end,
ok.
t_cluster_hocon_export_import(Config) ->
RawConfBeforeImport = emqx:get_raw_config([]),
BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP),

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [
{description, "EMQX Retainer"},
% strict semver, bump manually!
{vsn, "5.0.21"},
{vsn, "5.0.22"},
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]},

View File

@ -17,6 +17,7 @@
-module(emqx_retainer_mnesia).
-behaviour(emqx_retainer).
-behaviour(emqx_db_backup).
-include("emqx_retainer.hrl").
-include_lib("emqx/include/logger.hrl").
@ -54,6 +55,8 @@
-export([populate_index_meta/0]).
-export([reindex/3]).
-export([backup_tables/0]).
-record(retained_message, {topic, msg, expiry_time}).
-record(retained_index, {key, expiry_time}).
-record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}).
@ -73,6 +76,12 @@
topics() ->
[emqx_topic:join(I) || I <- mnesia:dirty_all_keys(?TAB_MESSAGE)].
%%--------------------------------------------------------------------
%% Data backup
%%--------------------------------------------------------------------
backup_tables() ->
[?TAB_MESSAGE].
%%--------------------------------------------------------------------
%% emqx_retainer callbacks
%%--------------------------------------------------------------------

View File

@ -0,0 +1,3 @@
Improve cluster discovery behaviour when a node is manually removed from a cluster using 'emqx ctl cluster leave' command.
Previously, if the configured cluster 'discovery_strategy' was not 'manual', the left node might re-discover and re-join the same cluster shortly after it left (unless it was stopped).
After this change, 'cluster leave' command disables automatic cluster_discovery, so that the left node won't re-join the same cluster again. Cluster discovery can be re-enabled by running 'emqx ctl discovery enable` or by restarting the left node.

View File

@ -0,0 +1,4 @@
Handle several errors in `/clients/{clientid}/mqueue_messages` and `/clients/{clientid}/inflight_messages` APIs:
- Internal timeout, which means that EMQX failed to get the list of Inflight/Mqueue messages within the default timeout of 5 s. This error may occur when the system is under a heavy load. The API will return 500 `{"code":"INTERNAL_ERROR","message":"timeout"}` response and log additional details.
- Client shutdown. The error may occur if the client connection is shutdown during the API call. The API will return 404 `{"code": "CLIENT_SHUTDOWN", "message": "Client connection has been shutdown"}` response in this case.

View File

@ -0,0 +1,2 @@
Make sure stats `'subscribers.count'` `'subscribers.max'` countains shared-subscribers.
It only contains non-shared subscribers previously.

View File

@ -0,0 +1,6 @@
Fixed an issue that prevented importing source data integrations and retained messages.
Before the fix:
- source data integrations are ignored from the backup file
- importing the `mnesia` table for retained messages are not supported

View File

@ -0,0 +1,2 @@
Fixed `cluster_rpc_commit` transaction ID cleanup procedure after `cluster leave` on replicant nodes.
Previously, the transaction id of the core node would be deleted prematurely, blocking configuration updates on the core node.

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.6.0
version: 5.6.1-beta.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.6.0
appVersion: 5.6.1-beta.1

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.6.0
version: 5.6.1-beta.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.6.0
appVersion: 5.6.1-beta.1

View File

@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
{:esockd, github: "emqx/esockd", tag: "5.11.1", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true},
{:ekka, github: "emqx/ekka", tag: "0.19.1", override: true},
{:ekka, github: "emqx/ekka", tag: "0.19.3", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
{:minirest, github: "emqx/minirest", tag: "1.4.0", override: true},

View File

@ -83,7 +83,7 @@
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.1"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
{minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.0"}}},

View File

@ -105,6 +105,10 @@ matrix() {
entries+=("$(format_app_entry "$app" 1 emqx "$runner")")
entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")")
;;
apps/emqx_management)
entries+=("$(format_app_entry "$app" 1 emqx "$runner")")
entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")")
;;
apps/*)
if [[ -f "${app}/BSL.txt" ]]; then
profile='emqx-enterprise'