From c1ff8778e1e8622d0ae05a3d22a528038fb97ed9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 15 Sep 2021 10:46:20 +0800 Subject: [PATCH] fix(bridges): add logs for creating/removing bridges --- apps/emqx_bridge/src/emqx_bridge.erl | 8 +++-- .../src/emqx_connector_mqtt.erl | 30 ++++++++++++------- .../src/mqtt/emqx_connector_mqtt_worker.erl | 2 +- 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 4d2b80ba7..badf97515 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -137,6 +137,7 @@ restart_bridge(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). create_bridge(Type, Name, Conf) -> + logger:info("create ~p bridge ~p use config: ~p", [Type, Name, Conf]), ResId = resource_id(Type, Name), case emqx_resource:create(ResId, emqx_bridge:resource_type(Type), Conf) of @@ -157,10 +158,12 @@ update_bridge(Type, Name, Conf) -> %% `message_out` are changed, then we should not restart the bridge, we only restart/start %% the channels. %% + logger:info("update ~p bridge ~p use config: ~p", [Type, Name, Conf]), emqx_resource:recreate(resource_id(Type, Name), emqx_bridge:resource_type(Type), Conf). remove_bridge(Type, Name, _Conf) -> + logger:info("remove ~p bridge ~p", [Type, Name]), case emqx_resource:remove(resource_id(Type, Name)) of ok -> ok; {error, not_found} -> ok; @@ -174,8 +177,9 @@ diff_confs(NewConfs, OldConfs) -> flatten_confs(Conf0) -> maps:from_list( - lists:append([do_flatten_confs(Type, Conf) - || {Type, Conf} <- maps:to_list(Conf0)])). + lists:flatmap(fun({Type, Conf}) -> + do_flatten_confs(Type, Conf) + end, maps:to_list(Conf0))). do_flatten_confs(Type, Conf0) -> [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 55a4dde25..4f4d3e5c8 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -104,12 +104,7 @@ on_start(InstId, Conf) -> on_stop(InstId, #{sub_bridges := NameList}) -> logger:info("stopping mqtt connector: ~p", [InstId]), lists:foreach(fun(Name) -> - case ?MODULE:drop_bridge(Name) of - ok -> ok; - {error, not_found} -> ok; - {error, Reason} -> - logger:error("stop channel ~p failed, error: ~p", [Name, Reason]) - end + remove_channel(Name) end, NameList). %% TODO: let the emqx_resource trigger on_query/4 automatically according to the @@ -139,22 +134,35 @@ check_channel_id_dup(Confs) -> Confs. %% this is an `message_in` bridge -create_channel(#{subscribe_remote_topic := _, id := Id} = InConf, NamePrefix, BasicConf) -> +create_channel(#{subscribe_remote_topic := RemoteT, local_topic := LocalT, id := Id} = InConf, + NamePrefix, BasicConf) -> Name = bridge_name(NamePrefix, Id), - logger:info("creating 'message_in' channel ~p", [Name]), + logger:info("creating 'message_in' channel ~p, remote ~s -> local ~s", + [Name, RemoteT, LocalT]), create_sub_bridge(BasicConf#{ name => Name, clientid => clientid(Id), subscriptions => InConf, forwards => undefined}); %% this is an `message_out` bridge -create_channel(#{subscribe_local_topic := _, id := Id} = OutConf, NamePrefix, BasicConf) -> +create_channel(#{subscribe_local_topic := LocalT, remote_topic := RemoteT, id := Id} = OutConf, + NamePrefix, BasicConf) -> Name = bridge_name(NamePrefix, Id), - logger:info("creating 'message_out' channel ~p", [Name]), + logger:info("creating 'message_out' channel ~p, local ~s -> remote ~s", + [Name, LocalT, RemoteT]), create_sub_bridge(BasicConf#{ name => bridge_name(NamePrefix, Id), clientid => clientid(Id), subscriptions => undefined, forwards => OutConf}). +remove_channel(BridgeName) -> + logger:info("removing channel ~p", [BridgeName]), + case ?MODULE:drop_bridge(BridgeName) of + ok -> ok; + {error, not_found} -> ok; + {error, Reason} -> + logger:error("stop channel ~p failed, error: ~p", [BridgeName, Reason]) + end. + create_sub_bridge(#{name := Name} = Conf) -> case ?MODULE:create_bridge(Conf) of {ok, _Pid} -> @@ -206,7 +214,7 @@ bridge_name(Prefix, Id) -> list_to_atom(str(Prefix) ++ ":" ++ str(Id)). clientid(Id) -> - list_to_binary(str(Id) ++ ":" ++ emqx_plugin_libs_id:gen(4)). + list_to_binary(str(Id) ++ ":" ++ emqx_plugin_libs_id:gen(16)). str(A) when is_atom(A) -> atom_to_list(A); diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 6ced719df..bb505f98d 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -183,7 +183,7 @@ callback_mode() -> [state_functions]. %% @doc Config should be a map(). init(#{name := Name} = ConnectOpts) -> - ?LOG(info, "starting bridge worker for ~p", [Name]), + ?LOG(debug, "starting bridge worker for ~p", [Name]), erlang:process_flag(trap_exit, true), Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), State = init_state(ConnectOpts),