From 298cf24f0000eaa72e78f5f3fda29ce3e8eb661e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 9 Sep 2021 10:49:26 +0800 Subject: [PATCH] fix(bridges): mqtt bridge cannot forward msgs using payload template --- apps/emqx_bridge/etc/emqx_bridge.conf | 2 +- apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl | 18 +++++------------- .../src/emqx_bridge_worker.erl | 14 ++++++-------- .../emqx_connector/src/emqx_connector_mqtt.erl | 13 ++++++++++--- 4 files changed, 22 insertions(+), 25 deletions(-) diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index 3ad1e95fc..2844af3bf 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -6,7 +6,7 @@ bridges.mqtt.my_mqtt_bridge { server = "127.0.0.1:1883" proto_ver = "v4" ## the clientid will be the concatenation of `clientid_prefix` and ids in `in` and `out`. - clientid_prefix = "emqx_bridge_" + clientid_prefix = "bridge_client:" username = "username1" password = "" clean_start = true diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl index c11e147f6..2609e8bea 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl @@ -155,12 +155,12 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, Parent) RC =:= ?RC_NO_MATCHING_SUBSCRIBERS -> Parent ! {batch_ack, PktId}, ok; handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> - ?LOG(warning, "Publish ~p to remote node falied, reason_code: ~p", [PktId, RC]). + ?LOG(warning, "publish ~p to remote node falied, reason_code: ~p", [PktId, RC]). handle_publish(Msg, undefined) -> - ?LOG(error, "Cannot publish to local broker as 'bridge.mqtt..in' not configured, msg: ~p", [Msg]); + ?LOG(error, "cannot publish to local broker as 'bridge.mqtt..in' not configured, msg: ~p", [Msg]); handle_publish(Msg, Vars) -> - ?LOG(debug, "Publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]), + ?LOG(debug, "publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]), emqx_broker:publish(emqx_bridge_msg:to_broker_msg(Msg, Vars)). handle_disconnected(Reason, Parent) -> @@ -179,13 +179,5 @@ subscribe_remote_topics(ClientPid, #{subscribe_remote_topic := FromTopic, subscr Error -> throw(Error) end. -process_config(#{name := Name, clientid_prefix := Prefix} = Config) -> - Conf0 = maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config), - Conf0#{clientid => iolist_to_binary([str(Prefix), str(Name)])}. - -str(A) when is_atom(A) -> - atom_to_list(A); -str(B) when is_binary(B) -> - binary_to_list(B); -str(S) when is_list(S) -> - S. \ No newline at end of file +process_config(Config) -> + maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 552ebe9a1..ac861d08f 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -186,14 +186,12 @@ init(#{name := Name} = ConnectOpts) -> ?LOG(info, "starting bridge worker for ~p", [Name]), erlang:process_flag(trap_exit, true), ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)), - Forwards = maps:get(forwards, ConnectOpts, #{}), Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), State = init_state(ConnectOpts), self() ! idle, {ok, idle, State#{ connect_module => ConnectModule, connect_opts => pre_process_opts(ConnectOpts), - forwards => Forwards, replayq => Queue }}. @@ -323,7 +321,7 @@ common(_StateName, {call, From}, ensure_stopped, #{connection := Conn, connect_module := ConnectModule} = State) -> Reply = ConnectModule:stop(Conn), {next_state, idle, State#{connection => undefined}, [{reply, From, Reply}]}; -common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> +common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := Forwards}}) -> {keep_state_and_data, [{reply, From, Forwards}]}; common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; @@ -343,9 +341,8 @@ common(StateName, Type, Content, #{name := Name} = State) -> [Name, Type, StateName, Content]), {keep_state, State}. -do_connect(#{forwards := Forwards, - connect_module := ConnectModule, - connect_opts := ConnectOpts, +do_connect(#{connect_module := ConnectModule, + connect_opts := ConnectOpts = #{forwards := Forwards}, inflight := Inflight, name := Name} = State) -> case Forwards of @@ -407,19 +404,20 @@ pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) -> end. %% Assert non-empty batch because we have a is_empty check earlier. -do_send(#{forwards := undefined}, _QAckRef, Batch) -> +do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) -> ?LOG(error, "cannot forward messages to remote broker as 'bridge.mqtt..in' not configured, msg: ~p", [Batch]); do_send(#{inflight := Inflight, connect_module := Module, connection := Connection, mountpoint := Mountpoint, - forwards := Forwards, + connect_opts := #{forwards := Forwards}, if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) -> Vars = emqx_bridge_msg:make_pub_vars(Mountpoint, Forwards), ExportMsg = fun(Message) -> bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'), emqx_bridge_msg:to_remote_msg(Module, Message, Vars) end, + ?LOG(debug, "publish to remote broker, msg: ~p, vars: ~p", [Batch, Vars]), case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of {ok, Refs} -> {ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef, diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index e3a8f143e..ba20d2926 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -40,7 +40,7 @@ fields("config") -> , {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")} , {proto_ver, fun proto_ver/1} , {bridge_mode, emqx_schema:t(boolean(), undefined, true)} - , {clientid_prefix, emqx_schema:t(string())} + , {clientid_prefix, emqx_schema:t(string(), undefined, "")} , {username, emqx_schema:t(string())} , {password, emqx_schema:t(string())} , {clean_start, emqx_schema:t(boolean(), undefined, true)} @@ -137,14 +137,18 @@ check_channel_id_dup(Confs) -> Confs. %% this is an `in` bridge -create_channel(#{subscribe_remote_topic := _, id := BridgeId} = InConf, NamePrefix, BasicConf) -> +create_channel(#{subscribe_remote_topic := _, id := BridgeId} = InConf, NamePrefix, + #{clientid_prefix := ClientPrefix} = BasicConf) -> logger:info("creating 'in' channel for: ~p", [BridgeId]), create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId), + clientid => clientid(ClientPrefix, BridgeId), subscriptions => InConf, forwards => undefined}); %% this is an `out` bridge -create_channel(#{subscribe_local_topic := _, id := BridgeId} = OutConf, NamePrefix, BasicConf) -> +create_channel(#{subscribe_local_topic := _, id := BridgeId} = OutConf, NamePrefix, + #{clientid_prefix := ClientPrefix} = BasicConf) -> logger:info("creating 'out' channel for: ~p", [BridgeId]), create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId), + clientid => clientid(ClientPrefix, BridgeId), subscriptions => undefined, forwards => OutConf}). create_sub_bridge(#{name := Name} = Conf) -> @@ -200,6 +204,9 @@ basic_config(#{ bridge_name(Prefix, Id) -> list_to_atom(str(Prefix) ++ ":" ++ str(Id)). +clientid(Prefix, Id) -> + list_to_binary(str(Prefix) ++ str(Id)). + str(A) when is_atom(A) -> atom_to_list(A); str(B) when is_binary(B) ->