Merge remote-tracking branch 'origin/release-57' into 0527-sync-5.7.0-to-master

This commit is contained in:
zmstone 2024-05-27 20:10:44 +02:00
commit 41cbfcfaa1
43 changed files with 803 additions and 215 deletions

View File

@ -33,7 +33,7 @@ runs:
HOMEBREW_NO_INSTALL_UPGRADE: 1 HOMEBREW_NO_INSTALL_UPGRADE: 1
HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK: 1 HOMEBREW_NO_INSTALLED_DEPENDENTS_CHECK: 1
run: | run: |
brew install curl zip unzip coreutils openssl@1.1 brew install curl zip unzip coreutils openssl@1.1 unixodbc
echo "/usr/local/opt/bison/bin" >> $GITHUB_PATH echo "/usr/local/opt/bison/bin" >> $GITHUB_PATH
echo "/usr/local/bin" >> $GITHUB_PATH echo "/usr/local/bin" >> $GITHUB_PATH
echo "emqx_name=${emqx_name}" >> $GITHUB_OUTPUT echo "emqx_name=${emqx_name}" >> $GITHUB_OUTPUT
@ -56,7 +56,7 @@ runs:
if: steps.prepare.outputs.SELF_HOSTED != 'true' if: steps.prepare.outputs.SELF_HOSTED != 'true'
with: with:
path: ${{ steps.prepare.outputs.OTP_INSTALL_PATH }} path: ${{ steps.prepare.outputs.OTP_INSTALL_PATH }}
key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit-20240524-1
- name: build erlang - name: build erlang
if: steps.cache.outputs.cache-hit != 'true' if: steps.cache.outputs.cache-hit != 'true'
shell: bash shell: bash
@ -80,9 +80,10 @@ runs:
git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH" git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH"
cd "$OTP_SOURCE_PATH" cd "$OTP_SOURCE_PATH"
if [ "$(arch)" = arm64 ]; then if [ "$(arch)" = arm64 ]; then
export CFLAGS="-O2 -g -I$(brew --prefix unixodbc)/include" ODBCHOME="$(brew --prefix unixodbc)"
export LDFLAGS="-L$(brew --prefix unixodbc)/lib" export CFLAGS="-O2 -g -I${ODBCHOME}/include"
WITH_ODBC="--with-odbc=$(brew --prefix unixodbc)" export LDFLAGS="-L${ODBCHOME}/lib"
WITH_ODBC="--with-odbc=${ODBCHOME}"
else else
WITH_ODBC="" WITH_ODBC=""
fi fi

View File

@ -156,7 +156,7 @@ jobs:
password: ${{ secrets.AWS_SECRET_ACCESS_KEY }} password: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
ecr: true ecr: true
- name: Build docker image - name: Build docker image for smoke test
env: env:
PROFILE: ${{ matrix.profile[0] }} PROFILE: ${{ matrix.profile[0] }}
DOCKER_REGISTRY: ${{ matrix.profile[1] }} DOCKER_REGISTRY: ${{ matrix.profile[1] }}
@ -164,7 +164,6 @@ jobs:
DOCKER_LATEST: ${{ inputs.latest }} DOCKER_LATEST: ${{ inputs.latest }}
DOCKER_PUSH: false DOCKER_PUSH: false
DOCKER_BUILD_NOCACHE: true DOCKER_BUILD_NOCACHE: true
DOCKER_PLATFORMS: linux/amd64,linux/arm64
DOCKER_LOAD: true DOCKER_LOAD: true
EMQX_RUNNER: 'public.ecr.aws/debian/debian:12-slim' EMQX_RUNNER: 'public.ecr.aws/debian/debian:12-slim'
EMQX_DOCKERFILE: 'deploy/docker/Dockerfile' EMQX_DOCKERFILE: 'deploy/docker/Dockerfile'
@ -203,7 +202,8 @@ jobs:
docker exec -t -u root -w /root $CID bash -c 'apt-get -y update && apt-get -y install net-tools' docker exec -t -u root -w /root $CID bash -c 'apt-get -y update && apt-get -y install net-tools'
docker exec -t -u root $CID node_dump docker exec -t -u root $CID node_dump
docker rm -f $CID docker rm -f $CID
- name: Push docker image
- name: Build and push docker image
if: inputs.publish || github.repository_owner != 'emqx' if: inputs.publish || github.repository_owner != 'emqx'
env: env:
PROFILE: ${{ matrix.profile[0] }} PROFILE: ${{ matrix.profile[0] }}

View File

@ -205,18 +205,6 @@ jobs:
pattern: "${{ matrix.profile }}-*" pattern: "${{ matrix.profile }}-*"
path: packages/${{ matrix.profile }} path: packages/${{ matrix.profile }}
merge-multiple: true merge-multiple: true
- name: install dos2unix
run: sudo apt-get update -y && sudo apt install -y dos2unix
- name: get packages
run: |
set -eu
cd packages/${{ matrix.profile }}
# fix the .sha256 file format
for f in *.sha256; do
dos2unix $f
echo "$(cat $f) ${f%.*}" | sha256sum -c || exit 1
done
cd -
- uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2 - uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with: with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}

View File

@ -47,9 +47,6 @@ jobs:
echo "_EMQX_DOCKER_IMAGE_TAG=$_EMQX_DOCKER_IMAGE_TAG" >> $GITHUB_ENV echo "_EMQX_DOCKER_IMAGE_TAG=$_EMQX_DOCKER_IMAGE_TAG" >> $GITHUB_ENV
- name: dashboard tests - name: dashboard tests
working-directory: ./scripts/ui-tests working-directory: ./scripts/ui-tests
env:
EMQX_VERSION: ${{ inputs.version-emqx }}
EMQX_ENTERPRISE_VERSION: ${{ inputs.version-emqx-enterprise }}
run: | run: |
set -eu set -eu
docker compose up --abort-on-container-exit --exit-code-from selenium docker compose up --abort-on-container-exit --exit-code-from selenium

1
.gitignore vendored
View File

@ -78,3 +78,4 @@ rebar-git-cache.tar
.git/ .git/
apps/emqx_utils/src/emqx_variform_parser.erl apps/emqx_utils/src/emqx_variform_parser.erl
apps/emqx_utils/src/emqx_variform_scan.erl apps/emqx_utils/src/emqx_variform_scan.erl
default-profile.mk

View File

@ -20,9 +20,10 @@ endif
# Dashboard version # Dashboard version
# from https://github.com/emqx/emqx-dashboard5 # from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.9.0-beta.1 export EMQX_DASHBOARD_VERSION ?= v1.9.0
export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0-beta.9 export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0
-include default-profile.mk
PROFILE ?= emqx PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise REL_PROFILES := emqx emqx-enterprise
PKG_PROFILES := emqx-pkg emqx-enterprise-pkg PKG_PROFILES := emqx-pkg emqx-enterprise-pkg

View File

@ -32,7 +32,7 @@
%% `apps/emqx/src/bpapi/README.md' %% `apps/emqx/src/bpapi/README.md'
%% Opensource edition %% Opensource edition
-define(EMQX_RELEASE_CE, "5.7.0-alpha.1"). -define(EMQX_RELEASE_CE, "5.7.0").
%% Enterprise edition %% Enterprise edition
-define(EMQX_RELEASE_EE, "5.7.0-alpha.1"). -define(EMQX_RELEASE_EE, "5.7.0").

File diff suppressed because one or more lines are too long

View File

@ -238,12 +238,12 @@ convert_actions(undefined, _) ->
convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := ResourceOpts}, _) -> convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := ResourceOpts}, _) ->
case Params of case Params of
#{<<"mode">> := <<"direct">>} -> #{<<"mode">> := <<"aggregated">>} ->
Conf;
#{} ->
%% NOTE: Disable batching for direct uploads. %% NOTE: Disable batching for direct uploads.
NResourceOpts = ResourceOpts#{<<"batch_size">> => 1, <<"batch_time">> => 0}, NResourceOpts = ResourceOpts#{<<"batch_size">> => 1, <<"batch_time">> => 0},
Conf#{<<"resource_opts">> := NResourceOpts}; Conf#{<<"resource_opts">> := NResourceOpts}
#{} ->
Conf
end. end.
%% Interpreting options %% Interpreting options

View File

@ -73,14 +73,14 @@
-define(GAUGE_SAMPLER_LIST, [ -define(GAUGE_SAMPLER_LIST, [
disconnected_durable_sessions, disconnected_durable_sessions,
durable_subscriptions, subscriptions_durable,
subscriptions, subscriptions,
topics, topics,
connections, connections,
live_connections live_connections
]). ]).
-define(SAMPLER_LIST, ?GAUGE_SAMPLER_LIST ++ ?DELTA_SAMPLER_LIST). -define(SAMPLER_LIST, (?GAUGE_SAMPLER_LIST ++ ?DELTA_SAMPLER_LIST)).
-define(DELTA_SAMPLER_RATE_MAP, #{ -define(DELTA_SAMPLER_RATE_MAP, #{
received => received_msg_rate, received => received_msg_rate,
@ -102,6 +102,11 @@
] ++ ?LICENSE_QUOTA ] ++ ?LICENSE_QUOTA
). ).
-define(CLUSTERONLY_SAMPLER_LIST, [
subscriptions_durable,
disconnected_durable_sessions
]).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
-define(LICENSE_QUOTA, [license_quota]). -define(LICENSE_QUOTA, [license_quota]).
-else. -else.

View File

