Compare commits
19 Commits
master
...
release-58
Author | SHA1 | Date |
---|---|---|
![]() |
9d6954cf60 | |
![]() |
9a2f878017 | |
![]() |
c35661f484 | |
![]() |
9abdff60a1 | |
![]() |
a849e6df4c | |
![]() |
c00b178b57 | |
![]() |
f085973778 | |
![]() |
a8882bd7fd | |
![]() |
7711307909 | |
![]() |
6849801293 | |
![]() |
6c4cfeed92 | |
![]() |
de9e619c96 | |
![]() |
8d88d14f0a | |
![]() |
ff72d55491 | |
![]() |
42e4a635e0 | |
![]() |
26ddc403c8 | |
![]() |
4971fd3eaf | |
![]() |
5b15886836 | |
![]() |
10dadbad3b |
|
@ -31,7 +31,7 @@
|
||||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}},
|
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}},
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
|
||||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},
|
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.3"}}},
|
||||||
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
|
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
|
||||||
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
||||||
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
|
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
|
||||||
|
|
|
@ -621,16 +621,16 @@ save_to_config_map(Conf, RawConf) ->
|
||||||
?MODULE:put_raw(RawConf).
|
?MODULE:put_raw(RawConf).
|
||||||
|
|
||||||
-spec save_to_override_conf(boolean(), raw_config(), update_opts()) -> ok | {error, term()}.
|
-spec save_to_override_conf(boolean(), raw_config(), update_opts()) -> ok | {error, term()}.
|
||||||
save_to_override_conf(_, undefined, _) ->
|
save_to_override_conf(_HasDeprecatedFile, undefined, _) ->
|
||||||
ok;
|
ok;
|
||||||
save_to_override_conf(true, RawConf, Opts) ->
|
save_to_override_conf(true = _HasDeprecatedFile, RawConf, Opts) ->
|
||||||
case deprecated_conf_file(Opts) of
|
case deprecated_conf_file(Opts) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
FileName ->
|
FileName ->
|
||||||
backup_and_write(FileName, hocon_pp:do(RawConf, Opts))
|
backup_and_write(FileName, hocon_pp:do(RawConf, Opts))
|
||||||
end;
|
end;
|
||||||
save_to_override_conf(false, RawConf, Opts) ->
|
save_to_override_conf(false = _HasDeprecatedFile, RawConf, Opts) ->
|
||||||
case cluster_hocon_file() of
|
case cluster_hocon_file() of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
|
|
|
@ -689,18 +689,9 @@ all() ->
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
%% TODO: Remove once builtin-local supports preconditions + atomic batches.
|
|
||||||
BuiltinLocalTCs =
|
|
||||||
TCs --
|
|
||||||
[
|
[
|
||||||
t_09_atomic_store_batch,
|
{builtin_local, TCs},
|
||||||
t_11_batch_preconditions,
|
{builtin_raft, TCs}
|
||||||
t_12_batch_precondition_conflicts
|
|
||||||
],
|
|
||||||
BuiltinRaftTCs = TCs,
|
|
||||||
[
|
|
||||||
{builtin_local, BuiltinLocalTCs},
|
|
||||||
{builtin_raft, BuiltinRaftTCs}
|
|
||||||
].
|
].
|
||||||
|
|
||||||
init_per_group(builtin_local, Config) ->
|
init_per_group(builtin_local, Config) ->
|
||||||
|
|
|
@ -49,7 +49,9 @@
|
||||||
%% Internal exports:
|
%% Internal exports:
|
||||||
-export([
|
-export([
|
||||||
do_next/3,
|
do_next/3,
|
||||||
do_delete_next/4
|
do_delete_next/4,
|
||||||
|
%% Used by batch serializer
|
||||||
|
make_batch/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
|
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
|
||||||
|
@ -88,7 +90,10 @@
|
||||||
#{
|
#{
|
||||||
backend := builtin_local,
|
backend := builtin_local,
|
||||||
storage := emqx_ds_storage_layer:prototype(),
|
storage := emqx_ds_storage_layer:prototype(),
|
||||||
n_shards := pos_integer()
|
n_shards := pos_integer(),
|
||||||
|
%% Inherited from `emqx_ds:generic_db_opts()`.
|
||||||
|
force_monotonic_timestamps => boolean(),
|
||||||
|
atomic_batches => boolean()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}.
|
-type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}.
|
||||||
|
@ -193,9 +198,17 @@ drop_db(DB) ->
|
||||||
),
|
),
|
||||||
emqx_ds_builtin_local_meta:drop_db(DB).
|
emqx_ds_builtin_local_meta:drop_db(DB).
|
||||||
|
|
||||||
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
-spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
store_batch(DB, Messages, Opts) ->
|
store_batch(DB, Batch, Opts) ->
|
||||||
|
case emqx_ds_builtin_local_meta:db_config(DB) of
|
||||||
|
#{atomic_batches := true} ->
|
||||||
|
store_batch_atomic(DB, Batch, Opts);
|
||||||
|
_ ->
|
||||||
|
store_batch_buffered(DB, Batch, Opts)
|
||||||
|
end.
|
||||||
|
|
||||||
|
store_batch_buffered(DB, Messages, Opts) ->
|
||||||
try
|
try
|
||||||
emqx_ds_buffer:store_batch(DB, Messages, Opts)
|
emqx_ds_buffer:store_batch(DB, Messages, Opts)
|
||||||
catch
|
catch
|
||||||
|
@ -203,6 +216,34 @@ store_batch(DB, Messages, Opts) ->
|
||||||
{error, recoverable, Reason}
|
{error, recoverable, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
store_batch_atomic(DB, Batch, Opts) ->
|
||||||
|
Shards = shards_of_batch(DB, Batch),
|
||||||
|
case Shards of
|
||||||
|
[Shard] ->
|
||||||
|
emqx_ds_builtin_local_batch_serializer:store_batch_atomic(DB, Shard, Batch, Opts);
|
||||||
|
[] ->
|
||||||
|
ok;
|
||||||
|
[_ | _] ->
|
||||||
|
{error, unrecoverable, atomic_batch_spans_multiple_shards}
|
||||||
|
end.
|
||||||
|
|
||||||
|
shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) ->
|
||||||
|
shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, []));
|
||||||
|
shards_of_batch(DB, Operations) ->
|
||||||
|
shards_of_batch(DB, Operations, []).
|
||||||
|
|
||||||
|
shards_of_batch(DB, [Operation | Rest], Acc) ->
|
||||||
|
case shard_of_operation(DB, Operation, clientid, #{}) of
|
||||||
|
Shard when Shard =:= hd(Acc) ->
|
||||||
|
shards_of_batch(DB, Rest, Acc);
|
||||||
|
Shard when Acc =:= [] ->
|
||||||
|
shards_of_batch(DB, Rest, [Shard]);
|
||||||
|
ShardAnother ->
|
||||||
|
[ShardAnother | Acc]
|
||||||
|
end;
|
||||||
|
shards_of_batch(_DB, [], Acc) ->
|
||||||
|
Acc.
|
||||||
|
|
||||||
-record(bs, {options :: emqx_ds:create_db_opts()}).
|
-record(bs, {options :: emqx_ds:create_db_opts()}).
|
||||||
-type buffer_state() :: #bs{}.
|
-type buffer_state() :: #bs{}.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,122 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_ds_builtin_local_batch_serializer).
|
||||||
|
|
||||||
|
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([
|
||||||
|
start_link/3,
|
||||||
|
|
||||||
|
store_batch_atomic/4
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% `gen_server' API
|
||||||
|
-export([
|
||||||
|
init/1,
|
||||||
|
handle_call/3,
|
||||||
|
handle_cast/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Type declarations
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(name(DB, SHARD), {n, l, {?MODULE, DB, SHARD}}).
|
||||||
|
-define(via(DB, SHARD), {via, gproc, ?name(DB, SHARD)}).
|
||||||
|
|
||||||
|
-record(store_batch_atomic, {batch :: emqx_ds:batch(), opts :: emqx_ds:message_store_opts()}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
start_link(DB, Shard, _Opts) ->
|
||||||
|
gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
|
||||||
|
|
||||||
|
store_batch_atomic(DB, Shard, Batch, Opts) ->
|
||||||
|
gen_server:call(?via(DB, Shard), #store_batch_atomic{batch = Batch, opts = Opts}, infinity).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% `gen_server' API
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
init([DB, Shard]) ->
|
||||||
|
process_flag(message_queue_data, off_heap),
|
||||||
|
State = #{
|
||||||
|
db => DB,
|
||||||
|
shard => Shard
|
||||||
|
},
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
handle_call(#store_batch_atomic{batch = Batch, opts = StoreOpts}, _From, State) ->
|
||||||
|
ShardId = shard_id(State),
|
||||||
|
DBOpts = db_config(State),
|
||||||
|
Result = do_store_batch_atomic(ShardId, Batch, DBOpts, StoreOpts),
|
||||||
|
{reply, Result, State};
|
||||||
|
handle_call(Call, _From, State) ->
|
||||||
|
{reply, {error, {unknown_call, Call}}, State}.
|
||||||
|
|
||||||
|
handle_cast(_Cast, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Internal fns
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
shard_id(#{db := DB, shard := Shard}) ->
|
||||||
|
{DB, Shard}.
|
||||||
|
|
||||||
|
db_config(#{db := DB}) ->
|
||||||
|
emqx_ds_builtin_local_meta:db_config(DB).
|
||||||
|
|
||||||
|
-spec do_store_batch_atomic(
|
||||||
|
emqx_ds_storage_layer:shard_id(),
|
||||||
|
emqx_ds:dsbatch(),
|
||||||
|
emqx_ds_builtin_local:db_opts(),
|
||||||
|
emqx_ds:message_store_opts()
|
||||||
|
) ->
|
||||||
|
emqx_ds:store_batch_result().
|
||||||
|
do_store_batch_atomic(ShardId, #dsbatch{} = Batch, DBOpts, StoreOpts) ->
|
||||||
|
#dsbatch{
|
||||||
|
operations = Operations0,
|
||||||
|
preconditions = Preconditions
|
||||||
|
} = Batch,
|
||||||
|
case emqx_ds_precondition:verify(emqx_ds_storage_layer, ShardId, Preconditions) of
|
||||||
|
ok ->
|
||||||
|
do_store_operations(ShardId, Operations0, DBOpts, StoreOpts);
|
||||||
|
{precondition_failed, _} = PreconditionFailed ->
|
||||||
|
{error, unrecoverable, PreconditionFailed};
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end;
|
||||||
|
do_store_batch_atomic(ShardId, Operations, DBOpts, StoreOpts) ->
|
||||||
|
do_store_operations(ShardId, Operations, DBOpts, StoreOpts).
|
||||||
|
|
||||||
|
do_store_operations(ShardId, Operations0, DBOpts, _StoreOpts) ->
|
||||||
|
ForceMonotonic = maps:get(force_monotonic_timestamps, DBOpts),
|
||||||
|
{Latest, Operations} =
|
||||||
|
emqx_ds_builtin_local:make_batch(
|
||||||
|
ForceMonotonic,
|
||||||
|
current_timestamp(ShardId),
|
||||||
|
Operations0
|
||||||
|
),
|
||||||
|
Result = emqx_ds_storage_layer:store_batch(ShardId, Operations, _Options = #{}),
|
||||||
|
emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest),
|
||||||
|
Result.
|
||||||
|
|
||||||
|
current_timestamp(ShardId) ->
|
||||||
|
emqx_ds_builtin_local_meta:current_timestamp(ShardId).
|
|
@ -158,7 +158,8 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
|
||||||
Opts = emqx_ds_builtin_local_meta:db_config(DB),
|
Opts = emqx_ds_builtin_local_meta:db_config(DB),
|
||||||
Children = [
|
Children = [
|
||||||
shard_storage_spec(DB, Shard, Opts),
|
shard_storage_spec(DB, Shard, Opts),
|
||||||
shard_buffer_spec(DB, Shard, Opts)
|
shard_buffer_spec(DB, Shard, Opts),
|
||||||
|
shard_batch_serializer_spec(DB, Shard, Opts)
|
||||||
],
|
],
|
||||||
{ok, {SupFlags, Children}}.
|
{ok, {SupFlags, Children}}.
|
||||||
|
|
||||||
|
@ -208,6 +209,15 @@ shard_buffer_spec(DB, Shard, Options) ->
|
||||||
type => worker
|
type => worker
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
shard_batch_serializer_spec(DB, Shard, Opts) ->
|
||||||
|
#{
|
||||||
|
id => {Shard, batch_serializer},
|
||||||
|
start => {emqx_ds_builtin_local_batch_serializer, start_link, [DB, Shard, Opts]},
|
||||||
|
shutdown => 5_000,
|
||||||
|
restart => permanent,
|
||||||
|
type => worker
|
||||||
|
}.
|
||||||
|
|
||||||
ensure_started(Res) ->
|
ensure_started(Res) ->
|
||||||
case Res of
|
case Res of
|
||||||
{ok, _Pid} ->
|
{ok, _Pid} ->
|
||||||
|
|
|
@ -479,10 +479,10 @@ shards_of_batch(_DB, [], Acc) ->
|
||||||
%% TODO
|
%% TODO
|
||||||
%% There's a possibility of race condition: storage may shut down right after we
|
%% There's a possibility of race condition: storage may shut down right after we
|
||||||
%% ask for its status.
|
%% ask for its status.
|
||||||
-define(IF_STORAGE_RUNNING(SHARDID, EXPR),
|
-define(IF_SHARD_READY(DB, SHARD, EXPR),
|
||||||
case emqx_ds_storage_layer:shard_info(SHARDID, status) of
|
case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of
|
||||||
running -> EXPR;
|
true -> EXPR;
|
||||||
down -> {error, recoverable, storage_down}
|
false -> {error, recoverable, shard_unavailable}
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -525,8 +525,9 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
|
||||||
[{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down).
|
[{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down).
|
||||||
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
|
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
|
||||||
ShardId = {DB, Shard},
|
ShardId = {DB, Shard},
|
||||||
?IF_STORAGE_RUNNING(
|
?IF_SHARD_READY(
|
||||||
ShardId,
|
DB,
|
||||||
|
Shard,
|
||||||
emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
|
emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -552,8 +553,9 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
|
||||||
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
|
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
|
||||||
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
|
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
|
||||||
ShardId = {DB, Shard},
|
ShardId = {DB, Shard},
|
||||||
?IF_STORAGE_RUNNING(
|
?IF_SHARD_READY(
|
||||||
ShardId,
|
DB,
|
||||||
|
Shard,
|
||||||
emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
|
emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -587,8 +589,9 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
|
||||||
emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
|
emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
|
||||||
do_next_v1(DB, Shard, Iter, BatchSize) ->
|
do_next_v1(DB, Shard, Iter, BatchSize) ->
|
||||||
ShardId = {DB, Shard},
|
ShardId = {DB, Shard},
|
||||||
?IF_STORAGE_RUNNING(
|
?IF_SHARD_READY(
|
||||||
ShardId,
|
DB,
|
||||||
|
Shard,
|
||||||
emqx_ds_storage_layer:next(
|
emqx_ds_storage_layer:next(
|
||||||
ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
|
ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
|
||||||
)
|
)
|
||||||
|
@ -620,8 +623,9 @@ do_add_generation_v2(_DB) ->
|
||||||
| emqx_ds:error(storage_down).
|
| emqx_ds:error(storage_down).
|
||||||
do_list_generations_with_lifetimes_v3(DB, Shard) ->
|
do_list_generations_with_lifetimes_v3(DB, Shard) ->
|
||||||
ShardId = {DB, Shard},
|
ShardId = {DB, Shard},
|
||||||
?IF_STORAGE_RUNNING(
|
?IF_SHARD_READY(
|
||||||
ShardId,
|
DB,
|
||||||
|
Shard,
|
||||||
emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
|
emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,8 @@
|
||||||
|
|
||||||
%% Dynamic server location API
|
%% Dynamic server location API
|
||||||
-export([
|
-export([
|
||||||
servers/3
|
servers/3,
|
||||||
|
shard_info/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Safe Process Command API
|
%% Safe Process Command API
|
||||||
|
@ -38,8 +39,10 @@
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
-export([
|
-export([
|
||||||
init/1,
|
init/1,
|
||||||
|
handle_continue/2,
|
||||||
handle_call/3,
|
handle_call/3,
|
||||||
handle_cast/2,
|
handle_cast/2,
|
||||||
|
handle_info/2,
|
||||||
terminate/2
|
terminate/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -52,6 +55,9 @@
|
||||||
| {error, servers_unreachable}.
|
| {error, servers_unreachable}.
|
||||||
|
|
||||||
-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000).
|
-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000).
|
||||||
|
-define(MAX_BOOSTRAP_RETRY_TIMEOUT, 1_000).
|
||||||
|
|
||||||
|
-define(PTERM(DB, SHARD, KEY), {?MODULE, DB, SHARD, KEY}).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
@ -160,6 +166,12 @@ local_site() ->
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), _Info) -> _Value.
|
||||||
|
shard_info(DB, Shard, ready) ->
|
||||||
|
get_shard_info(DB, Shard, ready, false).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
-spec process_command([server()], _Command, timeout()) ->
|
-spec process_command([server()], _Command, timeout()) ->
|
||||||
{ok, _Result, _Leader :: server()} | server_error().
|
{ok, _Result, _Leader :: server()} | server_error().
|
||||||
process_command(Servers, Command, Timeout) ->
|
process_command(Servers, Command, Timeout) ->
|
||||||
|
@ -324,10 +336,45 @@ ra_overview(Server) ->
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
-record(st, {
|
||||||
|
db :: emqx_ds:db(),
|
||||||
|
shard :: emqx_ds_replication_layer:shard_id(),
|
||||||
|
server :: server(),
|
||||||
|
bootstrapped :: boolean(),
|
||||||
|
stage :: term()
|
||||||
|
}).
|
||||||
|
|
||||||
init({DB, Shard, Opts}) ->
|
init({DB, Shard, Opts}) ->
|
||||||
_ = process_flag(trap_exit, true),
|
_ = process_flag(trap_exit, true),
|
||||||
ok = start_server(DB, Shard, Opts),
|
case start_server(DB, Shard, Opts) of
|
||||||
{ok, {DB, Shard}}.
|
{_New = true, Server} ->
|
||||||
|
NextStage = trigger_election;
|
||||||
|
{_New = false, Server} ->
|
||||||
|
NextStage = wait_leader
|
||||||
|
end,
|
||||||
|
St = #st{
|
||||||
|
db = DB,
|
||||||
|
shard = Shard,
|
||||||
|
server = Server,
|
||||||
|
bootstrapped = false,
|
||||||
|
stage = NextStage
|
||||||
|
},
|
||||||
|
{ok, St, {continue, bootstrap}}.
|
||||||
|
|
||||||
|
handle_continue(bootstrap, St = #st{bootstrapped = true}) ->
|
||||||
|
{noreply, St};
|
||||||
|
handle_continue(bootstrap, St0 = #st{db = DB, shard = Shard, stage = Stage}) ->
|
||||||
|
?tp(emqx_ds_replshard_bootstrapping, #{db => DB, shard => Shard, stage => Stage}),
|
||||||
|
case bootstrap(St0) of
|
||||||
|
St = #st{bootstrapped = true} ->
|
||||||
|
?tp(emqx_ds_replshard_bootstrapped, #{db => DB, shard => Shard}),
|
||||||
|
{noreply, St};
|
||||||
|
St = #st{bootstrapped = false} ->
|
||||||
|
{noreply, St, {continue, bootstrap}};
|
||||||
|
{retry, Timeout, St} ->
|
||||||
|
_TRef = erlang:start_timer(Timeout, self(), bootstrap),
|
||||||
|
{noreply, St}
|
||||||
|
end.
|
||||||
|
|
||||||
handle_call(_Call, _From, State) ->
|
handle_call(_Call, _From, State) ->
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
@ -335,7 +382,14 @@ handle_call(_Call, _From, State) ->
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, {DB, Shard}) ->
|
handle_info({timeout, _TRef, bootstrap}, St) ->
|
||||||
|
{noreply, St, {continue, bootstrap}};
|
||||||
|
handle_info(_Info, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, #st{db = DB, shard = Shard}) ->
|
||||||
|
%% NOTE: Mark as not ready right away.
|
||||||
|
ok = erase_shard_info(DB, Shard),
|
||||||
%% NOTE: Timeouts are ignored, it's a best effort attempt.
|
%% NOTE: Timeouts are ignored, it's a best effort attempt.
|
||||||
catch prep_stop_server(DB, Shard),
|
catch prep_stop_server(DB, Shard),
|
||||||
LocalServer = get_local_server(DB, Shard),
|
LocalServer = get_local_server(DB, Shard),
|
||||||
|
@ -343,6 +397,40 @@ terminate(_Reason, {DB, Shard}) ->
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
bootstrap(St = #st{stage = trigger_election, server = Server}) ->
|
||||||
|
ok = trigger_election(Server),
|
||||||
|
St#st{stage = wait_leader};
|
||||||
|
bootstrap(St = #st{stage = wait_leader, server = Server}) ->
|
||||||
|
case current_leader(Server) of
|
||||||
|
Leader = {_, _} ->
|
||||||
|
St#st{stage = {wait_log, Leader}};
|
||||||
|
unknown ->
|
||||||
|
St
|
||||||
|
end;
|
||||||
|
bootstrap(St = #st{stage = {wait_log, Leader}}) ->
|
||||||
|
case ra_overview(Leader) of
|
||||||
|
#{commit_index := RaftIdx} ->
|
||||||
|
St#st{stage = {wait_log_index, RaftIdx}};
|
||||||
|
#{} ->
|
||||||
|
St#st{stage = wait_leader}
|
||||||
|
end;
|
||||||
|
bootstrap(St = #st{stage = {wait_log_index, RaftIdx}, db = DB, shard = Shard, server = Server}) ->
|
||||||
|
Overview = ra_overview(Server),
|
||||||
|
case maps:get(last_applied, Overview, 0) of
|
||||||
|
LastApplied when LastApplied >= RaftIdx ->
|
||||||
|
ok = announce_shard_ready(DB, Shard),
|
||||||
|
St#st{bootstrapped = true, stage = undefined};
|
||||||
|
LastApplied ->
|
||||||
|
%% NOTE
|
||||||
|
%% Blunt estimate of time shard needs to catch up. If this proves to be too long in
|
||||||
|
%% practice, it's could be augmented with handling `recover` -> `follower` Ra
|
||||||
|
%% member state transition.
|
||||||
|
Timeout = min(RaftIdx - LastApplied, ?MAX_BOOSTRAP_RETRY_TIMEOUT),
|
||||||
|
{retry, Timeout, St}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
||||||
ClusterName = cluster_name(DB, Shard),
|
ClusterName = cluster_name(DB, Shard),
|
||||||
LocalServer = local_server(DB, Shard),
|
LocalServer = local_server(DB, Shard),
|
||||||
|
@ -350,7 +438,6 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
||||||
MutableConfig = #{tick_timeout => 100},
|
MutableConfig = #{tick_timeout => 100},
|
||||||
case ra:restart_server(DB, LocalServer, MutableConfig) of
|
case ra:restart_server(DB, LocalServer, MutableConfig) of
|
||||||
{error, name_not_registered} ->
|
{error, name_not_registered} ->
|
||||||
Bootstrap = true,
|
|
||||||
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
|
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
|
||||||
LogOpts = maps:with(
|
LogOpts = maps:with(
|
||||||
[
|
[
|
||||||
|
@ -366,30 +453,34 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
|
||||||
initial_members => Servers,
|
initial_members => Servers,
|
||||||
machine => Machine,
|
machine => Machine,
|
||||||
log_init_args => LogOpts
|
log_init_args => LogOpts
|
||||||
});
|
}),
|
||||||
|
{_NewServer = true, LocalServer};
|
||||||
ok ->
|
ok ->
|
||||||
Bootstrap = false;
|
{_NewServer = false, LocalServer};
|
||||||
{error, {already_started, _}} ->
|
{error, {already_started, _}} ->
|
||||||
Bootstrap = false
|
{_NewServer = false, LocalServer}
|
||||||
end,
|
end.
|
||||||
|
|
||||||
|
trigger_election(Server) ->
|
||||||
%% NOTE
|
%% NOTE
|
||||||
%% Triggering election is necessary when a new consensus group is being brought up.
|
%% Triggering election is necessary when a new consensus group is being brought up.
|
||||||
%% TODO
|
%% TODO
|
||||||
%% It's probably a good idea to rebalance leaders across the cluster from time to
|
%% It's probably a good idea to rebalance leaders across the cluster from time to
|
||||||
%% time. There's `ra:transfer_leadership/2` for that.
|
%% time. There's `ra:transfer_leadership/2` for that.
|
||||||
try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of
|
try ra:trigger_election(Server) of
|
||||||
false ->
|
ok -> ok
|
||||||
ok;
|
|
||||||
ok ->
|
|
||||||
ok
|
|
||||||
catch
|
catch
|
||||||
%% TODO
|
%% NOTE
|
||||||
%% Tolerating exceptions because server might be occupied with log replay for
|
%% Tolerating exceptions because server might be occupied with log replay for
|
||||||
%% a while.
|
%% a while.
|
||||||
exit:{timeout, _} when not Bootstrap ->
|
exit:{timeout, _} ->
|
||||||
|
?tp(emqx_ds_replshard_trigger_election, #{server => Server, error => timeout}),
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
announce_shard_ready(DB, Shard) ->
|
||||||
|
set_shard_info(DB, Shard, ready, true).
|
||||||
|
|
||||||
server_uid(_DB, Shard) ->
|
server_uid(_DB, Shard) ->
|
||||||
%% NOTE
|
%% NOTE
|
||||||
%% Each new "instance" of a server should have a unique identifier. Otherwise,
|
%% Each new "instance" of a server should have a unique identifier. Otherwise,
|
||||||
|
@ -402,6 +493,22 @@ server_uid(_DB, Shard) ->
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
get_shard_info(DB, Shard, K, Default) ->
|
||||||
|
persistent_term:get(?PTERM(DB, Shard, K), Default).
|
||||||
|
|
||||||
|
set_shard_info(DB, Shard, K, V) ->
|
||||||
|
persistent_term:put(?PTERM(DB, Shard, K), V).
|
||||||
|
|
||||||
|
erase_shard_info(DB, Shard) ->
|
||||||
|
lists:foreach(fun(K) -> erase_shard_info(DB, Shard, K) end, [
|
||||||
|
ready
|
||||||
|
]).
|
||||||
|
|
||||||
|
erase_shard_info(DB, Shard, K) ->
|
||||||
|
persistent_term:erase(?PTERM(DB, Shard, K)).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
prep_stop_server(DB, Shard) ->
|
prep_stop_server(DB, Shard) ->
|
||||||
prep_stop_server(DB, Shard, 5_000).
|
prep_stop_server(DB, Shard, 5_000).
|
||||||
|
|
||||||
|
|
|
@ -131,7 +131,6 @@ t_replication_transfers_snapshots(Config) ->
|
||||||
%% Initialize DB on all nodes and wait for it to be online.
|
%% Initialize DB on all nodes and wait for it to be online.
|
||||||
Opts = opts(Config, #{n_shards => 1, n_sites => 3}),
|
Opts = opts(Config, #{n_shards => 1, n_sites => 3}),
|
||||||
assert_db_open(Nodes, ?DB, Opts),
|
assert_db_open(Nodes, ?DB, Opts),
|
||||||
assert_db_stable(Nodes, ?DB),
|
|
||||||
|
|
||||||
%% Stop the DB on the "offline" node.
|
%% Stop the DB on the "offline" node.
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
|
@ -207,7 +206,6 @@ t_rebalance(Config) ->
|
||||||
%% 1. Initialize DB on the first node.
|
%% 1. Initialize DB on the first node.
|
||||||
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
|
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
|
||||||
assert_db_open(Nodes, ?DB, Opts),
|
assert_db_open(Nodes, ?DB, Opts),
|
||||||
assert_db_stable(Nodes, ?DB),
|
|
||||||
|
|
||||||
%% 1.1 Kick all sites except S1 from the replica set as
|
%% 1.1 Kick all sites except S1 from the replica set as
|
||||||
%% the initial condition:
|
%% the initial condition:
|
||||||
|
@ -419,7 +417,6 @@ t_rebalance_chaotic_converges(Config) ->
|
||||||
|
|
||||||
%% Open DB:
|
%% Open DB:
|
||||||
assert_db_open(Nodes, ?DB, Opts),
|
assert_db_open(Nodes, ?DB, Opts),
|
||||||
assert_db_stable(Nodes, ?DB),
|
|
||||||
|
|
||||||
%% Kick N3 from the replica set as the initial condition:
|
%% Kick N3 from the replica set as the initial condition:
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
@ -503,7 +500,6 @@ t_rebalance_offline_restarts(Config) ->
|
||||||
%% Initialize DB on all 3 nodes.
|
%% Initialize DB on all 3 nodes.
|
||||||
Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}),
|
Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}),
|
||||||
assert_db_open(Nodes, ?DB, Opts),
|
assert_db_open(Nodes, ?DB, Opts),
|
||||||
assert_db_stable(Nodes, ?DB),
|
|
||||||
|
|
||||||
?retry(
|
?retry(
|
||||||
1000,
|
1000,
|
||||||
|
@ -845,13 +841,11 @@ t_crash_restart_recover(Config) ->
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
%% Initialize DB on all nodes.
|
%% Initialize DB on all nodes.
|
||||||
?assertEqual(
|
assert_db_open(Nodes, ?DB, DBOpts),
|
||||||
[{ok, ok} || _ <- Nodes],
|
|
||||||
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts])
|
|
||||||
),
|
|
||||||
|
|
||||||
%% Apply the test events, including simulated node crashes.
|
%% Apply the test events, including simulated node crashes.
|
||||||
NodeStream = emqx_utils_stream:const(N1),
|
NodeStream = emqx_utils_stream:const(N1),
|
||||||
|
StartedAt = erlang:monotonic_time(millisecond),
|
||||||
emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0),
|
emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0),
|
||||||
|
|
||||||
%% It's expected to lose few messages when leaders are abruptly killed.
|
%% It's expected to lose few messages when leaders are abruptly killed.
|
||||||
|
@ -865,6 +859,10 @@ t_crash_restart_recover(Config) ->
|
||||||
ct:pal("Some messages were lost: ~p", [LostMessages]),
|
ct:pal("Some messages were lost: ~p", [LostMessages]),
|
||||||
?assert(length(LostMessages) < NMsgs div 20),
|
?assert(length(LostMessages) < NMsgs div 20),
|
||||||
|
|
||||||
|
%% Wait until crashed nodes are ready.
|
||||||
|
SinceStarted = erlang:monotonic_time(millisecond) - StartedAt,
|
||||||
|
wait_db_bootstrapped([N2, N3], ?DB, infinity, SinceStarted),
|
||||||
|
|
||||||
%% Verify that all the successfully persisted messages are there.
|
%% Verify that all the successfully persisted messages are there.
|
||||||
VerifyClient = fun({ClientId, ExpectedStream}) ->
|
VerifyClient = fun({ClientId, ExpectedStream}) ->
|
||||||
Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId),
|
Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId),
|
||||||
|
@ -926,7 +924,8 @@ assert_db_open(Nodes, DB, Opts) ->
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[{ok, ok} || _ <- Nodes],
|
[{ok, ok} || _ <- Nodes],
|
||||||
erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts])
|
erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts])
|
||||||
).
|
),
|
||||||
|
wait_db_bootstrapped(Nodes, ?DB).
|
||||||
|
|
||||||
assert_db_stable([Node | _], DB) ->
|
assert_db_stable([Node | _], DB) ->
|
||||||
Shards = ds_repl_meta(Node, shards, [DB]),
|
Shards = ds_repl_meta(Node, shards, [DB]),
|
||||||
|
@ -935,6 +934,32 @@ assert_db_stable([Node | _], DB) ->
|
||||||
db_leadership(Node, DB, Shards)
|
db_leadership(Node, DB, Shards)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
wait_db_bootstrapped(Nodes, DB) ->
|
||||||
|
wait_db_bootstrapped(Nodes, DB, infinity, infinity).
|
||||||
|
|
||||||
|
wait_db_bootstrapped(Nodes, DB, Timeout, BackInTime) ->
|
||||||
|
SRefs = [
|
||||||
|
snabbkaffe:subscribe(
|
||||||
|
?match_event(#{
|
||||||
|
?snk_kind := emqx_ds_replshard_bootstrapped,
|
||||||
|
?snk_meta := #{node := Node},
|
||||||
|
db := DB,
|
||||||
|
shard := Shard
|
||||||
|
}),
|
||||||
|
1,
|
||||||
|
Timeout,
|
||||||
|
BackInTime
|
||||||
|
)
|
||||||
|
|| Node <- Nodes,
|
||||||
|
Shard <- ds_repl_meta(Node, my_shards, [DB])
|
||||||
|
],
|
||||||
|
lists:foreach(
|
||||||
|
fun({ok, SRef}) ->
|
||||||
|
?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef))
|
||||||
|
end,
|
||||||
|
SRefs
|
||||||
|
).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
db_leadership(Node, DB, Shards) ->
|
db_leadership(Node, DB, Shards) ->
|
||||||
|
|
|
@ -55,6 +55,7 @@
|
||||||
topic_filter/0,
|
topic_filter/0,
|
||||||
topic/0,
|
topic/0,
|
||||||
batch/0,
|
batch/0,
|
||||||
|
dsbatch/0,
|
||||||
operation/0,
|
operation/0,
|
||||||
deletion/0,
|
deletion/0,
|
||||||
precondition/0,
|
precondition/0,
|
||||||
|
@ -104,7 +105,9 @@
|
||||||
-type message_matcher(Payload) :: #message_matcher{payload :: Payload}.
|
-type message_matcher(Payload) :: #message_matcher{payload :: Payload}.
|
||||||
|
|
||||||
%% A batch of storage operations.
|
%% A batch of storage operations.
|
||||||
-type batch() :: [operation()] | #dsbatch{}.
|
-type batch() :: [operation()] | dsbatch().
|
||||||
|
|
||||||
|
-type dsbatch() :: #dsbatch{}.
|
||||||
|
|
||||||
-type operation() ::
|
-type operation() ::
|
||||||
%% Store a message.
|
%% Store a message.
|
||||||
|
|
|
@ -97,7 +97,7 @@ broker(_) ->
|
||||||
%% @doc Cluster with other nodes
|
%% @doc Cluster with other nodes
|
||||||
|
|
||||||
cluster(["join", SNode]) ->
|
cluster(["join", SNode]) ->
|
||||||
case mria:join(ekka_node:parse_name(SNode)) of
|
case ekka:join(ekka_node:parse_name(SNode)) of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_ctl:print("Join the cluster successfully.~n"),
|
emqx_ctl:print("Join the cluster successfully.~n"),
|
||||||
%% FIXME: running status on the replicant immediately
|
%% FIXME: running status on the replicant immediately
|
||||||
|
@ -112,7 +112,7 @@ cluster(["join", SNode]) ->
|
||||||
end;
|
end;
|
||||||
cluster(["leave"]) ->
|
cluster(["leave"]) ->
|
||||||
_ = maybe_disable_autocluster(),
|
_ = maybe_disable_autocluster(),
|
||||||
case mria:leave() of
|
case ekka:leave() of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_ctl:print("Leave the cluster successfully.~n"),
|
emqx_ctl:print("Leave the cluster successfully.~n"),
|
||||||
cluster(["status"]);
|
cluster(["status"]);
|
||||||
|
@ -121,7 +121,7 @@ cluster(["leave"]) ->
|
||||||
end;
|
end;
|
||||||
cluster(["force-leave", SNode]) ->
|
cluster(["force-leave", SNode]) ->
|
||||||
Node = ekka_node:parse_name(SNode),
|
Node = ekka_node:parse_name(SNode),
|
||||||
case mria:force_leave(Node) of
|
case ekka:force_leave(Node) of
|
||||||
ok ->
|
ok ->
|
||||||
case emqx_cluster_rpc:force_leave_clean(Node) of
|
case emqx_cluster_rpc:force_leave_clean(Node) of
|
||||||
ok ->
|
ok ->
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("hocon/include/hoconsc.hrl").
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
|
||||||
|
|
||||||
-behaviour(minirest_api).
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
|
@ -385,16 +386,16 @@ param_path_id() ->
|
||||||
case maps:get(<<"id">>, Params0, list_to_binary(emqx_utils:gen_id(8))) of
|
case maps:get(<<"id">>, Params0, list_to_binary(emqx_utils:gen_id(8))) of
|
||||||
<<>> ->
|
<<>> ->
|
||||||
{400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
|
{400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
|
||||||
Id ->
|
Id when is_binary(Id) ->
|
||||||
Params = filter_out_request_body(add_metadata(Params0)),
|
Params = filter_out_request_body(add_metadata(Params0)),
|
||||||
case emqx_rule_engine:get_rule(Id) of
|
case emqx_rule_engine:get_rule(Id) of
|
||||||
{ok, _Rule} ->
|
{ok, _Rule} ->
|
||||||
{400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
|
?BAD_REQUEST(<<"rule id already exists">>);
|
||||||
not_found ->
|
not_found ->
|
||||||
ConfPath = ?RULE_PATH(Id),
|
ConfPath = ?RULE_PATH(Id),
|
||||||
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
|
case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
|
||||||
{ok, #{post_config_update := #{emqx_rule_engine := Rule}}} ->
|
{ok, #{post_config_update := #{emqx_rule_engine := Rule}}} ->
|
||||||
{201, format_rule_info_resp(Rule)};
|
?CREATED(format_rule_info_resp(Rule));
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(
|
?SLOG(
|
||||||
info,
|
info,
|
||||||
|
@ -405,9 +406,11 @@ param_path_id() ->
|
||||||
},
|
},
|
||||||
#{tag => ?TAG}
|
#{tag => ?TAG}
|
||||||
),
|
),
|
||||||
{400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
|
?BAD_REQUEST(?ERR_BADARGS(Reason))
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
end;
|
||||||
|
_BadId ->
|
||||||
|
?BAD_REQUEST(<<"rule id must be a string">>)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
'/rule_test'(post, #{body := Params}) ->
|
'/rule_test'(post, #{body := Params}) ->
|
||||||
|
|
|
@ -334,13 +334,14 @@ eventmsg_publish(
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp,
|
||||||
|
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||||
},
|
},
|
||||||
#{headers => Headers}
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_connected(
|
eventmsg_connected(
|
||||||
_ClientInfo = #{
|
ClientInfo = #{
|
||||||
clientid := ClientId,
|
clientid := ClientId,
|
||||||
username := Username,
|
username := Username,
|
||||||
is_bridge := IsBridge,
|
is_bridge := IsBridge,
|
||||||
|
@ -375,13 +376,14 @@ eventmsg_connected(
|
||||||
expiry_interval => ExpiryInterval div 1000,
|
expiry_interval => ExpiryInterval div 1000,
|
||||||
is_bridge => IsBridge,
|
is_bridge => IsBridge,
|
||||||
conn_props => printable_maps(ConnProps),
|
conn_props => printable_maps(ConnProps),
|
||||||
connected_at => ConnectedAt
|
connected_at => ConnectedAt,
|
||||||
|
client_attrs => maps:get(client_attrs, ClientInfo, #{})
|
||||||
},
|
},
|
||||||
#{}
|
#{}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_disconnected(
|
eventmsg_disconnected(
|
||||||
_ClientInfo = #{
|
ClientInfo = #{
|
||||||
clientid := ClientId,
|
clientid := ClientId,
|
||||||
username := Username
|
username := Username
|
||||||
},
|
},
|
||||||
|
@ -405,7 +407,8 @@ eventmsg_disconnected(
|
||||||
proto_name => ProtoName,
|
proto_name => ProtoName,
|
||||||
proto_ver => ProtoVer,
|
proto_ver => ProtoVer,
|
||||||
disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
|
disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
|
||||||
disconnected_at => DisconnectedAt
|
disconnected_at => DisconnectedAt,
|
||||||
|
client_attrs => maps:get(client_attrs, ClientInfo, #{})
|
||||||
},
|
},
|
||||||
#{}
|
#{}
|
||||||
).
|
).
|
||||||
|
@ -444,7 +447,7 @@ eventmsg_connack(
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_check_authz_complete(
|
eventmsg_check_authz_complete(
|
||||||
_ClientInfo = #{
|
ClientInfo = #{
|
||||||
clientid := ClientId,
|
clientid := ClientId,
|
||||||
username := Username,
|
username := Username,
|
||||||
peerhost := PeerHost,
|
peerhost := PeerHost,
|
||||||
|
@ -465,13 +468,14 @@ eventmsg_check_authz_complete(
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
action => PubSub,
|
action => PubSub,
|
||||||
authz_source => AuthzSource,
|
authz_source => AuthzSource,
|
||||||
result => Result
|
result => Result,
|
||||||
|
client_attrs => maps:get(client_attrs, ClientInfo, #{})
|
||||||
},
|
},
|
||||||
#{}
|
#{}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_check_authn_complete(
|
eventmsg_check_authn_complete(
|
||||||
_ClientInfo = #{
|
ClientInfo = #{
|
||||||
clientid := ClientId,
|
clientid := ClientId,
|
||||||
username := Username,
|
username := Username,
|
||||||
peername := PeerName
|
peername := PeerName
|
||||||
|
@ -493,14 +497,15 @@ eventmsg_check_authn_complete(
|
||||||
peername => ntoa(PeerName),
|
peername => ntoa(PeerName),
|
||||||
reason_code => force_to_bin(Reason),
|
reason_code => force_to_bin(Reason),
|
||||||
is_anonymous => IsAnonymous,
|
is_anonymous => IsAnonymous,
|
||||||
is_superuser => IsSuperuser
|
is_superuser => IsSuperuser,
|
||||||
|
client_attrs => maps:get(client_attrs, ClientInfo, #{})
|
||||||
},
|
},
|
||||||
#{}
|
#{}
|
||||||
).
|
).
|
||||||
|
|
||||||
eventmsg_sub_or_unsub(
|
eventmsg_sub_or_unsub(
|
||||||
Event,
|
Event,
|
||||||
_ClientInfo = #{
|
ClientInfo = #{
|
||||||
clientid := ClientId,
|
clientid := ClientId,
|
||||||
username := Username,
|
username := Username,
|
||||||
peerhost := PeerHost,
|
peerhost := PeerHost,
|
||||||
|
@ -519,7 +524,8 @@ eventmsg_sub_or_unsub(
|
||||||
peername => ntoa(PeerName),
|
peername => ntoa(PeerName),
|
||||||
PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
|
PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
qos => QoS
|
qos => QoS,
|
||||||
|
client_attrs => maps:get(client_attrs, ClientInfo, #{})
|
||||||
},
|
},
|
||||||
#{}
|
#{}
|
||||||
).
|
).
|
||||||
|
@ -551,7 +557,8 @@ eventmsg_dropped(
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp,
|
||||||
|
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||||
},
|
},
|
||||||
#{headers => Headers}
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
@ -583,7 +590,8 @@ eventmsg_transformation_failed(
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp,
|
||||||
|
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||||
},
|
},
|
||||||
#{headers => Headers}
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
@ -616,7 +624,8 @@ eventmsg_validation_failed(
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp,
|
||||||
|
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||||
},
|
},
|
||||||
#{headers => Headers}
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
@ -654,7 +663,8 @@ eventmsg_delivered(
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp,
|
||||||
|
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||||
},
|
},
|
||||||
#{headers => Headers}
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
@ -693,7 +703,8 @@ eventmsg_acked(
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
|
puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp,
|
||||||
|
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||||
},
|
},
|
||||||
#{headers => Headers}
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
@ -733,7 +744,8 @@ eventmsg_delivery_dropped(
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
flags => Flags,
|
flags => Flags,
|
||||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp,
|
||||||
|
client_attrs => emqx_message:get_header(client_attrs, Message, #{})
|
||||||
},
|
},
|
||||||
#{headers => Headers}
|
#{headers => Headers}
|
||||||
).
|
).
|
||||||
|
|
|
@ -112,7 +112,8 @@ groups() ->
|
||||||
t_sqlparse_undefined_variable,
|
t_sqlparse_undefined_variable,
|
||||||
t_sqlparse_new_map,
|
t_sqlparse_new_map,
|
||||||
t_sqlparse_invalid_json,
|
t_sqlparse_invalid_json,
|
||||||
t_sqlselect_as_put
|
t_sqlselect_as_put,
|
||||||
|
t_sqlselect_client_attr
|
||||||
]},
|
]},
|
||||||
{events, [], [
|
{events, [], [
|
||||||
t_events,
|
t_events,
|
||||||
|
@ -3891,6 +3892,57 @@ t_trace_rule_id(_Config) ->
|
||||||
?assertEqual([], emqx_trace_handler:running()),
|
?assertEqual([], emqx_trace_handler:running()),
|
||||||
emqtt:disconnect(T).
|
emqtt:disconnect(T).
|
||||||
|
|
||||||
|
t_sqlselect_client_attr(_) ->
|
||||||
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
{ok, Compiled} = emqx_variform:compile("user_property.group"),
|
||||||
|
emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], [
|
||||||
|
#{
|
||||||
|
expression => Compiled,
|
||||||
|
set_as_attr => <<"group">>
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
expression => Compiled,
|
||||||
|
set_as_attr => <<"group2">>
|
||||||
|
}
|
||||||
|
]),
|
||||||
|
|
||||||
|
SQL =
|
||||||
|
"SELECT client_attrs as payload FROM \"t/1\" ",
|
||||||
|
Repub = republish_action(<<"t/2">>),
|
||||||
|
{ok, _TopicRule} = emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
sql => SQL,
|
||||||
|
id => ?TMP_RULEID,
|
||||||
|
actions => [Repub]
|
||||||
|
}
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, Client} = emqtt:start_link([
|
||||||
|
{clientid, ClientId},
|
||||||
|
{proto_ver, v5},
|
||||||
|
{properties, #{'User-Property' => [{<<"group">>, <<"g1">>}]}}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, <<"t/2">>, 0),
|
||||||
|
ct:sleep(100),
|
||||||
|
emqtt:publish(Client, <<"t/1">>, <<"Hello">>),
|
||||||
|
|
||||||
|
receive
|
||||||
|
{publish, #{topic := Topic, payload := Payload}} ->
|
||||||
|
?assertEqual(<<"t/2">>, Topic),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"group">> := <<"g1">>, <<"group2">> := <<"g1">>},
|
||||||
|
emqx_utils_json:decode(Payload)
|
||||||
|
)
|
||||||
|
after 1000 ->
|
||||||
|
ct:fail(wait_for_t_2)
|
||||||
|
end,
|
||||||
|
|
||||||
|
emqtt:disconnect(Client),
|
||||||
|
emqx_rule_engine:delete_rule(?TMP_RULEID),
|
||||||
|
emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], []).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal helpers
|
%% Internal helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -3990,7 +4042,8 @@ verify_event_fields('message.publish', Fields) ->
|
||||||
flags := Flags,
|
flags := Flags,
|
||||||
pub_props := Properties,
|
pub_props := Properties,
|
||||||
timestamp := Timestamp,
|
timestamp := Timestamp,
|
||||||
publish_received_at := EventAt
|
publish_received_at := EventAt,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(millisecond),
|
||||||
TimestampElapse = Now - Timestamp,
|
TimestampElapse = Now - Timestamp,
|
||||||
|
@ -4007,7 +4060,8 @@ verify_event_fields('message.publish', Fields) ->
|
||||||
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
||||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||||
?assert(EventAt =< Timestamp);
|
?assert(EventAt =< Timestamp),
|
||||||
|
?assert(is_map(ClientAttrs));
|
||||||
verify_event_fields('client.connected', Fields) ->
|
verify_event_fields('client.connected', Fields) ->
|
||||||
#{
|
#{
|
||||||
clientid := ClientId,
|
clientid := ClientId,
|
||||||
|
@ -4023,7 +4077,8 @@ verify_event_fields('client.connected', Fields) ->
|
||||||
is_bridge := IsBridge,
|
is_bridge := IsBridge,
|
||||||
conn_props := Properties,
|
conn_props := Properties,
|
||||||
timestamp := Timestamp,
|
timestamp := Timestamp,
|
||||||
connected_at := EventAt
|
connected_at := EventAt,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(millisecond),
|
||||||
TimestampElapse = Now - Timestamp,
|
TimestampElapse = Now - Timestamp,
|
||||||
|
@ -4042,7 +4097,8 @@ verify_event_fields('client.connected', Fields) ->
|
||||||
?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
|
?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
|
||||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||||
?assert(EventAt =< Timestamp);
|
?assert(EventAt =< Timestamp),
|
||||||
|
?assert(is_map(ClientAttrs));
|
||||||
verify_event_fields('client.disconnected', Fields) ->
|
verify_event_fields('client.disconnected', Fields) ->
|
||||||
#{
|
#{
|
||||||
reason := Reason,
|
reason := Reason,
|
||||||
|
@ -4052,7 +4108,8 @@ verify_event_fields('client.disconnected', Fields) ->
|
||||||
sockname := SockName,
|
sockname := SockName,
|
||||||
disconn_props := Properties,
|
disconn_props := Properties,
|
||||||
timestamp := Timestamp,
|
timestamp := Timestamp,
|
||||||
disconnected_at := EventAt
|
disconnected_at := EventAt,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(millisecond),
|
||||||
TimestampElapse = Now - Timestamp,
|
TimestampElapse = Now - Timestamp,
|
||||||
|
@ -4065,7 +4122,8 @@ verify_event_fields('client.disconnected', Fields) ->
|
||||||
?assertMatch(#{'User-Property' := #{<<"reason">> := <<"normal">>}}, Properties),
|
?assertMatch(#{'User-Property' := #{<<"reason">> := <<"normal">>}}, Properties),
|
||||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||||
?assert(EventAt =< Timestamp);
|
?assert(EventAt =< Timestamp),
|
||||||
|
?assert(is_map(ClientAttrs));
|
||||||
verify_event_fields(SubUnsub, Fields) when
|
verify_event_fields(SubUnsub, Fields) when
|
||||||
SubUnsub == 'session.subscribed';
|
SubUnsub == 'session.subscribed';
|
||||||
SubUnsub == 'session.unsubscribed'
|
SubUnsub == 'session.unsubscribed'
|
||||||
|
@ -4077,7 +4135,8 @@ verify_event_fields(SubUnsub, Fields) when
|
||||||
peername := PeerName,
|
peername := PeerName,
|
||||||
topic := Topic,
|
topic := Topic,
|
||||||
qos := QoS,
|
qos := QoS,
|
||||||
timestamp := Timestamp
|
timestamp := Timestamp,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(millisecond),
|
||||||
TimestampElapse = Now - Timestamp,
|
TimestampElapse = Now - Timestamp,
|
||||||
|
@ -4097,7 +4156,8 @@ verify_event_fields(SubUnsub, Fields) when
|
||||||
#{'User-Property' := #{<<"topic_name">> := <<"t1">>}},
|
#{'User-Property' := #{<<"topic_name">> := <<"t1">>}},
|
||||||
maps:get(PropKey, Fields)
|
maps:get(PropKey, Fields)
|
||||||
),
|
),
|
||||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000);
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||||
|
?assert(is_map(ClientAttrs));
|
||||||
verify_event_fields('delivery.dropped', Fields) ->
|
verify_event_fields('delivery.dropped', Fields) ->
|
||||||
#{
|
#{
|
||||||
event := 'delivery.dropped',
|
event := 'delivery.dropped',
|
||||||
|
@ -4117,7 +4177,8 @@ verify_event_fields('delivery.dropped', Fields) ->
|
||||||
qos := QoS,
|
qos := QoS,
|
||||||
flags := Flags,
|
flags := Flags,
|
||||||
timestamp := Timestamp,
|
timestamp := Timestamp,
|
||||||
topic := Topic
|
topic := Topic,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(millisecond),
|
||||||
TimestampElapse = Now - Timestamp,
|
TimestampElapse = Now - Timestamp,
|
||||||
|
@ -4139,7 +4200,8 @@ verify_event_fields('delivery.dropped', Fields) ->
|
||||||
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
||||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||||
?assert(EventAt =< Timestamp);
|
?assert(EventAt =< Timestamp),
|
||||||
|
?assert(is_map(ClientAttrs));
|
||||||
verify_event_fields('message.dropped', Fields) ->
|
verify_event_fields('message.dropped', Fields) ->
|
||||||
#{
|
#{
|
||||||
id := ID,
|
id := ID,
|
||||||
|
@ -4154,7 +4216,8 @@ verify_event_fields('message.dropped', Fields) ->
|
||||||
flags := Flags,
|
flags := Flags,
|
||||||
pub_props := Properties,
|
pub_props := Properties,
|
||||||
timestamp := Timestamp,
|
timestamp := Timestamp,
|
||||||
publish_received_at := EventAt
|
publish_received_at := EventAt,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(millisecond),
|
||||||
TimestampElapse = Now - Timestamp,
|
TimestampElapse = Now - Timestamp,
|
||||||
|
@ -4172,7 +4235,8 @@ verify_event_fields('message.dropped', Fields) ->
|
||||||
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
||||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||||
?assert(EventAt =< Timestamp);
|
?assert(EventAt =< Timestamp),
|
||||||
|
?assert(is_map(ClientAttrs));
|
||||||
verify_event_fields('message.delivered', Fields) ->
|
verify_event_fields('message.delivered', Fields) ->
|
||||||
#{
|
#{
|
||||||
id := ID,
|
id := ID,
|
||||||
|
@ -4188,7 +4252,8 @@ verify_event_fields('message.delivered', Fields) ->
|
||||||
flags := Flags,
|
flags := Flags,
|
||||||
pub_props := Properties,
|
pub_props := Properties,
|
||||||
timestamp := Timestamp,
|
timestamp := Timestamp,
|
||||||
publish_received_at := EventAt
|
publish_received_at := EventAt,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(millisecond),
|
||||||
TimestampElapse = Now - Timestamp,
|
TimestampElapse = Now - Timestamp,
|
||||||
|
@ -4207,7 +4272,8 @@ verify_event_fields('message.delivered', Fields) ->
|
||||||
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
||||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||||
?assert(EventAt =< Timestamp);
|
?assert(EventAt =< Timestamp),
|
||||||
|
?assert(is_map(ClientAttrs));
|
||||||
verify_event_fields('message.acked', Fields) ->
|
verify_event_fields('message.acked', Fields) ->
|
||||||
#{
|
#{
|
||||||
id := ID,
|
id := ID,
|
||||||
|
@ -4224,7 +4290,8 @@ verify_event_fields('message.acked', Fields) ->
|
||||||
pub_props := PubProps,
|
pub_props := PubProps,
|
||||||
puback_props := PubAckProps,
|
puback_props := PubAckProps,
|
||||||
timestamp := Timestamp,
|
timestamp := Timestamp,
|
||||||
publish_received_at := EventAt
|
publish_received_at := EventAt,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(millisecond),
|
||||||
TimestampElapse = Now - Timestamp,
|
TimestampElapse = Now - Timestamp,
|
||||||
|
@ -4244,7 +4311,8 @@ verify_event_fields('message.acked', Fields) ->
|
||||||
?assert(is_map(PubAckProps)),
|
?assert(is_map(PubAckProps)),
|
||||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
|
||||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
|
||||||
?assert(EventAt =< Timestamp);
|
?assert(EventAt =< Timestamp),
|
||||||
|
?assert(is_map(ClientAttrs));
|
||||||
verify_event_fields('client.connack', Fields) ->
|
verify_event_fields('client.connack', Fields) ->
|
||||||
#{
|
#{
|
||||||
clientid := ClientId,
|
clientid := ClientId,
|
||||||
|
@ -4282,7 +4350,8 @@ verify_event_fields('client.check_authz_complete', Fields) ->
|
||||||
peername := PeerName,
|
peername := PeerName,
|
||||||
topic := Topic,
|
topic := Topic,
|
||||||
authz_source := AuthzSource,
|
authz_source := AuthzSource,
|
||||||
username := Username
|
username := Username,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
?assertEqual(<<"t1">>, Topic),
|
?assertEqual(<<"t1">>, Topic),
|
||||||
?assert(lists:member(Action, [subscribe, publish])),
|
?assert(lists:member(Action, [subscribe, publish])),
|
||||||
|
@ -4302,20 +4371,23 @@ verify_event_fields('client.check_authz_complete', Fields) ->
|
||||||
])
|
])
|
||||||
),
|
),
|
||||||
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
||||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>]));
|
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
||||||
|
?assert(is_map(ClientAttrs));
|
||||||
verify_event_fields('client.check_authn_complete', Fields) ->
|
verify_event_fields('client.check_authn_complete', Fields) ->
|
||||||
#{
|
#{
|
||||||
clientid := ClientId,
|
clientid := ClientId,
|
||||||
peername := PeerName,
|
peername := PeerName,
|
||||||
username := Username,
|
username := Username,
|
||||||
is_anonymous := IsAnonymous,
|
is_anonymous := IsAnonymous,
|
||||||
is_superuser := IsSuperuser
|
is_superuser := IsSuperuser,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
verify_peername(PeerName),
|
verify_peername(PeerName),
|
||||||
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
||||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
||||||
?assert(erlang:is_boolean(IsAnonymous)),
|
?assert(erlang:is_boolean(IsAnonymous)),
|
||||||
?assert(erlang:is_boolean(IsSuperuser));
|
?assert(erlang:is_boolean(IsSuperuser)),
|
||||||
|
?assert(is_map(ClientAttrs));
|
||||||
verify_event_fields('schema.validation_failed', Fields) ->
|
verify_event_fields('schema.validation_failed', Fields) ->
|
||||||
#{
|
#{
|
||||||
validation := ValidationName,
|
validation := ValidationName,
|
||||||
|
@ -4327,12 +4399,14 @@ verify_event_fields('schema.validation_failed', Fields) ->
|
||||||
topic := _Topic,
|
topic := _Topic,
|
||||||
flags := _Flags,
|
flags := _Flags,
|
||||||
pub_props := _PubProps,
|
pub_props := _PubProps,
|
||||||
publish_received_at := _PublishReceivedAt
|
publish_received_at := _PublishReceivedAt,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
?assertEqual(<<"v1">>, ValidationName),
|
?assertEqual(<<"v1">>, ValidationName),
|
||||||
verify_peername(PeerName),
|
verify_peername(PeerName),
|
||||||
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
||||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
||||||
|
?assert(is_map(ClientAttrs)),
|
||||||
ok;
|
ok;
|
||||||
verify_event_fields('message.transformation_failed', Fields) ->
|
verify_event_fields('message.transformation_failed', Fields) ->
|
||||||
#{
|
#{
|
||||||
|
@ -4345,12 +4419,14 @@ verify_event_fields('message.transformation_failed', Fields) ->
|
||||||
topic := _Topic,
|
topic := _Topic,
|
||||||
flags := _Flags,
|
flags := _Flags,
|
||||||
pub_props := _PubProps,
|
pub_props := _PubProps,
|
||||||
publish_received_at := _PublishReceivedAt
|
publish_received_at := _PublishReceivedAt,
|
||||||
|
client_attrs := ClientAttrs
|
||||||
} = Fields,
|
} = Fields,
|
||||||
?assertEqual(<<"t1">>, TransformationName),
|
?assertEqual(<<"t1">>, TransformationName),
|
||||||
verify_peername(PeerName),
|
verify_peername(PeerName),
|
||||||
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
||||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
||||||
|
?assert(is_map(ClientAttrs)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
verify_peername(PeerName) ->
|
verify_peername(PeerName) ->
|
||||||
|
|
|
@ -21,6 +21,8 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% CT boilerplate
|
%% CT boilerplate
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -48,6 +50,13 @@ app_specs() ->
|
||||||
emqx_mgmt_api_test_util:emqx_dashboard()
|
emqx_mgmt_api_test_util:emqx_dashboard()
|
||||||
].
|
].
|
||||||
|
|
||||||
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
|
emqx_common_test_helpers:call_janitor(),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helper fns
|
%% Helper fns
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -124,7 +133,13 @@ create_rule(Overrides) ->
|
||||||
Method = post,
|
Method = post,
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
|
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
|
||||||
Res = request(Method, Path, Params),
|
Res = request(Method, Path, Params),
|
||||||
emqx_mgmt_api_test_util:simplify_result(Res).
|
case emqx_mgmt_api_test_util:simplify_result(Res) of
|
||||||
|
{201, #{<<"id">> := RuleId}} = SRes ->
|
||||||
|
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||||
|
SRes;
|
||||||
|
SRes ->
|
||||||
|
SRes
|
||||||
|
end.
|
||||||
|
|
||||||
sources_sql(Sources) ->
|
sources_sql(Sources) ->
|
||||||
Froms = iolist_to_binary(lists:join(<<", ">>, lists:map(fun source_from/1, Sources))),
|
Froms = iolist_to_binary(lists:join(<<", ">>, lists:map(fun source_from/1, Sources))),
|
||||||
|
@ -586,3 +601,21 @@ t_filter_by_source_and_action(_Config) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Checks that creating a rule with a `null' JSON value id is forbidden.
|
||||||
|
t_create_rule_with_null_id(_Config) ->
|
||||||
|
?assertMatch(
|
||||||
|
{400, #{<<"message">> := <<"rule id must be a string">>}},
|
||||||
|
create_rule(#{<<"id">> => null})
|
||||||
|
),
|
||||||
|
%% The string `"null"' should be fine.
|
||||||
|
?assertMatch(
|
||||||
|
{201, _},
|
||||||
|
create_rule(#{<<"id">> => <<"null">>})
|
||||||
|
),
|
||||||
|
?assertMatch({201, _}, create_rule(#{})),
|
||||||
|
?assertMatch(
|
||||||
|
{200, #{<<"data">> := [_, _]}},
|
||||||
|
list_rules([])
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -339,6 +339,7 @@ ensure_serde_absent(Name) ->
|
||||||
{ok, Serde} ->
|
{ok, Serde} ->
|
||||||
ok = emqx_schema_registry_serde:destroy(Serde),
|
ok = emqx_schema_registry_serde:destroy(Serde),
|
||||||
_ = ets:delete(?SERDE_TAB, Name),
|
_ = ets:delete(?SERDE_TAB, Name),
|
||||||
|
?tp("schema_registry_serde_deleted", #{name => Name}),
|
||||||
ok;
|
ok;
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -57,7 +57,15 @@ end_per_testcase(_TestCase, _Config) ->
|
||||||
clear_schemas() ->
|
clear_schemas() ->
|
||||||
maps:foreach(
|
maps:foreach(
|
||||||
fun(Name, _Schema) ->
|
fun(Name, _Schema) ->
|
||||||
ok = emqx_schema_registry:delete_schema(Name)
|
NameBin = emqx_utils_conv:bin(Name),
|
||||||
|
{ok, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
emqx_schema_registry:delete_schema(Name),
|
||||||
|
#{
|
||||||
|
?snk_kind := "schema_registry_serde_deleted",
|
||||||
|
name := NameBin
|
||||||
|
}
|
||||||
|
)
|
||||||
end,
|
end,
|
||||||
emqx_schema_registry:list_schemas()
|
emqx_schema_registry:list_schemas()
|
||||||
).
|
).
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Expose `client_attrs` to rule engine and rule events.
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue where creating a rule with a null id via the HTTP API was allowed, which could lead to an inconsistent configuration.
|
2
mix.exs
2
mix.exs
|
@ -184,7 +184,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}
|
def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}
|
||||||
def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.12.0", override: true}
|
def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.12.0", override: true}
|
||||||
def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}
|
def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}
|
||||||
def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.2", override: true}
|
def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.3", override: true}
|
||||||
def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true}
|
def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true}
|
||||||
# in conflict by ehttpc and emqtt
|
# in conflict by ehttpc and emqtt
|
||||||
def common_dep(:gun), do: {:gun, github: "emqx/gun", tag: "1.3.11", override: true}
|
def common_dep(:gun), do: {:gun, github: "emqx/gun", tag: "1.3.11", override: true}
|
||||||
|
|
|
@ -98,7 +98,7 @@
|
||||||
{system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.5"}}},
|
{system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.5"}}},
|
||||||
{getopt, "1.0.2"},
|
{getopt, "1.0.2"},
|
||||||
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
|
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
|
||||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}},
|
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.3"}}},
|
||||||
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
|
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
|
||||||
{esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.1"}}},
|
{esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.1"}}},
|
||||||
{jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}},
|
{jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}},
|
||||||
|
|
Loading…
Reference in New Issue