Merge branch 'master' into sync-r55-m-20240124
This commit is contained in:
commit
4f8accc31b
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_persistent_session_ds_SUITE).
|
-module(emqx_persistent_session_ds_SUITE).
|
||||||
|
|
||||||
|
@ -18,6 +18,9 @@
|
||||||
%% CT boilerplate
|
%% CT boilerplate
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
suite() ->
|
||||||
|
[{timetrap, {seconds, 60}}].
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
@ -191,6 +194,7 @@ t_non_persistent_session_subscription(_Config) ->
|
||||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
SubTopicFilter = <<"t/#">>,
|
SubTopicFilter = <<"t/#">>,
|
||||||
?check_trace(
|
?check_trace(
|
||||||
|
#{timetrap => 30_000},
|
||||||
begin
|
begin
|
||||||
?tp(notice, "starting", #{}),
|
?tp(notice, "starting", #{}),
|
||||||
Client = start_client(#{
|
Client = start_client(#{
|
||||||
|
@ -220,6 +224,7 @@ t_session_subscription_idempotency(Config) ->
|
||||||
SubTopicFilter = <<"t/+">>,
|
SubTopicFilter = <<"t/+">>,
|
||||||
ClientId = <<"myclientid">>,
|
ClientId = <<"myclientid">>,
|
||||||
?check_trace(
|
?check_trace(
|
||||||
|
#{timetrap => 30_000},
|
||||||
begin
|
begin
|
||||||
?force_ordering(
|
?force_ordering(
|
||||||
#{?snk_kind := persistent_session_ds_subscription_added},
|
#{?snk_kind := persistent_session_ds_subscription_added},
|
||||||
|
@ -281,6 +286,7 @@ t_session_unsubscription_idempotency(Config) ->
|
||||||
SubTopicFilter = <<"t/+">>,
|
SubTopicFilter = <<"t/+">>,
|
||||||
ClientId = <<"myclientid">>,
|
ClientId = <<"myclientid">>,
|
||||||
?check_trace(
|
?check_trace(
|
||||||
|
#{timetrap => 30_000},
|
||||||
begin
|
begin
|
||||||
?force_ordering(
|
?force_ordering(
|
||||||
#{
|
#{
|
||||||
|
@ -385,6 +391,7 @@ do_t_session_discard(Params) ->
|
||||||
ReconnectOpts = ReconnectOpts0#{clientid => ClientId},
|
ReconnectOpts = ReconnectOpts0#{clientid => ClientId},
|
||||||
SubTopicFilter = <<"t/+">>,
|
SubTopicFilter = <<"t/+">>,
|
||||||
?check_trace(
|
?check_trace(
|
||||||
|
#{timetrap => 30_000},
|
||||||
begin
|
begin
|
||||||
?tp(notice, "starting", #{}),
|
?tp(notice, "starting", #{}),
|
||||||
Client0 = start_client(#{
|
Client0 = start_client(#{
|
||||||
|
@ -472,6 +479,7 @@ do_t_session_expiration(_Config, Opts) ->
|
||||||
} = Opts,
|
} = Opts,
|
||||||
CommonParams = #{proto_ver => v5, clientid => ClientId},
|
CommonParams = #{proto_ver => v5, clientid => ClientId},
|
||||||
?check_trace(
|
?check_trace(
|
||||||
|
#{timetrap => 30_000},
|
||||||
begin
|
begin
|
||||||
Topic = <<"some/topic">>,
|
Topic = <<"some/topic">>,
|
||||||
Params0 = maps:merge(CommonParams, FirstConn),
|
Params0 = maps:merge(CommonParams, FirstConn),
|
||||||
|
@ -539,6 +547,7 @@ t_session_gc(Config) ->
|
||||||
end,
|
end,
|
||||||
|
|
||||||
?check_trace(
|
?check_trace(
|
||||||
|
#{timetrap => 30_000},
|
||||||
begin
|
begin
|
||||||
ClientId0 = <<"session_gc0">>,
|
ClientId0 = <<"session_gc0">>,
|
||||||
Client0 = StartClient(ClientId0, Port1, 30),
|
Client0 = StartClient(ClientId0, Port1, 30),
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -99,7 +99,7 @@ needs_persistence(Msg) ->
|
||||||
|
|
||||||
-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
|
-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
|
||||||
store_message(Msg) ->
|
store_message(Msg) ->
|
||||||
emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg]).
|
emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}).
|
||||||
|
|
||||||
has_subscribers(#message{topic = Topic}) ->
|
has_subscribers(#message{topic = Topic}) ->
|
||||||
emqx_persistent_session_ds_router:has_any_route(Topic).
|
emqx_persistent_session_ds_router:has_any_route(Topic).
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -1915,6 +1915,24 @@ fields("session_storage_backend_builtin") ->
|
||||||
default => 3,
|
default => 3,
|
||||||
importance => ?IMPORTANCE_HIDDEN
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
}
|
}
|
||||||
|
)},
|
||||||
|
{"egress_batch_size",
|
||||||
|
sc(
|
||||||
|
pos_integer(),
|
||||||
|
#{
|
||||||
|
default => 1000,
|
||||||
|
mapping => "emqx_durable_storage.egress_batch_size",
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{"egress_flush_interval",
|
||||||
|
sc(
|
||||||
|
timeout_duration_ms(),
|
||||||
|
#{
|
||||||
|
default => 100,
|
||||||
|
mapping => "emqx_durable_storage.egress_flush_interval",
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -376,7 +376,7 @@ t_publish_empty_topic_levels(_Config) ->
|
||||||
{<<"t/3/bar">>, <<"6">>}
|
{<<"t/3/bar">>, <<"6">>}
|
||||||
],
|
],
|
||||||
[emqtt:publish(Pub, Topic, Payload, ?QOS_1) || {Topic, Payload} <- Messages],
|
[emqtt:publish(Pub, Topic, Payload, ?QOS_1) || {Topic, Payload} <- Messages],
|
||||||
Received = receive_messages(length(Messages), 1_500),
|
Received = receive_messages(length(Messages)),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[
|
[
|
||||||
#{topic := <<"t//1/">>, payload := <<"2">>},
|
#{topic := <<"t//1/">>, payload := <<"2">>},
|
||||||
|
@ -475,8 +475,7 @@ t_metrics_not_dropped(_Config) ->
|
||||||
|
|
||||||
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t/+">>, ?QOS_1),
|
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t/+">>, ?QOS_1),
|
||||||
emqtt:publish(Pub, <<"t/ps">>, <<"payload">>, ?QOS_1),
|
emqtt:publish(Pub, <<"t/ps">>, <<"payload">>, ?QOS_1),
|
||||||
?assertMatch([_], receive_messages(1, 1_500)),
|
?assertMatch([_], receive_messages(1)),
|
||||||
|
|
||||||
DroppedAfter = emqx_metrics:val('messages.dropped'),
|
DroppedAfter = emqx_metrics:val('messages.dropped'),
|
||||||
DroppedNoSubAfter = emqx_metrics:val('messages.dropped.no_subscribers'),
|
DroppedNoSubAfter = emqx_metrics:val('messages.dropped.no_subscribers'),
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -111,11 +111,15 @@
|
||||||
%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
|
%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
|
||||||
-type time() :: non_neg_integer().
|
-type time() :: non_neg_integer().
|
||||||
|
|
||||||
-type message_store_opts() :: #{}.
|
-type message_store_opts() ::
|
||||||
|
#{
|
||||||
|
sync => boolean()
|
||||||
|
}.
|
||||||
|
|
||||||
-type generic_db_opts() ::
|
-type generic_db_opts() ::
|
||||||
#{
|
#{
|
||||||
backend := atom(),
|
backend := atom(),
|
||||||
|
serialize_by => clientid | topic,
|
||||||
_ => _
|
_ => _
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_ds_app).
|
-module(emqx_ds_app).
|
||||||
|
|
|
@ -0,0 +1,152 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc Supervisor that contains all the processes that belong to a
|
||||||
|
%% given builtin DS database.
|
||||||
|
-module(emqx_ds_builtin_db_sup).
|
||||||
|
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
%% API:
|
||||||
|
-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1]).
|
||||||
|
|
||||||
|
%% behaviour callbacks:
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
%% internal exports:
|
||||||
|
-export([start_link_sup/2]).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Type declarations
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-define(via(REC), {via, gproc, {n, l, REC}}).
|
||||||
|
|
||||||
|
-define(db_sup, ?MODULE).
|
||||||
|
-define(shard_sup, emqx_ds_builtin_db_shard_sup).
|
||||||
|
-define(egress_sup, emqx_ds_builtin_db_egress_sup).
|
||||||
|
|
||||||
|
-record(?db_sup, {db}).
|
||||||
|
-record(?shard_sup, {db}).
|
||||||
|
-record(?egress_sup, {db}).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% API funcions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec start_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> {ok, pid()}.
|
||||||
|
start_db(DB, Opts) ->
|
||||||
|
start_link_sup(#?db_sup{db = DB}, Opts).
|
||||||
|
|
||||||
|
-spec start_shard(emqx_ds_storage_layer:shard_id()) ->
|
||||||
|
supervisor:startchild_ret().
|
||||||
|
start_shard(Shard = {DB, _}) ->
|
||||||
|
supervisor:start_child(?via(#?shard_sup{db = DB}), shard_spec(DB, Shard)).
|
||||||
|
|
||||||
|
-spec start_egress(emqx_ds_storage_layer:shard_id()) ->
|
||||||
|
supervisor:startchild_ret().
|
||||||
|
start_egress({DB, Shard}) ->
|
||||||
|
supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
|
||||||
|
|
||||||
|
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
|
||||||
|
stop_shard(Shard = {DB, _}) ->
|
||||||
|
Sup = ?via(#?shard_sup{db = DB}),
|
||||||
|
ok = supervisor:terminate_child(Sup, Shard),
|
||||||
|
ok = supervisor:delete_child(Sup, Shard).
|
||||||
|
|
||||||
|
-spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
|
||||||
|
ok | {error, _Reason}.
|
||||||
|
ensure_shard(Shard) ->
|
||||||
|
case start_shard(Shard) of
|
||||||
|
{ok, _Pid} ->
|
||||||
|
ok;
|
||||||
|
{error, {already_started, _Pid}} ->
|
||||||
|
ok;
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% behaviour callbacks
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
init({#?db_sup{db = DB}, DefaultOpts}) ->
|
||||||
|
%% Spec for the top-level supervisor for the database:
|
||||||
|
logger:notice("Starting DS DB ~p", [DB]),
|
||||||
|
_ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
|
||||||
|
%% TODO: before the leader election is implemented, we set ourselves as the leader for all shards:
|
||||||
|
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
||||||
|
lists:foreach(
|
||||||
|
fun(Shard) ->
|
||||||
|
emqx_ds_replication_layer:maybe_set_myself_as_leader(DB, Shard)
|
||||||
|
end,
|
||||||
|
MyShards
|
||||||
|
),
|
||||||
|
Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])],
|
||||||
|
SupFlags = #{
|
||||||
|
strategy => one_for_all,
|
||||||
|
intensity => 0,
|
||||||
|
period => 1
|
||||||
|
},
|
||||||
|
{ok, {SupFlags, Children}};
|
||||||
|
init({#?shard_sup{db = DB}, _}) ->
|
||||||
|
%% Spec for the supervisor that manages the worker processes for
|
||||||
|
%% each local shard of the DB:
|
||||||
|
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
||||||
|
Children = [shard_spec(DB, Shard) || Shard <- MyShards],
|
||||||
|
SupFlags = #{
|
||||||
|
strategy => one_for_one,
|
||||||
|
intensity => 10,
|
||||||
|
period => 1
|
||||||
|
},
|
||||||
|
{ok, {SupFlags, Children}};
|
||||||
|
init({#?egress_sup{db = DB}, _}) ->
|
||||||
|
%% Spec for the supervisor that manages the egress proxy processes
|
||||||
|
%% managing traffic towards each of the shards of the DB:
|
||||||
|
Shards = emqx_ds_replication_layer_meta:shards(DB),
|
||||||
|
Children = [egress_spec(DB, Shard) || Shard <- Shards],
|
||||||
|
SupFlags = #{
|
||||||
|
strategy => one_for_one,
|
||||||
|
intensity => 0,
|
||||||
|
period => 1
|
||||||
|
},
|
||||||
|
{ok, {SupFlags, Children}}.
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Internal exports
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
start_link_sup(Id, Options) ->
|
||||||
|
supervisor:start_link(?via(Id), ?MODULE, {Id, Options}).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Internal functions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
sup_spec(Id, Options) ->
|
||||||
|
#{
|
||||||
|
id => element(1, Id),
|
||||||
|
start => {?MODULE, start_link_sup, [Id, Options]},
|
||||||
|
type => supervisor,
|
||||||
|
shutdown => infinity
|
||||||
|
}.
|
||||||
|
|
||||||
|
shard_spec(DB, Shard) ->
|
||||||
|
Options = emqx_ds_replication_layer_meta:get_options(DB),
|
||||||
|
#{
|
||||||
|
id => Shard,
|
||||||
|
start => {emqx_ds_storage_layer, start_link, [{DB, Shard}, Options]},
|
||||||
|
shutdown => 5_000,
|
||||||
|
restart => permanent,
|
||||||
|
type => worker
|
||||||
|
}.
|
||||||
|
|
||||||
|
egress_spec(DB, Shard) ->
|
||||||
|
#{
|
||||||
|
id => Shard,
|
||||||
|
start => {emqx_ds_replication_layer_egress, start_link, [DB, Shard]},
|
||||||
|
shutdown => 5_000,
|
||||||
|
restart => permanent,
|
||||||
|
type => worker
|
||||||
|
}.
|
|
@ -0,0 +1,132 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc This supervisor manages the global worker processes needed for
|
||||||
|
%% the functioning of builtin databases, and all builtin database
|
||||||
|
%% attach to it.
|
||||||
|
-module(emqx_ds_builtin_sup).
|
||||||
|
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
%% API:
|
||||||
|
-export([start_db/2, stop_db/1]).
|
||||||
|
|
||||||
|
%% behavior callbacks:
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
%% internal exports:
|
||||||
|
-export([start_top/0, start_databases_sup/0]).
|
||||||
|
|
||||||
|
-export_type([]).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Type declarations
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-define(top, ?MODULE).
|
||||||
|
-define(databases, emqx_ds_builtin_databases_sup).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% API functions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec start_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||||
|
supervisor:startchild_ret().
|
||||||
|
start_db(DB, Opts) ->
|
||||||
|
ensure_top(),
|
||||||
|
ChildSpec = #{
|
||||||
|
id => DB,
|
||||||
|
start => {emqx_ds_builtin_db_sup, start_db, [DB, Opts]},
|
||||||
|
type => supervisor,
|
||||||
|
shutdown => infinity
|
||||||
|
},
|
||||||
|
supervisor:start_child(?databases, ChildSpec).
|
||||||
|
|
||||||
|
-spec stop_db(emqx_ds:db()) -> ok.
|
||||||
|
stop_db(DB) ->
|
||||||
|
case whereis(?databases) of
|
||||||
|
Pid when is_pid(Pid) ->
|
||||||
|
_ = supervisor:terminate_child(?databases, DB),
|
||||||
|
_ = supervisor:delete_child(?databases, DB),
|
||||||
|
ok;
|
||||||
|
undefined ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% behavior callbacks
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
%% There are two layers of supervision:
|
||||||
|
%%
|
||||||
|
%% 1. top supervisor for the builtin backend. It contains the global
|
||||||
|
%% worker processes (like the metadata server), and `?databases'
|
||||||
|
%% supervisior.
|
||||||
|
%%
|
||||||
|
%% 2. `?databases': a `one_for_one' supervisor where each child is a
|
||||||
|
%% `db' supervisor that contains processes that represent the DB.
|
||||||
|
%% Chidren are attached dynamically to this one.
|
||||||
|
init(?top) ->
|
||||||
|
%% Children:
|
||||||
|
MetadataServer = #{
|
||||||
|
id => metadata_server,
|
||||||
|
start => {emqx_ds_replication_layer_meta, start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
type => worker,
|
||||||
|
shutdown => 5000
|
||||||
|
},
|
||||||
|
DBsSup = #{
|
||||||
|
id => ?databases,
|
||||||
|
start => {?MODULE, start_databases_sup, []},
|
||||||
|
restart => permanent,
|
||||||
|
type => supervisor,
|
||||||
|
shutdown => infinity
|
||||||
|
},
|
||||||
|
%%
|
||||||
|
SupFlags = #{
|
||||||
|
strategy => one_for_all,
|
||||||
|
intensity => 1,
|
||||||
|
period => 1,
|
||||||
|
auto_shutdown => never
|
||||||
|
},
|
||||||
|
{ok, {SupFlags, [MetadataServer, DBsSup]}};
|
||||||
|
init(?databases) ->
|
||||||
|
%% Children are added dynamically:
|
||||||
|
SupFlags = #{
|
||||||
|
strategy => one_for_one,
|
||||||
|
intensity => 10,
|
||||||
|
period => 1
|
||||||
|
},
|
||||||
|
{ok, {SupFlags, []}}.
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Internal exports
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec start_top() -> {ok, pid()}.
|
||||||
|
start_top() ->
|
||||||
|
supervisor:start_link({local, ?top}, ?MODULE, ?top).
|
||||||
|
|
||||||
|
start_databases_sup() ->
|
||||||
|
supervisor:start_link({local, ?databases}, ?MODULE, ?databases).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Internal functions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
ensure_top() ->
|
||||||
|
{ok, _} = emqx_ds_sup:attach_backend(builtin, {?MODULE, start_top, []}),
|
||||||
|
ok.
|
|
@ -32,7 +32,10 @@
|
||||||
get_streams/3,
|
get_streams/3,
|
||||||
make_iterator/4,
|
make_iterator/4,
|
||||||
update_iterator/3,
|
update_iterator/3,
|
||||||
next/3
|
next/3,
|
||||||
|
node_of_shard/2,
|
||||||
|
shard_of_message/3,
|
||||||
|
maybe_set_myself_as_leader/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
|
@ -51,24 +54,12 @@
|
||||||
-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
|
-export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
|
||||||
|
|
||||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||||
|
-include("emqx_ds_replication_layer.hrl").
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Type declarations
|
%% Type declarations
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
|
|
||||||
%% records over the wire.
|
|
||||||
|
|
||||||
%% tags:
|
|
||||||
-define(STREAM, 1).
|
|
||||||
-define(IT, 2).
|
|
||||||
-define(BATCH, 3).
|
|
||||||
|
|
||||||
%% keys:
|
|
||||||
-define(tag, 1).
|
|
||||||
-define(shard, 2).
|
|
||||||
-define(enc, 3).
|
|
||||||
|
|
||||||
-type shard_id() :: binary().
|
-type shard_id() :: binary().
|
||||||
|
|
||||||
-type builtin_db_opts() ::
|
-type builtin_db_opts() ::
|
||||||
|
@ -101,8 +92,6 @@
|
||||||
|
|
||||||
-type message_id() :: emqx_ds:message_id().
|
-type message_id() :: emqx_ds:message_id().
|
||||||
|
|
||||||
-define(batch_messages, 2).
|
|
||||||
|
|
||||||
-type batch() :: #{
|
-type batch() :: #{
|
||||||
?tag := ?BATCH,
|
?tag := ?BATCH,
|
||||||
?batch_messages := [emqx_types:message()]
|
?batch_messages := [emqx_types:message()]
|
||||||
|
@ -120,21 +109,19 @@ list_shards(DB) ->
|
||||||
|
|
||||||
-spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
|
-spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
|
||||||
open_db(DB, CreateOpts) ->
|
open_db(DB, CreateOpts) ->
|
||||||
ok = emqx_ds_sup:ensure_workers(),
|
case emqx_ds_builtin_sup:start_db(DB, CreateOpts) of
|
||||||
Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts),
|
{ok, _} ->
|
||||||
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
ok;
|
||||||
lists:foreach(
|
{error, {already_started, _}} ->
|
||||||
fun(Shard) ->
|
ok;
|
||||||
emqx_ds_storage_layer:open_shard({DB, Shard}, Opts),
|
{error, Err} ->
|
||||||
maybe_set_myself_as_leader(DB, Shard)
|
{error, Err}
|
||||||
end,
|
end.
|
||||||
MyShards
|
|
||||||
).
|
|
||||||
|
|
||||||
-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
|
-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
|
||||||
add_generation(DB) ->
|
add_generation(DB) ->
|
||||||
Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
|
Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
|
||||||
_ = emqx_ds_proto_v2:add_generation(Nodes, DB),
|
_ = emqx_ds_proto_v3:add_generation(Nodes, DB),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
|
-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
|
||||||
|
@ -170,17 +157,15 @@ drop_generation(DB, {Shard, GenId}) ->
|
||||||
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
|
||||||
drop_db(DB) ->
|
drop_db(DB) ->
|
||||||
Nodes = list_nodes(),
|
Nodes = list_nodes(),
|
||||||
_ = emqx_ds_proto_v1:drop_db(Nodes, DB),
|
_ = emqx_ds_proto_v3:drop_db(Nodes, DB),
|
||||||
_ = emqx_ds_replication_layer_meta:drop_db(DB),
|
_ = emqx_ds_replication_layer_meta:drop_db(DB),
|
||||||
|
emqx_ds_builtin_sup:stop_db(DB),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
|
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
store_batch(DB, Messages, Opts) ->
|
store_batch(DB, Messages, Opts) ->
|
||||||
Shard = shard_of_messages(DB, Messages),
|
emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts).
|
||||||
Node = node_of_shard(DB, Shard),
|
|
||||||
Batch = #{?tag => ?BATCH, ?batch_messages => Messages},
|
|
||||||
emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts).
|
|
||||||
|
|
||||||
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||||
[{emqx_ds:stream_rank(), stream()}].
|
[{emqx_ds:stream_rank(), stream()}].
|
||||||
|
@ -189,7 +174,7 @@ get_streams(DB, TopicFilter, StartTime) ->
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(Shard) ->
|
fun(Shard) ->
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
Streams = emqx_ds_proto_v1:get_streams(Node, DB, Shard, TopicFilter, StartTime),
|
Streams = emqx_ds_proto_v3:get_streams(Node, DB, Shard, TopicFilter, StartTime),
|
||||||
lists:map(
|
lists:map(
|
||||||
fun({RankY, Stream}) ->
|
fun({RankY, Stream}) ->
|
||||||
RankX = Shard,
|
RankX = Shard,
|
||||||
|
@ -211,7 +196,7 @@ get_streams(DB, TopicFilter, StartTime) ->
|
||||||
make_iterator(DB, Stream, TopicFilter, StartTime) ->
|
make_iterator(DB, Stream, TopicFilter, StartTime) ->
|
||||||
#{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream,
|
#{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream,
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
|
case emqx_ds_proto_v3:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
|
||||||
{ok, Iter} ->
|
{ok, Iter} ->
|
||||||
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
|
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
|
||||||
Err = {error, _} ->
|
Err = {error, _} ->
|
||||||
|
@ -228,7 +213,7 @@ update_iterator(DB, OldIter, DSKey) ->
|
||||||
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
|
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
|
||||||
Node = node_of_shard(DB, Shard),
|
Node = node_of_shard(DB, Shard),
|
||||||
case
|
case
|
||||||
emqx_ds_proto_v2:update_iterator(
|
emqx_ds_proto_v3:update_iterator(
|
||||||
Node,
|
Node,
|
||||||
DB,
|
DB,
|
||||||
Shard,
|
Shard,
|
||||||
|
@ -254,7 +239,7 @@ next(DB, Iter0, BatchSize) ->
|
||||||
%%
|
%%
|
||||||
%% This kind of trickery should be probably done here in the
|
%% This kind of trickery should be probably done here in the
|
||||||
%% replication layer. Or, perhaps, in the logic layer.
|
%% replication layer. Or, perhaps, in the logic layer.
|
||||||
case emqx_ds_proto_v1:next(Node, DB, Shard, StorageIter0, BatchSize) of
|
case emqx_ds_proto_v3:next(Node, DB, Shard, StorageIter0, BatchSize) of
|
||||||
{ok, StorageIter, Batch} ->
|
{ok, StorageIter, Batch} ->
|
||||||
Iter = Iter0#{?enc := StorageIter},
|
Iter = Iter0#{?enc := StorageIter},
|
||||||
{ok, Iter, Batch};
|
{ok, Iter, Batch};
|
||||||
|
@ -262,6 +247,41 @@ next(DB, Iter0, BatchSize) ->
|
||||||
Other
|
Other
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
|
||||||
|
node_of_shard(DB, Shard) ->
|
||||||
|
case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
|
||||||
|
{ok, Leader} ->
|
||||||
|
Leader;
|
||||||
|
{error, no_leader_for_shard} ->
|
||||||
|
%% TODO: use optvar
|
||||||
|
timer:sleep(500),
|
||||||
|
node_of_shard(DB, Shard)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) ->
|
||||||
|
emqx_ds_replication_layer:shard_id().
|
||||||
|
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
|
||||||
|
N = emqx_ds_replication_layer_meta:n_shards(DB),
|
||||||
|
Hash =
|
||||||
|
case SerializeBy of
|
||||||
|
clientid -> erlang:phash2(From, N);
|
||||||
|
topic -> erlang:phash2(Topic, N)
|
||||||
|
end,
|
||||||
|
integer_to_binary(Hash).
|
||||||
|
|
||||||
|
%% TODO: there's no real leader election right now
|
||||||
|
-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
|
||||||
|
maybe_set_myself_as_leader(DB, Shard) ->
|
||||||
|
Site = emqx_ds_replication_layer_meta:this_site(),
|
||||||
|
case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
|
||||||
|
[Site | _] ->
|
||||||
|
%% Currently the first in-sync replica always becomes the
|
||||||
|
%% leader
|
||||||
|
ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
|
||||||
|
_Sites ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% behavior callbacks
|
%% behavior callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -273,6 +293,7 @@ next(DB, Iter0, BatchSize) ->
|
||||||
-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
|
-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
|
||||||
do_drop_db_v1(DB) ->
|
do_drop_db_v1(DB) ->
|
||||||
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
|
||||||
|
emqx_ds_builtin_sup:stop_db(DB),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Shard) ->
|
fun(Shard) ->
|
||||||
emqx_ds_storage_layer:drop_shard({DB, Shard})
|
emqx_ds_storage_layer:drop_shard({DB, Shard})
|
||||||
|
@ -354,34 +375,5 @@ do_drop_generation_v3(DB, ShardId, GenId) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
%% TODO: there's no real leader election right now
|
|
||||||
-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
|
|
||||||
maybe_set_myself_as_leader(DB, Shard) ->
|
|
||||||
Site = emqx_ds_replication_layer_meta:this_site(),
|
|
||||||
case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
|
|
||||||
[Site | _] ->
|
|
||||||
%% Currently the first in-sync replica always becomes the
|
|
||||||
%% leader
|
|
||||||
ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
|
|
||||||
_Sites ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
|
|
||||||
node_of_shard(DB, Shard) ->
|
|
||||||
case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
|
|
||||||
{ok, Leader} ->
|
|
||||||
Leader;
|
|
||||||
{error, no_leader_for_shard} ->
|
|
||||||
%% TODO: use optvar
|
|
||||||
timer:sleep(500),
|
|
||||||
node_of_shard(DB, Shard)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% Here we assume that all messages in the batch come from the same client
|
|
||||||
shard_of_messages(DB, [#message{from = From} | _]) ->
|
|
||||||
N = emqx_ds_replication_layer_meta:n_shards(DB),
|
|
||||||
integer_to_binary(erlang:phash2(From, N)).
|
|
||||||
|
|
||||||
list_nodes() ->
|
list_nodes() ->
|
||||||
mria:running_nodes().
|
mria:running_nodes().
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022, 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-ifndef(EMQX_DS_REPLICATION_LAYER_HRL).
|
||||||
|
-define(EMQX_DS_REPLICATION_LAYER_HRL, true).
|
||||||
|
|
||||||
|
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
|
||||||
|
%% records over the wire.
|
||||||
|
|
||||||
|
%% tags:
|
||||||
|
-define(STREAM, 1).
|
||||||
|
-define(IT, 2).
|
||||||
|
-define(BATCH, 3).
|
||||||
|
|
||||||
|
%% keys:
|
||||||
|
-define(tag, 1).
|
||||||
|
-define(shard, 2).
|
||||||
|
-define(enc, 3).
|
||||||
|
-define(batch_messages, 2).
|
||||||
|
|
||||||
|
-endif.
|
|
@ -0,0 +1,175 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc Egress servers are responsible for proxing the outcoming
|
||||||
|
%% `store_batch' requests towards EMQX DS shards.
|
||||||
|
%%
|
||||||
|
%% They re-assemble messages from different local processes into
|
||||||
|
%% fixed-sized batches, and introduce centralized channels between the
|
||||||
|
%% nodes. They are also responsible for maintaining backpressure
|
||||||
|
%% towards the local publishers.
|
||||||
|
%%
|
||||||
|
%% There is (currently) one egress process for each shard running on
|
||||||
|
%% each node, but it should be possible to have a pool of egress
|
||||||
|
%% servers, if needed.
|
||||||
|
-module(emqx_ds_replication_layer_egress).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
%% API:
|
||||||
|
-export([start_link/2, store_batch/3]).
|
||||||
|
|
||||||
|
%% behavior callbacks:
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||||
|
|
||||||
|
%% internal exports:
|
||||||
|
-export([]).
|
||||||
|
|
||||||
|
-export_type([]).
|
||||||
|
|
||||||
|
-include("emqx_ds_replication_layer.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/trace.hrl").
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Type declarations
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}).
|
||||||
|
-define(flush, flush).
|
||||||
|
|
||||||
|
-record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% API functions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}.
|
||||||
|
start_link(DB, Shard) ->
|
||||||
|
gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
|
||||||
|
|
||||||
|
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
||||||
|
ok.
|
||||||
|
store_batch(DB, Messages, Opts) ->
|
||||||
|
Sync = maps:get(sync, Opts, true),
|
||||||
|
lists:foreach(
|
||||||
|
fun(Message) ->
|
||||||
|
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
|
||||||
|
gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync})
|
||||||
|
end,
|
||||||
|
Messages
|
||||||
|
).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% behavior callbacks
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-record(s, {
|
||||||
|
db :: emqx_ds:db(),
|
||||||
|
shard :: emqx_ds_replication_layer:shard_id(),
|
||||||
|
leader :: node(),
|
||||||
|
n = 0 :: non_neg_integer(),
|
||||||
|
tref :: reference(),
|
||||||
|
batch = [] :: [emqx_types:message()],
|
||||||
|
pending_replies = [] :: [gen_server:from()]
|
||||||
|
}).
|
||||||
|
|
||||||
|
init([DB, Shard]) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
process_flag(message_queue_data, off_heap),
|
||||||
|
%% TODO: adjust leader dynamically
|
||||||
|
{ok, Leader} = emqx_ds_replication_layer_meta:shard_leader(DB, Shard),
|
||||||
|
S = #s{
|
||||||
|
db = DB,
|
||||||
|
shard = Shard,
|
||||||
|
leader = Leader,
|
||||||
|
tref = start_timer()
|
||||||
|
},
|
||||||
|
{ok, S}.
|
||||||
|
|
||||||
|
handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) ->
|
||||||
|
do_enqueue(From, Sync, Msg, S);
|
||||||
|
handle_call(_Call, _From, S) ->
|
||||||
|
{reply, {error, unknown_call}, S}.
|
||||||
|
|
||||||
|
handle_cast(_Cast, S) ->
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
handle_info(?flush, S) ->
|
||||||
|
{noreply, do_flush(S)};
|
||||||
|
handle_info(_Info, S) ->
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
terminate(_Reason, _S) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Internal exports
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Internal functions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
do_flush(S = #s{batch = []}) ->
|
||||||
|
S#s{tref = start_timer()};
|
||||||
|
do_flush(
|
||||||
|
S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard, leader = Leader}
|
||||||
|
) ->
|
||||||
|
Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)},
|
||||||
|
ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}),
|
||||||
|
[gen_server:reply(From, ok) || From <- lists:reverse(Replies)],
|
||||||
|
?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}),
|
||||||
|
erlang:garbage_collect(),
|
||||||
|
S#s{
|
||||||
|
n = 0,
|
||||||
|
batch = [],
|
||||||
|
pending_replies = [],
|
||||||
|
tref = start_timer()
|
||||||
|
}.
|
||||||
|
|
||||||
|
do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
|
||||||
|
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
|
||||||
|
S1 = S0#s{n = N + 1, batch = [Msg | Batch]},
|
||||||
|
S2 =
|
||||||
|
case N >= NMax of
|
||||||
|
true ->
|
||||||
|
_ = erlang:cancel_timer(S0#s.tref),
|
||||||
|
do_flush(S1);
|
||||||
|
false ->
|
||||||
|
S1
|
||||||
|
end,
|
||||||
|
%% TODO: later we may want to delay the reply until the message is
|
||||||
|
%% replicated, but it requies changes to the PUBACK/PUBREC flow to
|
||||||
|
%% allow for async replies. For now, we ack when the message is
|
||||||
|
%% _buffered_ rather than stored.
|
||||||
|
%%
|
||||||
|
%% Otherwise, the client would freeze for at least flush interval,
|
||||||
|
%% or until the buffer is filled.
|
||||||
|
S =
|
||||||
|
case Sync of
|
||||||
|
true ->
|
||||||
|
S2#s{pending_replies = [From | Replies]};
|
||||||
|
false ->
|
||||||
|
gen_server:reply(From, ok),
|
||||||
|
S2
|
||||||
|
end,
|
||||||
|
%% TODO: add a backpressure mechanism for the server to avoid
|
||||||
|
%% building a long message queue.
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
start_timer() ->
|
||||||
|
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
|
||||||
|
erlang:send_after(Interval, self(), ?flush).
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -35,6 +35,7 @@
|
||||||
in_sync_replicas/2,
|
in_sync_replicas/2,
|
||||||
sites/0,
|
sites/0,
|
||||||
open_db/2,
|
open_db/2,
|
||||||
|
get_options/1,
|
||||||
update_db_config/2,
|
update_db_config/2,
|
||||||
drop_db/1,
|
drop_db/1,
|
||||||
shard_leader/2,
|
shard_leader/2,
|
||||||
|
@ -230,6 +231,11 @@ is_leader(Node) ->
|
||||||
{atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]),
|
{atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
|
-spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
|
||||||
|
get_options(DB) ->
|
||||||
|
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]),
|
||||||
|
Opts.
|
||||||
|
|
||||||
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
||||||
emqx_ds_replication_layer:builtin_db_opts().
|
emqx_ds_replication_layer:builtin_db_opts().
|
||||||
open_db(DB, DefaultOpts) ->
|
open_db(DB, DefaultOpts) ->
|
||||||
|
@ -293,11 +299,11 @@ terminate(_Reason, #s{}) ->
|
||||||
%% Internal exports
|
%% Internal exports
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
|
-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts() | undefined) ->
|
||||||
emqx_ds_replication_layer:builtin_db_opts().
|
emqx_ds_replication_layer:builtin_db_opts().
|
||||||
open_db_trans(DB, CreateOpts) ->
|
open_db_trans(DB, CreateOpts) ->
|
||||||
case mnesia:wread({?META_TAB, DB}) of
|
case mnesia:wread({?META_TAB, DB}) of
|
||||||
[] ->
|
[] when is_map(CreateOpts) ->
|
||||||
NShards = maps:get(n_shards, CreateOpts),
|
NShards = maps:get(n_shards, CreateOpts),
|
||||||
ReplicationFactor = maps:get(replication_factor, CreateOpts),
|
ReplicationFactor = maps:get(replication_factor, CreateOpts),
|
||||||
mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
|
mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -199,7 +199,6 @@ open_shard(Shard, Options) ->
|
||||||
|
|
||||||
-spec drop_shard(shard_id()) -> ok.
|
-spec drop_shard(shard_id()) -> ok.
|
||||||
drop_shard(Shard) ->
|
drop_shard(Shard) ->
|
||||||
catch emqx_ds_storage_layer_sup:stop_shard(Shard),
|
|
||||||
case persistent_term:get({?MODULE, Shard, data_dir}, undefined) of
|
case persistent_term:get({?MODULE, Shard, data_dir}, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -586,7 +585,8 @@ commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB
|
||||||
rocksdb_open(Shard, Options) ->
|
rocksdb_open(Shard, Options) ->
|
||||||
DBOptions = [
|
DBOptions = [
|
||||||
{create_if_missing, true},
|
{create_if_missing, true},
|
||||||
{create_missing_column_families, true}
|
{create_missing_column_families, true},
|
||||||
|
{enable_write_thread_adaptive_yield, false}
|
||||||
| maps:get(db_options, Options, [])
|
| maps:get(db_options, Options, [])
|
||||||
],
|
],
|
||||||
DataDir = maps:get(data_dir, Options, emqx:data_dir()),
|
DataDir = maps:get(data_dir, Options, emqx:data_dir()),
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_ds_storage_layer_sup).
|
-module(emqx_ds_storage_layer_sup).
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@
|
||||||
|
|
||||||
-spec start_link() -> {ok, pid()}.
|
-spec start_link() -> {ok, pid()}.
|
||||||
start_link() ->
|
start_link() ->
|
||||||
supervisor:start_link({local, ?SUP}, ?MODULE, []).
|
supervisor:start_link(?MODULE, []).
|
||||||
|
|
||||||
-spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
|
-spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
|
||||||
supervisor:startchild_ret().
|
supervisor:startchild_ret().
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
-behaviour(supervisor).
|
-behaviour(supervisor).
|
||||||
|
|
||||||
%% API:
|
%% API:
|
||||||
-export([start_link/0, ensure_workers/0]).
|
-export([start_link/0, attach_backend/2]).
|
||||||
|
|
||||||
%% behaviour callbacks:
|
%% behaviour callbacks:
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
@ -25,64 +25,40 @@
|
||||||
start_link() ->
|
start_link() ->
|
||||||
supervisor:start_link({local, ?SUP}, ?MODULE, top).
|
supervisor:start_link({local, ?SUP}, ?MODULE, top).
|
||||||
|
|
||||||
-spec ensure_workers() -> ok.
|
%% @doc Attach a child backend-specific supervisor to the top
|
||||||
ensure_workers() ->
|
%% application supervisor, if not yet present
|
||||||
ChildSpec = #{
|
-spec attach_backend(_BackendId, {module(), atom(), list()}) ->
|
||||||
id => workers_sup,
|
{ok, pid()} | {error, _}.
|
||||||
restart => temporary,
|
attach_backend(Backend, Start) ->
|
||||||
type => supervisor,
|
Spec = #{
|
||||||
start => {supervisor, start_link, [?MODULE, workers]}
|
id => Backend,
|
||||||
|
start => Start,
|
||||||
|
significant => false,
|
||||||
|
shutdown => infinity,
|
||||||
|
type => supervisor
|
||||||
},
|
},
|
||||||
case supervisor:start_child(?SUP, ChildSpec) of
|
case supervisor:start_child(?SUP, Spec) of
|
||||||
{ok, _} ->
|
{ok, Pid} ->
|
||||||
ok;
|
{ok, Pid};
|
||||||
{error, already_present} ->
|
{error, {already_started, Pid}} ->
|
||||||
ok;
|
{ok, Pid};
|
||||||
{error, {already_started, _}} ->
|
{error, Err} ->
|
||||||
ok
|
{error, Err}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% behaviour callbacks
|
%% behaviour callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-dialyzer({nowarn_function, init/1}).
|
|
||||||
init(top) ->
|
init(top) ->
|
||||||
|
Children = [],
|
||||||
SupFlags = #{
|
SupFlags = #{
|
||||||
strategy => one_for_all,
|
strategy => one_for_one,
|
||||||
intensity => 10,
|
intensity => 10,
|
||||||
period => 1
|
period => 1
|
||||||
},
|
},
|
||||||
{ok, {SupFlags, []}};
|
|
||||||
init(workers) ->
|
|
||||||
%% TODO: technically, we don't need rocksDB for the alternative
|
|
||||||
%% backends. But right now we have any:
|
|
||||||
Children = [meta(), storage_layer_sup()],
|
|
||||||
SupFlags = #{
|
|
||||||
strategy => one_for_all,
|
|
||||||
intensity => 0,
|
|
||||||
period => 1
|
|
||||||
},
|
|
||||||
{ok, {SupFlags, Children}}.
|
{ok, {SupFlags, Children}}.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
meta() ->
|
|
||||||
#{
|
|
||||||
id => emqx_ds_replication_layer_meta,
|
|
||||||
start => {emqx_ds_replication_layer_meta, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
type => worker,
|
|
||||||
shutdown => 5000
|
|
||||||
}.
|
|
||||||
|
|
||||||
storage_layer_sup() ->
|
|
||||||
#{
|
|
||||||
id => local_store_shard_sup,
|
|
||||||
start => {emqx_ds_storage_layer_sup, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
type => supervisor,
|
|
||||||
shutdown => infinity
|
|
||||||
}.
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
|
|
@ -16,8 +16,8 @@
|
||||||
-define(DEFAULT_CONFIG, #{
|
-define(DEFAULT_CONFIG, #{
|
||||||
backend => builtin,
|
backend => builtin,
|
||||||
storage => {emqx_ds_storage_bitfield_lts, #{}},
|
storage => {emqx_ds_storage_bitfield_lts, #{}},
|
||||||
n_shards => 16,
|
n_shards => 1,
|
||||||
replication_factor => 3
|
replication_factor => 1
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(COMPACT_CONFIG, #{
|
-define(COMPACT_CONFIG, #{
|
||||||
|
@ -26,15 +26,10 @@
|
||||||
{emqx_ds_storage_bitfield_lts, #{
|
{emqx_ds_storage_bitfield_lts, #{
|
||||||
bits_per_wildcard_level => 8
|
bits_per_wildcard_level => 8
|
||||||
}},
|
}},
|
||||||
n_shards => 16,
|
n_shards => 1,
|
||||||
replication_factor => 3
|
replication_factor => 1
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% Smoke test for opening and reopening the database
|
|
||||||
t_open(_Config) ->
|
|
||||||
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
|
|
||||||
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}).
|
|
||||||
|
|
||||||
%% Smoke test of store function
|
%% Smoke test of store function
|
||||||
t_store(_Config) ->
|
t_store(_Config) ->
|
||||||
MessageID = emqx_guid:gen(),
|
MessageID = emqx_guid:gen(),
|
||||||
|
@ -98,8 +93,8 @@ t_get_streams(_Config) ->
|
||||||
[FooBarBaz] = GetStream(<<"foo/bar/baz">>),
|
[FooBarBaz] = GetStream(<<"foo/bar/baz">>),
|
||||||
[A] = GetStream(<<"a">>),
|
[A] = GetStream(<<"a">>),
|
||||||
%% Restart shard to make sure trie is persisted and restored:
|
%% Restart shard to make sure trie is persisted and restored:
|
||||||
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
|
ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME),
|
||||||
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}),
|
{ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}),
|
||||||
%% Verify that there are no "ghost streams" for topics that don't
|
%% Verify that there are no "ghost streams" for topics that don't
|
||||||
%% have any messages:
|
%% have any messages:
|
||||||
[] = GetStream(<<"bar/foo">>),
|
[] = GetStream(<<"bar/foo">>),
|
||||||
|
@ -196,9 +191,9 @@ t_replay(_Config) ->
|
||||||
?assert(check(?SHARD, <<"foo/+/+">>, 0, Messages)),
|
?assert(check(?SHARD, <<"foo/+/+">>, 0, Messages)),
|
||||||
?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)),
|
?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)),
|
||||||
?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)),
|
?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)),
|
||||||
%% Restart shard to make sure trie is persisted and restored:
|
%% Restart the DB to make sure trie is persisted and restored:
|
||||||
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
|
ok = emqx_ds_builtin_sup:stop_db(?FUNCTION_NAME),
|
||||||
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}),
|
{ok, _} = emqx_ds_builtin_sup:start_db(?FUNCTION_NAME, #{}),
|
||||||
%% Learned wildcard topics:
|
%% Learned wildcard topics:
|
||||||
?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])),
|
?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])),
|
||||||
?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)),
|
?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)),
|
||||||
|
@ -412,21 +407,21 @@ suite() -> [{timetrap, {seconds, 20}}].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
{ok, _} = application:ensure_all_started(emqx_durable_storage),
|
{ok, _} = application:ensure_all_started(emqx_durable_storage),
|
||||||
emqx_ds_sup:ensure_workers(),
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok = application:stop(emqx_durable_storage).
|
ok = application:stop(emqx_durable_storage).
|
||||||
|
|
||||||
init_per_testcase(TC, Config) ->
|
init_per_testcase(TC, Config) ->
|
||||||
{ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), ?DEFAULT_CONFIG),
|
ok = emqx_ds:open_db(TC, ?DEFAULT_CONFIG),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(TC, _Config) ->
|
end_per_testcase(TC, _Config) ->
|
||||||
ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)).
|
emqx_ds:drop_db(TC),
|
||||||
|
ok.
|
||||||
|
|
||||||
shard(TC) ->
|
shard(TC) ->
|
||||||
{?MODULE, atom_to_binary(TC)}.
|
{TC, <<"0">>}.
|
||||||
|
|
||||||
keyspace(TC) ->
|
keyspace(TC) ->
|
||||||
TC.
|
TC.
|
||||||
|
|
|
@ -116,7 +116,9 @@
|
||||||
%% Data Type Validation Funcs
|
%% Data Type Validation Funcs
|
||||||
-export([
|
-export([
|
||||||
is_null/1,
|
is_null/1,
|
||||||
|
is_null_var/1,
|
||||||
is_not_null/1,
|
is_not_null/1,
|
||||||
|
is_not_null_var/1,
|
||||||
is_str/1,
|
is_str/1,
|
||||||
is_bool/1,
|
is_bool/1,
|
||||||
is_int/1,
|
is_int/1,
|
||||||
|
@ -153,6 +155,9 @@
|
||||||
ascii/1,
|
ascii/1,
|
||||||
find/2,
|
find/2,
|
||||||
find/3,
|
find/3,
|
||||||
|
join_to_string/1,
|
||||||
|
join_to_string/2,
|
||||||
|
join_to_sql_values_string/1,
|
||||||
jq/2,
|
jq/2,
|
||||||
jq/3
|
jq/3
|
||||||
]).
|
]).
|
||||||
|
@ -163,7 +168,10 @@
|
||||||
-export([
|
-export([
|
||||||
map_get/2,
|
map_get/2,
|
||||||
map_get/3,
|
map_get/3,
|
||||||
map_put/3
|
map_put/3,
|
||||||
|
map_keys/1,
|
||||||
|
map_values/1,
|
||||||
|
map_to_entries/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% For backward compatibility
|
%% For backward compatibility
|
||||||
|
@ -699,9 +707,16 @@ hexstr2bin(Str) when is_binary(Str) ->
|
||||||
is_null(undefined) -> true;
|
is_null(undefined) -> true;
|
||||||
is_null(_Data) -> false.
|
is_null(_Data) -> false.
|
||||||
|
|
||||||
|
%% Similar to is_null/1, but also works for the JSON value 'null'
|
||||||
|
is_null_var(null) -> true;
|
||||||
|
is_null_var(Data) -> is_null(Data).
|
||||||
|
|
||||||
is_not_null(Data) ->
|
is_not_null(Data) ->
|
||||||
not is_null(Data).
|
not is_null(Data).
|
||||||
|
|
||||||
|
is_not_null_var(Data) ->
|
||||||
|
not is_null_var(Data).
|
||||||
|
|
||||||
is_str(T) when is_binary(T) -> true;
|
is_str(T) when is_binary(T) -> true;
|
||||||
is_str(_) -> false.
|
is_str(_) -> false.
|
||||||
|
|
||||||
|
@ -847,6 +862,23 @@ find_s(S, P, Dir) ->
|
||||||
SubStr -> SubStr
|
SubStr -> SubStr
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
join_to_string(List) when is_list(List) ->
|
||||||
|
join_to_string(<<", ">>, List).
|
||||||
|
join_to_string(Sep, List) when is_list(List), is_binary(Sep) ->
|
||||||
|
iolist_to_binary(lists:join(Sep, [str(Item) || Item <- List])).
|
||||||
|
join_to_sql_values_string(List) ->
|
||||||
|
QuotedList =
|
||||||
|
[
|
||||||
|
case is_list(Item) of
|
||||||
|
true ->
|
||||||
|
emqx_placeholder:quote_sql(emqx_utils_json:encode(Item));
|
||||||
|
false ->
|
||||||
|
emqx_placeholder:quote_sql(Item)
|
||||||
|
end
|
||||||
|
|| Item <- List
|
||||||
|
],
|
||||||
|
iolist_to_binary(lists:join(<<", ">>, QuotedList)).
|
||||||
|
|
||||||
-spec jq(FilterProgram, JSON, TimeoutMS) -> Result when
|
-spec jq(FilterProgram, JSON, TimeoutMS) -> Result when
|
||||||
FilterProgram :: binary(),
|
FilterProgram :: binary(),
|
||||||
JSON :: binary() | term(),
|
JSON :: binary() | term(),
|
||||||
|
@ -920,7 +952,8 @@ map_put(Key, Val, Map) ->
|
||||||
mget(Key, Map) ->
|
mget(Key, Map) ->
|
||||||
mget(Key, Map, undefined).
|
mget(Key, Map, undefined).
|
||||||
|
|
||||||
mget(Key, Map, Default) ->
|
mget(Key, Map0, Default) ->
|
||||||
|
Map = map(Map0),
|
||||||
case maps:find(Key, Map) of
|
case maps:find(Key, Map) of
|
||||||
{ok, Val} ->
|
{ok, Val} ->
|
||||||
Val;
|
Val;
|
||||||
|
@ -947,7 +980,8 @@ mget(Key, Map, Default) ->
|
||||||
Default
|
Default
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mput(Key, Val, Map) ->
|
mput(Key, Val, Map0) ->
|
||||||
|
Map = map(Map0),
|
||||||
case maps:find(Key, Map) of
|
case maps:find(Key, Map) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
maps:put(Key, Val, Map);
|
maps:put(Key, Val, Map);
|
||||||
|
@ -974,6 +1008,13 @@ mput(Key, Val, Map) ->
|
||||||
maps:put(Key, Val, Map)
|
maps:put(Key, Val, Map)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
map_keys(Map) ->
|
||||||
|
maps:keys(map(Map)).
|
||||||
|
map_values(Map) ->
|
||||||
|
maps:values(map(Map)).
|
||||||
|
map_to_entries(Map) ->
|
||||||
|
[#{key => K, value => V} || {K, V} <- maps:to_list(map(Map))].
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Hash Funcs
|
%% Hash Funcs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -1168,16 +1209,18 @@ map_path(Key) ->
|
||||||
{path, [{key, P} || P <- string:split(Key, ".", all)]}.
|
{path, [{key, P} || P <- string:split(Key, ".", all)]}.
|
||||||
|
|
||||||
function_literal(Fun, []) when is_atom(Fun) ->
|
function_literal(Fun, []) when is_atom(Fun) ->
|
||||||
atom_to_list(Fun) ++ "()";
|
iolist_to_binary(atom_to_list(Fun) ++ "()");
|
||||||
function_literal(Fun, [FArg | Args]) when is_atom(Fun), is_list(Args) ->
|
function_literal(Fun, [FArg | Args]) when is_atom(Fun), is_list(Args) ->
|
||||||
WithFirstArg = io_lib:format("~ts(~0p", [atom_to_list(Fun), FArg]),
|
WithFirstArg = io_lib:format("~ts(~0p", [atom_to_list(Fun), FArg]),
|
||||||
lists:foldl(
|
FuncLiteral =
|
||||||
fun(Arg, Literal) ->
|
lists:foldl(
|
||||||
io_lib:format("~ts, ~0p", [Literal, Arg])
|
fun(Arg, Literal) ->
|
||||||
end,
|
io_lib:format("~ts, ~0p", [Literal, Arg])
|
||||||
WithFirstArg,
|
end,
|
||||||
Args
|
WithFirstArg,
|
||||||
) ++ ")";
|
Args
|
||||||
|
) ++ ")",
|
||||||
|
iolist_to_binary(FuncLiteral);
|
||||||
function_literal(Fun, Args) ->
|
function_literal(Fun, Args) ->
|
||||||
{invalid_func, {Fun, Args}}.
|
{invalid_func, {Fun, Args}}.
|
||||||
|
|
||||||
|
|
|
@ -215,15 +215,32 @@ hex_convert() ->
|
||||||
).
|
).
|
||||||
|
|
||||||
t_is_null(_) ->
|
t_is_null(_) ->
|
||||||
|
?assertEqual(false, emqx_rule_funcs:is_null(null)),
|
||||||
?assertEqual(true, emqx_rule_funcs:is_null(undefined)),
|
?assertEqual(true, emqx_rule_funcs:is_null(undefined)),
|
||||||
|
?assertEqual(false, emqx_rule_funcs:is_null(<<"undefined">>)),
|
||||||
?assertEqual(false, emqx_rule_funcs:is_null(a)),
|
?assertEqual(false, emqx_rule_funcs:is_null(a)),
|
||||||
?assertEqual(false, emqx_rule_funcs:is_null(<<>>)),
|
?assertEqual(false, emqx_rule_funcs:is_null(<<>>)),
|
||||||
?assertEqual(false, emqx_rule_funcs:is_null(<<"a">>)).
|
?assertEqual(false, emqx_rule_funcs:is_null(<<"a">>)).
|
||||||
|
|
||||||
|
t_is_null_var(_) ->
|
||||||
|
?assertEqual(true, emqx_rule_funcs:is_null_var(null)),
|
||||||
|
?assertEqual(false, emqx_rule_funcs:is_null_var(<<"null">>)),
|
||||||
|
?assertEqual(true, emqx_rule_funcs:is_null_var(undefined)),
|
||||||
|
?assertEqual(false, emqx_rule_funcs:is_null_var(<<"undefined">>)),
|
||||||
|
?assertEqual(false, emqx_rule_funcs:is_null_var(a)),
|
||||||
|
?assertEqual(false, emqx_rule_funcs:is_null_var(<<>>)),
|
||||||
|
?assertEqual(false, emqx_rule_funcs:is_null_var(<<"a">>)).
|
||||||
|
|
||||||
t_is_not_null(_) ->
|
t_is_not_null(_) ->
|
||||||
[
|
[
|
||||||
?assertEqual(emqx_rule_funcs:is_not_null(T), not emqx_rule_funcs:is_null(T))
|
?assertEqual(emqx_rule_funcs:is_not_null(T), not emqx_rule_funcs:is_null(T))
|
||||||
|| T <- [undefined, a, <<"a">>, <<>>]
|
|| T <- [undefined, <<"undefined">>, null, <<"null">>, a, <<"a">>, <<>>]
|
||||||
|
].
|
||||||
|
|
||||||
|
t_is_not_null_var(_) ->
|
||||||
|
[
|
||||||
|
?assertEqual(emqx_rule_funcs:is_not_null_var(T), not emqx_rule_funcs:is_null_var(T))
|
||||||
|
|| T <- [undefined, <<"undefined">>, null, <<"null">>, a, <<"a">>, <<>>]
|
||||||
].
|
].
|
||||||
|
|
||||||
t_is_str(_) ->
|
t_is_str(_) ->
|
||||||
|
@ -622,6 +639,63 @@ t_ascii(_) ->
|
||||||
?assertEqual(97, apply_func(ascii, [<<"a">>])),
|
?assertEqual(97, apply_func(ascii, [<<"a">>])),
|
||||||
?assertEqual(97, apply_func(ascii, [<<"ab">>])).
|
?assertEqual(97, apply_func(ascii, [<<"ab">>])).
|
||||||
|
|
||||||
|
t_join_to_string(_) ->
|
||||||
|
A = 1,
|
||||||
|
B = a,
|
||||||
|
C = <<"c">>,
|
||||||
|
D = #{a => 1},
|
||||||
|
E = [1, 2, 3],
|
||||||
|
F = [#{<<"key">> => 1, <<"value">> => 2}],
|
||||||
|
M = #{<<"a">> => a, <<"b">> => 1, <<"c">> => <<"c">>},
|
||||||
|
J = <<"{\"a\":\"a\",\"b\":1,\"c\":\"c\"}">>,
|
||||||
|
?assertEqual(<<"a,b,c">>, apply_func(join_to_string, [<<",">>, [<<"a">>, <<"b">>, <<"c">>]])),
|
||||||
|
?assertEqual(<<"a b c">>, apply_func(join_to_string, [<<" ">>, [<<"a">>, <<"b">>, <<"c">>]])),
|
||||||
|
?assertEqual(
|
||||||
|
<<"a, b, c">>, apply_func(join_to_string, [<<", ">>, [<<"a">>, <<"b">>, <<"c">>]])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
<<"1, a, c, {\"a\":1}, [1,2,3], [{\"value\":2,\"key\":1}]">>,
|
||||||
|
apply_func(join_to_string, [<<", ">>, [A, B, C, D, E, F]])
|
||||||
|
),
|
||||||
|
?assertEqual(<<"a">>, apply_func(join_to_string, [<<",">>, [<<"a">>]])),
|
||||||
|
?assertEqual(<<"">>, apply_func(join_to_string, [<<",">>, []])),
|
||||||
|
?assertEqual(<<"a, b, c">>, apply_func(join_to_string, [emqx_rule_funcs:map_keys(M)])),
|
||||||
|
?assertEqual(<<"a, b, c">>, apply_func(join_to_string, [emqx_rule_funcs:map_keys(J)])),
|
||||||
|
?assertEqual(<<"a, 1, c">>, apply_func(join_to_string, [emqx_rule_funcs:map_values(M)])),
|
||||||
|
?assertEqual(<<"a, 1, c">>, apply_func(join_to_string, [emqx_rule_funcs:map_values(J)])).
|
||||||
|
|
||||||
|
t_join_to_sql_values_string(_) ->
|
||||||
|
A = 1,
|
||||||
|
B = a,
|
||||||
|
C = <<"c">>,
|
||||||
|
D = #{a => 1},
|
||||||
|
E = [1, 2, 3],
|
||||||
|
E1 = [97, 98],
|
||||||
|
F = [#{<<"key">> => 1, <<"value">> => 2}],
|
||||||
|
M = #{<<"a">> => a, <<"b">> => 1, <<"c">> => <<"c">>},
|
||||||
|
J = <<"{\"a\":\"a\",\"b\":1,\"c\":\"c\"}">>,
|
||||||
|
?assertEqual(
|
||||||
|
<<"'a', 'b', 'c'">>, apply_func(join_to_sql_values_string, [[<<"a">>, <<"b">>, <<"c">>]])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
<<"1, 'a', 'c', '{\"a\":1}', '[1,2,3]', '[97,98]', '[{\"value\":2,\"key\":1}]'">>,
|
||||||
|
apply_func(join_to_sql_values_string, [[A, B, C, D, E, E1, F]])
|
||||||
|
),
|
||||||
|
?assertEqual(<<"'a'">>, apply_func(join_to_sql_values_string, [[<<"a">>]])),
|
||||||
|
?assertEqual(<<"">>, apply_func(join_to_sql_values_string, [[]])),
|
||||||
|
?assertEqual(
|
||||||
|
<<"'a', 'b', 'c'">>, apply_func(join_to_sql_values_string, [emqx_rule_funcs:map_keys(M)])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
<<"'a', 'b', 'c'">>, apply_func(join_to_sql_values_string, [emqx_rule_funcs:map_keys(J)])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
<<"'a', 1, 'c'">>, apply_func(join_to_sql_values_string, [emqx_rule_funcs:map_values(M)])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
<<"'a', 1, 'c'">>, apply_func(join_to_sql_values_string, [emqx_rule_funcs:map_values(J)])
|
||||||
|
).
|
||||||
|
|
||||||
t_find(_) ->
|
t_find(_) ->
|
||||||
?assertEqual(<<"cbcd">>, apply_func(find, [<<"acbcd">>, <<"c">>])),
|
?assertEqual(<<"cbcd">>, apply_func(find, [<<"acbcd">>, <<"c">>])),
|
||||||
?assertEqual(<<"cbcd">>, apply_func(find, [<<"acbcd">>, <<"c">>, <<"leading">>])),
|
?assertEqual(<<"cbcd">>, apply_func(find, [<<"acbcd">>, <<"c">>, <<"leading">>])),
|
||||||
|
@ -746,14 +820,37 @@ t_map_put(_) ->
|
||||||
?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])).
|
?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])).
|
||||||
|
|
||||||
t_mget(_) ->
|
t_mget(_) ->
|
||||||
?assertEqual(1, apply_func(map_get, [<<"a">>, #{a => 1}])),
|
?assertEqual(1, apply_func(mget, [<<"a">>, #{a => 1}])),
|
||||||
?assertEqual(1, apply_func(map_get, [<<"a">>, #{<<"a">> => 1}])),
|
?assertEqual(1, apply_func(mget, [<<"a">>, <<"{\"a\" : 1}">>])),
|
||||||
?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])).
|
?assertEqual(1, apply_func(mget, [<<"a">>, #{<<"a">> => 1}])),
|
||||||
|
?assertEqual(1, apply_func(mget, [<<"a.b">>, #{<<"a.b">> => 1}])),
|
||||||
|
?assertEqual(undefined, apply_func(mget, [<<"a">>, #{}])).
|
||||||
|
|
||||||
t_mput(_) ->
|
t_mput(_) ->
|
||||||
?assertEqual(#{<<"a">> => 1}, apply_func(map_put, [<<"a">>, 1, #{}])),
|
?assertEqual(#{<<"a">> => 1}, apply_func(mput, [<<"a">>, 1, #{}])),
|
||||||
?assertEqual(#{<<"a">> => 2}, apply_func(map_put, [<<"a">>, 2, #{<<"a">> => 1}])),
|
?assertEqual(#{<<"a">> => 2}, apply_func(mput, [<<"a">>, 2, #{<<"a">> => 1}])),
|
||||||
?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])).
|
?assertEqual(#{<<"a">> => 2}, apply_func(mput, [<<"a">>, 2, <<"{\"a\" : 1}">>])),
|
||||||
|
?assertEqual(#{<<"a.b">> => 2}, apply_func(mput, [<<"a.b">>, 2, #{<<"a.b">> => 1}])),
|
||||||
|
?assertEqual(#{a => 2}, apply_func(mput, [<<"a">>, 2, #{a => 1}])).
|
||||||
|
|
||||||
|
t_map_to_entries(_) ->
|
||||||
|
?assertEqual([], apply_func(map_to_entries, [#{}])),
|
||||||
|
M = #{a => 1, b => <<"b">>},
|
||||||
|
J = <<"{\"a\":1,\"b\":\"b\"}">>,
|
||||||
|
?assertEqual(
|
||||||
|
[
|
||||||
|
#{key => a, value => 1},
|
||||||
|
#{key => b, value => <<"b">>}
|
||||||
|
],
|
||||||
|
apply_func(map_to_entries, [M])
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
[
|
||||||
|
#{key => <<"a">>, value => 1},
|
||||||
|
#{key => <<"b">>, value => <<"b">>}
|
||||||
|
],
|
||||||
|
apply_func(map_to_entries, [J])
|
||||||
|
).
|
||||||
|
|
||||||
t_bitsize(_) ->
|
t_bitsize(_) ->
|
||||||
?assertEqual(8, apply_func(bitsize, [<<"a">>])),
|
?assertEqual(8, apply_func(bitsize, [<<"a">>])),
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Added new SQL functions: map_keys(), map_values(), map_to_entries(), join_to_string(), join_to_string(), join_to_sql_values_string(), is_null_var(), is_not_null_var().
|
||||||
|
|
||||||
|
For more information on the functions and their usage, refer to the documentation.
|
Loading…
Reference in New Issue