@ -118,8 +118,7 @@ current_rate(all) ->
current_rate_cluster(); current_rate_cluster();
current_rate(Node) when Node == node() -> current_rate(Node) when Node == node() ->
try try
{ok, Rate} = do_call(current_rate), do_call(current_rate)
{ok, adjust_individual_node_metrics(Rate)}
catch catch
_E:R -> _E:R ->
?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}), ?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}),
@ -222,8 +221,11 @@ do_sample(Node, Time) ->
Res Res
end. end.
do_sample([], _Time, Res) -> do_sample([], _Time, Samples) ->
Res; maps:map(
fun(_TS, Sample) -> adjust_synthetic_cluster_metrics(Sample) end,
Samples
);
do_sample([Node | Nodes], Time, Res) -> do_sample([Node | Nodes], Time, Res) ->
case do_sample(Node, Time) of case do_sample(Node, Time) of
{badrpc, Reason} -> {badrpc, Reason} ->
@ -237,22 +239,27 @@ match_spec(infinity) ->
match_spec(Time) -> match_spec(Time) ->
[{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}]. [{{'_', '$1', '_'}, [{'>=', '$1', Time}], ['$_']}].
merge_cluster_samplers(Node, Cluster) -> merge_cluster_samplers(NodeSamples, Cluster) ->
maps:fold(fun merge_cluster_samplers/3, Cluster, Node). maps:fold(fun merge_cluster_samplers/3, Cluster, NodeSamples).
merge_cluster_samplers(TS, NodeData, Cluster) -> merge_cluster_samplers(TS, NodeSample, Cluster) ->
case maps:get(TS, Cluster, undefined) of case maps:get(TS, Cluster, undefined) of
undefined -> undefined ->
Cluster#{TS => NodeData}; Cluster#{TS => NodeSample};
ClusterData -> ClusterSample ->
Cluster#{TS => merge_cluster_sampler_map(NodeData, ClusterData)} Cluster#{TS => merge_cluster_sampler_map(NodeSample, ClusterSample)}
end. end.
merge_cluster_sampler_map(M1, M2) -> merge_cluster_sampler_map(M1, M2) ->
Fun = Fun =
fun fun
(topics, Map) -> (Key, Map) when
Map#{topics => maps:get(topics, M1)}; %% cluster-synced values
Key =:= topics;
Key =:= subscriptions_durable;
Key =:= disconnected_durable_sessions
->
Map#{Key => maps:get(Key, M1)};
(Key, Map) -> (Key, Map) ->
Map#{Key => maps:get(Key, M1, 0) + maps:get(Key, M2, 0)} Map#{Key => maps:get(Key, M1, 0) + maps:get(Key, M2, 0)}
end, end,
@ -283,10 +290,6 @@ merge_cluster_rate(Node, Cluster) ->
end, end,
maps:fold(Fun, Cluster, Node). maps:fold(Fun, Cluster, Node).
adjust_individual_node_metrics(Metrics0) ->
%% ensure renamed
emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0).
adjust_synthetic_cluster_metrics(Metrics0) -> adjust_synthetic_cluster_metrics(Metrics0) ->
DSSubs = maps:get(subscriptions_durable, Metrics0, 0), DSSubs = maps:get(subscriptions_durable, Metrics0, 0),
RamSubs = maps:get(subscriptions, Metrics0, 0), RamSubs = maps:get(subscriptions, Metrics0, 0),
@ -445,7 +448,7 @@ stats(connections) ->
emqx_stats:getstat('connections.count'); emqx_stats:getstat('connections.count');
stats(disconnected_durable_sessions) -> stats(disconnected_durable_sessions) ->
emqx_persistent_session_bookkeeper:get_disconnected_session_count(); emqx_persistent_session_bookkeeper:get_disconnected_session_count();
stats(durable_subscriptions) -> stats(subscriptions_durable) ->
emqx_stats:getstat('durable_subscriptions.count'); emqx_stats:getstat('durable_subscriptions.count');
stats(live_connections) -> stats(live_connections) ->
emqx_stats:getstat('live_connections.count'); emqx_stats:getstat('live_connections.count');

View File

@ -202,11 +202,10 @@ swagger_desc(persisted) ->
swagger_desc_format("Messages saved to the durable storage "); swagger_desc_format("Messages saved to the durable storage ");
swagger_desc(disconnected_durable_sessions) -> swagger_desc(disconnected_durable_sessions) ->
<<"Disconnected durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>; <<"Disconnected durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(durable_subscriptions) -> swagger_desc(subscriptions_durable) ->
<<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>; <<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(subscriptions) -> swagger_desc(subscriptions) ->
<<"Subscriptions at the time of sampling (not considering durable sessions).", <<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
?APPROXIMATE_DESC>>;
swagger_desc(topics) -> swagger_desc(topics) ->
<<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>; <<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(connections) -> swagger_desc(connections) ->
@ -252,8 +251,4 @@ swagger_desc_format(Format, Type) ->
maybe_reject_cluster_only_metrics(<<"all">>, Rates) -> maybe_reject_cluster_only_metrics(<<"all">>, Rates) ->
Rates; Rates;
maybe_reject_cluster_only_metrics(_Node, Rates) -> maybe_reject_cluster_only_metrics(_Node, Rates) ->
ClusterOnlyMetrics = [ maps:without(?CLUSTERONLY_SAMPLER_LIST, Rates).
subscriptions_durable,
disconnected_durable_sessions
],
maps:without(ClusterOnlyMetrics, Rates).

View File

@ -224,13 +224,16 @@ t_monitor_current_api(_) ->
], ],
?assert(maps:is_key(<<"subscriptions_durable">>, Rate)), ?assert(maps:is_key(<<"subscriptions_durable">>, Rate)),
?assert(maps:is_key(<<"disconnected_durable_sessions">>, Rate)), ?assert(maps:is_key(<<"disconnected_durable_sessions">>, Rate)),
ClusterOnlyMetrics = [durable_subscriptions, disconnected_durable_sessions],
{ok, NodeRate} = request(["monitor_current", "nodes", node()]), {ok, NodeRate} = request(["monitor_current", "nodes", node()]),
[ ExpectedKeys = lists:map(
?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate), #{key => Key, rates => NodeRate}) fun atom_to_binary/1,
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST, (?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)) -- ?CLUSTERONLY_SAMPLER_LIST
not lists:member(Key, ClusterOnlyMetrics) ),
], ?assertEqual(
[],
ExpectedKeys -- maps:keys(NodeRate),
NodeRate
),
?assertNot(maps:is_key(<<"subscriptions_durable">>, NodeRate)), ?assertNot(maps:is_key(<<"subscriptions_durable">>, NodeRate)),
?assertNot(maps:is_key(<<"subscriptions_ram">>, NodeRate)), ?assertNot(maps:is_key(<<"subscriptions_ram">>, NodeRate)),
?assertNot(maps:is_key(<<"disconnected_durable_sessions">>, NodeRate)), ?assertNot(maps:is_key(<<"disconnected_durable_sessions">>, NodeRate)),
@ -426,6 +429,21 @@ t_persistent_session_stats(Config) ->
?ON(N1, request(["monitor_current"])) ?ON(N1, request(["monitor_current"]))
) )
end), end),
%% Verify that historical metrics are in line with the current ones.
?assertMatch(
{ok, [
#{
<<"time_stamp">> := _,
<<"connections">> := 3,
<<"disconnected_durable_sessions">> := 1,
<<"topics">> := 8,
<<"subscriptions">> := 8,
<<"subscriptions_ram">> := 4,
<<"subscriptions_durable">> := 4
}
]},
?ON(N1, request(["monitor"], "latest=1"))
),
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqtt:disconnect(PSClient2), emqtt:disconnect(PSClient2),

View File

@ -566,7 +566,11 @@ list_nodes() ->
EXPR EXPR
catch catch
error:RPCError__ = {erpc, _} -> error:RPCError__ = {erpc, _} ->
{error, recoverable, RPCError__} {error, recoverable, RPCError__};
%% Note: remote node never _throws_ unrecoverable errors, so
%% we can assume that all exceptions are transient.
EC__:RPCError__:Stack__ ->
{error, recoverable, #{EC__ => RPCError__, stacktrace => Stack__}}
end end
). ).
@ -605,13 +609,7 @@ ra_add_generation(DB, Shard) ->
?tag => add_generation, ?tag => add_generation,
?since => emqx_ds:timestamp_us() ?since => emqx_ds:timestamp_us()
}, },
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), ra_command(DB, Shard, Command, 10).
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
end.
ra_update_config(DB, Shard, Opts) -> ra_update_config(DB, Shard, Opts) ->
Command = #{ Command = #{
@ -619,20 +617,20 @@ ra_update_config(DB, Shard, Opts) ->
?config => Opts, ?config => Opts,
?since => emqx_ds:timestamp_us() ?since => emqx_ds:timestamp_us()
}, },
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), ra_command(DB, Shard, Command, 10).
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
end.
ra_drop_generation(DB, Shard, GenId) -> ra_drop_generation(DB, Shard, GenId) ->
Command = #{?tag => drop_generation, ?generation => GenId}, Command = #{?tag => drop_generation, ?generation => GenId},
ra_command(DB, Shard, Command, 10).
ra_command(DB, Shard, Command, Retries) ->
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} -> {ok, Result, _Leader} ->
Result; Result;
_Error when Retries > 0 ->
timer:sleep(?RA_TIMEOUT),
ra_command(DB, Shard, Command, Retries - 1);
Error -> Error ->
error(Error, [DB, Shard]) error(Error, [DB, Shard])
end. end.

View File

@ -591,6 +591,7 @@ init({ShardId, Options}) ->
shard = Shard shard = Shard
}, },
commit_metadata(S), commit_metadata(S),
?tp(debug, ds_storage_init_state, #{shard => ShardId, s => S}),
{ok, S}. {ok, S}.
format_status(Status) -> format_status(Status) ->
@ -625,7 +626,6 @@ handle_call(#call_list_generations_with_lifetimes{}, _From, S) ->
{reply, Generations, S}; {reply, Generations, S};
handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) -> handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
{Reply, S} = handle_drop_generation(S0, GenId), {Reply, S} = handle_drop_generation(S0, GenId),
commit_metadata(S),
{reply, Reply, S}; {reply, Reply, S};
handle_call(#call_take_snapshot{}, _From, S) -> handle_call(#call_take_snapshot{}, _From, S) ->
Snapshot = handle_take_snapshot(S), Snapshot = handle_take_snapshot(S),
@ -774,6 +774,21 @@ handle_drop_generation(S0, GenId) ->
shard = OldShard, shard = OldShard,
cf_refs = OldCFRefs cf_refs = OldCFRefs
} = S0, } = S0,
%% 1. Commit the metadata first, so other functions are less
%% likely to see stale data, and replicas don't end up
%% inconsistent state, where generation's column families are
%% absent, but its metadata is still present.
%%
%% Note: in theory, this operation may be interrupted in the
%% middle. This will leave column families hanging.
Shard = maps:remove(?GEN_KEY(GenId), OldShard),
Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
S1 = S0#s{
shard = Shard,
schema = Schema
},
commit_metadata(S1),
%% 2. Now, actually drop the data from RocksDB:
#{module := Mod, cf_refs := GenCFRefs} = GenSchema, #{module := Mod, cf_refs := GenCFRefs} = GenSchema,
#{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard, #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
try try
@ -793,13 +808,7 @@ handle_drop_generation(S0, GenId) ->
) )
end, end,
CFRefs = OldCFRefs -- GenCFRefs, CFRefs = OldCFRefs -- GenCFRefs,
Shard = maps:remove(?GEN_KEY(GenId), OldShard), S = S1#s{cf_refs = CFRefs},
Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
S = S0#s{
cf_refs = CFRefs,
shard = Shard,
schema = Schema
},
{ok, S}. {ok, S}.
-spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) -> -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->

