diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 23506669c..65f612f21 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -51,9 +51,9 @@ end_per_testcase(_, _Config) -> -define(HTTP_BRIDGE(PATH), #{ <<"base_url">> => <<"http://localhost:9901">>, - <<"egress_channels">> => #{ + <<"egress">> => #{ <<"a">> => #{ - <<"subscribe_local_topic">> => <<"emqx_http/#">>, + <<"from_local_topic">> => <<"emqx_http/#">>, <<"method">> => <<"post">>, <<"path">> => PATH, <<"body">> => <<"${payload}">>, @@ -114,7 +114,7 @@ t_crud_apis(_) -> , bridge_type := http , is_connected := _ , node := _ - , <<"egress_channels">> := #{ + , <<"egress">> := #{ <<"a">> := #{<<"path">> := ?PATH1} } }, Bridge), @@ -127,7 +127,7 @@ t_crud_apis(_) -> ?assertMatch(#{ id := <<"http:test_bridge">> , bridge_type := http , is_connected := _ - , <<"egress_channels">> := #{ + , <<"egress">> := #{ <<"a">> := #{<<"path">> := ?PATH2} } }, Bridge2), diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index a4527984a..f98a5fa77 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -100,8 +100,8 @@ on_start(InstId, Conf) -> "bridge:" ++ NamePrefix = binary_to_list(InstId), BasicConf = basic_config(Conf), InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}}, - InOutConfigs = taged_map_list(ingress_channels, maps:get(ingress_channels, Conf, #{})) - ++ taged_map_list(egress_channels, maps:get(egress_channels, Conf, #{})), + InOutConfigs = taged_map_list(ingress, maps:get(ingress, Conf, #{})) + ++ taged_map_list(egress, maps:get(egress, Conf, #{})), lists:foldl(fun (_InOutConf, {error, Reason}) -> {error, Reason}; @@ -120,7 +120,7 @@ on_stop(InstId, #{channels := NameList}) -> end, NameList). %% TODO: let the emqx_resource trigger on_query/4 automatically according to the -%% `ingress_channels` and `egress_channels` config +%% `ingress` and `egress` config on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix, baisc_conf := BasicConf}) -> create_channel(Conf, Prefix, BasicConf); @@ -136,36 +136,36 @@ on_health_check(_InstId, #{channels := NameList} = State) -> false -> {error, {some_channel_down, Results}, State} end. -create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT} = Conf}, +create_channel({{ingress, Id}, #{from_remote_topic := RemoteT} = Conf}, NamePrefix, BasicConf) -> - LocalT = maps:get(local_topic, Conf, undefined), + LocalT = maps:get(to_local_topic, Conf, undefined), ChannId = ingress_channel_id(NamePrefix, Id), ?SLOG(info, #{msg => "creating ingress channel", - remote_topic => RemoteT, - local_topic => LocalT, + to_remote_topic => RemoteT, + to_local_topic => LocalT, channel_id => ChannId}), do_create_channel(BasicConf#{ name => ChannId, clientid => clientid(ChannId), subscriptions => Conf#{ - local_topic => LocalT, + to_local_topic => LocalT, on_message_received => {fun ?MODULE:on_message_received/2, [ChannId]} }, forwards => undefined}); -create_channel({{egress_channels, Id}, #{remote_topic := RemoteT} = Conf}, +create_channel({{egress, Id}, #{to_remote_topic := RemoteT} = Conf}, NamePrefix, BasicConf) -> - LocalT = maps:get(subscribe_local_topic, Conf, undefined), + LocalT = maps:get(from_local_topic, Conf, undefined), ChannId = egress_channel_id(NamePrefix, Id), ?SLOG(info, #{msg => "creating egress channel", - remote_topic => RemoteT, - local_topic => LocalT, + to_remote_topic => RemoteT, + to_local_topic => LocalT, channel_id => ChannId}), do_create_channel(BasicConf#{ name => ChannId, clientid => clientid(ChannId), subscriptions => undefined, - forwards => Conf#{subscribe_local_topic => LocalT}}). + forwards => Conf#{from_local_topic => LocalT}}). remove_channel(ChannId) -> ?SLOG(info, #{msg => "removing channel", @@ -229,9 +229,9 @@ taged_map_list(Tag, Map) -> [{{Tag, K}, V} || {K, V} <- maps:to_list(Map)]. ingress_channel_id(Prefix, Id) -> - channel_name("ingress_channels", Prefix, Id). + channel_name("ingress", Prefix, Id). egress_channel_id(Prefix, Id) -> - channel_name("egress_channels", Prefix, Id). + channel_name("egress", Prefix, Id). channel_name(Type, Prefix, Id) -> list_to_atom(str(Prefix) ++ ":" ++ Type ++ ":" ++ str(Id)). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 853221eec..9b529b340 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -65,7 +65,7 @@ start(Config) -> case emqtt:connect(Pid) of {ok, _} -> try - ok = subscribe_remote_topics(Pid, Subscriptions), + ok = from_remote_topics(Pid, Subscriptions), {ok, #{client_pid => Pid, subscriptions => Subscriptions}} catch throw : Reason -> @@ -167,7 +167,7 @@ handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) -> message => Msg, vars => Vars}), emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), _ = erlang:apply(OnMsgRcvdFunc, [Msg | Args]), - case maps:get(local_topic, Vars, undefined) of + case maps:get(to_local_topic, Vars, undefined) of undefined -> ok; _Topic -> emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)) @@ -182,8 +182,8 @@ make_hdlr(Parent, Vars) -> disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]} }. -subscribe_remote_topics(_ClientPid, undefined) -> ok; -subscribe_remote_topics(ClientPid, #{subscribe_remote_topic := FromTopic, subscribe_qos := QoS}) -> +from_remote_topics(_ClientPid, undefined) -> ok; +from_remote_topics(ClientPid, #{from_remote_topic := FromTopic, subscribe_qos := QoS}) -> case emqtt:subscribe(ClientPid, FromTopic, QoS) of {ok, _, _} -> ok; Error -> throw(Error) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 6009cc084..7b49f21fe 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -36,7 +36,7 @@ -type variables() :: #{ mountpoint := undefined | binary(), - remote_topic := binary(), + to_remote_topic := binary(), qos := original | integer(), retain := original | boolean(), payload := binary() @@ -59,7 +59,7 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> Retain0 = maps:get(retain, Flags0, false), MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), to_remote_msg(MapMsg, Vars); -to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, +to_remote_msg(MapMsg, #{to_remote_topic := TopicToken, payload := PayloadToken, qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = replace_vars_in_str(PayloadToken, MapMsg), @@ -75,7 +75,7 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> %% published from remote node over a MQTT connection to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, - #{local_topic := TopicToken, payload := PayloadToken, + #{to_local_topic := TopicToken, payload := PayloadToken, qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = replace_vars_in_str(PayloadToken, MapMsg), diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 67df50213..d53716ced 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -67,36 +67,131 @@ fields("config") -> })} , {keepalive, mk_duration("keepalive", #{default => "300s"})} , {retry_interval, mk_duration("retry interval", #{default => "30s"})} - , {max_inflight, sc(integer(), #{default => 32})} - , {replayq, sc(ref("replayq"))} - , {ingress_channels, sc(hoconsc:map(id, ref("ingress_channels")), #{default => []})} - , {egress_channels, sc(hoconsc:map(id, ref("egress_channels")), #{default => []})} + , {max_inflight, + sc(integer(), + #{ default => 32 + , desc => "Max inflight messages (sent but ACK has not received) of the MQTT protocol" + })} + , {replayq, + sc(ref("replayq"), + #{ desc => """ +Queue messages in disk files. +""" + })} + , {ingress, + sc(ref("ingress"), + #{ default => #{} + , desc => """ +The ingress config defines how this bridge receive messages from the remote MQTT broker, and then +send them to the local broker.
+Template with variables is allowed in 'to_local_topic', 'subscribe_qos', 'qos', 'retain', +'payload'.
+NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also to_local_topic is +configured, then messages got from the remote broker will be sent to both the 'to_local_topic' and +the rule. +""" + })} + , {egress, + sc(hoconsc:map(id, ref("egress")), + #{ default => #{} + , desc => """ +The egress config defines how this bridge forwards messages from the local broker to the remote +broker.
+Template with variables is allowed in 'to_remote_topic', 'qos', 'retain', 'payload'.
+NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic +is configured, then both the data got from the rule and the MQTT messages that matches +from_local_topic will be forwarded. +""" + })} ] ++ emqx_connector_schema_lib:ssl_fields(); -fields("ingress_channels") -> - %% the message maybe subscribed by rules, in this case 'local_topic' is not necessary - [ {subscribe_remote_topic, sc(binary(), #{nullable => false})} - , {local_topic, sc(binary())} - , {subscribe_qos, sc(qos(), #{default => 1})} +fields("ingress") -> + %% the message maybe subscribed by rules, in this case 'to_local_topic' is not necessary + [ {from_remote_topic, + sc(binary(), + #{ nullable => false + , desc => "Receive messages from which topic of the remote broker" + })} + , {subscribe_qos, + sc(qos(), + #{ default => 1 + , desc => "The QoS level to be used when subscribing to the remote broker" + })} + , {to_local_topic, + sc(binary(), + #{ desc => """ +Send messages to which topic of the local broker.
+Template with variables is allowed. +""" + })} ] ++ common_inout_confs(); -fields("egress_channels") -> - %% the message maybe sent from rules, in this case 'subscribe_local_topic' is not necessary - [ {subscribe_local_topic, sc(binary())} - , {remote_topic, sc(binary(), #{default => <<"${topic}">>})} +fields("egress") -> + %% the message maybe sent from rules, in this case 'from_local_topic' is not necessary + [ {from_local_topic, + sc(binary(), + #{ desc => "The local topic to be forwarded to the remote broker" + })} + , {to_remote_topic, + sc(binary(), + #{ default => <<"${topic}">> + , desc => """ +Forward to which topic of the remote broker.
+Template with variables is allowed. +""" + })} ] ++ common_inout_confs(); fields("replayq") -> - [ {dir, hoconsc:union([boolean(), string()])} - , {seg_bytes, sc(emqx_schema:bytesize(), #{default => "100MB"})} - , {offload, sc(boolean(), #{default => false})} - , {max_total_bytes, sc(emqx_schema:bytesize(), #{default => "1024MB"})} + [ {dir, + sc(hoconsc:union([boolean(), string()]), + #{ desc => """ +The dir where the replayq file saved.
+Set to 'false' disables the replayq feature. +""" + })} + , {seg_bytes, + sc(emqx_schema:bytesize(), + #{ default => "100MB" + , desc => """ +The size in bytes of a single segment.
+A segment is mapping to a file in the replayq dir. If the current segment is full, a new segment +(file) will be opened to write. +""" + })} + , {offload, + sc(boolean(), + #{ default => false + , desc => """ +In offload mode, the disk queue is only used to offload queue tail segments.
+The messages are cached in the memory first, then it write to the replayq files after the size of +the memory cache reaches 'seg_bytes'. +""" + })} ]. common_inout_confs() -> - [ {qos, sc(qos(), #{default => <<"${qos}">>})} - , {retain, sc(hoconsc:union([boolean(), binary()]), #{default => <<"${retain}">>})} - , {payload, sc(binary(), #{default => <<"${payload}">>})} + [ {qos, + sc(qos(), + #{ default => <<"${qos}">> + , desc => """ +The QoS of the MQTT message to be sent.
+Template with variables is allowed.""" + })} + , {retain, + sc(hoconsc:union([boolean(), binary()]), + #{ default => <<"${retain}">> + , desc => """ +The retain flag of the MQTT message to be sent.
+Template with variables is allowed.""" + })} + , {payload, + sc(binary(), + #{ default => <<"${payload}">> + , desc => """ +The payload of the MQTT message to be sent.
+Template with variables is allowed.""" + })} ]. qos() -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 990d15ef5..3ab829218 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -235,8 +235,8 @@ pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) pre_process_in_out(undefined) -> undefined; pre_process_in_out(Conf) when is_map(Conf) -> - Conf1 = pre_process_conf(local_topic, Conf), - Conf2 = pre_process_conf(remote_topic, Conf1), + Conf1 = pre_process_conf(to_local_topic, Conf), + Conf2 = pre_process_conf(to_remote_topic, Conf1), Conf3 = pre_process_conf(payload, Conf2), Conf4 = pre_process_conf(qos, Conf3), pre_process_conf(retain, Conf4). @@ -347,7 +347,7 @@ do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards}, name := Name} = State) -> case Forwards of undefined -> ok; - #{subscribe_local_topic := Topic} -> subscribe_local_topic(Topic, Name) + #{from_local_topic := Topic} -> from_local_topic(Topic, Name) end, case emqx_connector_mqtt_mod:start(ConnectOpts) of {ok, Conn} -> @@ -473,9 +473,9 @@ drop_acked_batches(Q, [#{send_ack_ref := Refs, All end. -subscribe_local_topic(undefined, _Name) -> +from_local_topic(undefined, _Name) -> ok; -subscribe_local_topic(Topic, Name) -> +from_local_topic(Topic, Name) -> do_subscribe(Topic, Name). topic(T) -> iolist_to_binary(T).