Merge pull request #7171 from lafirest/fix/auto_subscribe

fix(emqx_auto_subscribe): fix config update not work in cluster
This commit is contained in:
JianBo He 2022-03-02 10:16:46 +08:00 committed by GitHub
commit 8d837f88de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 20 deletions

View File

@ -20,20 +20,24 @@
-define(MAX_AUTO_SUBSCRIBE, 20). -define(MAX_AUTO_SUBSCRIBE, 20).
-export([load/0]). -export([load/0, unload/0]). %
-export([ max_limit/0 -export([ max_limit/0
, list/0 , list/0
, update/1 , update/1
, test/1 , post_config_update/5
]). ]).
%% hook callback %% hook callback
-export([on_client_connected/3]). -export([on_client_connected/3]).
load() -> load() ->
emqx_conf:add_handler([auto_subscribe, topics], ?MODULE),
update_hook(). update_hook().
unload() ->
emqx_conf:remove_handler([auto_subscribe, topics]).
max_limit() -> max_limit() ->
?MAX_AUTO_SUBSCRIBE. ?MAX_AUTO_SUBSCRIBE.
@ -43,18 +47,9 @@ list() ->
update(Topics) -> update(Topics) ->
update_(Topics). update_(Topics).
test(_) -> post_config_update(_KeyPath, _Req, NewTopics, _OldConf, _AppEnvs) ->
%% TODO: test rule with info map Config = emqx_conf:get([auto_subscribe], #{}),
ok. update_hook(Config#{topics => NewTopics}).
% test(Topic) when is_map(Topic) ->
% test([Topic]);
% test(Topics) when is_list(Topics) ->
% PlaceHolders = emqx_auto_subscribe_placeholder:generate(Topics),
% ClientInfo = #{},
% ConnInfo = #{},
% emqx_auto_subscribe_placeholder:to_topic_table([PlaceHolders], ClientInfo, ConnInfo).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% hook %% hook
@ -88,7 +83,6 @@ update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE ->
Topics, Topics,
#{rawconf_with_defaults => true, override_to => cluster}) of #{rawconf_with_defaults => true, override_to => cluster}) of
{ok, #{raw_config := NewTopics}} -> {ok, #{raw_config := NewTopics}} ->
ok = update_hook(),
{ok, NewTopics}; {ok, NewTopics};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
@ -97,6 +91,9 @@ update_(_Topics) ->
{error, quota_exceeded}. {error, quota_exceeded}.
update_hook() -> update_hook() ->
{TopicHandler, Options} = emqx_auto_subscribe_handler:init(), update_hook(emqx_conf:get([auto_subscribe], #{})).
update_hook(Config) ->
{TopicHandler, Options} = emqx_auto_subscribe_handler:init(Config),
emqx_hooks:put(?HOOK_POINT, {?MODULE, on_client_connected, [{TopicHandler, Options}]}), emqx_hooks:put(?HOOK_POINT, {?MODULE, on_client_connected, [{TopicHandler, Options}]}),
ok. ok.

View File

@ -26,6 +26,7 @@ start(_StartType, _StartArgs) ->
{ok, Sup}. {ok, Sup}.
stop(_State) -> stop(_State) ->
ok = emqx_auto_subscribe:unload(),
ok. ok.
%% internal functions %% internal functions

View File

@ -15,11 +15,11 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_auto_subscribe_handler). -module(emqx_auto_subscribe_handler).
-export([init/0]). -export([init/1]).
-spec(init() -> {Module :: atom(), Config :: term()}). -spec(init(hocons:config()) -> {Module :: atom(), Config :: term()}).
init() -> init(Config) ->
do_init(emqx_conf:get([auto_subscribe], #{})). do_init(Config).
do_init(Config = #{topics := _Topics}) -> do_init(Config = #{topics := _Topics}) ->
Options = emqx_auto_subscribe_internal:init(Config), Options = emqx_auto_subscribe_internal:init(Config),