View File

@ -25,8 +25,8 @@
-define(DB, testdb). -define(DB, testdb).
-define(ON(NODE, BODY), -define(ON(NODES, BODY),
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) emqx_ds_test_helpers:on(NODES, fun() -> BODY end)
). ).
opts() -> opts() ->
@ -476,6 +476,84 @@ t_rebalance_offline_restarts(Config) ->
?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), ?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])). ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
t_drop_generation(Config) ->
Apps = [appspec(emqx_durable_storage)],
[_, _, NS3] =
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
[
{t_drop_generation1, #{apps => Apps}},
{t_drop_generation2, #{apps => Apps}},
{t_drop_generation3, #{apps => Apps}}
],
#{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
}
),
Nodes = [N1, _, N3] = emqx_cth_cluster:start(NodeSpecs),
?check_trace(
try
%% Initialize DB on all 3 nodes.
Opts = opts(#{n_shards => 1, n_sites => 3, replication_factor => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
),
timer:sleep(1000),
%% Create a generation while all nodes are online:
?ON(N1, ?assertMatch(ok, emqx_ds:add_generation(?DB))),
?ON(
Nodes,
?assertEqual(
[{<<"0">>, 1}, {<<"0">>, 2}],
maps:keys(emqx_ds:list_generations_with_lifetimes(?DB))
)
),
%% Drop generation while all nodes are online:
?ON(N1, ?assertMatch(ok, emqx_ds:drop_generation(?DB, {<<"0">>, 1}))),
?ON(
Nodes,
?assertEqual(
[{<<"0">>, 2}],
maps:keys(emqx_ds:list_generations_with_lifetimes(?DB))
)
),
%% Ston N3, then create and drop generation when it's offline:
ok = emqx_cth_cluster:stop_node(N3),
?ON(
N1,
begin
ok = emqx_ds:add_generation(?DB),
ok = emqx_ds:drop_generation(?DB, {<<"0">>, 2})
end
),
%% Restart N3 and verify that it reached the consistent state:
emqx_cth_cluster:restart(NS3),
ok = ?ON(N3, emqx_ds:open_db(?DB, Opts)),
%% N3 can be in unstalbe state right now, but it still
%% must successfully return streams:
?ON(
Nodes,
?assertEqual([], emqx_ds:get_streams(?DB, ['#'], 0))
),
timer:sleep(1000),
?ON(
Nodes,
?assertEqual(
[{<<"0">>, 3}],
maps:keys(emqx_ds:list_generations_with_lifetimes(?DB))
)
)
after
emqx_cth_cluster:stop(Nodes)
end,
fun(Trace) ->
%% TODO: some idempotency errors still happen
%% ?assertMatch([], ?of_kind(ds_storage_layer_failed_to_drop_generation, Trace)),
true
end
).
%% %%
shard_server_info(Node, DB, Shard, Site, Info) -> shard_server_info(Node, DB, Shard, Site, Info) ->

View File

@ -23,7 +23,29 @@
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-define(ON(NODE, BODY), -define(ON(NODE, BODY),
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) emqx_ds_test_helpers:on(NODE, fun() -> BODY end)
).
-spec on([node()] | node(), fun(() -> A)) -> A | [A].
on(Node, Fun) when is_atom(Node) ->
[Ret] = on([Node], Fun),
Ret;
on(Nodes, Fun) ->
Results = erpc:multicall(Nodes, erlang, apply, [Fun, []]),
lists:map(
fun
({_Node, {ok, Result}}) ->
Result;
({Node, Error}) ->
ct:pal("Error on node ~p", [Node]),
case Error of
{error, {exception, Reason, Stack}} ->
erlang:raise(error, Reason, Stack);
_ ->
error(Error)
end
end,
lists:zip(Nodes, Results)
). ).
%% RPC mocking %% RPC mocking

View File

@ -32,6 +32,7 @@
-define(MEDIUM_CUSTOMER, 1). -define(MEDIUM_CUSTOMER, 1).
-define(LARGE_CUSTOMER, 2). -define(LARGE_CUSTOMER, 2).
-define(BUSINESS_CRITICAL_CUSTOMER, 3). -define(BUSINESS_CRITICAL_CUSTOMER, 3).
-define(BYOC_CUSTOMER, 4).
-define(EVALUATION_CUSTOMER, 10). -define(EVALUATION_CUSTOMER, 10).
-define(EXPIRED_DAY, -90). -define(EXPIRED_DAY, -90).

View File

@ -29,6 +29,7 @@
| ?MEDIUM_CUSTOMER | ?MEDIUM_CUSTOMER
| ?LARGE_CUSTOMER | ?LARGE_CUSTOMER
| ?BUSINESS_CRITICAL_CUSTOMER | ?BUSINESS_CRITICAL_CUSTOMER
| ?BYOC_CUSTOMER
| ?EVALUATION_CUSTOMER. | ?EVALUATION_CUSTOMER.
-type license_type() :: ?OFFICIAL | ?TRIAL. -type license_type() :: ?OFFICIAL | ?TRIAL.

View File

@ -141,14 +141,16 @@ schema("/clients_v2") ->
parameters => fields(list_clients_v2_inputs), parameters => fields(list_clients_v2_inputs),
responses => #{ responses => #{
200 => 200 =>
emqx_dashboard_swagger:schema_with_example(?R_REF(list_clients_v2_response), #{ %% TODO: unhide after API is ready
<<"data">> => [client_example()], %% emqx_dashboard_swagger:schema_with_example(?R_REF(list_clients_v2_response), #{
<<"meta">> => #{ %% <<"data">> => [client_example()],
<<"count">> => 1, %% <<"meta">> => #{
<<"cursor">> => <<"g2wAAAADYQFhAm0AAAACYzJq">>, %% <<"count">> => 1,
<<"hasnext">> => true %% <<"cursor">> => <<"g2wAAAADYQFhAm0AAAACYzJq">>,
} %% <<"hasnext">> => true
}), %% }
%% }),
emqx_dashboard_swagger:schema_with_example(map(), #{}),
400 => 400 =>
emqx_dashboard_swagger:error_codes( emqx_dashboard_swagger:error_codes(
['INVALID_PARAMETER'], <<"Invalid parameters">> ['INVALID_PARAMETER'], <<"Invalid parameters">>

View File

@ -307,7 +307,7 @@ get_site(get, #{bindings := #{site := Site}}) ->
?NOT_FOUND(<<"Site not found: ", Site/binary>>); ?NOT_FOUND(<<"Site not found: ", Site/binary>>);
true -> true ->
Node = emqx_ds_replication_layer_meta:node(Site), Node = emqx_ds_replication_layer_meta:node(Site),
IsUp = lists:member(Node, [node() | nodes()]), IsUp = mria:cluster_status(Node) =:= running,
Shards = shards_of_site(Site), Shards = shards_of_site(Site),
?OK(#{ ?OK(#{
node => Node, node => Node,

View File

@ -20,6 +20,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_plugins/include/emqx_plugins.hrl"). -include_lib("emqx_plugins/include/emqx_plugins.hrl").
-include_lib("erlavro/include/erlavro.hrl").
-dialyzer({no_match, [format_plugin_avsc_and_i18n/1]}). -dialyzer({no_match, [format_plugin_avsc_and_i18n/1]}).
@ -506,14 +507,20 @@ plugin_config(get, #{bindings := #{name := NameVsn}}) ->
{404, plugin_not_found_msg()} {404, plugin_not_found_msg()}
end; end;
plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) -> plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) ->
Nodes = emqx:running_nodes(),
case emqx_plugins:describe(NameVsn) of case emqx_plugins:describe(NameVsn) of
{ok, _} -> {ok, _} ->
case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of
{ok, AvroValueConfig} -> {ok, ?plugin_without_config_schema} ->
Nodes = emqx:running_nodes(), %% no plugin avro schema, just put the json map as-is
_Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config(
Nodes, NameVsn, AvroJsonMap, ?plugin_without_config_schema
),
{204};
{ok, AvroValue} ->
%% cluster call with config in map (binary key-value) %% cluster call with config in map (binary key-value)
_Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config( _Res = emqx_mgmt_api_plugins_proto_v3:update_plugin_config(
Nodes, NameVsn, AvroJsonMap, AvroValueConfig Nodes, NameVsn, AvroJsonMap, AvroValue
), ),
{204}; {204};
{error, Reason} -> {error, Reason} ->
@ -604,9 +611,13 @@ ensure_action(Name, restart) ->
ok. ok.
%% for RPC plugin avro encoded config update %% for RPC plugin avro encoded config update
do_update_plugin_config(NameVsn, AvroJsonMap, PluginConfigMap) -> -spec do_update_plugin_config(
%% TODO: maybe use `PluginConfigMap` to validate config name_vsn(), map(), avro_value() | ?plugin_without_config_schema
emqx_plugins:put_config(NameVsn, AvroJsonMap, PluginConfigMap). ) ->
ok.
do_update_plugin_config(NameVsn, AvroJsonMap, AvroValue) ->
%% TODO: maybe use `AvroValue` to validate config
emqx_plugins:put_config(NameVsn, AvroJsonMap, AvroValue).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions

View File

@ -28,6 +28,7 @@
]). ]).
-include_lib("emqx/include/bpapi.hrl"). -include_lib("emqx/include/bpapi.hrl").
-include_lib("emqx_plugins/include/emqx_plugins.hrl").
introduced_in() -> introduced_in() ->
"5.7.0". "5.7.0".
@ -56,14 +57,14 @@ ensure_action(Name, Action) ->
[node()], [node()],
binary() | string(), binary() | string(),
binary(), binary(),
map() map() | ?plugin_without_config_schema
) -> ) ->
emqx_rpc:multicall_result(). emqx_rpc:multicall_result().
update_plugin_config(Nodes, NameVsn, AvroJsonMap, PluginConfig) -> update_plugin_config(Nodes, NameVsn, AvroJsonMap, MaybeAvroValue) ->
rpc:multicall( rpc:multicall(
Nodes, Nodes,
emqx_mgmt_api_plugins, emqx_mgmt_api_plugins,
do_update_plugin_config, do_update_plugin_config,
[NameVsn, AvroJsonMap, PluginConfig], [NameVsn, AvroJsonMap, MaybeAvroValue],
10000 10000
). ).

