Merge remote-tracking branch 'origin/release-57'
This commit is contained in:
commit
1bc9600c58
|
@ -78,7 +78,8 @@ jobs:
|
||||||
MIX_ENV: emqx-enterprise
|
MIX_ENV: emqx-enterprise
|
||||||
PROFILE: emqx-enterprise
|
PROFILE: emqx-enterprise
|
||||||
run: |
|
run: |
|
||||||
mix local.hex --force --if-missing && mix local.rebar --force --if-missing
|
# mix local.hex --force --if-missing && mix local.rebar --force --if-missing
|
||||||
|
mix local.hex 2.0.6 --force --if-missing && mix local.rebar --force --if-missing
|
||||||
- name: Check formatting
|
- name: Check formatting
|
||||||
env:
|
env:
|
||||||
MIX_ENV: emqx-enterprise
|
MIX_ENV: emqx-enterprise
|
||||||
|
|
|
@ -28,7 +28,8 @@ jobs:
|
||||||
- run: ./scripts/check-deps-integrity.escript
|
- run: ./scripts/check-deps-integrity.escript
|
||||||
- name: Setup mix
|
- name: Setup mix
|
||||||
run: |
|
run: |
|
||||||
mix local.hex --force
|
# mix local.hex --force
|
||||||
|
mix local.hex 2.0.6 --force
|
||||||
mix local.rebar --force
|
mix local.rebar --force
|
||||||
mix deps.get
|
mix deps.get
|
||||||
- name: print mix dependency tree
|
- name: print mix dependency tree
|
||||||
|
|
3
Makefile
3
Makefile
|
@ -53,7 +53,8 @@ $(REBAR): .prepare ensure-rebar3
|
||||||
|
|
||||||
.PHONY: ensure-hex
|
.PHONY: ensure-hex
|
||||||
ensure-hex:
|
ensure-hex:
|
||||||
@mix local.hex --if-missing --force
|
# @mix local.hex --if-missing --force
|
||||||
|
@mix local.hex 2.0.6 --if-missing --force
|
||||||
|
|
||||||
.PHONY: ensure-mix-rebar3
|
.PHONY: ensure-mix-rebar3
|
||||||
ensure-mix-rebar3: $(REBAR)
|
ensure-mix-rebar3: $(REBAR)
|
||||||
|
|
|
@ -21,7 +21,8 @@
|
||||||
%% API
|
%% API
|
||||||
-export([
|
-export([
|
||||||
start_link/0,
|
start_link/0,
|
||||||
get_subscription_count/0
|
get_subscription_count/0,
|
||||||
|
get_disconnected_session_count/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% `gen_server' API
|
%% `gen_server' API
|
||||||
|
@ -39,7 +40,9 @@
|
||||||
|
|
||||||
%% call/cast/info events
|
%% call/cast/info events
|
||||||
-record(tally_subs, {}).
|
-record(tally_subs, {}).
|
||||||
|
-record(tally_disconnected_sessions, {}).
|
||||||
-record(get_subscription_count, {}).
|
-record(get_subscription_count, {}).
|
||||||
|
-record(get_disconnected_session_count, {}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
|
@ -59,6 +62,16 @@ get_subscription_count() ->
|
||||||
0
|
0
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Gets a cached view of the cluster-global count of disconnected persistent sessions.
|
||||||
|
-spec get_disconnected_session_count() -> non_neg_integer().
|
||||||
|
get_disconnected_session_count() ->
|
||||||
|
case emqx_persistent_message:is_persistence_enabled() of
|
||||||
|
true ->
|
||||||
|
gen_server:call(?MODULE, #get_disconnected_session_count{}, infinity);
|
||||||
|
false ->
|
||||||
|
0
|
||||||
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% `gen_server' API
|
%% `gen_server' API
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -66,7 +79,10 @@ get_subscription_count() ->
|
||||||
init(_Opts) ->
|
init(_Opts) ->
|
||||||
case emqx_persistent_message:is_persistence_enabled() of
|
case emqx_persistent_message:is_persistence_enabled() of
|
||||||
true ->
|
true ->
|
||||||
State = #{subs_count => 0},
|
State = #{
|
||||||
|
subs_count => 0,
|
||||||
|
disconnected_session_count => 0
|
||||||
|
},
|
||||||
{ok, State, {continue, #tally_subs{}}};
|
{ok, State, {continue, #tally_subs{}}};
|
||||||
false ->
|
false ->
|
||||||
ignore
|
ignore
|
||||||
|
@ -75,11 +91,18 @@ init(_Opts) ->
|
||||||
handle_continue(#tally_subs{}, State0) ->
|
handle_continue(#tally_subs{}, State0) ->
|
||||||
State = tally_persistent_subscriptions(State0),
|
State = tally_persistent_subscriptions(State0),
|
||||||
ensure_subs_tally_timer(),
|
ensure_subs_tally_timer(),
|
||||||
|
{noreply, State, {continue, #tally_disconnected_sessions{}}};
|
||||||
|
handle_continue(#tally_disconnected_sessions{}, State0) ->
|
||||||
|
State = tally_disconnected_persistent_sessions(State0),
|
||||||
|
ensure_disconnected_sessions_tally_timer(),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_call(#get_subscription_count{}, _From, State) ->
|
handle_call(#get_subscription_count{}, _From, State) ->
|
||||||
#{subs_count := N} = State,
|
#{subs_count := N} = State,
|
||||||
{reply, N, State};
|
{reply, N, State};
|
||||||
|
handle_call(#get_disconnected_session_count{}, _From, State) ->
|
||||||
|
#{disconnected_session_count := N} = State,
|
||||||
|
{reply, N, State};
|
||||||
handle_call(_Call, _From, State) ->
|
handle_call(_Call, _From, State) ->
|
||||||
{reply, {error, bad_call}, State}.
|
{reply, {error, bad_call}, State}.
|
||||||
|
|
||||||
|
@ -90,6 +113,10 @@ handle_info(#tally_subs{}, State0) ->
|
||||||
State = tally_persistent_subscriptions(State0),
|
State = tally_persistent_subscriptions(State0),
|
||||||
ensure_subs_tally_timer(),
|
ensure_subs_tally_timer(),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
handle_info(#tally_disconnected_sessions{}, State0) ->
|
||||||
|
State = tally_disconnected_persistent_sessions(State0),
|
||||||
|
ensure_disconnected_sessions_tally_timer(),
|
||||||
|
{noreply, State};
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
@ -101,7 +128,38 @@ tally_persistent_subscriptions(State0) ->
|
||||||
N = emqx_persistent_session_ds_state:total_subscription_count(),
|
N = emqx_persistent_session_ds_state:total_subscription_count(),
|
||||||
State0#{subs_count := N}.
|
State0#{subs_count := N}.
|
||||||
|
|
||||||
|
tally_disconnected_persistent_sessions(State0) ->
|
||||||
|
N = do_tally_disconnected_persistent_sessions(),
|
||||||
|
State0#{disconnected_session_count := N}.
|
||||||
|
|
||||||
ensure_subs_tally_timer() ->
|
ensure_subs_tally_timer() ->
|
||||||
Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]),
|
Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]),
|
||||||
_ = erlang:send_after(Timeout, self(), #tally_subs{}),
|
_ = erlang:send_after(Timeout, self(), #tally_subs{}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
ensure_disconnected_sessions_tally_timer() ->
|
||||||
|
Timeout = emqx_config:get([durable_sessions, disconnected_session_count_refresh_interval]),
|
||||||
|
_ = erlang:send_after(Timeout, self(), #tally_disconnected_sessions{}),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
do_tally_disconnected_persistent_sessions() ->
|
||||||
|
Iter = emqx_persistent_session_ds_state:make_session_iterator(),
|
||||||
|
do_tally_disconnected_persistent_sessions(Iter, 0).
|
||||||
|
|
||||||
|
do_tally_disconnected_persistent_sessions('$end_of_table', N) ->
|
||||||
|
N;
|
||||||
|
do_tally_disconnected_persistent_sessions(Iter0, N) ->
|
||||||
|
case emqx_persistent_session_ds_state:session_iterator_next(Iter0, 1) of
|
||||||
|
{[], _} ->
|
||||||
|
N;
|
||||||
|
{[{Id, _Meta}], Iter} ->
|
||||||
|
case is_live_session(Id) of
|
||||||
|
true ->
|
||||||
|
do_tally_disconnected_persistent_sessions(Iter, N);
|
||||||
|
false ->
|
||||||
|
do_tally_disconnected_persistent_sessions(Iter, N + 1)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_live_session(Id) ->
|
||||||
|
[] =/= emqx_cm_registry:lookup_channels(Id).
|
||||||
|
|
|
@ -1719,6 +1719,14 @@ fields("durable_sessions") ->
|
||||||
importance => ?IMPORTANCE_HIDDEN
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
{"disconnected_session_count_refresh_interval",
|
||||||
|
sc(
|
||||||
|
timeout_duration(),
|
||||||
|
#{
|
||||||
|
default => <<"5s">>,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
}
|
||||||
|
)},
|
||||||
{"message_retention_period",
|
{"message_retention_period",
|
||||||
sc(
|
sc(
|
||||||
timeout_duration(),
|
timeout_duration(),
|
||||||
|
|
|
@ -476,6 +476,7 @@ zone_global_defaults() ->
|
||||||
renew_streams_interval => 5000,
|
renew_streams_interval => 5000,
|
||||||
session_gc_batch_size => 100,
|
session_gc_batch_size => 100,
|
||||||
session_gc_interval => 600000,
|
session_gc_interval => 600000,
|
||||||
subscription_count_refresh_interval => 5000
|
subscription_count_refresh_interval => 5000,
|
||||||
|
disconnected_session_count_refresh_interval => 5000
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -69,7 +69,11 @@ fields(?ACTION) ->
|
||||||
}),
|
}),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC(s3_upload)
|
desc => ?DESC(s3_upload),
|
||||||
|
%% NOTE
|
||||||
|
%% There seems to be no way to attach validators to union types, thus we
|
||||||
|
%% have to attach a "common denominator" validator here.
|
||||||
|
validator => validators(s3_upload_parameters)
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
#{
|
#{
|
||||||
|
@ -211,6 +215,9 @@ desc(s3_upload_resource_opts) ->
|
||||||
desc(_Name) ->
|
desc(_Name) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
|
validators(s3_upload_parameters) ->
|
||||||
|
emqx_s3_schema:validators(s3_uploader).
|
||||||
|
|
||||||
convert_actions(Conf = #{}, Opts) ->
|
convert_actions(Conf = #{}, Opts) ->
|
||||||
maps:map(fun(_Name, ConfAction) -> convert_action(ConfAction, Opts) end, Conf);
|
maps:map(fun(_Name, ConfAction) -> convert_action(ConfAction, Opts) end, Conf);
|
||||||
convert_actions(undefined, _) ->
|
convert_actions(undefined, _) ->
|
||||||
|
|
|
@ -156,6 +156,24 @@ t_create_via_http(Config) ->
|
||||||
t_on_get_status(Config) ->
|
t_on_get_status(Config) ->
|
||||||
emqx_bridge_v2_testlib:t_on_get_status(Config, #{}).
|
emqx_bridge_v2_testlib:t_on_get_status(Config, #{}).
|
||||||
|
|
||||||
|
t_invalid_config(Config) ->
|
||||||
|
?assertMatch(
|
||||||
|
{error,
|
||||||
|
{_Status, _, #{
|
||||||
|
<<"code">> := <<"BAD_REQUEST">>,
|
||||||
|
<<"message">> := #{<<"kind">> := <<"validation_error">>}
|
||||||
|
}}},
|
||||||
|
emqx_bridge_v2_testlib:create_bridge_api(
|
||||||
|
Config,
|
||||||
|
_Overrides = #{
|
||||||
|
<<"parameters">> => #{
|
||||||
|
<<"min_part_size">> => <<"5GB">>,
|
||||||
|
<<"max_part_size">> => <<"100MB">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
t_aggreg_upload(Config) ->
|
t_aggreg_upload(Config) ->
|
||||||
Bucket = ?config(s3_bucket, Config),
|
Bucket = ?config(s3_bucket, Config),
|
||||||
BridgeName = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
|
|
|
@ -72,6 +72,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(GAUGE_SAMPLER_LIST, [
|
-define(GAUGE_SAMPLER_LIST, [
|
||||||
|
disconnected_durable_sessions,
|
||||||
durable_subscriptions,
|
durable_subscriptions,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
topics,
|
topics,
|
||||||
|
|
|
@ -262,6 +262,8 @@ merge_cluster_rate(Node, Cluster) ->
|
||||||
Fun =
|
Fun =
|
||||||
fun
|
fun
|
||||||
%% cluster-synced values
|
%% cluster-synced values
|
||||||
|
(disconnected_durable_sessions, V, NCluster) ->
|
||||||
|
NCluster#{disconnected_durable_sessions => V};
|
||||||
(durable_subscriptions, V, NCluster) ->
|
(durable_subscriptions, V, NCluster) ->
|
||||||
NCluster#{durable_subscriptions => V};
|
NCluster#{durable_subscriptions => V};
|
||||||
(topics, V, NCluster) ->
|
(topics, V, NCluster) ->
|
||||||
|
@ -417,22 +419,40 @@ getstats(Key) ->
|
||||||
_:_ -> 0
|
_:_ -> 0
|
||||||
end.
|
end.
|
||||||
|
|
||||||
stats(connections) -> emqx_stats:getstat('connections.count');
|
stats(connections) ->
|
||||||
stats(durable_subscriptions) -> emqx_stats:getstat('durable_subscriptions.count');
|
emqx_stats:getstat('connections.count');
|
||||||
stats(live_connections) -> emqx_stats:getstat('live_connections.count');
|
stats(disconnected_durable_sessions) ->
|
||||||
stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count');
|
emqx_persistent_session_bookkeeper:get_disconnected_session_count();
|
||||||
stats(topics) -> emqx_stats:getstat('topics.count');
|
stats(durable_subscriptions) ->
|
||||||
stats(subscriptions) -> emqx_stats:getstat('subscriptions.count');
|
emqx_stats:getstat('durable_subscriptions.count');
|
||||||
stats(shared_subscriptions) -> emqx_stats:getstat('subscriptions.shared.count');
|
stats(live_connections) ->
|
||||||
stats(retained_msg_count) -> emqx_stats:getstat('retained.count');
|
emqx_stats:getstat('live_connections.count');
|
||||||
stats(received) -> emqx_metrics:val('messages.received');
|
stats(cluster_sessions) ->
|
||||||
stats(received_bytes) -> emqx_metrics:val('bytes.received');
|
emqx_stats:getstat('cluster_sessions.count');
|
||||||
stats(sent) -> emqx_metrics:val('messages.sent');
|
stats(topics) ->
|
||||||
stats(sent_bytes) -> emqx_metrics:val('bytes.sent');
|
emqx_stats:getstat('topics.count');
|
||||||
stats(validation_succeeded) -> emqx_metrics:val('messages.validation_succeeded');
|
stats(subscriptions) ->
|
||||||
stats(validation_failed) -> emqx_metrics:val('messages.validation_failed');
|
emqx_stats:getstat('subscriptions.count');
|
||||||
stats(dropped) -> emqx_metrics:val('messages.dropped');
|
stats(shared_subscriptions) ->
|
||||||
stats(persisted) -> emqx_metrics:val('messages.persisted').
|
emqx_stats:getstat('subscriptions.shared.count');
|
||||||
|
stats(retained_msg_count) ->
|
||||||
|
emqx_stats:getstat('retained.count');
|
||||||
|
stats(received) ->
|
||||||
|
emqx_metrics:val('messages.received');
|
||||||
|
stats(received_bytes) ->
|
||||||
|
emqx_metrics:val('bytes.received');
|
||||||
|
stats(sent) ->
|
||||||
|
emqx_metrics:val('messages.sent');
|
||||||
|
stats(sent_bytes) ->
|
||||||
|
emqx_metrics:val('bytes.sent');
|
||||||
|
stats(validation_succeeded) ->
|
||||||
|
emqx_metrics:val('messages.validation_succeeded');
|
||||||
|
stats(validation_failed) ->
|
||||||
|
emqx_metrics:val('messages.validation_failed');
|
||||||
|
stats(dropped) ->
|
||||||
|
emqx_metrics:val('messages.dropped');
|
||||||
|
stats(persisted) ->
|
||||||
|
emqx_metrics:val('messages.persisted').
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% Retained && License Quota
|
%% Retained && License Quota
|
||||||
|
|
|
@ -194,6 +194,8 @@ swagger_desc(validation_failed) ->
|
||||||
swagger_desc_format("Schema validations failed ");
|
swagger_desc_format("Schema validations failed ");
|
||||||
swagger_desc(persisted) ->
|
swagger_desc(persisted) ->
|
||||||
swagger_desc_format("Messages saved to the durable storage ");
|
swagger_desc_format("Messages saved to the durable storage ");
|
||||||
|
swagger_desc(disconnected_durable_sessions) ->
|
||||||
|
<<"Disconnected durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
|
||||||
swagger_desc(durable_subscriptions) ->
|
swagger_desc(durable_subscriptions) ->
|
||||||
<<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
|
<<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
|
||||||
swagger_desc(subscriptions) ->
|
swagger_desc(subscriptions) ->
|
||||||
|
|
|
@ -341,6 +341,8 @@ t_persistent_session_stats(_Config) ->
|
||||||
?retry(1_000, 10, begin
|
?retry(1_000, 10, begin
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, #{
|
{ok, #{
|
||||||
|
<<"connections">> := 2,
|
||||||
|
<<"disconnected_durable_sessions">> := 0,
|
||||||
%% N.B.: we currently don't perform any deduplication between persistent
|
%% N.B.: we currently don't perform any deduplication between persistent
|
||||||
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
||||||
%% instead of 6 here.
|
%% instead of 6 here.
|
||||||
|
@ -356,6 +358,29 @@ t_persistent_session_stats(_Config) ->
|
||||||
?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}),
|
?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}),
|
||||||
PSSubCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
|
PSSubCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
|
||||||
?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}),
|
?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}),
|
||||||
|
|
||||||
|
%% Now with disconnected but alive persistent sessions
|
||||||
|
{ok, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
emqtt:disconnect(PSClient),
|
||||||
|
#{?snk_kind := dashboard_monitor_flushed}
|
||||||
|
),
|
||||||
|
?retry(1_000, 10, begin
|
||||||
|
?assertMatch(
|
||||||
|
{ok, #{
|
||||||
|
<<"connections">> := 1,
|
||||||
|
<<"disconnected_durable_sessions">> := 1,
|
||||||
|
%% N.B.: we currently don't perform any deduplication between persistent
|
||||||
|
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
||||||
|
%% instead of 6 here.
|
||||||
|
<<"topics">> := 8,
|
||||||
|
<<"durable_subscriptions">> := 4,
|
||||||
|
<<"subscriptions">> := 4
|
||||||
|
}},
|
||||||
|
request(["monitor_current"])
|
||||||
|
)
|
||||||
|
end),
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
request(Path) ->
|
request(Path) ->
|
||||||
|
|
|
@ -1384,7 +1384,6 @@ do_list_clients_cluster_query(
|
||||||
{Rows, QueryState1 = #{complete := Complete0}} ->
|
{Rows, QueryState1 = #{complete := Complete0}} ->
|
||||||
case emqx_mgmt_api:accumulate_query_rows(Node, Rows, QueryState1, ResultAcc) of
|
case emqx_mgmt_api:accumulate_query_rows(Node, Rows, QueryState1, ResultAcc) of
|
||||||
{enough, NResultAcc} ->
|
{enough, NResultAcc} ->
|
||||||
%% TODO: add persistent session count?
|
|
||||||
%% TODO: this may return `{error, _, _}'...
|
%% TODO: this may return `{error, _, _}'...
|
||||||
QueryState2 = emqx_mgmt_api:maybe_collect_total_from_tail_nodes(
|
QueryState2 = emqx_mgmt_api:maybe_collect_total_from_tail_nodes(
|
||||||
Tail, QueryState1
|
Tail, QueryState1
|
||||||
|
@ -1428,8 +1427,9 @@ add_persistent_session_count(QueryState0 = #{total := Totals0}) ->
|
||||||
%% to traverse the whole table), but also hard to deduplicate live connections
|
%% to traverse the whole table), but also hard to deduplicate live connections
|
||||||
%% from it... So this count will possibly overshoot the true count of
|
%% from it... So this count will possibly overshoot the true count of
|
||||||
%% sessions.
|
%% sessions.
|
||||||
SessionCount = persistent_session_count(),
|
DisconnectedSessionCount =
|
||||||
Totals = Totals0#{undefined => SessionCount},
|
emqx_persistent_session_bookkeeper:get_disconnected_session_count(),
|
||||||
|
Totals = Totals0#{undefined => DisconnectedSessionCount},
|
||||||
QueryState0#{total := Totals};
|
QueryState0#{total := Totals};
|
||||||
false ->
|
false ->
|
||||||
QueryState0
|
QueryState0
|
||||||
|
@ -1477,27 +1477,6 @@ no_persistent_sessions() ->
|
||||||
true
|
true
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec persistent_session_count() -> non_neg_integer().
|
|
||||||
persistent_session_count() ->
|
|
||||||
%% N.B.: this is potentially costly. Should not be called in hot paths.
|
|
||||||
%% `mnesia:table_info(_, size)' is always zero for rocksdb, so we need to traverse...
|
|
||||||
do_persistent_session_count(init_persistent_session_iterator(), 0).
|
|
||||||
|
|
||||||
do_persistent_session_count('$end_of_table', N) ->
|
|
||||||
N;
|
|
||||||
do_persistent_session_count(Cursor, N) ->
|
|
||||||
case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of
|
|
||||||
{[], _} ->
|
|
||||||
N;
|
|
||||||
{[{_Id, Meta}], NextCursor} ->
|
|
||||||
case is_expired(Meta) of
|
|
||||||
true ->
|
|
||||||
do_persistent_session_count(NextCursor, N);
|
|
||||||
false ->
|
|
||||||
do_persistent_session_count(NextCursor, N + 1)
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
is_expired(#{last_alive_at := LastAliveAt, expiry_interval := ExpiryInterval}) ->
|
is_expired(#{last_alive_at := LastAliveAt, expiry_interval := ExpiryInterval}) ->
|
||||||
LastAliveAt + ExpiryInterval < erlang:system_time(millisecond).
|
LastAliveAt + ExpiryInterval < erlang:system_time(millisecond).
|
||||||
|
|
||||||
|
|
|
@ -79,7 +79,9 @@ end_per_suite(Config) ->
|
||||||
|
|
||||||
init_per_group(persistent_sessions, Config) ->
|
init_per_group(persistent_sessions, Config) ->
|
||||||
AppSpecs = [
|
AppSpecs = [
|
||||||
{emqx, "durable_sessions.enable = true"},
|
{emqx,
|
||||||
|
"durable_sessions.enable = true\n"
|
||||||
|
"durable_sessions.disconnected_session_count_refresh_interval = 100ms"},
|
||||||
emqx_management
|
emqx_management
|
||||||
],
|
],
|
||||||
Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
|
Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
|
||||||
|
@ -457,9 +459,7 @@ t_persistent_sessions5(Config) ->
|
||||||
{{_, 200, _}, _, #{
|
{{_, 200, _}, _, #{
|
||||||
<<"data">> := [_, _, _],
|
<<"data">> := [_, _, _],
|
||||||
<<"meta">> := #{
|
<<"meta">> := #{
|
||||||
%% TODO: if/when we fix the persistent session count, this
|
<<"count">> := 4,
|
||||||
%% should be 4.
|
|
||||||
<<"count">> := 6,
|
|
||||||
<<"hasnext">> := true
|
<<"hasnext">> := true
|
||||||
}
|
}
|
||||||
}}},
|
}}},
|
||||||
|
@ -470,9 +470,7 @@ t_persistent_sessions5(Config) ->
|
||||||
{{_, 200, _}, _, #{
|
{{_, 200, _}, _, #{
|
||||||
<<"data">> := [_],
|
<<"data">> := [_],
|
||||||
<<"meta">> := #{
|
<<"meta">> := #{
|
||||||
%% TODO: if/when we fix the persistent session count, this
|
<<"count">> := 4,
|
||||||
%% should be 4.
|
|
||||||
<<"count">> := 6,
|
|
||||||
<<"hasnext">> := false
|
<<"hasnext">> := false
|
||||||
}
|
}
|
||||||
}}},
|
}}},
|
||||||
|
@ -489,9 +487,7 @@ t_persistent_sessions5(Config) ->
|
||||||
{{_, 200, _}, _, #{
|
{{_, 200, _}, _, #{
|
||||||
<<"data">> := [_, _],
|
<<"data">> := [_, _],
|
||||||
<<"meta">> := #{
|
<<"meta">> := #{
|
||||||
%% TODO: if/when we fix the persistent session count, this
|
<<"count">> := 4,
|
||||||
%% should be 4.
|
|
||||||
<<"count">> := 6,
|
|
||||||
<<"hasnext">> := true
|
<<"hasnext">> := true
|
||||||
}
|
}
|
||||||
}}},
|
}}},
|
||||||
|
@ -1996,7 +1992,11 @@ assert_single_client(Opts) ->
|
||||||
100,
|
100,
|
||||||
20,
|
20,
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}},
|
{ok,
|
||||||
|
{{_, 200, _}, _, #{
|
||||||
|
<<"data">> := [#{<<"connected">> := IsConnected}],
|
||||||
|
<<"meta">> := #{<<"count">> := 1}
|
||||||
|
}}},
|
||||||
list_request(APIPort)
|
list_request(APIPort)
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
-import(hoconsc, [mk/2, ref/2]).
|
-import(hoconsc, [mk/2, ref/2]).
|
||||||
|
|
||||||
-export([roots/0, fields/1, namespace/0, tags/0, desc/1]).
|
-export([roots/0, fields/1, namespace/0, tags/0, desc/1]).
|
||||||
|
-export([validators/1]).
|
||||||
|
|
||||||
-export([translate/1]).
|
-export([translate/1]).
|
||||||
-export([translate/2]).
|
-export([translate/2]).
|
||||||
|
@ -177,6 +178,14 @@ desc(s3_upload) ->
|
||||||
desc(transport_options) ->
|
desc(transport_options) ->
|
||||||
"Options for the HTTP transport layer used by the S3 client".
|
"Options for the HTTP transport layer used by the S3 client".
|
||||||
|
|
||||||
|
validators(s3_uploader) ->
|
||||||
|
[fun validate_part_size/1].
|
||||||
|
|
||||||
|
validate_part_size(Conf) ->
|
||||||
|
Min = hocon_maps:get(<<"min_part_size">>, Conf),
|
||||||
|
Max = hocon_maps:get(<<"max_part_size">>, Conf),
|
||||||
|
Min =< Max orelse {error, <<"Inconsistent 'min_part_size': cannot exceed 'max_part_size'">>}.
|
||||||
|
|
||||||
translate(Conf) ->
|
translate(Conf) ->
|
||||||
translate(Conf, #{}).
|
translate(Conf, #{}).
|
||||||
|
|
||||||
|
|
6
build
6
build
|
@ -185,7 +185,8 @@ just_compile_elixir() {
|
||||||
rm -f rebar.lock
|
rm -f rebar.lock
|
||||||
env MIX_ENV="$PROFILE" mix local.rebar --if-missing --force
|
env MIX_ENV="$PROFILE" mix local.rebar --if-missing --force
|
||||||
env MIX_ENV="$PROFILE" mix local.rebar rebar3 "${PWD}/rebar3" --if-missing --force
|
env MIX_ENV="$PROFILE" mix local.rebar rebar3 "${PWD}/rebar3" --if-missing --force
|
||||||
env MIX_ENV="$PROFILE" mix local.hex --if-missing --force
|
# env MIX_ENV="$PROFILE" mix local.hex --if-missing --force
|
||||||
|
env MIX_ENV="$PROFILE" mix local.hex 2.0.6 --if-missing --force
|
||||||
env MIX_ENV="$PROFILE" mix deps.get
|
env MIX_ENV="$PROFILE" mix deps.get
|
||||||
env MIX_ENV="$PROFILE" mix compile
|
env MIX_ENV="$PROFILE" mix compile
|
||||||
}
|
}
|
||||||
|
@ -203,7 +204,8 @@ make_elixir_rel() {
|
||||||
export_elixir_release_vars "$PROFILE"
|
export_elixir_release_vars "$PROFILE"
|
||||||
env MIX_ENV="$PROFILE" mix local.rebar --if-missing --force
|
env MIX_ENV="$PROFILE" mix local.rebar --if-missing --force
|
||||||
env MIX_ENV="$PROFILE" mix local.rebar rebar3 "${PWD}/rebar3" --if-missing --force
|
env MIX_ENV="$PROFILE" mix local.rebar rebar3 "${PWD}/rebar3" --if-missing --force
|
||||||
env MIX_ENV="$PROFILE" mix local.hex --if-missing --force
|
# env MIX_ENV="$PROFILE" mix local.hex --if-missing --force
|
||||||
|
env MIX_ENV="$PROFILE" mix local.hex 2.0.6 --if-missing --force
|
||||||
env MIX_ENV="$PROFILE" mix deps.get
|
env MIX_ENV="$PROFILE" mix deps.get
|
||||||
env MIX_ENV="$PROFILE" mix release --overwrite
|
env MIX_ENV="$PROFILE" mix release --overwrite
|
||||||
assert_no_excluded_deps emqx-enterprise emqx_telemetry
|
assert_no_excluded_deps emqx-enterprise emqx_telemetry
|
||||||
|
|
4
mix.exs
4
mix.exs
|
@ -102,7 +102,8 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
{:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true},
|
{:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true},
|
||||||
{:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true},
|
{:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true},
|
||||||
{:ra, "2.7.3", override: true},
|
{:ra, "2.7.3", override: true},
|
||||||
{:mimerl, "1.2.0", override: true}
|
{:mimerl, "1.2.0", override: true},
|
||||||
|
{:supervisor3, "1.1.12", override: true}
|
||||||
] ++
|
] ++
|
||||||
emqx_apps(profile_info, version) ++
|
emqx_apps(profile_info, version) ++
|
||||||
enterprise_deps(profile_info) ++ jq_dep() ++ quicer_dep()
|
enterprise_deps(profile_info) ++ jq_dep() ++ quicer_dep()
|
||||||
|
@ -215,7 +216,6 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
{:brod, github: "kafka4beam/brod", tag: "3.16.8"},
|
{:brod, github: "kafka4beam/brod", tag: "3.16.8"},
|
||||||
{:snappyer, "1.2.9", override: true},
|
{:snappyer, "1.2.9", override: true},
|
||||||
{:crc32cer, "0.1.8", override: true},
|
{:crc32cer, "0.1.8", override: true},
|
||||||
{:supervisor3, "1.1.12", override: true},
|
|
||||||
{:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true},
|
{:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true},
|
||||||
{:greptimedb,
|
{:greptimedb,
|
||||||
github: "GreptimeTeam/greptimedb-ingester-erl", tag: "v0.1.8", override: true},
|
github: "GreptimeTeam/greptimedb-ingester-erl", tag: "v0.1.8", override: true},
|
||||||
|
|
Loading…
Reference in New Issue