diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index f3e0e4dc5..3ad1e95fc 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -5,7 +5,8 @@ bridges.mqtt.my_mqtt_bridge { server = "127.0.0.1:1883" proto_ver = "v4" - clientid = "client1" + ## the clientid will be the concatenation of `clientid_prefix` and ids in `in` and `out`. + clientid_prefix = "emqx_bridge_" username = "username1" password = "" clean_start = true @@ -26,20 +27,24 @@ bridges.mqtt.my_mqtt_bridge { certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" } + ## we will create one MQTT connection for each element of the `in` in: [{ - subscribe_remote_topic = "msg/#" + id = "pull_msgs_from_aws" + subscribe_remote_topic = "aws/#" subscribe_qos = 1 - publish_local_topic = "from_aws/${topic}" - publish_payload = "${payload}" - publish_qos = "${qos}" - publish_retain = "${retain}" + local_topic = "from_aws/${topic}" + payload = "${payload}" + qos = "${qos}" + retain = "${retain}" }] + ## we will create one MQTT connection for each element of the `out` out: [{ - subscribe_local_topic = "msg/#" - publish_remote_topic = "from_emqx/${topic}" - publish_payload = "${payload}" - publish_qos = 1 - publish_retain = false + id = "push_msgs_to_aws" + subscribe_local_topic = "emqx/#" + remote_topic = "from_emqx/${topic}" + payload = "${payload}" + qos = 1 + retain = false }] } diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl index 4e6224801..c11e147f6 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl @@ -50,8 +50,9 @@ start(Config) -> Parent = self(), {Host, Port} = maps:get(server, Config), Mountpoint = maps:get(receive_mountpoint, Config, undefined), - Subscriptions = maps:get(subscriptions, Config, []), - Handlers = make_hdlr(Parent, Mountpoint), + Subscriptions = maps:get(subscriptions, Config), + Vars = emqx_bridge_msg:make_pub_vars(Mountpoint, Subscriptions), + Handlers = make_hdlr(Parent, Vars), Config1 = Config#{ msg_handler => Handlers, host => Host, @@ -59,7 +60,7 @@ start(Config) -> force_ping => true, proto_ver => maps:get(proto_ver, Config, v4) }, - case emqtt:start_link(without_config(Config1)) of + case emqtt:start_link(process_config(Config1)) of {ok, Pid} -> case emqtt:connect(Pid) of {ok, _} -> @@ -156,25 +157,35 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, Parent) handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> ?LOG(warning, "Publish ~p to remote node falied, reason_code: ~p", [PktId, RC]). -handle_publish(Msg, Mountpoint) -> - emqx_broker:publish(emqx_bridge_msg:to_broker_msg(Msg, Mountpoint)). +handle_publish(Msg, undefined) -> + ?LOG(error, "Cannot publish to local broker as 'bridge.mqtt..in' not configured, msg: ~p", [Msg]); +handle_publish(Msg, Vars) -> + ?LOG(debug, "Publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]), + emqx_broker:publish(emqx_bridge_msg:to_broker_msg(Msg, Vars)). handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. -make_hdlr(Parent, Mountpoint) -> +make_hdlr(Parent, Vars) -> #{puback => {fun ?MODULE:handle_puback/2, [Parent]}, - publish => {fun ?MODULE:handle_publish/2, [Mountpoint]}, + publish => {fun ?MODULE:handle_publish/2, [Vars]}, disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]} }. -subscribe_remote_topics(ClientPid, Subscriptions) -> - lists:foreach(fun(#{subscribe_remote_topic := FromTopic, subscribe_qos := QoS}) -> - case emqtt:subscribe(ClientPid, FromTopic, QoS) of - {ok, _, _} -> ok; - Error -> throw(Error) - end - end, Subscriptions). +subscribe_remote_topics(_ClientPid, undefined) -> ok; +subscribe_remote_topics(ClientPid, #{subscribe_remote_topic := FromTopic, subscribe_qos := QoS}) -> + case emqtt:subscribe(ClientPid, FromTopic, QoS) of + {ok, _, _} -> ok; + Error -> throw(Error) + end. -without_config(Config) -> - maps:without([conn_type, address, receive_mountpoint, subscriptions], Config). +process_config(#{name := Name, clientid_prefix := Prefix} = Config) -> + Conf0 = maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config), + Conf0#{clientid => iolist_to_binary([str(Prefix), str(Name)])}. + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S. \ No newline at end of file diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl index 26e6ee7c3..c75592edb 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl @@ -65,6 +65,5 @@ drop_bridge(Name) -> ok -> supervisor:delete_child(?MODULE, Name); {error, Error} -> - ?LOG(error, "Delete bridge failed, error : ~p", [Error]), {error, Error} end. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_msg.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_msg.erl index 18a4a74f9..e78844ed4 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_msg.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_msg.erl @@ -18,9 +18,8 @@ -export([ to_binary/1 , from_binary/1 - , to_export/3 - , to_broker_msgs/1 - , to_broker_msg/1 + , make_pub_vars/2 + , to_remote_msg/3 , to_broker_msg/2 , estimate_size/1 ]). @@ -44,6 +43,12 @@ payload := binary() }. +make_pub_vars(_, undefined) -> undefined; +make_pub_vars(Mountpoint, #{payload := _, qos := _, retain := _, remote_topic := Topic} = Conf) -> + Conf#{topic => Topic, mountpoint => Mountpoint}; +make_pub_vars(Mountpoint, #{payload := _, qos := _, retain := _, local_topic := Topic} = Conf) -> + Conf#{topic => Topic, mountpoint => Mountpoint}. + %% @doc Make export format: %% 1. Mount topic to a prefix %% 2. Fix QoS to 1 @@ -51,37 +56,52 @@ %% Shame that we have to know the callback module here %% would be great if we can get rid of #mqtt_msg{} record %% and use #message{} in all places. --spec to_export(emqx_bridge_rpc | emqx_bridge_worker, variables(), msg()) +-spec to_remote_msg(emqx_bridge_rpc | emqx_bridge_worker, msg(), variables()) -> exp_msg(). -to_export(emqx_bridge_mqtt, Vars, #message{flags = Flags0} = Msg) -> +to_remote_msg(emqx_bridge_mqtt, #message{flags = Flags0} = Msg, Vars) -> Retain0 = maps:get(retain, Flags0, false), MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), - to_export(emqx_bridge_mqtt, Vars, MapMsg); -to_export(emqx_bridge_mqtt, #{topic := TopicToken, payload := PayloadToken, - qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}, - MapMsg) when is_map(MapMsg) -> + to_remote_msg(emqx_bridge_mqtt, MapMsg, Vars); +to_remote_msg(emqx_bridge_mqtt, MapMsg, #{topic := TopicToken, payload := PayloadToken, + qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = replace_vars_in_str(PayloadToken, MapMsg), - QoS = replace_vars(QoSToken, MapMsg), - Retain = replace_vars(RetainToken, MapMsg), + QoS = replace_simple_var(QoSToken, MapMsg), + Retain = replace_simple_var(RetainToken, MapMsg), #mqtt_msg{qos = QoS, retain = Retain, topic = topic(Mountpoint, Topic), props = #{}, payload = Payload}; -to_export(_Module, #{mountpoint := Mountpoint}, - #message{topic = Topic} = Msg) -> +to_remote_msg(_Module, #message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. +%% published from remote node over a MQTT connection +to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, + #{topic := TopicToken, payload := PayloadToken, + qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> + Topic = replace_vars_in_str(TopicToken, MapMsg), + Payload = replace_vars_in_str(PayloadToken, MapMsg), + QoS = replace_simple_var(QoSToken, MapMsg), + Retain = replace_simple_var(RetainToken, MapMsg), + set_headers(Props, + emqx_message:set_flags(#{dup => Dup, retain => Retain}, + emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). + +%% Replace a string contains vars to another string in which the placeholders are replace by the +%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: +%% "a: 1". replace_vars_in_str(Tokens, Data) when is_list(Tokens) -> emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary}); replace_vars_in_str(Val, _Data) -> Val. -replace_vars(Tokens, Data) when is_list(Tokens) -> +%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result +%% value will be an integer 1. +replace_simple_var(Tokens, Data) when is_list(Tokens) -> [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), Var; -replace_vars(Val, _Data) -> +replace_simple_var(Val, _Data) -> Val. %% @doc Make `binary()' in order to make iodata to be persisted on disk. @@ -98,22 +118,6 @@ from_binary(Bin) -> binary_to_term(Bin). estimate_size(#message{topic = Topic, payload = Payload}) -> size(Topic) + size(Payload). -%% @doc By message/batch receiver, transform received batch into -%% messages to deliver to local brokers. -to_broker_msgs(Batch) -> lists:map(fun to_broker_msg/1, Batch). - -to_broker_msg(#message{} = Msg) -> - %% internal format from another EMQX node via rpc - Msg; -to_broker_msg(Msg) -> - to_broker_msg(Msg, undefined). -to_broker_msg(#{qos := QoS, dup := Dup, retain := Retain, topic := Topic, - properties := Props, payload := Payload}, Mountpoint) -> - %% published from remote node over a MQTT connection - set_headers(Props, - emqx_message:set_flags(#{dup => Dup, retain => Retain}, - emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). - set_headers(undefined, Msg) -> Msg; set_headers(Val, Msg) -> diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 3d6bf115f..552ebe9a1 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -90,13 +90,9 @@ ]). -export([ get_forwards/1 - , ensure_forward_present/2 - , ensure_forward_absent/2 ]). -export([ get_subscriptions/1 - , ensure_subscription_present/3 - , ensure_subscription_absent/2 ]). %% Internal @@ -183,45 +179,23 @@ get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(10 -spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}]. get_subscriptions(Name) -> gen_statem:call(name(Name), get_subscriptions). -%% @doc Add a new forward (local topic subscription). --spec ensure_forward_present(id(), topic()) -> ok. -ensure_forward_present(Name, Topic) -> - gen_statem:call(name(Name), {ensure_forward_present, topic(Topic)}). - -%% @doc Ensure a forward topic is deleted. --spec ensure_forward_absent(id(), topic()) -> ok. -ensure_forward_absent(Name, Topic) -> - gen_statem:call(name(Name), {ensure_forward_absent, topic(Topic)}). - -%% @doc Ensure subscribed to remote topic. -%% NOTE: only applicable when connection module is emqx_bridge_mqtt -%% return `{error, no_remote_subscription_support}' otherwise. --spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}. -ensure_subscription_present(Name, Topic, QoS) -> - gen_statem:call(name(Name), {ensure_subscription_present, topic(Topic), QoS}). - -%% @doc Ensure unsubscribed from remote topic. -%% NOTE: only applicable when connection module is emqx_bridge_mqtt --spec ensure_subscription_absent(id(), topic()) -> ok. -ensure_subscription_absent(Name, Topic) -> - gen_statem:call(name(Name), {ensure_subscription_absent, topic(Topic)}). - callback_mode() -> [state_functions]. %% @doc Config should be a map(). -init(Opts) -> +init(#{name := Name} = ConnectOpts) -> + ?LOG(info, "starting bridge worker for ~p", [Name]), erlang:process_flag(trap_exit, true), - ConnectOpts = maps:get(config, Opts), ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)), - Forwards = maps:get(forwards, Opts, []), - Queue = open_replayq(maps:get(replayq, Opts, #{})), - State = init_state(Opts), + Forwards = maps:get(forwards, ConnectOpts, #{}), + Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), + State = init_state(ConnectOpts), self() ! idle, - {ok, idle, State#{connect_module => ConnectModule, - connect_opts => pre_process_opts(ConnectOpts), - forwards => Forwards, - replayq => Queue - }}. + {ok, idle, State#{ + connect_module => ConnectModule, + connect_opts => pre_process_opts(ConnectOpts), + forwards => Forwards, + replayq => Queue + }}. init_state(Opts) -> IfRecordMetrics = maps:get(if_record_metrics, Opts, true), @@ -241,27 +215,29 @@ init_state(Opts) -> if_record_metrics => IfRecordMetrics, name => Name}. -open_replayq(QCfg) -> +open_replayq(Name, QCfg) -> Dir = maps:get(dir, QCfg, undefined), SegBytes = maps:get(seg_bytes, QCfg, ?DEFAULT_SEG_BYTES), MaxTotalSize = maps:get(max_total_size, QCfg, ?DEFAULT_MAX_TOTAL_SIZE), QueueConfig = case Dir =:= undefined orelse Dir =:= "" of true -> #{mem_only => true}; - false -> #{dir => Dir, seg_bytes => SegBytes, max_total_size => MaxTotalSize} + false -> #{dir => filename:join([Dir, node(), Name]), + seg_bytes => SegBytes, max_total_size => MaxTotalSize} end, replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1, marshaller => fun ?MODULE:msg_marshaller/1}). pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) -> - ConnectOpts#{subscriptions => [pre_process_in_out(In) || In <- InConf], - forwards => [pre_process_in_out(Out) || Out <- OutConf]}. + ConnectOpts#{subscriptions => pre_process_in_out(InConf), + forwards => pre_process_in_out(OutConf)}. -pre_process_in_out(Conf) -> - Conf1 = pre_process_conf(publish_local_topic, Conf), - Conf2 = pre_process_conf(publish_remote_topic, Conf1), - Conf3 = pre_process_conf(publish_payload, Conf2), - Conf4 = pre_process_conf(publish_qos, Conf3), - pre_process_conf(publish_retain, Conf4). +pre_process_in_out(undefined) -> undefined; +pre_process_in_out(Conf) when is_map(Conf) -> + Conf1 = pre_process_conf(local_topic, Conf), + Conf2 = pre_process_conf(remote_topic, Conf1), + Conf3 = pre_process_conf(payload, Conf2), + Conf4 = pre_process_conf(qos, Conf3), + pre_process_conf(retain, Conf4). pre_process_conf(Key, Conf) -> case maps:find(Key, Conf) of @@ -350,19 +326,7 @@ common(_StateName, {call, From}, ensure_stopped, #{connection := Conn, common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> {keep_state_and_data, [{reply, From, Forwards}]}; common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> - {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, [])}]}; -common(_StateName, {call, From}, {ensure_forward_present, Topic}, State) -> - {Result, NewState} = do_ensure_forward_present(Topic, State), - {keep_state, NewState, [{reply, From, Result}]}; -common(_StateName, {call, From}, {ensure_subscription_present, Topic, QoS}, State) -> - {Result, NewState} = do_ensure_subscription_present(Topic, QoS, State), - {keep_state, NewState, [{reply, From, Result}]}; -common(_StateName, {call, From}, {ensure_forward_absent, Topic}, State) -> - {Result, NewState} = do_ensure_forward_absent(Topic, State), - {keep_state, NewState, [{reply, From, Result}]}; -common(_StateName, {call, From}, {ensure_subscription_absent, Topic}, State) -> - {Result, NewState} = do_ensure_subscription_absent(Topic, State), - {keep_state, NewState, [{reply, From, Result}]}; + {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; common(_StateName, info, {deliver, _, Msg}, State = #{replayq := Q, if_record_metrics := IfRecordMetric}) -> Msgs = collect([Msg]), @@ -379,75 +343,15 @@ common(StateName, Type, Content, #{name := Name} = State) -> [Name, Type, StateName, Content]), {keep_state, State}. -do_ensure_forward_present(Topic, #{forwards := Forwards, name := Name} = State) -> - case is_local_sub_present(Topic, Forwards) of - true -> - {ok, State}; - false -> - R = subscribe_local_topic(Topic, Name), - {R, State#{forwards => [Topic | Forwards]}} - end. - -do_ensure_subscription_present(_Topic, _QoS, #{connection := undefined} = State) -> - {{error, no_connection}, State}; -do_ensure_subscription_present(_Topic, _QoS, #{connect_module := emqx_bridge_rpc} = State) -> - {{error, no_remote_subscription_support}, State}; -do_ensure_subscription_present(Topic, QoS, #{connect_module := ConnectModule, - connection := Conn} = State) -> - case is_remote_sub_present(Topic, maps:get(subscriptions, Conn, [])) of - true -> - {ok, State}; - false -> - case ConnectModule:ensure_subscribed(Conn, Topic, QoS) of - {error, Error} -> - {{error, Error}, State}; - Conn1 -> - {ok, State#{connection => Conn1}} - end - end. - -do_ensure_forward_absent(Topic, #{forwards := Forwards} = State) -> - case is_local_sub_present(Topic, Forwards) of - true -> - R = do_unsubscribe(Topic), - {R, State#{forwards => lists:delete(Topic, Forwards)}}; - false -> - {ok, State} - end. -do_ensure_subscription_absent(_Topic, #{connection := undefined} = State) -> - {{error, no_connection}, State}; -do_ensure_subscription_absent(_Topic, #{connect_module := emqx_bridge_rpc} = State) -> - {{error, no_remote_subscription_support}, State}; -do_ensure_subscription_absent(Topic, #{connect_module := ConnectModule, - connection := Conn} = State) -> - case is_remote_sub_present(Topic, maps:get(subscriptions, Conn, [])) of - true -> - case ConnectModule:ensure_unsubscribed(Conn, Topic) of - {error, Error} -> - {{error, Error}, State}; - Conn1 -> - {ok, State#{connection => Conn1}} - end; - false -> - {ok, State} - end. - -is_local_sub_present(Topic, Configs) -> - is_topic_present(subscribe_local_topic, Topic, Configs). -is_remote_sub_present(Topic, Configs) -> - is_topic_present(subscribe_remote_topic, Topic, Configs). - -is_topic_present(Type, Topic, Configs) -> - lists:any(fun(Conf) -> - Topic == maps:get(Type, Conf, undefined) - end, Configs). - do_connect(#{forwards := Forwards, connect_module := ConnectModule, connect_opts := ConnectOpts, inflight := Inflight, name := Name} = State) -> - ok = subscribe_local_topics(Forwards, Name), + case Forwards of + undefined -> ok; + #{subscribe_local_topic := Topic} -> subscribe_local_topic(Topic, Name) + end, case ConnectModule:start(ConnectOpts) of {ok, Conn} -> ?tp(info, connected, #{name => Name, inflight => length(Inflight)}), @@ -503,16 +407,18 @@ pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) -> end. %% Assert non-empty batch because we have a is_empty check earlier. +do_send(#{forwards := undefined}, _QAckRef, Batch) -> + ?LOG(error, "cannot forward messages to remote broker as 'bridge.mqtt..in' not configured, msg: ~p", [Batch]); do_send(#{inflight := Inflight, connect_module := Module, connection := Connection, mountpoint := Mountpoint, forwards := Forwards, if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) -> - Vars = make_export_variables(Mountpoint, Forwards), + Vars = emqx_bridge_msg:make_pub_vars(Mountpoint, Forwards), ExportMsg = fun(Message) -> bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'), - emqx_bridge_msg:to_export(Module, Vars, Message) + emqx_bridge_msg:to_remote_msg(Module, Message, Vars) end, case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of {ok, Refs} -> @@ -524,15 +430,6 @@ do_send(#{inflight := Inflight, {error, State} end. -make_export_variables(Mountpoint, #{ - publish_remote_topic := PubTopic, - publish_payload := PayloadTmpl, - publish_qos := PubQoS, - publish_retain := PubRetain}) -> - #{topic => PubTopic, payload => PayloadTmpl, - qos => PubQoS, retain => PubRetain, - mountpoint => Mountpoint}. - %% map as set, ack-reference -> 1 map_set(Ref) when is_reference(Ref) -> %% QoS-0 or RPC call returns a reference @@ -578,9 +475,6 @@ drop_acked_batches(Q, [#{send_ack_ref := Refs, All end. -subscribe_local_topics(Topics, Name) -> - lists:foreach(fun(Topic) -> subscribe_local_topic(Topic, Name) end, Topics). - subscribe_local_topic(Topic, Name) -> do_subscribe(Topic, Name). @@ -597,14 +491,9 @@ validate(RawTopic) -> do_subscribe(RawTopic, Name) -> TopicFilter = validate(RawTopic), - {Topic, SubOpts} = emqx_topic:parse(TopicFilter, #{qos => ?QOS_1}), + {Topic, SubOpts} = emqx_topic:parse(TopicFilter, #{qos => ?QOS_2}), emqx_broker:subscribe(Topic, Name, SubOpts). -do_unsubscribe(RawTopic) -> - TopicFilter = validate(RawTopic), - {Topic, _SubOpts} = emqx_topic:parse(TopicFilter), - emqx_broker:unsubscribe(Topic). - disconnect(#{connection := Conn, connect_module := Module } = State) when Conn =/= undefined -> @@ -622,7 +511,7 @@ format_mountpoint(undefined) -> format_mountpoint(Prefix) -> binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). -name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). +name(Id) -> list_to_atom(str(Id)). register_metrics() -> lists:foreach(fun emqx_metrics:ensure/1, @@ -657,3 +546,10 @@ conn_type(mqtt) -> emqx_bridge_mqtt; conn_type(Mod) when is_atom(Mod) -> Mod. + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S. \ No newline at end of file diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 475d83ac7..e3a8f143e 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -40,7 +40,7 @@ fields("config") -> , {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")} , {proto_ver, fun proto_ver/1} , {bridge_mode, emqx_schema:t(boolean(), undefined, true)} - , {clientid, emqx_schema:t(string())} + , {clientid_prefix, emqx_schema:t(string())} , {username, emqx_schema:t(string())} , {password, emqx_schema:t(string())} , {clean_start, emqx_schema:t(boolean(), undefined, true)} @@ -54,14 +54,14 @@ fields("config") -> fields("in") -> [ {subscribe_remote_topic, #{type => binary(), nullable => false}} - , {publish_local_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)} + , {local_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)} , {subscribe_qos, emqx_schema:t(hoconsc:union([0, 1, 2, binary()]), undefined, 1)} - ] ++ publish_confs(); + ] ++ common_inout_confs(); fields("out") -> [ {subscribe_local_topic, #{type => binary(), nullable => false}} - , {publish_remote_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)} - ] ++ publish_confs(); + , {remote_topic, emqx_schema:t(binary(), undefined, <<"${topic}">>)} + ] ++ common_inout_confs(); fields("replayq") -> [ {dir, hoconsc:union([boolean(), string()])} @@ -70,10 +70,13 @@ fields("replayq") -> , {max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")} ]. +common_inout_confs() -> + [{id, #{type => binary(), nullable => false}}] ++ publish_confs(). + publish_confs() -> - [ {publish_qos, emqx_schema:t(hoconsc:union([0, 1, 2, binary()]), undefined, <<"${qos}">>)} - , {publish_retain, emqx_schema:t(hoconsc:union([boolean(), binary()]), undefined, <<"${retain}">>)} - , {publish_payload, emqx_schema:t(binary(), undefined, <<"${payload}">>)} + [ {qos, emqx_schema:t(hoconsc:union([0, 1, 2, binary()]), undefined, <<"${qos}">>)} + , {retain, emqx_schema:t(hoconsc:union([boolean(), binary()]), undefined, <<"${retain}">>)} + , {payload, emqx_schema:t(binary(), undefined, <<"${payload}">>)} ]. proto_ver(type) -> hoconsc:enum([v3, v4, v5]); @@ -81,72 +84,125 @@ proto_ver(default) -> v4; proto_ver(_) -> undefined. %% =================================================================== -on_start(InstId, #{server := Server, - reconnect_interval := ReconnIntv, - proto_ver := ProtoVer, - bridge_mode := BridgeMod, - clientid := ClientID, - username := User, - password := Password, - clean_start := CleanStart, - keepalive := KeepAlive, - retry_interval := RetryIntv, - max_inflight := MaxInflight, - replayq := ReplayQ, - in := In, - out := Out, - ssl := #{enable := EnableSsl} = Ssl} = Conf) -> +on_start(InstId, Conf) -> logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]), - BridgeName = binary_to_atom(InstId, latin1), - BridgeConf = Conf#{ - name => BridgeName, - config => #{ - conn_type => mqtt, - subscriptions => In, - forwards => Out, - replayq => ReplayQ, - %% connection opts - server => Server, - reconnect_interval => ReconnIntv, - proto_ver => ProtoVer, - bridge_mode => BridgeMod, - clientid => ClientID, - username => User, - password => Password, - clean_start => CleanStart, - keepalive => KeepAlive, - retry_interval => RetryIntv, - max_inflight => MaxInflight, - ssl => EnableSsl, - ssl_opts => maps:to_list(maps:remove(enable, Ssl)), - if_record_metrics => true - } - }, - case emqx_bridge_mqtt_sup:create_bridge(BridgeConf) of - {ok, _Pid} -> - start_bridge(BridgeName); - {error, {already_started, _Pid}} -> - start_bridge(BridgeName); - {error, Reason} -> - {error, Reason} - end. + NamePrefix = binary_to_list(InstId), + BasicConf = basic_config(Conf), + InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, sub_bridges => []}}, + InOutConfigs = check_channel_id_dup(maps:get(in, Conf, []) ++ maps:get(out, Conf, [])), + lists:foldl(fun + (_InOutConf, {error, Reason}) -> + {error, Reason}; + (InOutConf, {ok, #{sub_bridges := SubBridges} = Res}) -> + case create_channel(InOutConf, NamePrefix, BasicConf) of + {error, Reason} -> {error, Reason}; + {ok, Name} -> {ok, Res#{sub_bridges => [Name | SubBridges]}} + end + end, InitRes, InOutConfigs). on_stop(InstId, #{}) -> logger:info("stopping mqtt connector: ~p", [InstId]), - emqx_bridge_mqtt_sup:drop_bridge(InstId). + case emqx_bridge_mqtt_sup:drop_bridge(InstId) of + ok -> ok; + {error, not_found} -> ok; + {error, Reason} -> + logger:error("stop bridge failed, error: ~p", [Reason]) + end. %% TODO: let the emqx_resource trigger on_query/4 automatically according to the %% `in` and `out` config +on_query(InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix, + baisc_conf := BasicConf}) -> + logger:debug("create channel to connector: ~p, conf: ~p", [InstId, Conf]), + create_channel(Conf, Prefix, BasicConf); on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) -> logger:debug("publish to local node, connector: ~p, msg: ~p", [InstId, Msg]); on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) -> logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]). -on_health_check(_InstId, #{bridge_name := Name}) -> - {ok, emqx_bridge_worker:ping(Name)}. +on_health_check(_InstId, #{sub_bridges := NameList} = State) -> + Results = [{Name, emqx_bridge_worker:ping(Name)} || Name <- NameList], + case lists:all(fun({_, pong}) -> true; ({_, _}) -> false end, Results) of + true -> {ok, State}; + false -> {error, {some_sub_bridge_down, Results}, State} + end. -start_bridge(Name) -> +check_channel_id_dup(Confs) -> + lists:foreach(fun(#{id := Id}) -> + case length([Id || #{id := Id0} <- Confs, Id0 == Id]) of + 1 -> ok; + L when L > 1 -> error({mqtt_bridge_conf, {duplicate_id_found, Id}}) + end + end, Confs), + Confs. + +%% this is an `in` bridge +create_channel(#{subscribe_remote_topic := _, id := BridgeId} = InConf, NamePrefix, BasicConf) -> + logger:info("creating 'in' channel for: ~p", [BridgeId]), + create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId), + subscriptions => InConf, forwards => undefined}); +%% this is an `out` bridge +create_channel(#{subscribe_local_topic := _, id := BridgeId} = OutConf, NamePrefix, BasicConf) -> + logger:info("creating 'out' channel for: ~p", [BridgeId]), + create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId), + subscriptions => undefined, forwards => OutConf}). + +create_sub_bridge(#{name := Name} = Conf) -> + case emqx_bridge_mqtt_sup:create_bridge(Conf) of + {ok, _Pid} -> + start_sub_bridge(Name); + {error, {already_started, _Pid}} -> + ok; + {error, Reason} -> + {error, Reason} + end. + +start_sub_bridge(Name) -> case emqx_bridge_worker:ensure_started(Name) of - ok -> {ok, #{bridge_name => Name}}; + ok -> {ok, Name}; {error, Reason} -> {error, Reason} end. + +basic_config(#{ + server := Server, + reconnect_interval := ReconnIntv, + proto_ver := ProtoVer, + bridge_mode := BridgeMod, + clientid_prefix := ClientIdPrefix, + username := User, + password := Password, + clean_start := CleanStart, + keepalive := KeepAlive, + retry_interval := RetryIntv, + max_inflight := MaxInflight, + replayq := ReplayQ, + ssl := #{enable := EnableSsl} = Ssl}) -> + #{ + conn_type => mqtt, + replayq => ReplayQ, + %% connection opts + server => Server, + reconnect_interval => ReconnIntv, + proto_ver => ProtoVer, + bridge_mode => BridgeMod, + clientid_prefix => ClientIdPrefix, + username => User, + password => Password, + clean_start => CleanStart, + keepalive => KeepAlive, + retry_interval => RetryIntv, + max_inflight => MaxInflight, + ssl => EnableSsl, + ssl_opts => maps:to_list(maps:remove(enable, Ssl)), + if_record_metrics => true + }. + +bridge_name(Prefix, Id) -> + list_to_atom(str(Prefix) ++ ":" ++ str(Id)). + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index f9e210ab3..e8dcc8a58 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -222,6 +222,7 @@ do_compare('<=', L, R) -> L =< R; do_compare('>=', L, R) -> L >= R; do_compare('<>', L, R) -> L /= R; do_compare('!=', L, R) -> L /= R; +do_compare('~=', T, F) -> emqx_topic:match(T, F); do_compare('=~', T, F) -> emqx_topic:match(T, F). number(Bin) ->