diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index 7a2a3512e..f3e0e4dc5 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -27,15 +27,19 @@ bridges.mqtt.my_mqtt_bridge { cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" } in: [{ - from_remote_topic = "msg/#" - to_local_topic = "from_aws/${topic}" - payload_template = "${message}" - qos = 1 + subscribe_remote_topic = "msg/#" + subscribe_qos = 1 + publish_local_topic = "from_aws/${topic}" + publish_payload = "${payload}" + publish_qos = "${qos}" + publish_retain = "${retain}" }] out: [{ - from_local_topic = "msg/#" - to_remote_topic = "from_emqx/${topic}" - payload_template = "${message}" + subscribe_local_topic = "msg/#" + publish_remote_topic = "from_emqx/${topic}" + publish_payload = "${payload}" + publish_qos = 1 + publish_retain = false }] } diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl index 5678c73ef..4e6224801 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl @@ -34,9 +34,6 @@ , handle_disconnected/2 ]). --export([ check_subscriptions/1 - ]). - -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -54,7 +51,6 @@ start(Config) -> {Host, Port} = maps:get(server, Config), Mountpoint = maps:get(receive_mountpoint, Config, undefined), Subscriptions = maps:get(subscriptions, Config, []), - Subscriptions1 = check_subscriptions(Subscriptions), Handlers = make_hdlr(Parent, Mountpoint), Config1 = Config#{ msg_handler => Handlers, @@ -68,8 +64,8 @@ start(Config) -> case emqtt:connect(Pid) of {ok, _} -> try - Subscriptions2 = subscribe_remote_topics(Pid, Subscriptions1), - {ok, #{client_pid => Pid, subscriptions => Subscriptions2}} + ok = subscribe_remote_topics(Pid, Subscriptions), + {ok, #{client_pid => Pid, subscriptions => Subscriptions}} catch throw : Reason -> ok = stop(#{client_pid => Pid}), @@ -173,18 +169,12 @@ make_hdlr(Parent, Mountpoint) -> }. subscribe_remote_topics(ClientPid, Subscriptions) -> - lists:map(fun({Topic, Qos}) -> - case emqtt:subscribe(ClientPid, Topic, Qos) of - {ok, _, _} -> {Topic, Qos}; + lists:foreach(fun(#{subscribe_remote_topic := FromTopic, subscribe_qos := QoS}) -> + case emqtt:subscribe(ClientPid, FromTopic, QoS) of + {ok, _, _} -> ok; Error -> throw(Error) end end, Subscriptions). without_config(Config) -> maps:without([conn_type, address, receive_mountpoint, subscriptions], Config). - -check_subscriptions(Subscriptions) -> - lists:map(fun(#{qos := QoS, topic := Topic}) -> - true = emqx_topic:validate({filter, Topic}), - {Topic, QoS} - end, Subscriptions). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_msg.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_msg.erl index 91fd18bf4..18a4a74f9 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_msg.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_msg.erl @@ -36,6 +36,14 @@ -type msg() :: emqx_types:message(). -type exp_msg() :: emqx_types:message() | #mqtt_msg{}. +-type variables() :: #{ + mountpoint := undefined | binary(), + topic := binary(), + qos := original | integer(), + retain := original | boolean(), + payload := binary() +}. + %% @doc Make export format: %% 1. Mount topic to a prefix %% 2. Fix QoS to 1 @@ -43,24 +51,39 @@ %% Shame that we have to know the callback module here %% would be great if we can get rid of #mqtt_msg{} record %% and use #message{} in all places. --spec to_export(emqx_bridge_rpc | emqx_bridge_worker, - undefined | binary(), msg()) -> exp_msg(). -to_export(emqx_bridge_mqtt, Mountpoint, - #message{topic = Topic, - payload = Payload, - flags = Flags, - qos = QoS - }) -> - Retain = maps:get(retain, Flags, false), +-spec to_export(emqx_bridge_rpc | emqx_bridge_worker, variables(), msg()) + -> exp_msg(). +to_export(emqx_bridge_mqtt, Vars, #message{flags = Flags0} = Msg) -> + Retain0 = maps:get(retain, Flags0, false), + MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), + to_export(emqx_bridge_mqtt, Vars, MapMsg); +to_export(emqx_bridge_mqtt, #{topic := TopicToken, payload := PayloadToken, + qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}, + MapMsg) when is_map(MapMsg) -> + Topic = replace_vars_in_str(TopicToken, MapMsg), + Payload = replace_vars_in_str(PayloadToken, MapMsg), + QoS = replace_vars(QoSToken, MapMsg), + Retain = replace_vars(RetainToken, MapMsg), #mqtt_msg{qos = QoS, retain = Retain, topic = topic(Mountpoint, Topic), props = #{}, payload = Payload}; -to_export(_Module, Mountpoint, +to_export(_Module, #{mountpoint := Mountpoint}, #message{topic = Topic} = Msg) -> Msg#message{topic = topic(Mountpoint, Topic)}. +replace_vars_in_str(Tokens, Data) when is_list(Tokens) -> + emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary}); +replace_vars_in_str(Val, _Data) -> + Val. + +replace_vars(Tokens, Data) when is_list(Tokens) -> + [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), + Var; +replace_vars(Val, _Data) -> + Val. + %% @doc Make `binary()' in order to make iodata to be persisted on disk. -spec to_binary(msg()) -> binary(). to_binary(Msg) -> term_to_binary(Msg). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 319d7054e..3d6bf115f 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -215,15 +215,15 @@ init(Opts) -> ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)), Forwards = maps:get(forwards, Opts, []), Queue = open_replayq(maps:get(replayq, Opts, #{})), - State = init_opts(Opts), + State = init_state(Opts), self() ! idle, {ok, idle, State#{connect_module => ConnectModule, - connect_opts => ConnectOpts, + connect_opts => pre_process_opts(ConnectOpts), forwards => Forwards, replayq => Queue }}. -init_opts(Opts) -> +init_state(Opts) -> IfRecordMetrics = maps:get(if_record_metrics, Opts, true), ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS), StartType = maps:get(start_type, Opts, manual), @@ -252,6 +252,26 @@ open_replayq(QCfg) -> replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1, marshaller => fun ?MODULE:msg_marshaller/1}). +pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) -> + ConnectOpts#{subscriptions => [pre_process_in_out(In) || In <- InConf], + forwards => [pre_process_in_out(Out) || Out <- OutConf]}. + +pre_process_in_out(Conf) -> + Conf1 = pre_process_conf(publish_local_topic, Conf), + Conf2 = pre_process_conf(publish_remote_topic, Conf1), + Conf3 = pre_process_conf(publish_payload, Conf2), + Conf4 = pre_process_conf(publish_qos, Conf3), + pre_process_conf(publish_retain, Conf4). + +pre_process_conf(Key, Conf) -> + case maps:find(Key, Conf) of + error -> Conf; + {ok, Val} when is_binary(Val) -> + Conf#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)}; + {ok, Val} -> + Conf#{Key => Val} + end. + code_change(_Vsn, State, Data, _Extra) -> {ok, State, Data}. @@ -360,7 +380,7 @@ common(StateName, Type, Content, #{name := Name} = State) -> {keep_state, State}. do_ensure_forward_present(Topic, #{forwards := Forwards, name := Name} = State) -> - case is_topic_present(Topic, Forwards) of + case is_local_sub_present(Topic, Forwards) of true -> {ok, State}; false -> @@ -374,7 +394,7 @@ do_ensure_subscription_present(_Topic, _QoS, #{connect_module := emqx_bridge_rpc {{error, no_remote_subscription_support}, State}; do_ensure_subscription_present(Topic, QoS, #{connect_module := ConnectModule, connection := Conn} = State) -> - case is_topic_present(Topic, maps:get(subscriptions, Conn, [])) of + case is_remote_sub_present(Topic, maps:get(subscriptions, Conn, [])) of true -> {ok, State}; false -> @@ -387,7 +407,7 @@ do_ensure_subscription_present(Topic, QoS, #{connect_module := ConnectModule, end. do_ensure_forward_absent(Topic, #{forwards := Forwards} = State) -> - case is_topic_present(Topic, Forwards) of + case is_local_sub_present(Topic, Forwards) of true -> R = do_unsubscribe(Topic), {R, State#{forwards => lists:delete(Topic, Forwards)}}; @@ -400,7 +420,7 @@ do_ensure_subscription_absent(_Topic, #{connect_module := emqx_bridge_rpc} = Sta {{error, no_remote_subscription_support}, State}; do_ensure_subscription_absent(Topic, #{connect_module := ConnectModule, connection := Conn} = State) -> - case is_topic_present(Topic, maps:get(subscriptions, Conn, [])) of + case is_remote_sub_present(Topic, maps:get(subscriptions, Conn, [])) of true -> case ConnectModule:ensure_unsubscribed(Conn, Topic) of {error, Error} -> @@ -412,8 +432,15 @@ do_ensure_subscription_absent(Topic, #{connect_module := ConnectModule, {ok, State} end. -is_topic_present(Topic, Topics) -> - lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics). +is_local_sub_present(Topic, Configs) -> + is_topic_present(subscribe_local_topic, Topic, Configs). +is_remote_sub_present(Topic, Configs) -> + is_topic_present(subscribe_remote_topic, Topic, Configs). + +is_topic_present(Type, Topic, Configs) -> + lists:any(fun(Conf) -> + Topic == maps:get(Type, Conf, undefined) + end, Configs). do_connect(#{forwards := Forwards, connect_module := ConnectModule, @@ -451,7 +478,7 @@ retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Rest] = OldInf) {error, State1#{inflight := NewInf ++ OldInf}} end. -pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) -> +pop_and_send(#{inflight := Inflight, max_inflight := Max} = State) -> pop_and_send_loop(State, Max - length(Inflight)). pop_and_send_loop(State, 0) -> @@ -480,10 +507,12 @@ do_send(#{inflight := Inflight, connect_module := Module, connection := Connection, mountpoint := Mountpoint, + forwards := Forwards, if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) -> + Vars = make_export_variables(Mountpoint, Forwards), ExportMsg = fun(Message) -> bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'), - emqx_bridge_msg:to_export(Module, Mountpoint, Message) + emqx_bridge_msg:to_export(Module, Vars, Message) end, case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of {ok, Refs} -> @@ -495,6 +524,15 @@ do_send(#{inflight := Inflight, {error, State} end. +make_export_variables(Mountpoint, #{ + publish_remote_topic := PubTopic, + publish_payload := PayloadTmpl, + publish_qos := PubQoS, + publish_retain := PubRetain}) -> + #{topic => PubTopic, payload => PayloadTmpl, + qos => PubQoS, retain => PubRetain, + mountpoint => Mountpoint}. + %% map as set, ack-reference -> 1 map_set(Ref) when is_reference(Ref) -> %% QoS-0 or RPC call returns a reference diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 4210b66cf..475d83ac7 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -53,17 +53,15 @@ fields("config") -> ] ++ emqx_connector_schema_lib:ssl_fields(); fields("in") -> - [ {from_remote_topic, #{type => binary(), nullable => false}} - , {to_local_topic, #{type => binary(), nullable => false}} - , {qos, emqx_schema:t(integer(), undefined, 1)} - , {payload_template, emqx_schema:t(binary(), undefined, <<"${message}">>)} - ]; + [ {subscribe_remote_topic, #{type => binary(), nullable => false}} + , {publish_local_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)} + , {subscribe_qos, emqx_schema:t(hoconsc:union([0, 1, 2, binary()]), undefined, 1)} + ] ++ publish_confs(); fields("out") -> - [ {to_remote_topic, #{type => binary(), nullable => false}} - , {from_local_topic, #{type => binary(), nullable => false}} - , {payload_template, emqx_schema:t(binary(), undefined, <<"${payload}">>)} - ]; + [ {subscribe_local_topic, #{type => binary(), nullable => false}} + , {publish_remote_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)} + ] ++ publish_confs(); fields("replayq") -> [ {dir, hoconsc:union([boolean(), string()])} @@ -72,6 +70,12 @@ fields("replayq") -> , {max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")} ]. +publish_confs() -> + [ {publish_qos, emqx_schema:t(hoconsc:union([0, 1, 2, binary()]), undefined, <<"${qos}">>)} + , {publish_retain, emqx_schema:t(hoconsc:union([boolean(), binary()]), undefined, <<"${retain}">>)} + , {publish_payload, emqx_schema:t(binary(), undefined, <<"${payload}">>)} + ]. + proto_ver(type) -> hoconsc:enum([v3, v4, v5]); proto_ver(default) -> v4; proto_ver(_) -> undefined. @@ -138,8 +142,8 @@ on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) -> on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) -> logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]). -on_health_check(_InstId, #{bridge_worker := Worker}) -> - {ok, emqx_bridge_worker:ping(Worker)}. +on_health_check(_InstId, #{bridge_name := Name}) -> + {ok, emqx_bridge_worker:ping(Name)}. start_bridge(Name) -> case emqx_bridge_worker:ensure_started(Name) of