fix(bridges): add logs for creating/removing bridges

This commit is contained in:
Shawn 2021-09-15 10:46:20 +08:00
parent 8730a03ab8
commit c1ff8778e1
3 changed files with 26 additions and 14 deletions

View File

@ -137,6 +137,7 @@ restart_bridge(Type, Name) ->
emqx_resource:restart(resource_id(Type, Name)). emqx_resource:restart(resource_id(Type, Name)).
create_bridge(Type, Name, Conf) -> create_bridge(Type, Name, Conf) ->
logger:info("create ~p bridge ~p use config: ~p", [Type, Name, Conf]),
ResId = resource_id(Type, Name), ResId = resource_id(Type, Name),
case emqx_resource:create(ResId, case emqx_resource:create(ResId,
emqx_bridge:resource_type(Type), Conf) of emqx_bridge:resource_type(Type), Conf) of
@ -157,10 +158,12 @@ update_bridge(Type, Name, Conf) ->
%% `message_out` are changed, then we should not restart the bridge, we only restart/start %% `message_out` are changed, then we should not restart the bridge, we only restart/start
%% the channels. %% the channels.
%% %%
logger:info("update ~p bridge ~p use config: ~p", [Type, Name, Conf]),
emqx_resource:recreate(resource_id(Type, Name), emqx_resource:recreate(resource_id(Type, Name),
emqx_bridge:resource_type(Type), Conf). emqx_bridge:resource_type(Type), Conf).
remove_bridge(Type, Name, _Conf) -> remove_bridge(Type, Name, _Conf) ->
logger:info("remove ~p bridge ~p", [Type, Name]),
case emqx_resource:remove(resource_id(Type, Name)) of case emqx_resource:remove(resource_id(Type, Name)) of
ok -> ok; ok -> ok;
{error, not_found} -> ok; {error, not_found} -> ok;
@ -174,8 +177,9 @@ diff_confs(NewConfs, OldConfs) ->
flatten_confs(Conf0) -> flatten_confs(Conf0) ->
maps:from_list( maps:from_list(
lists:append([do_flatten_confs(Type, Conf) lists:flatmap(fun({Type, Conf}) ->
|| {Type, Conf} <- maps:to_list(Conf0)])). do_flatten_confs(Type, Conf)
end, maps:to_list(Conf0))).
do_flatten_confs(Type, Conf0) -> do_flatten_confs(Type, Conf0) ->
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].

View File

@ -104,12 +104,7 @@ on_start(InstId, Conf) ->
on_stop(InstId, #{sub_bridges := NameList}) -> on_stop(InstId, #{sub_bridges := NameList}) ->
logger:info("stopping mqtt connector: ~p", [InstId]), logger:info("stopping mqtt connector: ~p", [InstId]),
lists:foreach(fun(Name) -> lists:foreach(fun(Name) ->
case ?MODULE:drop_bridge(Name) of remove_channel(Name)
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
logger:error("stop channel ~p failed, error: ~p", [Name, Reason])
end
end, NameList). end, NameList).
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the %% TODO: let the emqx_resource trigger on_query/4 automatically according to the
@ -139,22 +134,35 @@ check_channel_id_dup(Confs) ->
Confs. Confs.
%% this is an `message_in` bridge %% this is an `message_in` bridge
create_channel(#{subscribe_remote_topic := _, id := Id} = InConf, NamePrefix, BasicConf) -> create_channel(#{subscribe_remote_topic := RemoteT, local_topic := LocalT, id := Id} = InConf,
NamePrefix, BasicConf) ->
Name = bridge_name(NamePrefix, Id), Name = bridge_name(NamePrefix, Id),
logger:info("creating 'message_in' channel ~p", [Name]), logger:info("creating 'message_in' channel ~p, remote ~s -> local ~s",
[Name, RemoteT, LocalT]),
create_sub_bridge(BasicConf#{ create_sub_bridge(BasicConf#{
name => Name, name => Name,
clientid => clientid(Id), clientid => clientid(Id),
subscriptions => InConf, forwards => undefined}); subscriptions => InConf, forwards => undefined});
%% this is an `message_out` bridge %% this is an `message_out` bridge
create_channel(#{subscribe_local_topic := _, id := Id} = OutConf, NamePrefix, BasicConf) -> create_channel(#{subscribe_local_topic := LocalT, remote_topic := RemoteT, id := Id} = OutConf,
NamePrefix, BasicConf) ->
Name = bridge_name(NamePrefix, Id), Name = bridge_name(NamePrefix, Id),
logger:info("creating 'message_out' channel ~p", [Name]), logger:info("creating 'message_out' channel ~p, local ~s -> remote ~s",
[Name, LocalT, RemoteT]),
create_sub_bridge(BasicConf#{ create_sub_bridge(BasicConf#{
name => bridge_name(NamePrefix, Id), name => bridge_name(NamePrefix, Id),
clientid => clientid(Id), clientid => clientid(Id),
subscriptions => undefined, forwards => OutConf}). subscriptions => undefined, forwards => OutConf}).
remove_channel(BridgeName) ->
logger:info("removing channel ~p", [BridgeName]),
case ?MODULE:drop_bridge(BridgeName) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
logger:error("stop channel ~p failed, error: ~p", [BridgeName, Reason])
end.
create_sub_bridge(#{name := Name} = Conf) -> create_sub_bridge(#{name := Name} = Conf) ->
case ?MODULE:create_bridge(Conf) of case ?MODULE:create_bridge(Conf) of
{ok, _Pid} -> {ok, _Pid} ->
@ -206,7 +214,7 @@ bridge_name(Prefix, Id) ->
list_to_atom(str(Prefix) ++ ":" ++ str(Id)). list_to_atom(str(Prefix) ++ ":" ++ str(Id)).
clientid(Id) -> clientid(Id) ->
list_to_binary(str(Id) ++ ":" ++ emqx_plugin_libs_id:gen(4)). list_to_binary(str(Id) ++ ":" ++ emqx_plugin_libs_id:gen(16)).
str(A) when is_atom(A) -> str(A) when is_atom(A) ->
atom_to_list(A); atom_to_list(A);

View File

@ -183,7 +183,7 @@ callback_mode() -> [state_functions].
%% @doc Config should be a map(). %% @doc Config should be a map().
init(#{name := Name} = ConnectOpts) -> init(#{name := Name} = ConnectOpts) ->
?LOG(info, "starting bridge worker for ~p", [Name]), ?LOG(debug, "starting bridge worker for ~p", [Name]),
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
State = init_state(ConnectOpts), State = init_state(ConnectOpts),