View File

@ -19,6 +19,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(ACT_ALARM, test_act_alarm). -define(ACT_ALARM, test_act_alarm).
-define(DE_ACT_ALARM, test_de_act_alarm). -define(DE_ACT_ALARM, test_de_act_alarm).
@ -27,11 +28,18 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(), Apps = emqx_cth_suite:start(
Config. [
emqx,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite(). ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
t_alarms_api(_) -> t_alarms_api(_) ->
ok = emqx_alarm:activate(?ACT_ALARM), ok = emqx_alarm:activate(?ACT_ALARM),

View File

@ -56,11 +56,18 @@ groups() ->
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]), Apps = emqx_cth_suite:start(
Config. [
emqx_conf,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_management]). ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
t_bootstrap_file(_) -> t_bootstrap_file(_) ->
TestPath = <<"/api/v5/status">>, TestPath = <<"/api/v5/status">>,

View File

@ -19,6 +19,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(EXPIRATION_TIME, 31536000). -define(EXPIRATION_TIME, 31536000).
@ -26,11 +27,18 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(), Apps = emqx_cth_suite:start(
Config. [
emqx,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite(). ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
t_create(_Config) -> t_create(_Config) ->
Now = erlang:system_time(second), Now = erlang:system_time(second),

View File

@ -41,9 +41,12 @@ init_per_testcase(TC = t_cluster_invite_api_timeout, Config0) ->
init_per_testcase(TC = t_cluster_invite_async, Config0) -> init_per_testcase(TC = t_cluster_invite_async, Config0) ->
Config = [{tc_name, TC} | Config0], Config = [{tc_name, TC} | Config0],
[{cluster, cluster(Config)} | setup(Config)]; [{cluster, cluster(Config)} | setup(Config)];
init_per_testcase(_TC, Config) -> init_per_testcase(TC, Config) ->
emqx_mgmt_api_test_util:init_suite(?APPS), Apps = emqx_cth_suite:start(
Config. ?APPS ++ [emqx_mgmt_api_test_util:emqx_dashboard()],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
),
[{tc_apps, Apps} | Config].
end_per_testcase(t_cluster_topology_api_replicants, Config) -> end_per_testcase(t_cluster_topology_api_replicants, Config) ->
emqx_cth_cluster:stop(?config(cluster, Config)), emqx_cth_cluster:stop(?config(cluster, Config)),
@ -54,8 +57,8 @@ end_per_testcase(t_cluster_invite_api_timeout, Config) ->
end_per_testcase(t_cluster_invite_async, Config) -> end_per_testcase(t_cluster_invite_async, Config) ->
emqx_cth_cluster:stop(?config(cluster, Config)), emqx_cth_cluster:stop(?config(cluster, Config)),
cleanup(Config); cleanup(Config);
end_per_testcase(_TC, _Config) -> end_per_testcase(_TC, Config) ->
emqx_mgmt_api_test_util:end_suite(?APPS). ok = emqx_cth_suite:stop(?config(tc_apps, Config)).
t_cluster_topology_api_empty_resp(_) -> t_cluster_topology_api_empty_resp(_) ->
ClusterTopologyPath = emqx_mgmt_api_test_util:api_path(["cluster", "topology"]), ClusterTopologyPath = emqx_mgmt_api_test_util:api_path(["cluster", "topology"]),

View File

@ -25,11 +25,18 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf]), Apps = emqx_cth_suite:start(
Config. [
emqx_conf,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_conf]). ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_testcase(TestCase = t_configs_node, Config) -> init_per_testcase(TestCase = t_configs_node, Config) ->
?MODULE:TestCase({'init', Config}); ?MODULE:TestCase({'init', Config});

View File

@ -74,13 +74,10 @@ init_group_apps(Config, CTConfig) ->
[ [
{emqx_conf, Config}, {emqx_conf, Config},
emqx_management, emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} emqx_mgmt_api_test_util:emqx_dashboard()
], ],
#{ #{work_dir => emqx_cth_suite:work_dir(CTConfig)}
work_dir => emqx_cth_suite:work_dir(CTConfig)
}
), ),
{ok, _} = emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | CTConfig]. [{suite_apps, Apps} | CTConfig].
end_per_group(_Group, Config) -> end_per_group(_Group, Config) ->

View File

@ -19,16 +19,24 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(), Apps = emqx_cth_suite:start(
Config. [
emqx,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(_) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite(). ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
t_metrics_api(_) -> t_metrics_api(_) ->
{ok, MetricsResponse} = request_helper("metrics?aggregate=true"), {ok, MetricsResponse} = request_helper("metrics?aggregate=true"),

View File

@ -29,11 +29,10 @@ init_per_suite(Config) ->
[ [
emqx_conf, emqx_conf,
emqx_management, emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} emqx_mgmt_api_test_util:emqx_dashboard()
], ],
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
{ok, _} = emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | Config]. [{suite_apps, Apps} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->

View File

@ -34,24 +34,23 @@ all() ->
init_per_suite(Config) -> init_per_suite(Config) ->
WorkDir = proplists:get_value(data_dir, Config), WorkDir = proplists:get_value(data_dir, Config),
ok = filelib:ensure_dir(WorkDir),
DemoShDir1 = string:replace(WorkDir, "emqx_mgmt_api_plugins", "emqx_plugins"), DemoShDir1 = string:replace(WorkDir, "emqx_mgmt_api_plugins", "emqx_plugins"),
DemoShDir = lists:flatten(string:replace(DemoShDir1, "emqx_management", "emqx_plugins")), DemoShDir = lists:flatten(string:replace(DemoShDir1, "emqx_management", "emqx_plugins")),
OrigInstallDir = emqx_plugins:get_config_interal(install_dir, undefined), Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx_plugins,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
ok = filelib:ensure_dir(DemoShDir), ok = filelib:ensure_dir(DemoShDir),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_plugins]),
emqx_plugins:put_config_internal(install_dir, DemoShDir), emqx_plugins:put_config_internal(install_dir, DemoShDir),
[{demo_sh_dir, DemoShDir}, {orig_install_dir, OrigInstallDir} | Config]. [{apps, Apps}, {demo_sh_dir, DemoShDir} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all), ok = emqx_cth_suite:stop(?config(apps, Config)).
%% restore config
case proplists:get_value(orig_install_dir, Config) of
undefined -> ok;
OrigInstallDir -> emqx_plugins:put_config_internal(install_dir, OrigInstallDir)
end,
emqx_mgmt_api_test_util:end_suite([emqx_plugins, emqx_conf]),
ok.
init_per_testcase(t_cluster_update_order = TestCase, Config0) -> init_per_testcase(t_cluster_update_order = TestCase, Config0) ->
Config = [{api_port, 18085} | Config0], Config = [{api_port, 18085} | Config0],

View File

