feat(ds): Add a flag that forces all sessions to become durable
This commit is contained in:
parent
ee191803ea
commit
82e74d0201
|
@ -19,7 +19,7 @@
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
|
||||||
-export([init/0]).
|
-export([init/0]).
|
||||||
-export([is_persistence_enabled/0]).
|
-export([is_persistence_enabled/0, force_ds/0]).
|
||||||
|
|
||||||
%% Message persistence
|
%% Message persistence
|
||||||
-export([
|
-export([
|
||||||
|
@ -54,6 +54,12 @@ is_persistence_enabled() ->
|
||||||
storage_backend() ->
|
storage_backend() ->
|
||||||
storage_backend(emqx_config:get([session_persistence, storage])).
|
storage_backend(emqx_config:get([session_persistence, storage])).
|
||||||
|
|
||||||
|
%% Dev-only option: force all messages to go through
|
||||||
|
%% `emqx_persistent_session_ds':
|
||||||
|
-spec force_ds() -> boolean().
|
||||||
|
force_ds() ->
|
||||||
|
emqx_config:get([session_persistence, force_ds]).
|
||||||
|
|
||||||
storage_backend(#{
|
storage_backend(#{
|
||||||
builtin := #{enable := true, n_shards := NShards, replication_factor := ReplicationFactor}
|
builtin := #{enable := true, n_shards := NShards, replication_factor := ReplicationFactor}
|
||||||
}) ->
|
}) ->
|
||||||
|
|
|
@ -626,12 +626,18 @@ choose_impl_candidates(#{expiry_interval := EI}) ->
|
||||||
choose_impl_candidates(_, _IsPSStoreEnabled = false) ->
|
choose_impl_candidates(_, _IsPSStoreEnabled = false) ->
|
||||||
[emqx_session_mem];
|
[emqx_session_mem];
|
||||||
choose_impl_candidates(0, _IsPSStoreEnabled = true) ->
|
choose_impl_candidates(0, _IsPSStoreEnabled = true) ->
|
||||||
%% NOTE
|
case emqx_persistent_message:force_ds() of
|
||||||
%% If ExpiryInterval is 0, the natural choice is `emqx_session_mem`. Yet we still
|
false ->
|
||||||
%% need to look the existing session up in the `emqx_persistent_session_ds` store
|
%% NOTE
|
||||||
%% first, because previous connection may have set ExpiryInterval to a non-zero
|
%% If ExpiryInterval is 0, the natural choice is
|
||||||
%% value.
|
%% `emqx_session_mem'. Yet we still need to look the
|
||||||
[emqx_session_mem, emqx_persistent_session_ds];
|
%% existing session up in the `emqx_persistent_session_ds'
|
||||||
|
%% store first, because previous connection may have set
|
||||||
|
%% ExpiryInterval to a non-zero value.
|
||||||
|
[emqx_session_mem, emqx_persistent_session_ds];
|
||||||
|
true ->
|
||||||
|
[emqx_persistent_session_ds]
|
||||||
|
end;
|
||||||
choose_impl_candidates(EI, _IsPSStoreEnabled = true) when EI > 0 ->
|
choose_impl_candidates(EI, _IsPSStoreEnabled = true) when EI > 0 ->
|
||||||
[emqx_persistent_session_ds].
|
[emqx_persistent_session_ds].
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue