From c0258e1be6db3c1a47b1299b4c8e57e0667a5b3a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 17 Sep 2021 18:56:33 +0800 Subject: [PATCH] feat(bridge): let mqtt bridge work with rules --- .../src/emqx_connector_mqtt.erl | 40 +++++++++++------- .../src/mqtt/emqx_connector_mqtt_mod.erl | 10 ++++- .../src/mqtt/emqx_connector_mqtt_msg.erl | 14 +++---- .../src/mqtt/emqx_connector_mqtt_schema.erl | 8 ++-- .../src/mqtt/emqx_connector_mqtt_worker.erl | 41 ++++++++----------- 5 files changed, 63 insertions(+), 50 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 36ffd5706..a03c888d3 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -28,6 +28,8 @@ , bridges/0 ]). +-export([on_message_received/2]). + %% callbacks of behaviour emqx_resource -export([ on_start/2 , on_stop/2 @@ -83,6 +85,12 @@ drop_bridge(Name) -> {error, Error} end. +%% =================================================================== +%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called +%% if the bridge received msgs from the remote broker. +on_message_received(Msg, ChannelName) -> + emqx:run_hook(ChannelName, [Msg]). + %% =================================================================== on_start(InstId, Conf) -> logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]), @@ -112,10 +120,9 @@ on_stop(InstId, #{channels := NameList}) -> on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix, baisc_conf := BasicConf}) -> create_channel(Conf, Prefix, BasicConf); -on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) -> - logger:debug("publish to local node, connector: ~p, msg: ~p", [InstId, Msg]); -on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) -> - logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]). +on_query(_InstId, {send_to_remote, ChannelName, Msg}, _AfterQuery, _State) -> + logger:debug("send msg to remote node on channel: ~p, msg: ~p", [ChannelName, Msg]), + emqx_connector_mqtt_worker:send_to_remote(ChannelName, Msg). on_health_check(_InstId, #{channels := NameList} = State) -> Results = [{Name, emqx_connector_mqtt_worker:ping(Name)} || Name <- NameList], @@ -124,25 +131,30 @@ on_health_check(_InstId, #{channels := NameList} = State) -> false -> {error, {some_channel_down, Results}, State} end. -create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT, - local_topic := LocalT} = Conf}, NamePrefix, BasicConf) -> +create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT} = Conf}, + NamePrefix, BasicConf) -> + LocalT = maps:get(local_topic, Conf, undefined), Name = ingress_channel_name(NamePrefix, Id), - logger:info("creating ingress channel ~p, remote ~s -> local ~s", - [Name, RemoteT, LocalT]), + logger:info("creating ingress channel ~p, remote ~s -> local ~s", [Name, RemoteT, LocalT]), do_create_channel(BasicConf#{ name => Name, clientid => clientid(Name), - subscriptions => Conf, forwards => undefined}); + subscriptions => Conf#{ + local_topic => LocalT, + on_message_received => {fun ?MODULE:on_message_received/2, [Name]} + }, + forwards => undefined}); -create_channel({{egress_channels, Id}, #{subscribe_local_topic := LocalT, - remote_topic := RemoteT} = Conf}, NamePrefix, BasicConf) -> +create_channel({{egress_channels, Id}, #{remote_topic := RemoteT} = Conf}, + NamePrefix, BasicConf) -> + LocalT = maps:get(subscribe_local_topic, Conf, undefined), Name = egress_channel_name(NamePrefix, Id), - logger:info("creating egress channel ~p, local ~s -> remote ~s", - [Name, LocalT, RemoteT]), + logger:info("creating egress channel ~p, local ~s -> remote ~s", [Name, LocalT, RemoteT]), do_create_channel(BasicConf#{ name => Name, clientid => clientid(Name), - subscriptions => undefined, forwards => Conf}). + subscriptions => undefined, + forwards => Conf#{subscribe_local_topic => LocalT}}). remove_channel(ChannelName) -> logger:info("removing channel ~p", [ChannelName]), diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 3de7feac4..8b0aa5051 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -159,9 +159,15 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> handle_publish(Msg, undefined) -> ?LOG(error, "cannot publish to local broker as 'bridge.mqtt..in' not configured, msg: ~p", [Msg]); -handle_publish(Msg, Vars) -> +handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) -> ?LOG(debug, "publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]), - emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)). + emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), + _ = erlang:apply(OnMsgRcvdFunc, [Msg, Args]), + case maps:get(local_topic, Vars, undefined) of + undefined -> ok; + _Topic -> + emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)) + end. handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 5f076ed9e..6009cc084 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -36,17 +36,15 @@ -type variables() :: #{ mountpoint := undefined | binary(), - topic := binary(), + remote_topic := binary(), qos := original | integer(), retain := original | boolean(), payload := binary() }. make_pub_vars(_, undefined) -> undefined; -make_pub_vars(Mountpoint, #{payload := _, qos := _, retain := _, remote_topic := Topic} = Conf) -> - Conf#{topic => Topic, mountpoint => Mountpoint}; -make_pub_vars(Mountpoint, #{payload := _, qos := _, retain := _, local_topic := Topic} = Conf) -> - Conf#{topic => Topic, mountpoint => Mountpoint}. +make_pub_vars(Mountpoint, Conf) when is_map(Conf) -> + Conf#{mountpoint => Mountpoint}. %% @doc Make export format: %% 1. Mount topic to a prefix @@ -61,7 +59,7 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> Retain0 = maps:get(retain, Flags0, false), MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), to_remote_msg(MapMsg, Vars); -to_remote_msg(MapMsg, #{topic := TopicToken, payload := PayloadToken, +to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = replace_vars_in_str(PayloadToken, MapMsg), @@ -77,7 +75,7 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> %% published from remote node over a MQTT connection to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, - #{topic := TopicToken, payload := PayloadToken, + #{local_topic := TopicToken, payload := PayloadToken, qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = replace_vars_in_str(PayloadToken, MapMsg), @@ -115,6 +113,8 @@ from_binary(Bin) -> binary_to_term(Bin). %% Count only the topic length + payload size -spec estimate_size(msg()) -> integer(). estimate_size(#message{topic = Topic, payload = Payload}) -> + size(Topic) + size(Payload); +estimate_size(#{topic := Topic, payload := Payload}) -> size(Topic) + size(Payload). set_headers(undefined, Msg) -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 89fe5f581..a00b76b97 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -43,13 +43,15 @@ fields("config") -> ] ++ emqx_connector_schema_lib:ssl_fields(); fields("ingress_channels") -> - [ {subscribe_remote_topic, #{type => binary(), nullable => false}} - , {local_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})} + %% the message maybe subscribed by rules, in this case 'local_topic' is not necessary + [ {subscribe_remote_topic, hoconsc:mk(binary(), #{nullable => false})} + , {local_topic, hoconsc:mk(binary())} , {subscribe_qos, hoconsc:mk(qos(), #{default => 1})} ] ++ common_inout_confs(); fields("egress_channels") -> - [ {subscribe_local_topic, #{type => binary(), nullable => false}} + %% the message maybe sent from rules, in this case 'subscribe_local_topic' is not necessary + [ {subscribe_local_topic, hoconsc:mk(binary())} , {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})} ] ++ common_inout_confs(); diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index e5a1a807f..c98efd322 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -87,6 +87,7 @@ , ensure_stopped/1 , status/1 , ping/1 + , send_to_remote/2 ]). -export([ get_forwards/1 @@ -171,6 +172,11 @@ ping(Pid) when is_pid(Pid) -> ping(Name) -> gen_statem:call(name(Name), ping). +send_to_remote(Pid, Msg) when is_pid(Pid) -> + gen_statem:cast(Pid, {send_to_remote, Msg}); +send_to_remote(Name, Msg) -> + gen_statem:cast(name(Name), {send_to_remote, Msg}). + %% @doc Return all forwards (local subscriptions). -spec get_forwards(id()) -> [topic()]. get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)). @@ -194,7 +200,6 @@ init(#{name := Name} = ConnectOpts) -> }}. init_state(Opts) -> - IfRecordMetrics = maps:get(if_record_metrics, Opts, true), ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS), StartType = maps:get(start_type, Opts, manual), Mountpoint = maps:get(forward_mountpoint, Opts, undefined), @@ -208,7 +213,6 @@ init_state(Opts) -> inflight => [], max_inflight => MaxInflightSize, connection => undefined, - if_record_metrics => IfRecordMetrics, name => Name}. open_replayq(Name, QCfg) -> @@ -321,17 +325,15 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F {keep_state_and_data, [{reply, From, Forwards}]}; common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; -common(_StateName, info, {deliver, _, Msg}, - State = #{replayq := Q, if_record_metrics := IfRecordMetric}) -> +common(_StateName, info, {deliver, _, Msg}, State = #{replayq := Q}) -> Msgs = collect([Msg]), - bridges_metrics_inc(IfRecordMetric, - 'bridge.mqtt.message_received', - length(Msgs) - ), NewQ = replayq:append(Q, Msgs), {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; +common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) -> + NewQ = replayq:append(Q, [Msg]), + {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; common(StateName, Type, Content, #{name := Name} = State) -> ?LOG(notice, "Bridge ~p discarded ~p type event at state ~p:~p", [Name, Type, StateName, Content]), @@ -401,11 +403,10 @@ do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) -> do_send(#{inflight := Inflight, connection := Connection, mountpoint := Mountpoint, - connect_opts := #{forwards := Forwards}, - if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) -> + connect_opts := #{forwards := Forwards}} = State, QAckRef, [_ | _] = Batch) -> Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), ExportMsg = fun(Message) -> - bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'), + emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'), emqx_connector_mqtt_msg:to_remote_msg(Message, Vars) end, ?LOG(debug, "publish to remote broker, msg: ~p, vars: ~p", [Batch, Vars]), @@ -464,6 +465,8 @@ drop_acked_batches(Q, [#{send_ack_ref := Refs, All end. +subscribe_local_topic(undefined, _Name) -> + ok; subscribe_local_topic(Topic, Name) -> do_subscribe(Topic, Name). @@ -487,7 +490,7 @@ disconnect(#{connection := Conn} = State) when Conn =/= undefined -> emqx_connector_mqtt_mod:stop(Conn), State#{connection => undefined}; disconnect(State) -> - State. + State. %% Called only when replayq needs to dump it to disk. msg_marshaller(Bin) when is_binary(Bin) -> emqx_connector_mqtt_msg:from_binary(Bin); @@ -502,20 +505,10 @@ name(Id) -> list_to_atom(str(Id)). register_metrics() -> lists:foreach(fun emqx_metrics:ensure/1, - ['bridge.mqtt.message_sent', - 'bridge.mqtt.message_received' + ['bridge.mqtt.message_sent_to_remote', + 'bridge.mqtt.message_received_from_remote' ]). -bridges_metrics_inc(true, Metric) -> - emqx_metrics:inc(Metric); -bridges_metrics_inc(_IsRecordMetric, _Metric) -> - ok. - -bridges_metrics_inc(true, Metric, Value) -> - emqx_metrics:inc(Metric, Value); -bridges_metrics_inc(_IsRecordMetric, _Metric, _Value) -> - ok. - obfuscate(Map) -> maps:fold(fun(K, V, Acc) -> case is_sensitive(K) of