Compare commits
19 Commits
master
...
release-58
Author | SHA1 | Date |
---|---|---|
![]() |
9d6954cf60 | |
![]() |
9a2f878017 | |
![]() |
c35661f484 | |
![]() |
9abdff60a1 | |
![]() |
a849e6df4c | |
![]() |
c00b178b57 | |
![]() |
f085973778 | |
![]() |
a8882bd7fd | |
![]() |
7711307909 | |
![]() |
6849801293 | |
![]() |
6c4cfeed92 | |
![]() |
de9e619c96 | |
![]() |
8d88d14f0a | |
![]() |
ff72d55491 | |
![]() |
42e4a635e0 | |
![]() |
26ddc403c8 | |
![]() |
4971fd3eaf | |
![]() |
5b15886836 | |
![]() |
10dadbad3b |
|
@ -31,7 +31,7 @@
|
|||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
|
||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.3"}}},
|
||||
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
|
||||
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
||||
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
|
||||
|
|
|
@ -621,16 +621,16 @@ save_to_config_map(Conf, RawConf) ->
|
|||
?MODULE:put_raw(RawConf).
|
||||
|
||||
-spec save_to_override_conf(boolean(), raw_config(), update_opts()) -> ok | {error, term()}.
|
||||
save_to_override_conf(_, undefined, _) ->
|
||||
save_to_override_conf(_HasDeprecatedFile, undefined, _) ->
|
||||
ok;
|
||||
save_to_override_conf(true, RawConf, Opts) ->
|
||||
save_to_override_conf(true = _HasDeprecatedFile, RawConf, Opts) ->
|
||||
case deprecated_conf_file(Opts) of
|
||||
undefined ->
|
||||
ok;
|
||||
FileName ->
|
||||
backup_and_write(FileName, hocon_pp:do(RawConf, Opts))
|
||||
end;
|
||||
save_to_override_conf(false, RawConf, Opts) ->
|
||||
save_to_override_conf(false = _HasDeprecatedFile, RawConf, Opts) ->
|
||||
case cluster_hocon_file() of
|
||||
undefined ->
|
||||
ok;
|
||||
|
|
|
@ -689,18 +689,9 @@ all() ->
|
|||
|
||||
groups() ->
|
||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||
%% TODO: Remove once builtin-local supports preconditions + atomic batches.
|
||||
BuiltinLocalTCs =
|
||||
TCs --
|
||||
[
|
||||
t_09_atomic_store_batch,
|
||||
t_11_batch_preconditions,
|
||||
t_12_batch_precondition_conflicts
|
||||
],
|
||||
BuiltinRaftTCs = TCs,
|
||||
[
|
||||
{builtin_local, BuiltinLocalTCs},
|
||||
{builtin_raft, BuiltinRaftTCs}
|
||||
{builtin_local, TCs},
|
||||
{builtin_raft, TCs}
|
||||
].
|
||||
|
||||
init_per_group(builtin_local, Config) ->
|
||||
|
|
|
@ -49,7 +49,9 @@
|
|||
%% Internal exports:
|
||||
-export([
|
||||
do_next/3,
|
||||
do_delete_next/4
|
||||
do_delete_next/4,
|
||||
%% Used by batch serializer
|
||||
make_batch/3
|
||||
]).
|
||||
|
||||
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
|
||||
|
@ -88,7 +90,10 @@
|
|||
#{
|
||||
backend := builtin_local,
|
||||
storage := emqx_ds_storage_layer:prototype(),
|
||||
n_shards := pos_integer()
|
||||
n_shards := pos_integer(),
|
||||
%% Inherited from `emqx_ds:generic_db_opts()`.
|
||||
force_monotonic_timestamps => boolean(),
|
||||
atomic_batches => boolean()
|
||||
}.
|
||||
|
||||
-type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}.
|
||||
|
@ -193,9 +198,17 @@ drop_db(DB) ->
|
|||
),
|
||||
emqx_ds_builtin_local_meta:drop_db(DB).
|
||||
|
||||
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
||||
-spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) ->
|
||||
emqx_ds:store_batch_result().
|
||||
store_batch(DB, Messages, Opts) ->
|
||||
store_batch(DB, Batch, Opts) ->
|
||||
case emqx_ds_builtin_local_meta:db_config(DB) of
|
||||
#{atomic_batches := true} ->
|
||||
store_batch_atomic(DB, Batch, Opts);
|
||||
_ ->
|
||||
store_batch_buffered(DB, Batch, Opts)
|
||||
end.
|
||||
|
||||
store_batch_buffered(DB, Messages, Opts) ->
|
||||
try
|
||||
emqx_ds_buffer:store_batch(DB, Messages, Opts)
|
||||
catch
|
||||
|
@ -203,6 +216,34 @@ store_batch(DB, Messages, Opts) ->
|
|||
{error, recoverable, Reason}
|
||||
end.
|
||||
|
||||
store_batch_atomic(DB, Batch, Opts) ->
|
||||
Shards = shards_of_batch(DB, Batch),
|
||||
case Shards of
|
||||
[Shard] ->
|
||||
emqx_ds_builtin_local_batch_serializer:store_batch_atomic(DB, Shard, Batch, Opts);
|
||||
[] ->
|
||||
ok;
|
||||
[_ | _] ->
|
||||
{error, unrecoverable, atomic_batch_spans_multiple_shards}
|
||||
end.
|
||||
|
||||
shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) ->
|
||||
shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, []));
|
||||
shards_of_batch(DB, Operations) ->
|
||||
shards_of_batch(DB, Operations, []).
|
||||
|
||||
shards_of_batch(DB, [Operation | Rest], Acc) ->
|
||||
case shard_of_operation(DB, Operation, clientid, #{}) of
|
||||
Shard when Shard =:= hd(Acc) ->
|
||||
shards_of_batch(DB, Rest, Acc);
|
||||
Shard when Acc =:= [] ->
|
||||
shards_of_batch(DB, Rest, [Shard]);
|
||||
ShardAnother ->
|
||||
[ShardAnother | Acc]
|
||||
end;
|
||||
shards_of_batch(_DB, [], Acc) ->
|
||||
Acc.
|
||||
|
||||
-record(bs, {options :: emqx_ds:create_db_opts()}).
|
||||
-type buffer_state() :: #bs{}.
|
||||
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_ds_builtin_local_batch_serializer).
|
||||
|
||||
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
|
||||
|
||||
%% API
|
||||
-export([
|
||||
start_link/3,
|
||||
|
||||
store_batch_atomic/4
|
||||
]).
|
||||
|
||||
%% `gen_server' API
|
||||
-export([
|
||||
init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2
|
||||
]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-define(name(DB, SHARD), {n, l, {?MODULE, DB, SHARD}}).
|
||||
-define(via(DB, SHARD), {via, gproc, ?name(DB, SHARD)}).
|
||||
|
||||
-record(store_batch_atomic, {batch :: emqx_ds:batch(), opts :: emqx_ds:message_store_opts()}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
start_link(DB, Shard, _Opts) ->
|
||||
gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
|
||||
|
||||
store_batch_atomic(DB, Shard, Batch, Opts) ->
|
||||
gen_server:call(?via(DB, Shard), #store_batch_atomic{batch = Batch, opts = Opts}, infinity).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `gen_server' API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([DB, Shard]) ->
|
||||
process_flag(message_queue_data, off_heap),
|
||||
State = #{
|
||||
db => DB,
|
||||
shard => Shard
|
||||
},
|
||||
{ok, State}.
|
||||
|
||||
handle_call(#store_batch_atomic{batch = Batch, opts = StoreOpts}, _From, State) ->
|
||||
ShardId = shard_id(State),
|
||||
DBOpts = db_config(State),
|
||||
Result = do_store_batch_atomic(ShardId, Batch, DBOpts, StoreOpts),
|
||||
{reply, Result, State};
|
||||
handle_call(Call, _From, State) ->
|
||||
{reply, {error, {unknown_call, Call}}, State}.
|
||||
|
||||
handle_cast(_Cast, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
shard_id(#{db := DB, shard := Shard}) ->
|
||||
{DB, Shard}.
|
||||
|
||||
db_config(#{db := DB}) ->
|
||||
emqx_ds_builtin_local_meta:db_config(DB).
|
||||
|
||||
-spec do_store_batch_atomic(
|
||||
emqx_ds_storage_layer:shard_id(),
|
||||
emqx_ds:dsbatch(),
|
||||
emqx_ds_builtin_local:db_opts(),
|
||||
emqx_ds:message_store_opts()
|
||||
) ->
|
||||
emqx_ds:store_batch_result().
|
||||
do_store_batch_atomic(ShardId, #dsbatch{} = Batch, DBOpts, StoreOpts) ->
|
||||
#dsbatch{
|
||||
operations = Operations0,
|
||||
preconditions = Preconditions
|
||||
} = Batch,
|
||||
case emqx_ds_precondition:verify(emqx_ds_storage_layer, ShardId, Preconditions) of
|
||||
ok ->
|
||||
do_store_operations(ShardId, Operations0, DBOpts, StoreOpts);
|
||||
{precondition_failed, _} = PreconditionFailed ->
|
||||
{error, unrecoverable, PreconditionFailed};
|
||||
Error ->
|
||||
Error
|
||||
end;
|
||||
do_store_batch_atomic(ShardId, Operations, DBOpts, StoreOpts) ->
|
||||
do_store_operations(ShardId, Operations, DBOpts, StoreOpts).
|
||||
|
||||
do_store_operations(ShardId, Operations0, DBOpts, _StoreOpts) ->
|
||||
ForceMonotonic = maps:get(force_monotonic_timestamps, DBOpts),
|
||||
{Latest, Operations} =
|
||||
emqx_ds_builtin_local:make_batch(
|
||||
ForceMonotonic,
|
||||
current_timestamp(ShardId),
|
||||
Operations0
|
||||
),
|
||||
Result = emqx_ds_storage_layer:store_batch(ShardId, Operations, _Options = #{}),
|
||||
emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest),
|
||||
Result.
|
||||
|
||||
current_timestamp(ShardId) ->
|
||||
emqx_ds_builtin_local_meta:current_timestamp(ShardId).
|
|
@ -158,7 +158,8 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
|
|||
Opts = emqx_ds_builtin_local_meta:db_config(DB),
|
||||
Children = [
|
||||
shard_storage_spec(DB, Shard, Opts),
|
||||
shard_buffer_spec(DB, Shard, Opts)
|
||||
shard_buffer_spec(DB, Shard, Opts),
|
||||
shard_batch_serializer_spec(DB, Shard, Opts)
|
||||
],
|
||||
{ok, {SupFlags, Children}}.
|
||||
|
||||
|
@ -208,6 +209,15 @@ shard_buffer_spec(DB, Shard, Options) ->
|
|||
type => worker
|
||||
}.
|
||||
|
||||
shard_batch_serializer_spec(DB, Shard, Opts) ->
|
||||
#{
|
||||
id => {Shard, batch_serializer},
|
||||
start => {emqx_ds_builtin_local_batch_serializer, start_link, [DB, Shard, Opts]},
|
||||
shutdown => 5_000,
|
||||
restart => permanent,
|
||||
type => worker
|
||||
}.
|
||||
|
||||
ensure_started(Res) ->
|
||||
case Res of
|
||||
{ok, _Pid} ->
|
||||
|
|
|
@ -479,10 +479,10 @@ shards_of_batch(_DB, [], Acc) ->
|
|||
%% TODO
|
||||
%% There's a possibility of race condition: storage may shut down right after we
|
||||
%% ask for its status.
|
||||
-define(IF_STORAGE_RUNNING(SHARDID, EXPR),
|
||||
case emqx_ds_storage_layer:shard_info(SHARDID, status) of
|
||||
running -> EXPR;
|
||||
down -> {error, recoverable, storage_down}
|
||||
-define(IF_SHARD_READY(DB, SHARD, EXPR),
|
||||
case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of
|
||||
true -> EXPR;
|
||||
false -> {error, recoverable, shard_unavailable}
|
||||
end
|
||||
).
|
||||
|
||||
|
@ -525,8 +525,9 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
|
|||
[{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down).
|
||||
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
|
||||
ShardId = {DB, Shard},
|
||||
?IF_STORAGE_RUNNING(
|
||||
ShardId,
|
||||
?IF_SHARD_READY(
|
||||
DB,
|
||||
Shard,
|
||||
emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
|
||||
).
|
||||
|
||||
|
@ -552,8 +553,9 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
|
|||
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
|
||||
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
|
||||
ShardId = {DB, Shard},
|
||||
?IF_STORAGE_RUNNING(
|
||||
ShardId,
|
||||
?IF_SHARD_READY(
|
||||
DB,
|
||||
Shard,
|
||||
emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
|
||||
).
|
||||
|
||||
|
@ -587,8 +589,9 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
|
|||
emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
|
||||
do_next_v1(DB, Shard, Iter, BatchSize) ->
|
||||
ShardId = {DB, Shard},
|
||||
?IF_STORAGE_RUNNING(
|
||||
ShardId,
|
||||
?IF_SHARD_READY(
|
||||
DB,
|
||||
Shard,
|
||||
emqx_ds_storage_layer:next(
|
||||
ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
|
||||
)
|
||||
|
@ -620,8 +623,9 @@ do_add_generation_v2(_DB) ->
|
|||
| emqx_ds:error(storage_down).
|
||||
do_list_generations_with_lifetimes_v3(DB, Shard) ->
|
||||
ShardId = {DB, Shard},
|
||||
?IF_STORAGE_RUNNING(
|
||||
ShardId,
|
||||
?IF_SHARD_READY(
|
||||
DB,
|
||||
Shard,
|
||||
emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
|
||||
).
|
||||
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
|
||||
%% Dynamic server location API
|
||||
-export([
|
||||
servers/3
|
||||
servers/3,
|
||||
shard_info/3
|
||||
]).
|
||||
|
||||
%% Safe Process Command API
|
||||
|
@ -38,8 +39,10 @@
|
|||
-behaviour(gen_server).
|
||||
-export([
|
||||
init/1,
|
||||
handle_continue/2,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2
|
||||
]).
|
||||
|
||||
|
@ -52,6 +55,9 @@
|
|||
| {error, servers_unreachable}.
|
||||
|
||||
-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000).
|
||||
-define(MAX_BOOSTRAP_RETRY_TIMEOUT, 1_000).
|
||||
|
||||
-define(PTERM(DB, SHARD, KEY), {?MODULE, DB, SHARD, KEY}).
|
||||
|
||||
%%
|
||||
|
||||
|
@ -160,6 +166,12 @@ local_site() ->
|
|||
|
||||
%%
|
||||
|
||||
-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), _Info) -> _Value.
|
||||
shard_info(DB, Shard, ready) ->
|
||||
get_shard_info(DB, Shard, ready, false).
|
||||
|
||||
%%
|
||||
|
||||
-spec process_command([server()], _Command, timeout()) ->
|
||||
{ok, _Result, _Leader :: server()} | server_error().
|
||||
process_command(Servers, Command, Timeout) ->
|
||||
|
@ -324,10 +336,45 @@ ra_overview(Server) ->
|
|||
|
||||
%%
|
||||
|
||||
-record(st, {
|
||||
db :: emqx_ds:db(),
|
||||
shard :: emqx_ds_replication_layer:shard_id(),
|
||||
server :: server(),
|
||||
bootstrapped :: boolean(),
|
||||
stage :: term()
|
||||
}).
|
||||
|
||||
init({DB, Shard, Opts}) ->
|
||||
_ = process_flag(trap_exit, true),
|
||||
ok = start_server(DB, Shard, Opts),
|
||||
{ok, {DB, Shard}}.
|
||||
case start_server(DB, Shard, Opts) of
|
||||
{_New = true, Server} ->
|
||||
NextStage = trigger_election;
|
||||
{_New = false, Server} ->
|
||||
NextStage = wait_leader
|
||||
end,
|
||||
St = #st{
|
||||
db = DB,
|
||||
shard = Shard,
|
||||
server = Server,
|
||||
bootstrapped = false,
|
||||
stage = NextStage
|
||||
},
|
||||
{ok, St, {continue, bootstrap}}.
|
||||
|
||||
handle_continue(bootstrap, St = #st{bootstrapped = true}) ->
|
||||
{noreply, St};
|
||||
handle_continue(bootstrap, St0 = #st{db = DB, shard = Shard, stage = Stage}) ->
|
||||
?tp(emqx_ds_replshard_bootstrapping, #{db => DB, shard => Shard, stage => Stage}),
|
||||
case bootstrap(St0) of
|
||||
St = #st{bootstrapped = true} ->
|
||||
?tp(emqx_ds_replshard_bootstrapped, #{db => DB, shard => Shard}),
|
||||
{noreply, St};
|
||||
St = #st{bootstrapped = false} ->
|
||||
{noreply, St, {continue, bootstrap}};
|
||||
{retry, Timeout, St} ->
|
||||
_TRef = erlang:start_timer(Timeout, self(), bootstrap),
|
||||
{noreply, St}
|
||||
end.
|
||||
|
||||
handle_call(_Call, _From, State) ->
|
||||
{reply, ignored, State}.
|
||||
|
@ -335,7 +382,14 @@ handle_call(_Call, _From, State) ->
|
|||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, {DB, Shard}) ->
|
||||
handle_info({timeout, _TRef, bootstrap}, St) ->
|
||||
{noreply, St, {continue, bootstrap}};
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #st{db = DB, shard = Shard}) ->
|
||||
%% NOTE: Mark as not ready right away.
|
||||
ok = erase_shard_info(DB, Shard),
|
||||
%% NOTE: Timeouts are ignored, it's a best effort attempt.
|
||||
catch prep_stop_server(DB, Shard),
|
||||
LocalServer = get_local_server(DB, Shard),
|
||||
|
@ -343,6 +397,40 @@ terminate(_Reason, {DB, Shard}) ->
|
|||
|
||||
%%
|
||||
|
||||
bootstrap(St = #st{stage = trigger_election, server = Server}) ->
|
||||
ok = trigger_election(Server),
|
||||
St#st{stage = wait_leader};
|
||||
bootstrap(St = #st{stage = wait_leader, server = Server}) ->
|
||||
case current_leader(Server) of
|
||||
Leader = {_, _} ->
|
||||
St#st{stage = {wait_log, Leader}};
|
||||
unknown ->
|
||||
St
|
||||
end;
|
||||
bootstrap(St = #st{stage = {wait_log, Leader}}) ->
|
||||
case ra_overview(Leader) of
|
||||
#{commit_index := RaftIdx} ->
|
||||
St#st{stage = {wait_log_index, RaftIdx}};
|
||||
#{} ->
|
||||
St#st{stage = wait_leader}
|
||||
end;
|
||||
bootstrap(St = #st{stage = {wait_log_index, RaftIdx}, db = DB, shard = Shard, server = Server}) ->
|
||||
Overview = ra_overview(Server),
|
||||
case maps:get(last_applied, Overview, 0) of
|
||||
LastApplied when LastApplied >= RaftIdx ->
|
||||
ok = announce_shard_ready(DB, Shard),
|
||||
St#st{bootstrapped = true, stage = undefined};
|
||||
LastApplied ->
|
||||
%% NOTE
|
||||
%% Blunt estimate of time shard needs to catch up. If this proves to be too long in
|
||||
%% practice, it's could be augmented with handling `recover` -> `follower` Ra
|
||||
%% member state transition.
|
||||
Timeout = min(RaftIdx - LastApplied, ?MAX_BOOSTRAP_RETRY_TIMEOUT),
|
||||
{retry, Timeout, St}
|
||||
end.
|
||||
|
||||
%%
|
||||
|
||||
start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
||||
ClusterName = cluster_name(DB, Shard),
|
||||
LocalServer = local_server(DB, Shard),
|
||||
|
@ -350,7 +438,6 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
|||
MutableConfig = #{tick_timeout => 100},
|
||||
case ra:restart_server(DB, LocalServer, MutableConfig) of
|
||||
{error, name_not_registered} ->
|
||||
Bootstrap = true,
|
||||
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
|
||||
LogOpts = maps:with(
|
||||
[
|
||||
|
@ -366,30 +453,34 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
|||
initial_members => Servers,
|
||||
machine => Machine,
|
||||
log_init_args => LogOpts
|
||||
});
|
||||
}),
|
||||
{_NewServer = true, LocalServer};
|
||||
ok ->
|
||||
Bootstrap = false;
|
||||
{_NewServer = false, LocalServer};
|
||||
{error, {already_started, _}} ->
|
||||
Bootstrap = false
|
||||
end,
|
||||
{_NewServer = false, LocalServer}
|
||||
end.
|
||||
|
||||
trigger_election(Server) ->
|
||||
%% NOTE
|
||||
%% Triggering election is necessary when a new consensus group is being brought up.
|
||||
%% TODO
|
||||
%% It's probably a good idea to rebalance leaders across the cluster from time to
|
||||
%% time. There's `ra:transfer_leadership/2` for that.
|
||||
try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of
|
||||
false ->
|
||||
ok;
|
||||
ok ->
|
||||
ok
|
||||
try ra:trigger_election(Server) of
|
||||
ok -> ok
|
||||
catch
|
||||
%% TODO
|
||||
%% NOTE
|
||||
%% Tolerating exceptions because server might be occupied with log replay for
|
||||
%% a while.
|
||||
exit:{timeout, _} when not Bootstrap ->
|
||||
exit:{timeout, _} ->
|
||||
?tp(emqx_ds_replshard_trigger_election, #{server => Server, error => timeout}),
|
||||
ok
|
||||
end.
|
||||
|
||||
announce_shard_ready(DB, Shard) ->
|
||||
set_shard_info(DB, Shard, ready, true).
|
||||
|
||||
server_uid(_DB, Shard) ->
|
||||
%% NOTE
|
||||
%% Each new "instance" of a server should have a unique identifier. Otherwise,
|
||||
|
@ -402,6 +493,22 @@ server_uid(_DB, Shard) ->
|
|||
|
||||
%%
|
||||
|
||||
get_shard_info(DB, Shard, K, Default) ->
|
||||
persistent_term:get(?PTERM(DB, Shard, K), Default).
|
||||
|
||||
set_shard_info(DB, Shard, K, V) ->
|
||||
persistent_term:put(?PTERM(DB, Shard, K), V).
|
||||
|
||||
erase_shard_info(DB, Shard) ->
|
||||
lists:foreach(fun(K) -> erase_shard_info(DB, Shard, K) end, [
|
||||
ready
|
||||
]).
|
||||
|
||||
erase_shard_info(DB, Shard, K) ->
|
||||
persistent_term:erase(?PTERM(DB, Shard, K)).
|
||||
|
||||
%%
|
||||
|
||||
prep_stop_server(DB, Shard) ->
|
||||
prep_stop_server(DB, Shard, 5_000).
|
||||
|
||||
|
|
|
@ -131,7 +131,6 @@ t_replication_transfers_snapshots(Config) ->
|
|||
%% Initialize DB on all nodes and wait for it to be online.
|
||||
Opts = opts(Config, #{n_shards => 1, n_sites => 3}),
|
||||
assert_db_open(Nodes, ?DB, Opts),
|
||||
assert_db_stable(Nodes, ?DB),
|
||||
|
||||
%% Stop the DB on the "offline" node.
|
||||
?wait_async_action(
|
||||
|
@ -207,7 +206,6 @@ t_rebalance(Config) ->
|
|||
%% 1. Initialize DB on the first node.
|
||||
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
|
||||
assert_db_open(Nodes, ?DB, Opts),
|
||||
assert_db_stable(Nodes, ?DB),
|
||||
|
||||
%% 1.1 Kick all sites except S1 from the replica set as
|
||||
%% the initial condition:
|
||||
|
@ -419,7 +417,6 @@ t_rebalance_chaotic_converges(Config) ->
|
|||
|
||||
%% Open DB:
|
||||
assert_db_open(Nodes, ?DB, Opts),
|
||||
assert_db_stable(Nodes, ?DB),
|
||||
|
||||
%% Kick N3 from the replica set as the initial condition:
|
||||
?assertMatch(
|
||||
|
@ -503,7 +500,6 @@ t_rebalance_offline_restarts(Config) ->
|
|||
%% Initialize DB on all 3 nodes.
|
||||
Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}),
|
||||
assert_db_open(Nodes, ?DB, Opts),
|
||||
assert_db_stable(Nodes, ?DB),
|
||||
|
||||
?retry(
|
||||
1000,
|
||||
|
@ -845,13 +841,11 @@ t_crash_restart_recover(Config) ->
|
|||
?check_trace(
|
||||
begin
|
||||
%% Initialize DB on all nodes.
|
||||
?assertEqual(
|
||||
[{ok, ok} || _ <- Nodes],
|
||||
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts])
|
||||
),
|
||||
assert_db_open(Nodes, ?DB, DBOpts),
|
||||
|
||||
%% Apply the test events, including simulated node crashes.
|
||||
NodeStream = emqx_utils_stream:const(N1),
|
||||
StartedAt = erlang:monotonic_time(millisecond),
|
||||
emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0),
|
||||
|
||||
%% It's expected to lose few messages when leaders are abruptly killed.
|
||||
|
@ -865,6 +859,10 @@ t_crash_restart_recover(Config) ->
|
|||
ct:pal("Some messages were lost: ~p", [LostMessages]),
|
||||
?assert(length(LostMessages) < NMsgs div 20),
|
||||
|
||||
%% Wait until crashed nodes are ready.
|
||||
SinceStarted = erlang:monotonic_time(millisecond) - StartedAt,
|
||||
wait_db_bootstrapped([N2, N3], ?DB, infinity, SinceStarted),
|
||||
|
||||
%% Verify that all the successfully persisted messages are there.
|
||||
VerifyClient = fun({ClientId, ExpectedStream}) ->
|
||||
Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId),
|
||||
|
@ -926,7 +924,8 @@ assert_db_open(Nodes, DB, Opts) ->
|
|||
?assertEqual(
|
||||
[{ok, ok} || _ <- Nodes],
|
||||
erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts])
|
||||
).
|
||||
),
|
||||
wait_db_bootstrapped(Nodes, ?DB).
|
||||
|
||||
assert_db_stable([Node | _], DB) ->
|
||||
Shards = ds_repl_meta(Node, shards, [DB]),
|
||||
|
@ -935,6 +934,32 @@ assert_db_stable([Node | _], DB) ->
|
|||
db_leadership(Node, DB, Shards)
|
||||
).
|
||||
|
||||
wait_db_bootstrapped(Nodes, DB) ->
|
||||
wait_db_bootstrapped(Nodes, DB, infinity, infinity).
|
||||
|
||||
wait_db_bootstrapped(Nodes, DB, Timeout, BackInTime) ->
|
||||
SRefs = [
|
||||
snabbkaffe:subscribe(
|
||||
?match_event(#{
|
||||
?snk_kind := emqx_ds_replshard_bootstrapped,
|
||||
?snk_meta := #{node := Node},
|
||||
db := DB,
|
||||
shard := Shard
|
||||
}),
|
||||
1,
|
||||
Timeout,
|
||||
BackInTime
|
||||
)
|
||||
|| Node <- Nodes,
|
||||
Shard <- ds_repl_meta(Node, my_shards, [DB])
|
||||
],
|
||||
lists:foreach(
|
||||
fun({ok, SRef}) ->
|
||||
?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef))
|
||||
end,
|
||||
SRefs
|
||||
).
|
||||
|
||||
%%
|
||||
|
||||
db_leadership(Node, DB, Shards) ->
|
||||
|
|
|
@ -55,6 +55,7 @@
|
|||
topic_filter/0,
|
||||
topic/0,
|
||||
batch/0,
|
||||
dsbatch/0,
|
||||
operation/0,
|
||||
deletion/0,
|
||||
precondition/0,
|
||||
|
@ -104,7 +105,9 @@
|
|||
-type message_matcher(Payload) :: #message_matcher{payload :: Payload}.
|
||||
|
||||
%% A batch of storage operations.
|
||||
-type batch() :: [operation()] | #dsbatch{}.
|
||||
-type batch() :: [operation()] | dsbatch().
|
||||
|
||||
-type dsbatch() :: #dsbatch{}.
|
||||
|
||||
-type operation() ::
|
||||
%% Store a message.
|
||||
|
|
|
@ -97,7 +97,7 @@ broker(_) ->
|
|||
%% @doc Cluster with other nodes
|
||||
|
||||
cluster(["join", SNode]) ->
|
||||
case mria:join(ekka_node:parse_name(SNode)) of
|
||||
case ekka:join(ekka_node:parse_name(SNode)) of
|
||||
ok ->
|
||||
emqx_ctl:print("Join the cluster successfully.~n"),
|
||||
%% FIXME: running status on the replicant immediately
|
||||
|
@ -112,7 +112,7 @@ cluster(["join", SNode]) ->
|
|||
end;
|
||||
cluster(["leave"]) ->
|
||||
_ = maybe_disable_autocluster(),
|
||||
case mria:leave() of
|
||||
case ekka:leave() of
|
||||
ok ->
|
||||
emqx_ctl:print("Leave the cluster successfully.~n"),
|
||||
cluster(["status"]);
|
||||
|
@ -121,7 +121,7 @@ cluster(["leave"]) ->
|
|||
end;
|
||||
cluster(["force-leave", SNode]) ->
|
||||
Node = ekka_node:parse_name(SNode),
|
||||
case mria:force_leave(Node) of
|
||||
case ekka:force_leave(Node) of
|
||||
ok ->
|
||||
case emqx_cluster_rpc:force_leave_clean(Node) of
|
||||
ok ->
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
|
||||
|
||||
-behaviour(minirest_api).
|
||||
|
||||
|
@ -385,16 +386,16 @@ param_path_id() ->
|
|||
case maps:get(<<"id">>, Params0, list_to_binary(emqx_utils:gen_id(8))) of
|
||||
<<>> ->
|
||||
{400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
|
||||
Id ->
|
||||
Id when is_binary(Id) ->
|
||||
Params = filter_out_request_body(add_metadata(Params0)),
|
||||
case emqx_rule_engine:get_rule(Id) of
|
||||
{ok, _Rule} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
|
||||
?BAD_REQUEST(<<"rule id already exists">>);
|
||||
not_found ->
|
||||
ConfPath = ?RULE_PATH(Id),
|
||||
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
|
||||
{ok, #{post_config_update := #{emqx_rule_engine := Rule}}} ->
|
||||
{201, format_rule_info_resp(Rule)};
|
||||
?CREATED(format_rule_info_resp(Rule));
|
||||
{error, Reason} ->
|
||||
?SLOG(
|
||||
info,
|
||||
|
@ -405,9 +406,11 @@ param_path_id() ->
|
|||
},
|
||||
#{tag => ?TAG}
|
||||
),
|
||||
{400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
|
||||
?BAD_REQUEST(?ERR_BADARGS(Reason))
|
||||
end
|
||||
end
|
||||
end;
|
||||
_BadId ->
|
||||
?BAD_REQUEST(<<"rule id must be a string">>)
|
||||
end.
|
||||
|
||||
'/rule_test'(post, #{body := Params}) ->
|
||||
|
|
|
@ -334,13 +334,14 @@ eventmsg_publish(
|
|||
qos => QoS,
|
||||
flags => Flags,
|
||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||
publish_received_at => Timestamp
|
||||
publish_received_at => Timestamp,
|
||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||
},
|
||||
#{headers => Headers}
|
||||
).
|
||||
|
||||
eventmsg_connected(
|
||||
_ClientInfo = #{
|
||||
ClientInfo = #{
|
||||
clientid := ClientId,
|
||||
username := Username,
|
||||
is_bridge := IsBridge,
|
||||
|
@ -375,13 +376,14 @@ eventmsg_connected(
|
|||
expiry_interval => ExpiryInterval div 1000,
|
||||
is_bridge => IsBridge,
|
||||
conn_props => printable_maps(ConnProps),
|
||||
connected_at => ConnectedAt
|
||||
connected_at => ConnectedAt,
|
||||
client_attrs => maps:get(client_attrs, ClientInfo, #{})
|
||||
},
|
||||
#{}
|
||||
).
|
||||
|
||||
eventmsg_disconnected(
|
||||
_ClientInfo = #{
|
||||
ClientInfo = #{
|
||||
clientid := ClientId,
|
||||
username := Username
|
||||
},
|
||||
|
@ -405,7 +407,8 @@ eventmsg_disconnected(
|
|||
proto_name => ProtoName,
|
||||
proto_ver => ProtoVer,
|
||||
disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
|
||||
disconnected_at => DisconnectedAt
|
||||
disconnected_at => DisconnectedAt,
|
||||
client_attrs => maps:get(client_attrs, ClientInfo, #{})
|
||||
},
|
||||
#{}
|
||||
).
|
||||
|
@ -444,7 +447,7 @@ eventmsg_connack(
|
|||
).
|
||||
|
||||
eventmsg_check_authz_complete(
|
||||
_ClientInfo = #{
|
||||
ClientInfo = #{
|
||||
clientid := ClientId,
|
||||
username := Username,
|
||||
peerhost := PeerHost,
|
||||
|
@ -465,13 +468,14 @@ eventmsg_check_authz_complete(
|
|||
topic => Topic,
|
||||
action => PubSub,
|
||||
authz_source => AuthzSource,
|
||||
result => Result
|
||||
result => Result,
|
||||
client_attrs => maps:get(client_attrs, ClientInfo, #{})
|
||||
},
|
||||
#{}
|
||||
).
|
||||
|
||||
eventmsg_check_authn_complete(
|
||||
_ClientInfo = #{
|
||||
ClientInfo = #{
|
||||
clientid := ClientId,
|
||||
username := Username,
|
||||
peername := PeerName
|
||||
|
@ -493,14 +497,15 @@ eventmsg_check_authn_complete(
|
|||
peername => ntoa(PeerName),
|
||||
reason_code => force_to_bin(Reason),
|
||||
is_anonymous => IsAnonymous,
|
||||
is_superuser => IsSuperuser
|
||||
is_superuser => IsSuperuser,
|
||||
client_attrs => maps:get(client_attrs, ClientInfo, #{})
|
||||
},
|
||||
#{}
|
||||
).
|
||||
|
||||
eventmsg_sub_or_unsub(
|
||||
Event,
|
||||
_ClientInfo = #{
|
||||
ClientInfo = #{
|
||||
clientid := ClientId,
|
||||
username := Username,
|
||||
peerhost := PeerHost,
|
||||
|
@ -519,7 +524,8 @@ eventmsg_sub_or_unsub(
|
|||
peername => ntoa(PeerName),
|
||||
PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
|
||||
topic => Topic,
|
||||
qos => QoS
|
||||
qos => QoS,
|
||||
client_attrs => maps:get(client_attrs, ClientInfo, #{})
|
||||
},
|
||||
#{}
|
||||
).
|
||||
|
@ -551,7 +557,8 @@ eventmsg_dropped(
|
|||
qos => QoS,
|
||||
flags => Flags,
|
||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||
publish_received_at => Timestamp
|
||||
publish_received_at => Timestamp,
|
||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||
},
|
||||
#{headers => Headers}
|
||||
).
|
||||
|
@ -583,7 +590,8 @@ eventmsg_transformation_failed(
|
|||
qos => QoS,
|
||||
flags => Flags,
|
||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||
publish_received_at => Timestamp
|
||||
publish_received_at => Timestamp,
|
||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||
},
|
||||
#{headers => Headers}
|
||||
).
|
||||
|
@ -616,7 +624,8 @@ eventmsg_validation_failed(
|
|||
qos => QoS,
|
||||
flags => Flags,
|
||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||
publish_received_at => Timestamp
|
||||
publish_received_at => Timestamp,
|
||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||
},
|
||||
#{headers => Headers}
|
||||
).
|
||||
|
@ -654,7 +663,8 @@ eventmsg_delivered(
|
|||
qos => QoS,
|
||||
flags => Flags,
|
||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||
publish_received_at => Timestamp
|
||||
publish_received_at => Timestamp,
|
||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||
},
|
||||
#{headers => Headers}
|
||||
).
|
||||
|
@ -693,7 +703,8 @@ eventmsg_acked(
|
|||
flags => Flags,
|
||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||
puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
|
||||
publish_received_at => Timestamp
|
||||
publish_received_at => Timestamp,
|
||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||
},
|
||||
#{headers => Headers}
|
||||
).
|
||||
|
@ -733,7 +744,8 @@ eventmsg_delivery_dropped(
|
|||
qos => QoS,
|
||||
flags => Flags,
|
||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||
publish_received_at => Timestamp
|
||||
publish_received_at => Timestamp,
|
||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||
},
|
||||
#{headers => Headers}
|
||||
).
|
||||
|
|
|
@ -112,7 +112,8 @@ groups() ->
|
|||
t_sqlparse_undefined_variable,
|
||||
t_sqlparse_new_map,
|
||||
t_sqlparse_invalid_json,
|
||||
t_sqlselect_as_put
|
||||
t_sqlselect_as_put,
|
||||
t_sqlselect_client_attr
|
||||
]},
|
||||
{events, [], [
|
||||
t_events,
|
||||
|
@ -3891,6 +3892,57 @@ t_trace_rule_id(_Config) ->
|
|||
?assertEqual([], emqx_trace_handler:running()),
|
||||
emqtt:disconnect(T).
|
||||
|
||||
t_sqlselect_client_attr(_) ->
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
{ok, Compiled} = emqx_variform:compile("user_property.group"),
|
||||
emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], [
|
||||
#{
|
||||
expression => Compiled,
|
||||
set_as_attr => <<"group">>
|
||||
},
|
||||
#{
|
||||
expression => Compiled,
|
||||
set_as_attr => <<"group2">>
|
||||
}
|
||||
]),
|
||||
|
||||
SQL =
|
||||
"SELECT client_attrs as payload FROM \"t/1\" ",
|
||||
Repub = republish_action(<<"t/2">>),
|
||||
{ok, _TopicRule} = emqx_rule_engine:create_rule(
|
||||
#{
|
||||
sql => SQL,
|
||||
id => ?TMP_RULEID,
|
||||
actions => [Repub]
|
||||
}
|
||||
),
|
||||
|
||||
{ok, Client} = emqtt:start_link([
|
||||
{clientid, ClientId},
|
||||
{proto_ver, v5},
|
||||
{properties, #{'User-Property' => [{<<"group">>, <<"g1">>}]}}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
|
||||
{ok, _, _} = emqtt:subscribe(Client, <<"t/2">>, 0),
|
||||
ct:sleep(100),
|
||||
emqtt:publish(Client, <<"t/1">>, <<"Hello">>),
|
||||
|
||||
receive
|
||||
{publish, #{topic := Topic, payload := Payload}} ->
|
||||
?assertEqual(<<"t/2">>, Topic),
|
||||
?assertMatch(
|
||||
#{<<"group">> := <<"g1">>, <<"group2">> := <<"g1">>},
|
||||
emqx_utils_json:decode(Payload)
|
||||
)
|
||||
after 1000 ->
|
||||
ct:fail(wait_for_t_2)
|
||||
end,
|
||||
|
||||
emqtt:disconnect(Client),
|
||||
emqx_rule_engine:delete_rule(?TMP_RULEID),
|
||||
emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], []).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal helpers
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -3990,7 +4042,8 @@ verify_event_fields('message.publish', Fields) ->
|
|||
flags := Flags,
|
||||
pub_props := Properties,
|
||||
timestamp := Timestamp,
|
||||
publish_received_at := EventAt
|
||||
publish_received_at := EventAt,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
Now = erlang:system_time(millisecond),
|
||||
TimestampElapse = Now - Timestamp,
|
||||
|
@ -4007,7 +4060,8 @@ verify_event_fields('message.publish', Fields) ->
|
|||
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||
?assert(EventAt =< Timestamp);
|
||||
?assert(EventAt =< Timestamp),
|
||||
?assert(is_map(ClientAttrs));
|
||||
verify_event_fields('client.connected', Fields) ->
|
||||
#{
|
||||
clientid := ClientId,
|
||||
|
@ -4023,7 +4077,8 @@ verify_event_fields('client.connected', Fields) ->
|
|||
is_bridge := IsBridge,
|
||||
conn_props := Properties,
|
||||
timestamp := Timestamp,
|
||||
connected_at := EventAt
|
||||
connected_at := EventAt,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
Now = erlang:system_time(millisecond),
|
||||
TimestampElapse = Now - Timestamp,
|
||||
|
@ -4042,7 +4097,8 @@ verify_event_fields('client.connected', Fields) ->
|
|||
?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||
?assert(EventAt =< Timestamp);
|
||||
?assert(EventAt =< Timestamp),
|
||||
?assert(is_map(ClientAttrs));
|
||||
verify_event_fields('client.disconnected', Fields) ->
|
||||
#{
|
||||
reason := Reason,
|
||||
|
@ -4052,7 +4108,8 @@ verify_event_fields('client.disconnected', Fields) ->
|
|||
sockname := SockName,
|
||||
disconn_props := Properties,
|
||||
timestamp := Timestamp,
|
||||
disconnected_at := EventAt
|
||||
disconnected_at := EventAt,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
Now = erlang:system_time(millisecond),
|
||||
TimestampElapse = Now - Timestamp,
|
||||
|
@ -4065,7 +4122,8 @@ verify_event_fields('client.disconnected', Fields) ->
|
|||
?assertMatch(#{'User-Property' := #{<<"reason">> := <<"normal">>}}, Properties),
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||
?assert(EventAt =< Timestamp);
|
||||
?assert(EventAt =< Timestamp),
|
||||
?assert(is_map(ClientAttrs));
|
||||
verify_event_fields(SubUnsub, Fields) when
|
||||
SubUnsub == 'session.subscribed';
|
||||
SubUnsub == 'session.unsubscribed'
|
||||
|
@ -4077,7 +4135,8 @@ verify_event_fields(SubUnsub, Fields) when
|
|||
peername := PeerName,
|
||||
topic := Topic,
|
||||
qos := QoS,
|
||||
timestamp := Timestamp
|
||||
timestamp := Timestamp,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
Now = erlang:system_time(millisecond),
|
||||
TimestampElapse = Now - Timestamp,
|
||||
|
@ -4097,7 +4156,8 @@ verify_event_fields(SubUnsub, Fields) when
|
|||
#{'User-Property' := #{<<"topic_name">> := <<"t1">>}},
|
||||
maps:get(PropKey, Fields)
|
||||
),
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000);
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||
?assert(is_map(ClientAttrs));
|
||||
verify_event_fields('delivery.dropped', Fields) ->
|
||||
#{
|
||||
event := 'delivery.dropped',
|
||||
|
@ -4117,7 +4177,8 @@ verify_event_fields('delivery.dropped', Fields) ->
|
|||
qos := QoS,
|
||||
flags := Flags,
|
||||
timestamp := Timestamp,
|
||||
topic := Topic
|
||||
topic := Topic,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
Now = erlang:system_time(millisecond),
|
||||
TimestampElapse = Now - Timestamp,
|
||||
|
@ -4139,7 +4200,8 @@ verify_event_fields('delivery.dropped', Fields) ->
|
|||
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||
?assert(EventAt =< Timestamp);
|
||||
?assert(EventAt =< Timestamp),
|
||||
?assert(is_map(ClientAttrs));
|
||||
verify_event_fields('message.dropped', Fields) ->
|
||||
#{
|
||||
id := ID,
|
||||
|
@ -4154,7 +4216,8 @@ verify_event_fields('message.dropped', Fields) ->
|
|||
flags := Flags,
|
||||
pub_props := Properties,
|
||||
timestamp := Timestamp,
|
||||
publish_received_at := EventAt
|
||||
publish_received_at := EventAt,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
Now = erlang:system_time(millisecond),
|
||||
TimestampElapse = Now - Timestamp,
|
||||
|
@ -4172,7 +4235,8 @@ verify_event_fields('message.dropped', Fields) ->
|
|||
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||
?assert(EventAt =< Timestamp);
|
||||
?assert(EventAt =< Timestamp),
|
||||
?assert(is_map(ClientAttrs));
|
||||
verify_event_fields('message.delivered', Fields) ->
|
||||
#{
|
||||
id := ID,
|
||||
|
@ -4188,7 +4252,8 @@ verify_event_fields('message.delivered', Fields) ->
|
|||
flags := Flags,
|
||||
pub_props := Properties,
|
||||
timestamp := Timestamp,
|
||||
publish_received_at := EventAt
|
||||
publish_received_at := EventAt,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
Now = erlang:system_time(millisecond),
|
||||
TimestampElapse = Now - Timestamp,
|
||||
|
@ -4207,7 +4272,8 @@ verify_event_fields('message.delivered', Fields) ->
|
|||
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||
?assert(EventAt =< Timestamp);
|
||||
?assert(EventAt =< Timestamp),
|
||||
?assert(is_map(ClientAttrs));
|
||||
verify_event_fields('message.acked', Fields) ->
|
||||
#{
|
||||
id := ID,
|
||||
|
@ -4224,7 +4290,8 @@ verify_event_fields('message.acked', Fields) ->
|
|||
pub_props := PubProps,
|
||||
puback_props := PubAckProps,
|
||||
timestamp := Timestamp,
|
||||
publish_received_at := EventAt
|
||||
publish_received_at := EventAt,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
Now = erlang:system_time(millisecond),
|
||||
TimestampElapse = Now - Timestamp,
|
||||
|
@ -4244,7 +4311,8 @@ verify_event_fields('message.acked', Fields) ->
|
|||
?assert(is_map(PubAckProps)),
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||
?assert(EventAt =< Timestamp);
|
||||
?assert(EventAt =< Timestamp),
|
||||
?assert(is_map(ClientAttrs));
|
||||
verify_event_fields('client.connack', Fields) ->
|
||||
#{
|
||||
clientid := ClientId,
|
||||
|
@ -4282,7 +4350,8 @@ verify_event_fields('client.check_authz_complete', Fields) ->
|
|||
peername := PeerName,
|
||||
topic := Topic,
|
||||
authz_source := AuthzSource,
|
||||
username := Username
|
||||
username := Username,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
?assertEqual(<<"t1">>, Topic),
|
||||
?assert(lists:member(Action, [subscribe, publish])),
|
||||
|
@ -4302,20 +4371,23 @@ verify_event_fields('client.check_authz_complete', Fields) ->
|
|||
])
|
||||
),
|
||||
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>]));
|
||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
||||
?assert(is_map(ClientAttrs));
|
||||
verify_event_fields('client.check_authn_complete', Fields) ->
|
||||
#{
|
||||
clientid := ClientId,
|
||||
peername := PeerName,
|
||||
username := Username,
|
||||
is_anonymous := IsAnonymous,
|
||||
is_superuser := IsSuperuser
|
||||
is_superuser := IsSuperuser,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
verify_peername(PeerName),
|
||||
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
||||
?assert(erlang:is_boolean(IsAnonymous)),
|
||||
?assert(erlang:is_boolean(IsSuperuser));
|
||||
?assert(erlang:is_boolean(IsSuperuser)),
|
||||
?assert(is_map(ClientAttrs));
|
||||
verify_event_fields('schema.validation_failed', Fields) ->
|
||||
#{
|
||||
validation := ValidationName,
|
||||
|
@ -4327,12 +4399,14 @@ verify_event_fields('schema.validation_failed', Fields) ->
|
|||
topic := _Topic,
|
||||
flags := _Flags,
|
||||
pub_props := _PubProps,
|
||||
publish_received_at := _PublishReceivedAt
|
||||
publish_received_at := _PublishReceivedAt,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
?assertEqual(<<"v1">>, ValidationName),
|
||||
verify_peername(PeerName),
|
||||
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
||||
?assert(is_map(ClientAttrs)),
|
||||
ok;
|
||||
verify_event_fields('message.transformation_failed', Fields) ->
|
||||
#{
|
||||
|
@ -4345,12 +4419,14 @@ verify_event_fields('message.transformation_failed', Fields) ->
|
|||
topic := _Topic,
|
||||
flags := _Flags,
|
||||
pub_props := _PubProps,
|
||||
publish_received_at := _PublishReceivedAt
|
||||
publish_received_at := _PublishReceivedAt,
|
||||
client_attrs := ClientAttrs
|
||||
} = Fields,
|
||||
?assertEqual(<<"t1">>, TransformationName),
|
||||
verify_peername(PeerName),
|
||||
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
||||
?assert(is_map(ClientAttrs)),
|
||||
ok.
|
||||
|
||||
verify_peername(PeerName) ->
|
||||
|
|
|
@ -21,6 +21,8 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% CT boilerplate
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -48,6 +50,13 @@ app_specs() ->
|
|||
emqx_mgmt_api_test_util:emqx_dashboard()
|
||||
].
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
emqx_common_test_helpers:call_janitor(),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -124,7 +133,13 @@ create_rule(Overrides) ->
|
|||
Method = post,
|
||||
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
|
||||
Res = request(Method, Path, Params),
|
||||
emqx_mgmt_api_test_util:simplify_result(Res).
|
||||
case emqx_mgmt_api_test_util:simplify_result(Res) of
|
||||
{201, #{<<"id">> := RuleId}} = SRes ->
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
SRes;
|
||||
SRes ->
|
||||
SRes
|
||||
end.
|
||||
|
||||
sources_sql(Sources) ->
|
||||
Froms = iolist_to_binary(lists:join(<<", ">>, lists:map(fun source_from/1, Sources))),
|
||||
|
@ -586,3 +601,21 @@ t_filter_by_source_and_action(_Config) ->
|
|||
),
|
||||
|
||||
ok.
|
||||
|
||||
%% Checks that creating a rule with a `null' JSON value id is forbidden.
|
||||
t_create_rule_with_null_id(_Config) ->
|
||||
?assertMatch(
|
||||
{400, #{<<"message">> := <<"rule id must be a string">>}},
|
||||
create_rule(#{<<"id">> => null})
|
||||
),
|
||||
%% The string `"null"' should be fine.
|
||||
?assertMatch(
|
||||
{201, _},
|
||||
create_rule(#{<<"id">> => <<"null">>})
|
||||
),
|
||||
?assertMatch({201, _}, create_rule(#{})),
|
||||
?assertMatch(
|
||||
{200, #{<<"data">> := [_, _]}},
|
||||
list_rules([])
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -339,6 +339,7 @@ ensure_serde_absent(Name) ->
|
|||
{ok, Serde} ->
|
||||
ok = emqx_schema_registry_serde:destroy(Serde),
|
||||
_ = ets:delete(?SERDE_TAB, Name),
|
||||
?tp("schema_registry_serde_deleted", #{name => Name}),
|
||||
ok;
|
||||
{error, not_found} ->
|
||||
ok
|
||||
|
|
|
@ -57,7 +57,15 @@ end_per_testcase(_TestCase, _Config) ->
|
|||
clear_schemas() ->
|
||||
maps:foreach(
|
||||
fun(Name, _Schema) ->
|
||||
ok = emqx_schema_registry:delete_schema(Name)
|
||||
NameBin = emqx_utils_conv:bin(Name),
|
||||
{ok, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqx_schema_registry:delete_schema(Name),
|
||||
#{
|
||||
?snk_kind := "schema_registry_serde_deleted",
|
||||
name := NameBin
|
||||
}
|
||||
)
|
||||
end,
|
||||
emqx_schema_registry:list_schemas()
|
||||
).
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Expose `client_attrs` to rule engine and rule events.
|
|
@ -0,0 +1 @@
|
|||
Fixed an issue where creating a rule with a null id via the HTTP API was allowed, which could lead to an inconsistent configuration.
|
2
mix.exs
2
mix.exs
|
@ -184,7 +184,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}
|
||||
def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.12.0", override: true}
|
||||
def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}
|
||||
def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.2", override: true}
|
||||
def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.3", override: true}
|
||||
def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true}
|
||||
# in conflict by ehttpc and emqtt
|
||||
def common_dep(:gun), do: {:gun, github: "emqx/gun", tag: "1.3.11", override: true}
|
||||
|
|
|
@ -98,7 +98,7 @@
|
|||
{system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.5"}}},
|
||||
{getopt, "1.0.2"},
|
||||
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.3"}}},
|
||||
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
|
||||
{esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.1"}}},
|
||||
{jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}},
|
||||
|
|
Loading…
Reference in New Issue