@ -24,6 +24,7 @@
-include_lib("stdlib/include/zip.hrl"). -include_lib("stdlib/include/zip.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("common_test/include/ct.hrl").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
@ -33,11 +34,18 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(), Apps = emqx_cth_suite:start(
Config. [
emqx,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(_) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite(). ok = emqx_cth_suite:stop(?config(apps, Config)).
t_http_test(_Config) -> t_http_test(_Config) ->
emqx_trace:clear(), emqx_trace:clear(),

View File

@ -25,6 +25,7 @@
-define(CONFIG_FORMAT_MAP, config_format_map). -define(CONFIG_FORMAT_MAP, config_format_map).
-define(plugin_conf_not_found, plugin_conf_not_found). -define(plugin_conf_not_found, plugin_conf_not_found).
-define(plugin_without_config_schema, plugin_without_config_schema).
-type schema_name() :: binary(). -type schema_name() :: binary().
-type avsc_path() :: string(). -type avsc_path() :: string().

View File

@ -161,7 +161,9 @@ ensure_installed(NameVsn) ->
ok = purge(NameVsn), ok = purge(NameVsn),
case ensure_exists_and_installed(NameVsn) of case ensure_exists_and_installed(NameVsn) of
ok -> ok ->
maybe_post_op_after_installed(NameVsn); maybe_post_op_after_installed(NameVsn),
_ = maybe_ensure_plugin_config(NameVsn),
ok;
{error, _Reason} = Err -> {error, _Reason} = Err ->
Err Err
end end
@ -328,10 +330,11 @@ get_config_bin(NameVsn) ->
%% @doc Update plugin's config. %% @doc Update plugin's config.
%% RPC call from Management API or CLI. %% RPC call from Management API or CLI.
%% the plugin config Json Map and plugin config ALWAYS be valid before calling this function. %% The plugin config Json Map was valid by avro schema
put_config(NameVsn, ConfigJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) -> %% Or: if no and plugin config ALWAYS be valid before calling this function.
put_config(bin(NameVsn), ConfigJsonMap, DecodedPluginConfig); put_config(NameVsn, ConfigJsonMap, AvroValue) when not is_binary(NameVsn) ->
put_config(NameVsn, ConfigJsonMap, _DecodedPluginConfig) -> put_config(bin(NameVsn), ConfigJsonMap, AvroValue);
put_config(NameVsn, ConfigJsonMap, _AvroValue) ->
HoconBin = hocon_pp:do(ConfigJsonMap, #{}), HoconBin = hocon_pp:do(ConfigJsonMap, #{}),
ok = backup_and_write_hocon_bin(NameVsn, HoconBin), ok = backup_and_write_hocon_bin(NameVsn, HoconBin),
%% TODO: callback in plugin's on_config_changed (config update by mgmt API) %% TODO: callback in plugin's on_config_changed (config update by mgmt API)
@ -369,10 +372,30 @@ list() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Package utils %% Package utils
-spec decode_plugin_config_map(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}. -spec decode_plugin_config_map(name_vsn(), map() | binary()) ->
decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) -> {ok, map() | ?plugin_without_config_schema}
decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap)); | {error, any()}.
decode_plugin_config_map(NameVsn, AvroJsonBin) -> decode_plugin_config_map(NameVsn, AvroJsonMap) ->
case with_plugin_avsc(NameVsn) of
true ->
case emqx_plugins_serde:lookup_serde(NameVsn) of
{error, not_found} ->
Reason = "plugin_config_schema_serde_not_found",
?SLOG(error, #{
msg => Reason, name_vsn => NameVsn, plugin_with_avro_schema => true
}),
{error, Reason};
{ok, _Serde} ->
do_decode_plugin_config_map(NameVsn, AvroJsonMap)
end;
false ->
?SLOG(debug, #{name_vsn => NameVsn, plugin_with_avro_schema => false}),
{ok, ?plugin_without_config_schema}
end.
do_decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
do_decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap));
do_decode_plugin_config_map(NameVsn, AvroJsonBin) ->
case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
{ok, Config} -> {ok, Config}; {ok, Config} -> {ok, Config};
{error, ReasonMap} -> {error, ReasonMap} {error, ReasonMap} -> {error, ReasonMap}
@ -533,6 +556,7 @@ ensure_state(NameVsn, Position, State, ConfLocation) ->
fun() -> ensure_configured(Item, Position, ConfLocation) end fun() -> ensure_configured(Item, Position, ConfLocation) end
); );
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "ensure_plugin_states_failed", reason => Reason}),
{error, Reason} {error, Reason}
end. end.
@ -1100,9 +1124,31 @@ for_plugins(ActionFun) ->
ok ok
end. end.
maybe_post_op_after_installed(NameVsn) -> maybe_post_op_after_installed(NameVsn0) ->
NameVsn = wrap_to_list(NameVsn0),
_ = maybe_load_config_schema(NameVsn), _ = maybe_load_config_schema(NameVsn),
ok = maybe_ensure_state(NameVsn).
maybe_ensure_state(NameVsn) ->
EnsureStateFun = fun(#{name_vsn := NV, enable := Bool}, AccIn) ->
case NV of
NameVsn ->
%% Configured, using existed cluster config
_ = ensure_state(NV, no_move, Bool, global),
AccIn#{ensured => true};
_ ->
AccIn
end
end,
case lists:foldl(EnsureStateFun, #{ensured => false}, configured()) of
#{ensured := true} ->
ok;
#{ensured := false} ->
?SLOG(info, #{msg => "plugin_not_configured", name_vsn => NameVsn}),
%% Clean installation, no config, ensure with `Enable = false`
_ = ensure_state(NameVsn, no_move, false, global), _ = ensure_state(NameVsn, no_move, false, global),
ok
end,
ok. ok.
maybe_load_config_schema(NameVsn) -> maybe_load_config_schema(NameVsn) ->
@ -1114,7 +1160,7 @@ maybe_load_config_schema(NameVsn) ->
_ = maybe_create_config_dir(NameVsn). _ = maybe_create_config_dir(NameVsn).
do_load_config_schema(NameVsn, AvscPath) -> do_load_config_schema(NameVsn, AvscPath) ->
case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of case emqx_plugins_serde:add_schema(bin(NameVsn), AvscPath) of
ok -> ok; ok -> ok;
{error, already_exists} -> ok; {error, already_exists} -> ok;
{error, _Reason} -> ok {error, _Reason} -> ok
@ -1144,6 +1190,7 @@ do_create_config_dir(NameVsn) ->
end end
end. end.
-spec maybe_ensure_plugin_config(name_vsn()) -> ok.
maybe_ensure_plugin_config(NameVsn) -> maybe_ensure_plugin_config(NameVsn) ->
maybe maybe
true ?= with_plugin_avsc(NameVsn), true ?= with_plugin_avsc(NameVsn),
@ -1152,10 +1199,13 @@ maybe_ensure_plugin_config(NameVsn) ->
_ -> ok _ -> ok
end. end.
-spec ensure_plugin_config(name_vsn()) -> ok.
ensure_plugin_config(NameVsn) -> ensure_plugin_config(NameVsn) ->
%% fetch plugin hocon config from cluster %% fetch plugin hocon config from cluster
Nodes = [N || N <- mria:running_nodes(), N /= node()], Nodes = [N || N <- mria:running_nodes(), N /= node()],
ensure_plugin_config(NameVsn, Nodes). ensure_plugin_config(NameVsn, Nodes).
-spec ensure_plugin_config(name_vsn(), list()) -> ok.
ensure_plugin_config(NameVsn, []) -> ensure_plugin_config(NameVsn, []) ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "default_plugin_config_used", msg => "default_plugin_config_used",
@ -1167,7 +1217,9 @@ ensure_plugin_config(NameVsn, Nodes) ->
case get_plugin_config_from_any_node(Nodes, NameVsn, []) of case get_plugin_config_from_any_node(Nodes, NameVsn, []) of
{ok, ConfigMap} when is_map(ConfigMap) -> {ok, ConfigMap} when is_map(ConfigMap) ->
HoconBin = hocon_pp:do(ConfigMap, #{}), HoconBin = hocon_pp:do(ConfigMap, #{}),
ok = file:write_file(plugin_config_file(NameVsn), HoconBin), Path = plugin_config_file(NameVsn),
ok = filelib:ensure_dir(Path),
ok = file:write_file(Path, HoconBin),
ensure_config_map(NameVsn); ensure_config_map(NameVsn);
_ -> _ ->
?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}), ?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}),
@ -1176,6 +1228,7 @@ ensure_plugin_config(NameVsn, Nodes) ->
cp_default_config_file(NameVsn) cp_default_config_file(NameVsn)
end. end.
-spec cp_default_config_file(name_vsn()) -> ok.
cp_default_config_file(NameVsn) -> cp_default_config_file(NameVsn) ->
%% always copy default hocon file into config dir when can not get config from other nodes %% always copy default hocon file into config dir when can not get config from other nodes
Source = default_plugin_config_file(NameVsn), Source = default_plugin_config_file(NameVsn),
@ -1184,9 +1237,10 @@ cp_default_config_file(NameVsn) ->
true ?= filelib:is_regular(Source), true ?= filelib:is_regular(Source),
%% destination path not existed (not configured) %% destination path not existed (not configured)
true ?= (not filelib:is_regular(Destination)), true ?= (not filelib:is_regular(Destination)),
ok = filelib:ensure_dir(Destination),
case file:copy(Source, Destination) of case file:copy(Source, Destination) of
{ok, _} -> {ok, _} ->
ok; ensure_config_map(NameVsn);
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "failed_to_copy_plugin_default_hocon_config", msg => "failed_to_copy_plugin_default_hocon_config",
@ -1200,19 +1254,32 @@ cp_default_config_file(NameVsn) ->
end. end.
ensure_config_map(NameVsn) -> ensure_config_map(NameVsn) ->
with_plugin_avsc(NameVsn) andalso
do_ensure_config_map(NameVsn).
do_ensure_config_map(NameVsn) ->
case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of
{ok, ConfigJsonMap} -> {ok, ConfigJsonMap} ->
{ok, Config} = decode_plugin_config_map(NameVsn, ConfigJsonMap), case with_plugin_avsc(NameVsn) of
put_config(NameVsn, ConfigJsonMap, Config); true ->
do_ensure_config_map(NameVsn, ConfigJsonMap);
false ->
put_config(NameVsn, ConfigJsonMap, ?plugin_without_config_schema)
end;
_ -> _ ->
?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}), ?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}),
ok ok
end. end.
do_ensure_config_map(NameVsn, ConfigJsonMap) ->
case decode_plugin_config_map(NameVsn, ConfigJsonMap) of
{ok, AvroValue} ->
put_config(NameVsn, ConfigJsonMap, AvroValue);
{error, Reason} ->
?SLOG(error, #{
msg => "plugin_config_validation_failed",
name_vsn => NameVsn,
reason => Reason
}),
ok
end.
%% @private Backup the current config to a file with a timestamp suffix and %% @private Backup the current config to a file with a timestamp suffix and
%% then save the new config to the config file. %% then save the new config to the config file.
backup_and_write_hocon_bin(NameVsn, HoconBin) -> backup_and_write_hocon_bin(NameVsn, HoconBin) ->
@ -1301,23 +1368,23 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) ->
%% Directorys %% Directorys
-spec plugin_dir(name_vsn()) -> string(). -spec plugin_dir(name_vsn()) -> string().
plugin_dir(NameVsn) -> plugin_dir(NameVsn) ->
wrap_list_path(filename:join([install_dir(), NameVsn])). wrap_to_list(filename:join([install_dir(), NameVsn])).
-spec plugin_priv_dir(name_vsn()) -> string(). -spec plugin_priv_dir(name_vsn()) -> string().
plugin_priv_dir(NameVsn) -> plugin_priv_dir(NameVsn) ->
case read_plugin_info(NameVsn, #{fill_readme => false}) of case read_plugin_info(NameVsn, #{fill_readme => false}) of
{ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} -> {ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} ->
AppDir = make_name_vsn_string(Name, Vsn), AppDir = make_name_vsn_string(Name, Vsn),
wrap_list_path(filename:join([plugin_dir(NameVsn), AppDir, "priv"])); wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"]));
_ -> _ ->
wrap_list_path(filename:join([install_dir(), NameVsn, "priv"])) wrap_to_list(filename:join([install_dir(), NameVsn, "priv"]))
end. end.
-spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}. -spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}.
plugin_config_dir(NameVsn) -> plugin_config_dir(NameVsn) ->
case parse_name_vsn(NameVsn) of case parse_name_vsn(NameVsn) of
{ok, NameAtom, _Vsn} -> {ok, NameAtom, _Vsn} ->
wrap_list_path(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)])); wrap_to_list(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)]));
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "failed_to_generate_plugin_config_dir_for_plugin", msg => "failed_to_generate_plugin_config_dir_for_plugin",
@ -1330,32 +1397,32 @@ plugin_config_dir(NameVsn) ->
%% Files %% Files
-spec pkg_file_path(name_vsn()) -> string(). -spec pkg_file_path(name_vsn()) -> string().
pkg_file_path(NameVsn) -> pkg_file_path(NameVsn) ->
wrap_list_path(filename:join([install_dir(), bin([NameVsn, ".tar.gz"])])). wrap_to_list(filename:join([install_dir(), bin([NameVsn, ".tar.gz"])])).
-spec info_file_path(name_vsn()) -> string(). -spec info_file_path(name_vsn()) -> string().
info_file_path(NameVsn) -> info_file_path(NameVsn) ->
wrap_list_path(filename:join([plugin_dir(NameVsn), "release.json"])). wrap_to_list(filename:join([plugin_dir(NameVsn), "release.json"])).
-spec avsc_file_path(name_vsn()) -> string(). -spec avsc_file_path(name_vsn()) -> string().
avsc_file_path(NameVsn) -> avsc_file_path(NameVsn) ->
wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config_schema.avsc"])). wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config_schema.avsc"])).
-spec plugin_config_file(name_vsn()) -> string(). -spec plugin_config_file(name_vsn()) -> string().
plugin_config_file(NameVsn) -> plugin_config_file(NameVsn) ->
wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.hocon"])). wrap_to_list(filename:join([plugin_config_dir(NameVsn), "config.hocon"])).
%% should only used when plugin installing %% should only used when plugin installing
-spec default_plugin_config_file(name_vsn()) -> string(). -spec default_plugin_config_file(name_vsn()) -> string().
default_plugin_config_file(NameVsn) -> default_plugin_config_file(NameVsn) ->
wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config.hocon"])). wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config.hocon"])).
-spec i18n_file_path(name_vsn()) -> string(). -spec i18n_file_path(name_vsn()) -> string().
i18n_file_path(NameVsn) -> i18n_file_path(NameVsn) ->
wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config_i18n.json"])). wrap_to_list(filename:join([plugin_priv_dir(NameVsn), "config_i18n.json"])).
-spec readme_file(name_vsn()) -> string(). -spec readme_file(name_vsn()) -> string().
readme_file(NameVsn) -> readme_file(NameVsn) ->
wrap_list_path(filename:join([plugin_dir(NameVsn), "README.md"])). wrap_to_list(filename:join([plugin_dir(NameVsn), "README.md"])).
running_apps() -> running_apps() ->
lists:map( lists:map(
@ -1387,5 +1454,5 @@ bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8); bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
bin(B) when is_binary(B) -> B. bin(B) when is_binary(B) -> B.
wrap_list_path(Path) -> wrap_to_list(Path) ->
binary_to_list(iolist_to_binary(Path)). binary_to_list(iolist_to_binary(Path)).

View File

@ -265,11 +265,16 @@ lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] ->
Value; Value;
lookup_var([Prop | Rest], Data0) -> lookup_var([Prop | Rest], Data0) ->
Data = Data =
case is_map(Data0) of
true ->
Data0;
false ->
case emqx_utils_json:safe_decode(Data0, [return_maps]) of case emqx_utils_json:safe_decode(Data0, [return_maps]) of
{ok, Data1} -> {ok, Data1} ->
Data1; Data1;
{error, _} -> {error, _} ->
Data0 Data0
end
end, end,
case lookup(Prop, Data) of case lookup(Prop, Data) of
{ok, Value} -> {ok, Value} ->
@ -293,12 +298,10 @@ lookup(Prop, Data) when is_binary(Prop) ->
end. end.
do_one_lookup(Key, Data) -> do_one_lookup(Key, Data) ->
try case Data of
{ok, maps:get(Key, Data)} #{Key := Value} ->
catch {ok, Value};
error:{badkey, _} -> _ ->
{error, undefined};
error:{badmap, _} ->
{error, undefined} {error, undefined}
end. end.

