Compare commits

...

19 Commits

Author SHA1 Message Date
Thales Macedo Garitezi 9d6954cf60
Merge pull request #13593 from thalesmg/20240808-r58-builtin-local-preconditions
feat(ds builtin local): add basic support for atomic batches + preconditions
2024-08-09 08:59:28 -03:00
lafirest 9a2f878017
Merge pull request #13573 from lafirest/feat/cattrs
feat(ruleengine): expose client_attrs to rule-engine
2024-08-09 14:43:46 +08:00
firest c35661f484 chore: update changes 2024-08-09 13:36:17 +08:00
firest 9abdff60a1 feat(ruleengine): expose client_attrs to rule-engine 2024-08-09 13:36:10 +08:00
Thales Macedo Garitezi a849e6df4c feat(ds builtin local): add basic support for atomic batches + preconditions 2024-08-08 16:37:21 -03:00
Thales Macedo Garitezi c00b178b57
Merge pull request #13589 from thalesmg/20240807-r58-rule-id-null
fix(rule engine api): check that user provided id is valid
2024-08-08 13:51:56 -03:00
Thales Macedo Garitezi f085973778 fix(rule engine api): check that user provided id is valid
Fixes https://emqx.atlassian.net/browse/EMQX-12838
2024-08-08 10:36:07 -03:00
Thales Macedo Garitezi a8882bd7fd
Merge pull request #13588 from thalesmg/20240807-r58-cluster-join-singleton
fix: use ekka when performing `emqx ctl cluster *`
2024-08-08 09:51:44 -03:00
Thales Macedo Garitezi 7711307909
Merge pull request #13590 from thalesmg/20240807-r58-test-flaky-serde
test: reduce inter-case flakiness
2024-08-08 08:56:33 -03:00
Andrew Mayorov 6849801293
Merge pull request #13561 from keynslug/fix/raft/bootstrap-wait-log
fix(dsraft): use shard readiness as criterion for reads availability
2024-08-08 10:29:48 +03:00
Thales Macedo Garitezi 6c4cfeed92 test: reduce inter-case flakiness 2024-08-07 15:58:45 -03:00
Thales Macedo Garitezi de9e619c96 fix: use ekka when performing `emqx ctl cluster *`
Fixes https://emqx.atlassian.net/browse/EMQX-12824
2024-08-07 10:41:48 -03:00
Andrew Mayorov 8d88d14f0a
test(dsraft): use bootstrap as readiness criterion
In another attempt to stabilize the rest of flaky testcases.
2024-08-07 10:38:22 +02:00
Andrew Mayorov ff72d55491
fix(dsraft): replace unused clause with catch-all one
Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
2024-08-07 09:14:13 +02:00
Andrew Mayorov 42e4a635e0
chore(dsraft): sprinkle shard bootstrap process with tracepoints 2024-08-06 21:26:54 +02:00
Andrew Mayorov 26ddc403c8
fix(dsraft): avoid tight loop in shard bootstrap 2024-08-06 21:26:54 +02:00
Andrew Mayorov 4971fd3eaf
chore(dsraft): make shard info pterms saner and more visible 2024-08-06 21:26:54 +02:00
Andrew Mayorov 5b15886836
fix(dsraft): use shard readiness as criterion for reads availability 2024-08-06 21:26:54 +02:00
Andrew Mayorov 10dadbad3b
fix(dsraft): add more involved shard bootstrapping
Namely, attempt to make sure log is sufficiently replayed on the shard
server, before announcing it is "ready".
2024-08-06 21:26:50 +02:00
21 changed files with 548 additions and 110 deletions

View File

@ -31,7 +31,7 @@
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.3"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},

View File

@ -621,16 +621,16 @@ save_to_config_map(Conf, RawConf) ->
?MODULE:put_raw(RawConf).
-spec save_to_override_conf(boolean(), raw_config(), update_opts()) -> ok | {error, term()}.
save_to_override_conf(_, undefined, _) ->
save_to_override_conf(_HasDeprecatedFile, undefined, _) ->
ok;
save_to_override_conf(true, RawConf, Opts) ->
save_to_override_conf(true = _HasDeprecatedFile, RawConf, Opts) ->
case deprecated_conf_file(Opts) of
undefined ->
ok;
FileName ->
backup_and_write(FileName, hocon_pp:do(RawConf, Opts))
end;
save_to_override_conf(false, RawConf, Opts) ->
save_to_override_conf(false = _HasDeprecatedFile, RawConf, Opts) ->
case cluster_hocon_file() of
undefined ->
ok;

View File

@ -689,18 +689,9 @@ all() ->
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
%% TODO: Remove once builtin-local supports preconditions + atomic batches.
BuiltinLocalTCs =
TCs --
[
t_09_atomic_store_batch,
t_11_batch_preconditions,
t_12_batch_precondition_conflicts
],
BuiltinRaftTCs = TCs,
[
{builtin_local, BuiltinLocalTCs},
{builtin_raft, BuiltinRaftTCs}
{builtin_local, TCs},
{builtin_raft, TCs}
].
init_per_group(builtin_local, Config) ->

View File

@ -49,7 +49,9 @@
%% Internal exports:
-export([
do_next/3,
do_delete_next/4
do_delete_next/4,
%% Used by batch serializer
make_batch/3
]).
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
@ -88,7 +90,10 @@
#{
backend := builtin_local,
storage := emqx_ds_storage_layer:prototype(),
n_shards := pos_integer()
n_shards := pos_integer(),
%% Inherited from `emqx_ds:generic_db_opts()`.
force_monotonic_timestamps => boolean(),
atomic_batches => boolean()
}.
-type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}.
@ -193,9 +198,17 @@ drop_db(DB) ->
),
emqx_ds_builtin_local_meta:drop_db(DB).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
-spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) ->
store_batch(DB, Batch, Opts) ->
case emqx_ds_builtin_local_meta:db_config(DB) of
#{atomic_batches := true} ->
store_batch_atomic(DB, Batch, Opts);
_ ->
store_batch_buffered(DB, Batch, Opts)
end.
store_batch_buffered(DB, Messages, Opts) ->
try
emqx_ds_buffer:store_batch(DB, Messages, Opts)
catch
@ -203,6 +216,34 @@ store_batch(DB, Messages, Opts) ->
{error, recoverable, Reason}
end.
store_batch_atomic(DB, Batch, Opts) ->
Shards = shards_of_batch(DB, Batch),
case Shards of
[Shard] ->
emqx_ds_builtin_local_batch_serializer:store_batch_atomic(DB, Shard, Batch, Opts);
[] ->
ok;
[_ | _] ->
{error, unrecoverable, atomic_batch_spans_multiple_shards}
end.
shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) ->
shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, []));
shards_of_batch(DB, Operations) ->
shards_of_batch(DB, Operations, []).
shards_of_batch(DB, [Operation | Rest], Acc) ->
case shard_of_operation(DB, Operation, clientid, #{}) of
Shard when Shard =:= hd(Acc) ->
shards_of_batch(DB, Rest, Acc);
Shard when Acc =:= [] ->
shards_of_batch(DB, Rest, [Shard]);
ShardAnother ->
[ShardAnother | Acc]
end;
shards_of_batch(_DB, [], Acc) ->
Acc.
-record(bs, {options :: emqx_ds:create_db_opts()}).
-type buffer_state() :: #bs{}.

View File

@ -0,0 +1,122 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_builtin_local_batch_serializer).
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
%% API
-export([
start_link/3,
store_batch_atomic/4
]).
%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2
]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(name(DB, SHARD), {n, l, {?MODULE, DB, SHARD}}).
-define(via(DB, SHARD), {via, gproc, ?name(DB, SHARD)}).
-record(store_batch_atomic, {batch :: emqx_ds:batch(), opts :: emqx_ds:message_store_opts()}).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
start_link(DB, Shard, _Opts) ->
gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
store_batch_atomic(DB, Shard, Batch, Opts) ->
gen_server:call(?via(DB, Shard), #store_batch_atomic{batch = Batch, opts = Opts}, infinity).
%%------------------------------------------------------------------------------
%% `gen_server' API
%%------------------------------------------------------------------------------
init([DB, Shard]) ->
process_flag(message_queue_data, off_heap),
State = #{
db => DB,
shard => Shard
},
{ok, State}.
handle_call(#store_batch_atomic{batch = Batch, opts = StoreOpts}, _From, State) ->
ShardId = shard_id(State),
DBOpts = db_config(State),
Result = do_store_batch_atomic(ShardId, Batch, DBOpts, StoreOpts),
{reply, Result, State};
handle_call(Call, _From, State) ->
{reply, {error, {unknown_call, Call}}, State}.
handle_cast(_Cast, State) ->
{noreply, State}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
shard_id(#{db := DB, shard := Shard}) ->
{DB, Shard}.
db_config(#{db := DB}) ->
emqx_ds_builtin_local_meta:db_config(DB).
-spec do_store_batch_atomic(
emqx_ds_storage_layer:shard_id(),
emqx_ds:dsbatch(),
emqx_ds_builtin_local:db_opts(),
emqx_ds:message_store_opts()
) ->
emqx_ds:store_batch_result().
do_store_batch_atomic(ShardId, #dsbatch{} = Batch, DBOpts, StoreOpts) ->
#dsbatch{
operations = Operations0,
preconditions = Preconditions
} = Batch,
case emqx_ds_precondition:verify(emqx_ds_storage_layer, ShardId, Preconditions) of
ok ->
do_store_operations(ShardId, Operations0, DBOpts, StoreOpts);
{precondition_failed, _} = PreconditionFailed ->
{error, unrecoverable, PreconditionFailed};
Error ->
Error
end;
do_store_batch_atomic(ShardId, Operations, DBOpts, StoreOpts) ->
do_store_operations(ShardId, Operations, DBOpts, StoreOpts).
do_store_operations(ShardId, Operations0, DBOpts, _StoreOpts) ->
ForceMonotonic = maps:get(force_monotonic_timestamps, DBOpts),
{Latest, Operations} =
emqx_ds_builtin_local:make_batch(
ForceMonotonic,
current_timestamp(ShardId),
Operations0
),
Result = emqx_ds_storage_layer:store_batch(ShardId, Operations, _Options = #{}),
emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest),
Result.
current_timestamp(ShardId) ->
emqx_ds_builtin_local_meta:current_timestamp(ShardId).

View File

@ -158,7 +158,8 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
Opts = emqx_ds_builtin_local_meta:db_config(DB),
Children = [
shard_storage_spec(DB, Shard, Opts),
shard_buffer_spec(DB, Shard, Opts)
shard_buffer_spec(DB, Shard, Opts),
shard_batch_serializer_spec(DB, Shard, Opts)
],
{ok, {SupFlags, Children}}.
@ -208,6 +209,15 @@ shard_buffer_spec(DB, Shard, Options) ->
type => worker
}.
shard_batch_serializer_spec(DB, Shard, Opts) ->
#{
id => {Shard, batch_serializer},
start => {emqx_ds_builtin_local_batch_serializer, start_link, [DB, Shard, Opts]},
shutdown => 5_000,
restart => permanent,
type => worker
}.
ensure_started(Res) ->
case Res of
{ok, _Pid} ->

View File

@ -479,10 +479,10 @@ shards_of_batch(_DB, [], Acc) ->
%% TODO
%% There's a possibility of race condition: storage may shut down right after we
%% ask for its status.
-define(IF_STORAGE_RUNNING(SHARDID, EXPR),
case emqx_ds_storage_layer:shard_info(SHARDID, status) of
running -> EXPR;
down -> {error, recoverable, storage_down}
-define(IF_SHARD_READY(DB, SHARD, EXPR),
case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of
true -> EXPR;
false -> {error, recoverable, shard_unavailable}
end
).
@ -525,8 +525,9 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
[{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down).
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
?IF_SHARD_READY(
DB,
Shard,
emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
).
@ -552,8 +553,9 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
?IF_SHARD_READY(
DB,
Shard,
emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
).
@ -587,8 +589,9 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
do_next_v1(DB, Shard, Iter, BatchSize) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
?IF_SHARD_READY(
DB,
Shard,
emqx_ds_storage_layer:next(
ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
)
@ -620,8 +623,9 @@ do_add_generation_v2(_DB) ->
| emqx_ds:error(storage_down).
do_list_generations_with_lifetimes_v3(DB, Shard) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
?IF_SHARD_READY(
DB,
Shard,
emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
).

View File

@ -18,7 +18,8 @@
%% Dynamic server location API
-export([
servers/3
servers/3,
shard_info/3
]).
%% Safe Process Command API
@ -38,8 +39,10 @@
-behaviour(gen_server).
-export([
init/1,
handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).
@ -52,6 +55,9 @@
| {error, servers_unreachable}.
-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000).
-define(MAX_BOOSTRAP_RETRY_TIMEOUT, 1_000).
-define(PTERM(DB, SHARD, KEY), {?MODULE, DB, SHARD, KEY}).
%%
@ -160,6 +166,12 @@ local_site() ->
%%
-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), _Info) -> _Value.
shard_info(DB, Shard, ready) ->
get_shard_info(DB, Shard, ready, false).
%%
-spec process_command([server()], _Command, timeout()) ->
{ok, _Result, _Leader :: server()} | server_error().
process_command(Servers, Command, Timeout) ->
@ -324,10 +336,45 @@ ra_overview(Server) ->
%%
-record(st, {
db :: emqx_ds:db(),
shard :: emqx_ds_replication_layer:shard_id(),
server :: server(),
bootstrapped :: boolean(),
stage :: term()
}).
init({DB, Shard, Opts}) ->
_ = process_flag(trap_exit, true),
ok = start_server(DB, Shard, Opts),
{ok, {DB, Shard}}.
case start_server(DB, Shard, Opts) of
{_New = true, Server} ->
NextStage = trigger_election;
{_New = false, Server} ->
NextStage = wait_leader
end,
St = #st{
db = DB,
shard = Shard,
server = Server,
bootstrapped = false,
stage = NextStage
},
{ok, St, {continue, bootstrap}}.
handle_continue(bootstrap, St = #st{bootstrapped = true}) ->
{noreply, St};
handle_continue(bootstrap, St0 = #st{db = DB, shard = Shard, stage = Stage}) ->
?tp(emqx_ds_replshard_bootstrapping, #{db => DB, shard => Shard, stage => Stage}),
case bootstrap(St0) of
St = #st{bootstrapped = true} ->
?tp(emqx_ds_replshard_bootstrapped, #{db => DB, shard => Shard}),
{noreply, St};
St = #st{bootstrapped = false} ->
{noreply, St, {continue, bootstrap}};
{retry, Timeout, St} ->
_TRef = erlang:start_timer(Timeout, self(), bootstrap),
{noreply, St}
end.
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
@ -335,7 +382,14 @@ handle_call(_Call, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
terminate(_Reason, {DB, Shard}) ->
handle_info({timeout, _TRef, bootstrap}, St) ->
{noreply, St, {continue, bootstrap}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #st{db = DB, shard = Shard}) ->
%% NOTE: Mark as not ready right away.
ok = erase_shard_info(DB, Shard),
%% NOTE: Timeouts are ignored, it's a best effort attempt.
catch prep_stop_server(DB, Shard),
LocalServer = get_local_server(DB, Shard),
@ -343,6 +397,40 @@ terminate(_Reason, {DB, Shard}) ->
%%
bootstrap(St = #st{stage = trigger_election, server = Server}) ->
ok = trigger_election(Server),
St#st{stage = wait_leader};
bootstrap(St = #st{stage = wait_leader, server = Server}) ->
case current_leader(Server) of
Leader = {_, _} ->
St#st{stage = {wait_log, Leader}};
unknown ->
St
end;
bootstrap(St = #st{stage = {wait_log, Leader}}) ->
case ra_overview(Leader) of
#{commit_index := RaftIdx} ->
St#st{stage = {wait_log_index, RaftIdx}};
#{} ->
St#st{stage = wait_leader}
end;
bootstrap(St = #st{stage = {wait_log_index, RaftIdx}, db = DB, shard = Shard, server = Server}) ->
Overview = ra_overview(Server),
case maps:get(last_applied, Overview, 0) of
LastApplied when LastApplied >= RaftIdx ->
ok = announce_shard_ready(DB, Shard),
St#st{bootstrapped = true, stage = undefined};
LastApplied ->
%% NOTE
%% Blunt estimate of time shard needs to catch up. If this proves to be too long in
%% practice, it's could be augmented with handling `recover` -> `follower` Ra
%% member state transition.
Timeout = min(RaftIdx - LastApplied, ?MAX_BOOSTRAP_RETRY_TIMEOUT),
{retry, Timeout, St}
end.
%%
start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard),
@ -350,7 +438,6 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
MutableConfig = #{tick_timeout => 100},
case ra:restart_server(DB, LocalServer, MutableConfig) of
{error, name_not_registered} ->
Bootstrap = true,
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
LogOpts = maps:with(
[
@ -366,30 +453,34 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
initial_members => Servers,
machine => Machine,
log_init_args => LogOpts
});
}),
{_NewServer = true, LocalServer};
ok ->
Bootstrap = false;
{_NewServer = false, LocalServer};
{error, {already_started, _}} ->
Bootstrap = false
end,
{_NewServer = false, LocalServer}
end.
trigger_election(Server) ->
%% NOTE
%% Triggering election is necessary when a new consensus group is being brought up.
%% TODO
%% It's probably a good idea to rebalance leaders across the cluster from time to
%% time. There's `ra:transfer_leadership/2` for that.
try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of
false ->
ok;
ok ->
ok
try ra:trigger_election(Server) of
ok -> ok
catch
%% TODO
%% NOTE
%% Tolerating exceptions because server might be occupied with log replay for
%% a while.
exit:{timeout, _} when not Bootstrap ->
exit:{timeout, _} ->
?tp(emqx_ds_replshard_trigger_election, #{server => Server, error => timeout}),
ok
end.
announce_shard_ready(DB, Shard) ->
set_shard_info(DB, Shard, ready, true).
server_uid(_DB, Shard) ->
%% NOTE
%% Each new "instance" of a server should have a unique identifier. Otherwise,
@ -402,6 +493,22 @@ server_uid(_DB, Shard) ->
%%
get_shard_info(DB, Shard, K, Default) ->
persistent_term:get(?PTERM(DB, Shard, K), Default).
set_shard_info(DB, Shard, K, V) ->
persistent_term:put(?PTERM(DB, Shard, K), V).
erase_shard_info(DB, Shard) ->
lists:foreach(fun(K) -> erase_shard_info(DB, Shard, K) end, [
ready
]).
erase_shard_info(DB, Shard, K) ->
persistent_term:erase(?PTERM(DB, Shard, K)).
%%
prep_stop_server(DB, Shard) ->
prep_stop_server(DB, Shard, 5_000).

View File

@ -131,7 +131,6 @@ t_replication_transfers_snapshots(Config) ->
%% Initialize DB on all nodes and wait for it to be online.
Opts = opts(Config, #{n_shards => 1, n_sites => 3}),
assert_db_open(Nodes, ?DB, Opts),
assert_db_stable(Nodes, ?DB),
%% Stop the DB on the "offline" node.
?wait_async_action(
@ -207,7 +206,6 @@ t_rebalance(Config) ->
%% 1. Initialize DB on the first node.
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
assert_db_open(Nodes, ?DB, Opts),
assert_db_stable(Nodes, ?DB),
%% 1.1 Kick all sites except S1 from the replica set as
%% the initial condition:
@ -419,7 +417,6 @@ t_rebalance_chaotic_converges(Config) ->
%% Open DB:
assert_db_open(Nodes, ?DB, Opts),
assert_db_stable(Nodes, ?DB),
%% Kick N3 from the replica set as the initial condition:
?assertMatch(
@ -503,7 +500,6 @@ t_rebalance_offline_restarts(Config) ->
%% Initialize DB on all 3 nodes.
Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}),
assert_db_open(Nodes, ?DB, Opts),
assert_db_stable(Nodes, ?DB),
?retry(
1000,
@ -845,13 +841,11 @@ t_crash_restart_recover(Config) ->
?check_trace(
begin
%% Initialize DB on all nodes.
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts])
),
assert_db_open(Nodes, ?DB, DBOpts),
%% Apply the test events, including simulated node crashes.
NodeStream = emqx_utils_stream:const(N1),
StartedAt = erlang:monotonic_time(millisecond),
emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0),
%% It's expected to lose few messages when leaders are abruptly killed.
@ -865,6 +859,10 @@ t_crash_restart_recover(Config) ->
ct:pal("Some messages were lost: ~p", [LostMessages]),
?assert(length(LostMessages) < NMsgs div 20),
%% Wait until crashed nodes are ready.
SinceStarted = erlang:monotonic_time(millisecond) - StartedAt,
wait_db_bootstrapped([N2, N3], ?DB, infinity, SinceStarted),
%% Verify that all the successfully persisted messages are there.
VerifyClient = fun({ClientId, ExpectedStream}) ->
Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId),
@ -926,7 +924,8 @@ assert_db_open(Nodes, DB, Opts) ->
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts])
).
),
wait_db_bootstrapped(Nodes, ?DB).
assert_db_stable([Node | _], DB) ->
Shards = ds_repl_meta(Node, shards, [DB]),
@ -935,6 +934,32 @@ assert_db_stable([Node | _], DB) ->
db_leadership(Node, DB, Shards)
).
wait_db_bootstrapped(Nodes, DB) ->
wait_db_bootstrapped(Nodes, DB, infinity, infinity).
wait_db_bootstrapped(Nodes, DB, Timeout, BackInTime) ->
SRefs = [
snabbkaffe:subscribe(
?match_event(#{
?snk_kind := emqx_ds_replshard_bootstrapped,
?snk_meta := #{node := Node},
db := DB,
shard := Shard
}),
1,
Timeout,
BackInTime
)
|| Node <- Nodes,
Shard <- ds_repl_meta(Node, my_shards, [DB])
],
lists:foreach(
fun({ok, SRef}) ->
?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef))
end,
SRefs
).
%%
db_leadership(Node, DB, Shards) ->

View File

@ -55,6 +55,7 @@
topic_filter/0,
topic/0,
batch/0,
dsbatch/0,
operation/0,
deletion/0,
precondition/0,
@ -104,7 +105,9 @@
-type message_matcher(Payload) :: #message_matcher{payload :: Payload}.
%% A batch of storage operations.
-type batch() :: [operation()] | #dsbatch{}.
-type batch() :: [operation()] | dsbatch().
-type dsbatch() :: #dsbatch{}.
-type operation() ::
%% Store a message.

View File

@ -97,7 +97,7 @@ broker(_) ->
%% @doc Cluster with other nodes
cluster(["join", SNode]) ->
case mria:join(ekka_node:parse_name(SNode)) of
case ekka:join(ekka_node:parse_name(SNode)) of
ok ->
emqx_ctl:print("Join the cluster successfully.~n"),
%% FIXME: running status on the replicant immediately
@ -112,7 +112,7 @@ cluster(["join", SNode]) ->
end;
cluster(["leave"]) ->
_ = maybe_disable_autocluster(),
case mria:leave() of
case ekka:leave() of
ok ->
emqx_ctl:print("Leave the cluster successfully.~n"),
cluster(["status"]);
@ -121,7 +121,7 @@ cluster(["leave"]) ->
end;
cluster(["force-leave", SNode]) ->
Node = ekka_node:parse_name(SNode),
case mria:force_leave(Node) of
case ekka:force_leave(Node) of
ok ->
case emqx_cluster_rpc:force_leave_clean(Node) of
ok ->

View File

@ -20,6 +20,7 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-behaviour(minirest_api).
@ -385,16 +386,16 @@ param_path_id() ->
case maps:get(<<"id">>, Params0, list_to_binary(emqx_utils:gen_id(8))) of
<<>> ->
{400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
Id ->
Id when is_binary(Id) ->
Params = filter_out_request_body(add_metadata(Params0)),
case emqx_rule_engine:get_rule(Id) of
{ok, _Rule} ->
{400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
?BAD_REQUEST(<<"rule id already exists">>);
not_found ->
ConfPath = ?RULE_PATH(Id),
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
{ok, #{post_config_update := #{emqx_rule_engine := Rule}}} ->
{201, format_rule_info_resp(Rule)};
?CREATED(format_rule_info_resp(Rule));
{error, Reason} ->
?SLOG(
info,
@ -405,9 +406,11 @@ param_path_id() ->
},
#{tag => ?TAG}
),
{400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
?BAD_REQUEST(?ERR_BADARGS(Reason))
end
end
end;
_BadId ->
?BAD_REQUEST(<<"rule id must be a string">>)
end.
'/rule_test'(post, #{body := Params}) ->

View File

@ -334,13 +334,14 @@ eventmsg_publish(
qos => QoS,
flags => Flags,
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
publish_received_at => Timestamp,
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
},
#{headers => Headers}
).
eventmsg_connected(
_ClientInfo = #{
ClientInfo = #{
clientid := ClientId,
username := Username,
is_bridge := IsBridge,
@ -375,13 +376,14 @@ eventmsg_connected(
expiry_interval => ExpiryInterval div 1000,
is_bridge => IsBridge,
conn_props => printable_maps(ConnProps),
connected_at => ConnectedAt
connected_at => ConnectedAt,
client_attrs => maps:get(client_attrs, ClientInfo, #{})
},
#{}
).
eventmsg_disconnected(
_ClientInfo = #{
ClientInfo = #{
clientid := ClientId,
username := Username
},
@ -405,7 +407,8 @@ eventmsg_disconnected(
proto_name => ProtoName,
proto_ver => ProtoVer,
disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
disconnected_at => DisconnectedAt
disconnected_at => DisconnectedAt,
client_attrs => maps:get(client_attrs, ClientInfo, #{})
},
#{}
).
@ -444,7 +447,7 @@ eventmsg_connack(
).
eventmsg_check_authz_complete(
_ClientInfo = #{
ClientInfo = #{
clientid := ClientId,
username := Username,
peerhost := PeerHost,
@ -465,13 +468,14 @@ eventmsg_check_authz_complete(
topic => Topic,
action => PubSub,
authz_source => AuthzSource,
result => Result
result => Result,
client_attrs => maps:get(client_attrs, ClientInfo, #{})
},
#{}
).
eventmsg_check_authn_complete(
_ClientInfo = #{
ClientInfo = #{
clientid := ClientId,
username := Username,
peername := PeerName
@ -493,14 +497,15 @@ eventmsg_check_authn_complete(
peername => ntoa(PeerName),
reason_code => force_to_bin(Reason),
is_anonymous => IsAnonymous,
is_superuser => IsSuperuser
is_superuser => IsSuperuser,
client_attrs => maps:get(client_attrs, ClientInfo, #{})
},
#{}
).
eventmsg_sub_or_unsub(
Event,
_ClientInfo = #{
ClientInfo = #{
clientid := ClientId,
username := Username,
peerhost := PeerHost,
@ -519,7 +524,8 @@ eventmsg_sub_or_unsub(
peername => ntoa(PeerName),
PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
topic => Topic,
qos => QoS
qos => QoS,
client_attrs => maps:get(client_attrs, ClientInfo, #{})
},
#{}
).
@ -551,7 +557,8 @@ eventmsg_dropped(
qos => QoS,
flags => Flags,
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
publish_received_at => Timestamp,
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
},
#{headers => Headers}
).
@ -583,7 +590,8 @@ eventmsg_transformation_failed(
qos => QoS,
flags => Flags,
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
publish_received_at => Timestamp,
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
},
#{headers => Headers}
).
@ -616,7 +624,8 @@ eventmsg_validation_failed(
qos => QoS,
flags => Flags,
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
publish_received_at => Timestamp,
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
},
#{headers => Headers}
).
@ -654,7 +663,8 @@ eventmsg_delivered(
qos => QoS,
flags => Flags,
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
publish_received_at => Timestamp,
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
},
#{headers => Headers}
).
@ -693,7 +703,8 @@ eventmsg_acked(
flags => Flags,
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
publish_received_at => Timestamp
publish_received_at => Timestamp,
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
},
#{headers => Headers}
).
@ -733,7 +744,8 @@ eventmsg_delivery_dropped(
qos => QoS,
flags => Flags,
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
publish_received_at => Timestamp,
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
},
#{headers => Headers}
).

View File

@ -112,7 +112,8 @@ groups() ->
t_sqlparse_undefined_variable,
t_sqlparse_new_map,
t_sqlparse_invalid_json,
t_sqlselect_as_put
t_sqlselect_as_put,
t_sqlselect_client_attr
]},
{events, [], [
t_events,
@ -3891,6 +3892,57 @@ t_trace_rule_id(_Config) ->
?assertEqual([], emqx_trace_handler:running()),
emqtt:disconnect(T).
t_sqlselect_client_attr(_) ->
ClientId = atom_to_binary(?FUNCTION_NAME),
{ok, Compiled} = emqx_variform:compile("user_property.group"),
emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], [
#{
expression => Compiled,
set_as_attr => <<"group">>
},
#{
expression => Compiled,
set_as_attr => <<"group2">>
}
]),
SQL =
"SELECT client_attrs as payload FROM \"t/1\" ",
Repub = republish_action(<<"t/2">>),
{ok, _TopicRule} = emqx_rule_engine:create_rule(
#{
sql => SQL,
id => ?TMP_RULEID,
actions => [Repub]
}
),
{ok, Client} = emqtt:start_link([
{clientid, ClientId},
{proto_ver, v5},
{properties, #{'User-Property' => [{<<"group">>, <<"g1">>}]}}
]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t/2">>, 0),
ct:sleep(100),
emqtt:publish(Client, <<"t/1">>, <<"Hello">>),
receive
{publish, #{topic := Topic, payload := Payload}} ->
?assertEqual(<<"t/2">>, Topic),
?assertMatch(
#{<<"group">> := <<"g1">>, <<"group2">> := <<"g1">>},
emqx_utils_json:decode(Payload)
)
after 1000 ->
ct:fail(wait_for_t_2)
end,
emqtt:disconnect(Client),
emqx_rule_engine:delete_rule(?TMP_RULEID),
emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], []).
%%------------------------------------------------------------------------------
%% Internal helpers
%%------------------------------------------------------------------------------
@ -3990,7 +4042,8 @@ verify_event_fields('message.publish', Fields) ->
flags := Flags,
pub_props := Properties,
timestamp := Timestamp,
publish_received_at := EventAt
publish_received_at := EventAt,
client_attrs := ClientAttrs
} = Fields,
Now = erlang:system_time(millisecond),
TimestampElapse = Now - Timestamp,
@ -4007,7 +4060,8 @@ verify_event_fields('message.publish', Fields) ->
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
?assert(EventAt =< Timestamp);
?assert(EventAt =< Timestamp),
?assert(is_map(ClientAttrs));
verify_event_fields('client.connected', Fields) ->
#{
clientid := ClientId,
@ -4023,7 +4077,8 @@ verify_event_fields('client.connected', Fields) ->
is_bridge := IsBridge,
conn_props := Properties,
timestamp := Timestamp,
connected_at := EventAt
connected_at := EventAt,
client_attrs := ClientAttrs
} = Fields,
Now = erlang:system_time(millisecond),
TimestampElapse = Now - Timestamp,
@ -4042,7 +4097,8 @@ verify_event_fields('client.connected', Fields) ->
?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
?assert(EventAt =< Timestamp);
?assert(EventAt =< Timestamp),
?assert(is_map(ClientAttrs));
verify_event_fields('client.disconnected', Fields) ->
#{
reason := Reason,
@ -4052,7 +4108,8 @@ verify_event_fields('client.disconnected', Fields) ->
sockname := SockName,
disconn_props := Properties,
timestamp := Timestamp,
disconnected_at := EventAt
disconnected_at := EventAt,
client_attrs := ClientAttrs
} = Fields,
Now = erlang:system_time(millisecond),
TimestampElapse = Now - Timestamp,
@ -4065,7 +4122,8 @@ verify_event_fields('client.disconnected', Fields) ->
?assertMatch(#{'User-Property' := #{<<"reason">> := <<"normal">>}}, Properties),
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
?assert(EventAt =< Timestamp);
?assert(EventAt =< Timestamp),
?assert(is_map(ClientAttrs));
verify_event_fields(SubUnsub, Fields) when
SubUnsub == 'session.subscribed';
SubUnsub == 'session.unsubscribed'
@ -4077,7 +4135,8 @@ verify_event_fields(SubUnsub, Fields) when
peername := PeerName,
topic := Topic,
qos := QoS,
timestamp := Timestamp
timestamp := Timestamp,
client_attrs := ClientAttrs
} = Fields,
Now = erlang:system_time(millisecond),
TimestampElapse = Now - Timestamp,
@ -4097,7 +4156,8 @@ verify_event_fields(SubUnsub, Fields) when
#{'User-Property' := #{<<"topic_name">> := <<"t1">>}},
maps:get(PropKey, Fields)
),
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000);
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
?assert(is_map(ClientAttrs));
verify_event_fields('delivery.dropped', Fields) ->
#{
event := 'delivery.dropped',
@ -4117,7 +4177,8 @@ verify_event_fields('delivery.dropped', Fields) ->
qos := QoS,
flags := Flags,
timestamp := Timestamp,
topic := Topic
topic := Topic,
client_attrs := ClientAttrs
} = Fields,
Now = erlang:system_time(millisecond),
TimestampElapse = Now - Timestamp,
@ -4139,7 +4200,8 @@ verify_event_fields('delivery.dropped', Fields) ->
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
?assert(EventAt =< Timestamp);
?assert(EventAt =< Timestamp),
?assert(is_map(ClientAttrs));
verify_event_fields('message.dropped', Fields) ->
#{
id := ID,
@ -4154,7 +4216,8 @@ verify_event_fields('message.dropped', Fields) ->
flags := Flags,
pub_props := Properties,
timestamp := Timestamp,
publish_received_at := EventAt
publish_received_at := EventAt,
client_attrs := ClientAttrs
} = Fields,
Now = erlang:system_time(millisecond),
TimestampElapse = Now - Timestamp,
@ -4172,7 +4235,8 @@ verify_event_fields('message.dropped', Fields) ->
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
?assert(EventAt =< Timestamp);
?assert(EventAt =< Timestamp),
?assert(is_map(ClientAttrs));
verify_event_fields('message.delivered', Fields) ->
#{
id := ID,
@ -4188,7 +4252,8 @@ verify_event_fields('message.delivered', Fields) ->
flags := Flags,
pub_props := Properties,
timestamp := Timestamp,
publish_received_at := EventAt
publish_received_at := EventAt,
client_attrs := ClientAttrs
} = Fields,
Now = erlang:system_time(millisecond),
TimestampElapse = Now - Timestamp,
@ -4207,7 +4272,8 @@ verify_event_fields('message.delivered', Fields) ->
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
?assert(EventAt =< Timestamp);
?assert(EventAt =< Timestamp),
?assert(is_map(ClientAttrs));
verify_event_fields('message.acked', Fields) ->
#{
id := ID,
@ -4224,7 +4290,8 @@ verify_event_fields('message.acked', Fields) ->
pub_props := PubProps,
puback_props := PubAckProps,
timestamp := Timestamp,
publish_received_at := EventAt
publish_received_at := EventAt,
client_attrs := ClientAttrs
} = Fields,
Now = erlang:system_time(millisecond),
TimestampElapse = Now - Timestamp,
@ -4244,7 +4311,8 @@ verify_event_fields('message.acked', Fields) ->
?assert(is_map(PubAckProps)),
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
?assert(EventAt =< Timestamp);
?assert(EventAt =< Timestamp),
?assert(is_map(ClientAttrs));
verify_event_fields('client.connack', Fields) ->
#{
clientid := ClientId,
@ -4282,7 +4350,8 @@ verify_event_fields('client.check_authz_complete', Fields) ->
peername := PeerName,
topic := Topic,
authz_source := AuthzSource,
username := Username
username := Username,
client_attrs := ClientAttrs
} = Fields,
?assertEqual(<<"t1">>, Topic),
?assert(lists:member(Action, [subscribe, publish])),
@ -4302,20 +4371,23 @@ verify_event_fields('client.check_authz_complete', Fields) ->
])
),
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>]));
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
?assert(is_map(ClientAttrs));
verify_event_fields('client.check_authn_complete', Fields) ->
#{
clientid := ClientId,
peername := PeerName,
username := Username,
is_anonymous := IsAnonymous,
is_superuser := IsSuperuser
is_superuser := IsSuperuser,
client_attrs := ClientAttrs
} = Fields,
verify_peername(PeerName),
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
?assert(erlang:is_boolean(IsAnonymous)),
?assert(erlang:is_boolean(IsSuperuser));
?assert(erlang:is_boolean(IsSuperuser)),
?assert(is_map(ClientAttrs));
verify_event_fields('schema.validation_failed', Fields) ->
#{
validation := ValidationName,
@ -4327,12 +4399,14 @@ verify_event_fields('schema.validation_failed', Fields) ->
topic := _Topic,
flags := _Flags,
pub_props := _PubProps,
publish_received_at := _PublishReceivedAt
publish_received_at := _PublishReceivedAt,
client_attrs := ClientAttrs
} = Fields,
?assertEqual(<<"v1">>, ValidationName),
verify_peername(PeerName),
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
?assert(is_map(ClientAttrs)),
ok;
verify_event_fields('message.transformation_failed', Fields) ->
#{
@ -4345,12 +4419,14 @@ verify_event_fields('message.transformation_failed', Fields) ->
topic := _Topic,
flags := _Flags,
pub_props := _PubProps,
publish_received_at := _PublishReceivedAt
publish_received_at := _PublishReceivedAt,
client_attrs := ClientAttrs
} = Fields,
?assertEqual(<<"t1">>, TransformationName),
verify_peername(PeerName),
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
?assert(is_map(ClientAttrs)),
ok.
verify_peername(PeerName) ->

View File

@ -21,6 +21,8 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
@ -48,6 +50,13 @@ app_specs() ->
emqx_mgmt_api_test_util:emqx_dashboard()
].
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
@ -124,7 +133,13 @@ create_rule(Overrides) ->
Method = post,
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
Res = request(Method, Path, Params),
emqx_mgmt_api_test_util:simplify_result(Res).
case emqx_mgmt_api_test_util:simplify_result(Res) of
{201, #{<<"id">> := RuleId}} = SRes ->
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
SRes;
SRes ->
SRes
end.
sources_sql(Sources) ->
Froms = iolist_to_binary(lists:join(<<", ">>, lists:map(fun source_from/1, Sources))),
@ -586,3 +601,21 @@ t_filter_by_source_and_action(_Config) ->
),
ok.
%% Checks that creating a rule with a `null' JSON value id is forbidden.
t_create_rule_with_null_id(_Config) ->
?assertMatch(
{400, #{<<"message">> := <<"rule id must be a string">>}},
create_rule(#{<<"id">> => null})
),
%% The string `"null"' should be fine.
?assertMatch(
{201, _},
create_rule(#{<<"id">> => <<"null">>})
),
?assertMatch({201, _}, create_rule(#{})),
?assertMatch(
{200, #{<<"data">> := [_, _]}},
list_rules([])
),
ok.

View File

@ -339,6 +339,7 @@ ensure_serde_absent(Name) ->
{ok, Serde} ->
ok = emqx_schema_registry_serde:destroy(Serde),
_ = ets:delete(?SERDE_TAB, Name),
?tp("schema_registry_serde_deleted", #{name => Name}),
ok;
{error, not_found} ->
ok

View File

@ -57,7 +57,15 @@ end_per_testcase(_TestCase, _Config) ->
clear_schemas() ->
maps:foreach(
fun(Name, _Schema) ->
ok = emqx_schema_registry:delete_schema(Name)
NameBin = emqx_utils_conv:bin(Name),
{ok, {ok, _}} =
?wait_async_action(
emqx_schema_registry:delete_schema(Name),
#{
?snk_kind := "schema_registry_serde_deleted",
name := NameBin
}
)
end,
emqx_schema_registry:list_schemas()
).

View File

@ -0,0 +1 @@
Expose `client_attrs` to rule engine and rule events.

View File

@ -0,0 +1 @@
Fixed an issue where creating a rule with a null id via the HTTP API was allowed, which could lead to an inconsistent configuration.

View File

@ -184,7 +184,7 @@ defmodule EMQXUmbrella.MixProject do
def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}
def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.12.0", override: true}
def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}
def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.2", override: true}
def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.3", override: true}
def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true}
# in conflict by ehttpc and emqtt
def common_dep(:gun), do: {:gun, github: "emqx/gun", tag: "1.3.11", override: true}

View File

@ -98,7 +98,7 @@
{system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.5"}}},
{getopt, "1.0.2"},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.3"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
{esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.1"}}},
{jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}},