View File

@ -0,0 +1 @@
Improve performance of template rendering in bridges.

182
changes/e5.7.0.en.md Normal file
View File

@ -0,0 +1,182 @@
# e5.7.0
## Enhancements
#### MQTT
Implemented Durable Sessions, which persists MQTT Persistent Sessions and their messages to disk, and continuously replicates session metadata and MQTT messages among multiple nodes in the EMQX cluster. This achieves effective failover and recovery mechanisms, ensuring service continuity and high availability, thereby enhancing system reliability.
Added metrics related to EMQX durable storage to Prometheus:
- `emqx_ds_egress_batches`
- `emqx_ds_egress_batches_retry`
- `emqx_ds_egress_batches_failed`
- `emqx_ds_egress_messages`
- `emqx_ds_egress_bytes`
- `emqx_ds_egress_flush_time`
- `emqx_ds_store_batch_time`
- `emqx_ds_builtin_next_time`
- `emqx_ds_storage_bitfield_lts_counter_seek`
- `emqx_ds_storage_bitfield_lts_counter_next`
- `emqx_ds_storage_bitfield_lts_counter_collision`
Note: these metrics are only visible when session persistence is enabled.
The number of persisted messages has also been added to the Dashboard.
#### Security
- [#12947](https://github.com/emqx/emqx/pull/12947) For JWT authentication, support new `disconnect_after_expire` option. When enabled, the client will be disconnected after the JWT token expires.
Note: This is a breaking change. This option is enabled by default, so the default behavior is changed. Previously, the clients with actual JWTs could connect to the broker and stay connected even after the JWT token expired. Now, the client will be disconnected after the JWT token expires. To preserve the previous behavior, set `disconnect_after_expire` to `false`.
#### Data Processing and Integration
- [#12711](https://github.com/emqx/emqx/pull/12711) Added schema validation feature. With this feature, once validations are configured for certain topic filters, the configured checks are run against published messages. If the checking results are not accepted by validation, the message is dropped and the client may be disconnected, depending on the configuration.
- [#12899](https://github.com/emqx/emqx/pull/12899) For RocketMQ data integration, added support for namespace and key dispatch strategy.
- [#12671](https://github.com/emqx/emqx/pull/12671) An `unescape` function has been added to the rule engine SQL language to handle the expansion of escape sequences in strings. This addition has been done because string literals in the SQL language don't support any escape codes (e.g., `\n` and `\t`). This enhancement allows for more flexible string manipulation within SQL expressions.
- [#12898](https://github.com/emqx/emqx/pull/12898) IoTDB bridge support for iotdb 1.3.0 and batch insert(batch_size/batch_time) options.
- [#12934](https://github.com/emqx/emqx/pull/12934) Added CSV format file aggregation for AWS S3 action.
#### Observability
- [#12827](https://github.com/emqx/emqx/pull/12827) It is now possible to trace rules with a new Rule ID trace filter as well as with the Client ID filter. For testing purposes, it is now also possible to use a new HTTP API endpoint (rules/:id/test) to artificially apply a rule and optionally stop its actions after they have been rendered.
- [#12863](https://github.com/emqx/emqx/pull/12863) You can now format trace log entries as JSON objects by setting the formatter parameter to "json" when creating the trace pattern.
#### Extensibility
- [#12872](https://github.com/emqx/emqx/pull/12872) Implemented Client Attributes feature. It allows setting additional properties for each client using key-value pairs. Property values can be generated from MQTT client connection information (such as username, client ID, TLS certificate) or set from data accompanying successful authentication returns. Properties can be used in EMQX for authentication, authorization, data integration, and MQTT extension functions. Compared to using static properties like client ID directly, client properties offer greater flexibility in various business scenarios, simplifying the development process and enhancing adaptability and efficiency in development work.
**Initialization of `client_attrs`**
The `client_attrs` fields can be initially populated from one of the following `clientinfo` fields:
- `cn`: The common name from the TLS client's certificate.
- `dn`: The distinguished name from the TLS client's certificate, that is, the certificate "Subject".
- `clientid`: The MQTT client ID provided by the client.
- `username`: The username provided by the client.
- `user_property`: Extract a property value from 'User-Property' of the MQTT CONNECT packet.
**Extension through Authentication Responses**
Additional attributes may be merged into `client_attrs` from authentication responses. Supported
authentication backends include:
- **HTTP**: Attributes can be included in the JSON object of the HTTP response body through a
`client_attrs` field.
- **JWT**: Attributes can be included via a `client_attrs` claim within the JWT.
**Usage in Authentication and Authorization**
If `client_attrs` is initialized before authentication, it can be used in external authentication
requests. For instance, `${client_attrs.property1}` can be used within request templates
directed at an HTTP server for authenticity validation.
- The `client_attrs` can be utilized in authorization configurations or request templates, enhancing
flexibility and control. Examples include: In `acl.conf`, use `{allow, all, all, ["${client_attrs.namespace}/#"]}` to apply permissions based on the `namespace` attribute.
- In other authorization backends, `${client_attrs.namespace}` can be used within request templates to dynamically include client attributes.
- [#12910](https://github.com/emqx/emqx/pull/12910) Added plugin configuration management and schema validation. It is also possible to annotate the schema with metadata to facilitate UI rendering in the Dashboard. See more details in the [plugin template](https://github.com/emqx/emqx-plugin-template/pull/126) and [plugin documentation](../extensions/plugins.md).
#### Operations and Management
- [#12923](https://github.com/emqx/emqx/pull/12923) Provided more specific error when importing wrong format into builtin authenticate database.
- [#12940](https://github.com/emqx/emqx/pull/12940) Added `ignore_readonly` argument to `PUT /configs` API.
Before this change, EMQX would return 400 (BAD_REQUEST) if the raw config included read-only root keys (`cluster`, `rpc`, and `node`).
After this enhancement it can be called as `PUT /configs?ignore_readonly=true`, EMQX will in this case ignore readonly root config keys, and apply the rest. For observability purposes, an info level message is logged if any readonly keys are dropped.
Also fixed an exception when config has bad HOCON syntax (returns 500). Now bad syntax will cause the API to return 400 (BAD_REQUEST).
- [#12957](https://github.com/emqx/emqx/pull/12957) Started building packages for macOS 14 (Apple Silicon) and Ubuntu 24.04 Noble Numbat (LTS).
## Bug Fixes
#### Security
- [#12887](https://github.com/emqx/emqx/pull/12887) Fixed MQTT enhanced auth with sasl scram.
- [#12962](https://github.com/emqx/emqx/pull/12962) TLS clients can now verify server hostname against wildcard certificate. For example, if a certificate is issued for host `*.example.com`, TLS clients is able to verify server hostnames like `srv1.example.com`.
#### MQTT
- [#12996](https://github.com/emqx/emqx/pull/12996) Fixed process leak in `emqx_retainer` application. Previously, client disconnection while receiving retained messages could cause a process leak.
#### Data Processing and Integration
- [#12653](https://github.com/emqx/emqx/pull/12653) The rule engine function `bin2hexstr` now supports bitstring inputs with a bit size that is not divisible by 8. Such bitstrings can be returned by the rule engine function `subbits`.
- [#12657](https://github.com/emqx/emqx/pull/12657) The rule engine SQL-based language previously did not allow putting any expressions as array elements in array literals (only constants and variable references were allowed). This has now been fixed so that one can use any expressions as array elements.
The following is now permitted, for example:
```bash
select
[21 + 21, abs(-abs(-2)), [1 + 1], 4] as my_array
from "t/#"
```
- [#12932](https://github.com/emqx/emqx/pull/12932) Previously, if a HTTP action request received a 503 (Service Unavailable) status, it was marked as a failure and the request was not retried. This has now been fixed so that the request is retried a configurable number of times.
- [#12948](https://github.com/emqx/emqx/pull/12948) Fixed an issue where sensitive HTTP header values like `Authorization` would be substituted by `******` after updating a connector.
- [#12895](https://github.com/emqx/emqx/pull/12895) Added some missing config keys for the DynamoDB connector and the action.
- [#13018](https://github.com/emqx/emqx/pull/13018) Reduced log spamming when connection goes down in a Postgres/Timescale/Matrix connector.
- [#13118](https://github.com/emqx/emqx/pull/13118) Fix a performance issue in the rule engine template rendering.
#### Observability
- [#12765](https://github.com/emqx/emqx/pull/12765) Make sure stats `subscribers.count` `subscribers.max` contains shared-subscribers. It only contains non-shared subscribers previously.
#### Operations and Management
- [#12812](https://github.com/emqx/emqx/pull/12812) Made resource health checks non-blocking operations. This means that operations such as updating or removing a resource won't be blocked by a lengthy running health check.
- [#12830](https://github.com/emqx/emqx/pull/12830) Made channel (action/source) health checks non-blocking operations. This means that operations such as updating or removing an action/source data integration won't be blocked by a lengthy running health check.
- [#12993](https://github.com/emqx/emqx/pull/12993) Fixed listener config update API when handling an unknown zone.
Before this fix, when a listener config is updated with an unknown zone, for example `{"zone": "unknown"}`, the change would be accepted, causing all clients to crash when connected.
After this fix, updating the listener with an unknown zone name will get a "Bad request" response.
- [#13012](https://github.com/emqx/emqx/pull/13012) The MQTT listerners config option `access_rules` has been improved in the following ways:
* The listener no longer crash with an incomprehensible error message if a non-valid access rule is configured. Instead a configuration error is generated.
* One can now add several rules in a single string by separating them by comma (for example, "allow 10.0.1.0/24, deny all").
- [#13041](https://github.com/emqx/emqx/pull/13041) Improved HTTP authentication error log message. If HTTP content-type header is missing for POST method, it now emits a meaningful error message instead of a less readable exception with stack trace.
- [#13077](https://github.com/emqx/emqx/pull/13077) This fix makes EMQX only read action configurations from the global configuration when the connector starts or restarts, and instead stores the latest configurations for the actions in the connector. Previously, updates to action configurations would sometimes not take effect without disabling and enabling the action. This means that an action could sometimes run with the old (previous) configuration even though it would look like the action configuration has been updated successfully.
- [#13090](https://github.com/emqx/emqx/pull/13090) Attempting to start an action or source whose connector is disabled will no longer attempt to start the connector itself.
- [#12871](https://github.com/emqx/emqx/pull/12871) Fixed startup process of evacuated node. Previously, if a node was evacuated and stoped without stopping evacuation, it would not start back.
- [#12888](https://github.com/emqx/emqx/pull/12888) Fixed License related configuration loss after importing backup data.
#### Gateways
- [#12909](https://github.com/emqx/emqx/pull/12909) Fixed UDP listener process handling on errors or closure, The fix ensures the UDP listener is cleanly stopped and restarted as needed if these error conditions occur.
- [#13001](https://github.com/emqx/emqx/pull/13001) Fixed an issue where the syskeeper forwarder would never reconnect when the connection was lost.
- [#13010](https://github.com/emqx/emqx/pull/13010) Fixed the issue where the JT/T 808 gateway could not correctly reply to the REGISTER_ACK message when requesting authentication from the registration service failed.
## Breaking Changes
- [#12947](https://github.com/emqx/emqx/pull/12947) For JWT authentication, a new boolean option `disconnect_after_expire` has been added with default value set to `true`. When enabled, the client will be disconnected after the JWT token expires.
Previously, the clients with actual JWTs could connect to the broker and stay connected even after the JWT token expired. Now, the client will be disconnected after the JWT token expires. To preserve the previous behavior, set `disconnect_after_expire` to `false`.
- [#12957](https://github.com/emqx/emqx/pull/12957) Stopped building packages for macOS 12.
- [#12895](https://github.com/emqx/emqx/pull/12895) Complemented some necessary but missed keys for the DynamoDB connector and the action. The old configuration is obsolete, as it didn't function properly before this fix. Specifically, for the DynamoDB connector, the addition of a new key, `region`, is required. Additionally, `hash_key` and `range_key` are now supported in the DynamoDB action, with `hash_key` being mandatory.

145
changes/v5.7.0.en.md Normal file
View File

@ -0,0 +1,145 @@
# v5.7.0
## Enhancements
#### MQTT
Implemented Durable Sessions, which persists MQTT Persistent Sessions and their messages to disk, and continuously replicates session metadata and MQTT messages among multiple nodes in the EMQX cluster. This achieves effective failover and recovery mechanisms, ensuring service continuity and high availability, thereby enhancing system reliability.
Added metrics related to EMQX durable storage to Prometheus:
- `emqx_ds_egress_batches`
- `emqx_ds_egress_batches_retry`
- `emqx_ds_egress_batches_failed`
- `emqx_ds_egress_messages`
- `emqx_ds_egress_bytes`
- `emqx_ds_egress_flush_time`
- `emqx_ds_store_batch_time`
- `emqx_ds_builtin_next_time`
- `emqx_ds_storage_bitfield_lts_counter_seek`
- `emqx_ds_storage_bitfield_lts_counter_next`
- `emqx_ds_storage_bitfield_lts_counter_collision`
Note: these metrics are only visible when session persistence is enabled.
The number of persisted messages has also been added to the Dashboard.
#### Security
- [#12947](https://github.com/emqx/emqx/pull/12947) For JWT authentication, support new `disconnect_after_expire` option. When enabled, the client will be disconnected after the JWT token expires.
Note: This is a breaking change. This option is enabled by default, so the default behavior is changed. Previously, the clients with actual JWTs could connect to the broker and stay connected even after the JWT token expired. Now, the client will be disconnected after the JWT token expires. To preserve the previous behavior, set `disconnect_after_expire` to `false`.
#### Data Processing and Integration
- [#12671](https://github.com/emqx/emqx/pull/12671) An `unescape` function has been added to the rule engine SQL language to handle the expansion of escape sequences in strings. This addition has been done because string literals in the SQL language don't support any escape codes (e.g., `\n` and `\t`). This enhancement allows for more flexible string manipulation within SQL expressions.
#### Extensibility
- [#12872](https://github.com/emqx/emqx/pull/12872) Implemented Client Attributes feature. It allows setting additional properties for each client using key-value pairs. Property values can be generated from MQTT client connection information (such as username, client ID, TLS certificate) or set from data accompanying successful authentication returns. Properties can be used in EMQX for authentication, authorization, data integration, and MQTT extension functions. Compared to using static properties like client ID directly, client properties offer greater flexibility in various business scenarios, simplifying the development process and enhancing adaptability and efficiency in development work.
**Initialization of `client_attrs`**
The `client_attrs` fields can be initially populated from one of the following `clientinfo` fields:
- `cn`: The common name from the TLS client's certificate.
- `dn`: The distinguished name from the TLS client's certificate, that is, the certificate "Subject".
- `clientid`: The MQTT client ID provided by the client.
- `username`: The username provided by the client.
- `user_property`: Extract a property value from 'User-Property' of the MQTT CONNECT packet.
**Extension through Authentication Responses**
Additional attributes may be merged into `client_attrs` from authentication responses. Supported
authentication backends include:
- **HTTP**: Attributes can be included in the JSON object of the HTTP response body through a
`client_attrs` field.
- **JWT**: Attributes can be included via a `client_attrs` claim within the JWT.
**Usage in Authentication and Authorization**
If `client_attrs` is initialized before authentication, it can be used in external authentication
requests. For instance, `${client_attrs.property1}` can be used within request templates
directed at an HTTP server for authenticity validation.
- The `client_attrs` can be utilized in authorization configurations or request templates, enhancing
flexibility and control. Examples include: In `acl.conf`, use `{allow, all, all, ["${client_attrs.namespace}/#"]}` to apply permissions based on the `namespace` attribute.
- In other authorization backends, `${client_attrs.namespace}` can be used within request templates to dynamically include client attributes.
- [#12910](https://github.com/emqx/emqx/pull/12910) Added plugin configuration management and schema validation. For EMQX enterprise edition, one can also annotate the schema with metadata to facilitate UI rendering in the Dashboard. See more details in the [plugin template](https://github.com/emqx/emqx-plugin-template/pull/126) and plugin [documentation](../extensions/plugins.md).
#### Operations and Management
- [#12923](https://github.com/emqx/emqx/pull/12923) Provided more specific error when importing wrong format into builtin authenticate database.
- [#12940](https://github.com/emqx/emqx/pull/12940) Added `ignore_readonly` argument to `PUT /configs` API.
Before this change, EMQX would return 400 (BAD_REQUEST) if the raw config included read-only root keys (`cluster`, `rpc`, and `node`).
After this enhancement it can be called as `PUT /configs?ignore_readonly=true`, EMQX will in this case ignore readonly root config keys, and apply the rest. For observability purposes, an info level message is logged if any readonly keys are dropped.
Also fixed an exception when config has bad HOCON syntax (returns 500). Now bad syntax will cause the API to return 400 (BAD_REQUEST).
- [#12957](https://github.com/emqx/emqx/pull/12957) Started building packages for macOS 14 (Apple Silicon) and Ubuntu 24.04 Noble Numbat (LTS).
## Bug Fixes
#### Security
- [#12887](https://github.com/emqx/emqx/pull/12887) Fixed MQTT enhanced auth with sasl scram.
- [#12962](https://github.com/emqx/emqx/pull/12962) TLS clients can now verify server hostname against wildcard certificate. For example, if a certificate is issued for host `*.example.com`, TLS clients is able to verify server hostnames like `srv1.example.com`.
#### MQTT
- [#12996](https://github.com/emqx/emqx/pull/12996) Fixed process leak in `emqx_retainer` application. Previously, client disconnection while receiving retained messages could cause a process leak.
#### Data Processing and Integration
- [#12653](https://github.com/emqx/emqx/pull/12653) The rule engine function `bin2hexstr` now supports bitstring inputs with a bit size that is not divisible by 8. Such bitstrings can be returned by the rule engine function `subbits`.
- [#12657](https://github.com/emqx/emqx/pull/12657) The rule engine SQL-based language previously did not allow putting any expressions as array elements in array literals (only constants and variable references were allowed). This has now been fixed so that one can use any expressions as array elements.
The following is now permitted, for example:
```bash
select
[21 + 21, abs(-abs(-2)), [1 + 1], 4] as my_array
from "t/#"
```
- [#12932](https://github.com/emqx/emqx/pull/12932) Previously, if a HTTP action request received a 503 (Service Unavailable) status, it was marked as a failure and the request was not retried. This has now been fixed so that the request is retried a configurable number of times.
- [#12948](https://github.com/emqx/emqx/pull/12948) Fixed an issue where sensitive HTTP header values like `Authorization` would be substituted by `******` after updating a connector.
- [#13118](https://github.com/emqx/emqx/pull/13118) Fix a performance issue in the rule engine template rendering.
#### Observability
- [#12765](https://github.com/emqx/emqx/pull/12765) Make sure stats `subscribers.count` `subscribers.max` contains shared-subscribers. It only contains non-shared subscribers previously.
#### Operations and Management
- [#12812](https://github.com/emqx/emqx/pull/12812) Made resource health checks non-blocking operations. This means that operations such as updating or removing a resource won't be blocked by a lengthy running health check.
- [#12830](https://github.com/emqx/emqx/pull/12830) Made channel (action/source) health checks non-blocking operations. This means that operations such as updating or removing an action/source data integration won't be blocked by a lengthy running health check.
- [#12993](https://github.com/emqx/emqx/pull/12993) Fixed listener config update API when handling an unknown zone.
Before this fix, when a listener config is updated with an unknown zone, for example `{"zone": "unknown"}`, the change would be accepted, causing all clients to crash whens connected.
After this fix, updating the listener with an unknown zone name will get a "Bad request" response.
- [#13012](https://github.com/emqx/emqx/pull/13012) The MQTT listerners config option `access_rules` has been improved in the following ways:
* The listener no longer crash with an incomprehensible error message if a non-valid access rule is configured. Instead a configuration error is generated.
* One can now add several rules in a single string by separating them by comma (for example, "allow 10.0.1.0/24, deny all").
- [#13041](https://github.com/emqx/emqx/pull/13041) Improved HTTP authentication error log message. If HTTP content-type header is missing for POST method, it now emits a meaningful error message instead of a less readable exception with stack trace.
- [#13077](https://github.com/emqx/emqx/pull/13077) This fix makes EMQX only read action configurations from the global configuration when the connector starts or restarts, and instead stores the latest configurations for the actions in the connector. Previously, updates to action configurations would sometimes not take effect without disabling and enabling the action. This means that an action could sometimes run with the old (previous) configuration even though it would look like the action configuration has been updated successfully.
- [#13090](https://github.com/emqx/emqx/pull/13090) Attempting to start an action or source whose connector is disabled will no longer attempt to start the connector itself.
#### Gateways
- [#12909](https://github.com/emqx/emqx/pull/12909) Fixed UDP listener process handling on errors or closure, The fix ensures the UDP listener is cleanly stopped and restarted as needed if these error conditions occur.
- [#13001](https://github.com/emqx/emqx/pull/13001) Fixed an issue where the syskeeper forwarder would never reconnect when the connection was lost.
- [#13010](https://github.com/emqx/emqx/pull/13010) Fixed the issue where the JT/T 808 gateway could not correctly reply to the REGISTER_ACK message when requesting authentication from the registration service failed.
## Breaking Changes
- [#12947](https://github.com/emqx/emqx/pull/12947) For JWT authentication, a new boolean option `disconnect_after_expire` has been added with default value set to `true`. When enabled, the client will be disconnected after the JWT token expires.
Previously, the clients with actual JWTs could connect to the broker and stay connected even after the JWT token expired. Now, the client will be disconnected after the JWT token expires. To preserve the previous behavior, set `disconnect_after_expire` to `false`.
- [#12957](https://github.com/emqx/emqx/pull/12957) Stopped building packages for macOS 12.

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes # This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version. # to the chart and its templates, including the app version.
version: 5.7.0-alpha.1 version: 5.7.0
# This is the version number of the application being deployed. This version number should be # This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. # incremented each time you make changes to the application.
appVersion: 5.7.0-alpha.1 appVersion: 5.7.0

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes # This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version. # to the chart and its templates, including the app version.
version: 5.6.1 version: 5.7.0
# This is the version number of the application being deployed. This version number should be # This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. # incremented each time you make changes to the application.
appVersion: 5.6.1 appVersion: 5.7.0

View File

@ -2,6 +2,7 @@ import os
import time import time
import unittest import unittest
import pytest import pytest
import requests
from urllib.parse import urljoin from urllib.parse import urljoin
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.common.by import By from selenium.webdriver.common.by import By
@ -77,6 +78,23 @@ def test_log(driver, login, dashboard_url):
label = driver.find_element(By.XPATH, "//div[@id='app']//form//label[contains(., 'Time Offset')]") label = driver.find_element(By.XPATH, "//div[@id='app']//form//label[contains(., 'Time Offset')]")
assert driver.find_elements(By.ID, label.get_attribute("for")) assert driver.find_elements(By.ID, label.get_attribute("for"))
def fetch_version_info(dashboard_url):
status_url = urljoin(dashboard_url, "/status?format=json")
response = requests.get(status_url)
response.raise_for_status()
return response.json()
def parse_version(version_str):
prefix_major, minor, _ = version_str.split('.', 2)
prefix = prefix_major[:1]
major = prefix_major[1:]
return prefix, major + '.' + minor
def fetch_version(url):
info = fetch_version_info(url)
version_str = info['rel_vsn']
return parse_version(version_str)
def test_docs_link(driver, login, dashboard_url): def test_docs_link(driver, login, dashboard_url):
dest_url = urljoin(dashboard_url, "/#/dashboard/overview") dest_url = urljoin(dashboard_url, "/#/dashboard/overview")
driver.get(dest_url) driver.get(dest_url)
@ -85,21 +103,19 @@ def test_docs_link(driver, login, dashboard_url):
link_help = driver.find_element(By.XPATH, xpath_link_help) link_help = driver.find_element(By.XPATH, xpath_link_help)
driver.execute_script("arguments[0].click();", link_help) driver.execute_script("arguments[0].click();", link_help)
emqx_name = os.getenv("EMQX_NAME") prefix, emqx_version = fetch_version(dashboard_url)
emqx_community_version = os.getenv("EMQX_COMMUNITY_VERSION") # it's v5.x in the url
emqx_enterprise_version = os.getenv("EMQX_ENTERPRISE_VERSION") emqx_version = 'v' + emqx_version
if emqx_name == 'emqx-enterprise':
emqx_version = f"v{emqx_enterprise_version}" if prefix == 'e':
docs_base_url = "https://docs.emqx.com/en/enterprise" docs_base_url = "https://docs.emqx.com/en/enterprise"
else: else:
emqx_version = f"v{emqx_community_version}"
docs_base_url = "https://www.emqx.io/docs/en" docs_base_url = "https://www.emqx.io/docs/en"
emqx_version = ".".join(emqx_version.split(".")[:2])
docs_url = f"{docs_base_url}/{emqx_version}" docs_url = f"{docs_base_url}/{emqx_version}"
xpath = f"//div[@id='app']//div[@class='nav-header']//a[@href[starts-with(.,'{docs_url}')]]" xpath = f"//div[@id='app']//div[@class='nav-header']//a[@href[starts-with(.,'{docs_url}')]]"
try: try:
driver.find_element(By.XPATH, xpath) driver.find_element(By.XPATH, xpath)
except NoSuchElementException: except NoSuchElementException:
raise AssertionError(f"Cannot find the doc URL for {emqx_name} version {emqx_version}, please make sure the dashboard package is up to date.") raise AssertionError(f"Cannot find the doc URL for version {emqx_version}, please make sure the dashboard package is up to date.")

View File

@ -9,10 +9,6 @@ services:
selenium: selenium:
shm_size: '2gb' shm_size: '2gb'
image: ghcr.io/emqx/selenium-chrome:latest image: ghcr.io/emqx/selenium-chrome:latest
environment:
EMQX_NAME: ${EMQX_NAME}
EMQX_COMMUNITY_VERSION: ${EMQX_VERSION}
EMQX_ENTERPRISE_VERSION: ${EMQX_ENTERPRISE_VERSION}
volumes: volumes:
- ./:/app - ./:/app
depends_on: depends_on: