From 0cbdaa0f40eb54b3c2c51e89d95b3b0cbf015dc5 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 21 Oct 2021 17:13:13 +0800 Subject: [PATCH] refactor(bridge): update config struct for mqtt bridge --- apps/emqx_bridge/etc/emqx_bridge.conf | 112 +++++++----------- .../src/mqtt/emqx_connector_mqtt_schema.erl | 78 ++++++++---- 2 files changed, 97 insertions(+), 93 deletions(-) diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index e7e5dffbb..f4c9ac74b 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -3,74 +3,50 @@ ##-------------------------------------------------------------------- ## MQTT bridges to/from another MQTT broker -#bridges.mqtt.my_mqtt_bridge_from_aws { -# server = "127.0.0.1:1883" -# proto_ver = "v4" -# clientid = "my_mqtt_bridge_from_aws" -# username = "username1" -# password = "" -# clean_start = true -# keepalive = 300 -# retry_interval = "30s" -# max_inflight = 32 -# reconnect_interval = "30s" -# bridge_mode = true -# replayq { -# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/" -# seg_bytes = "100MB" -# offload = false -# max_total_bytes = "1GB" -# } -# ssl { -# enable = false -# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem" -# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" -# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" -# } -# -# ## topic mappings for this bridge -# direction = in -# from_remote_topic = "aws/#" -# subscribe_qos = 1 -# to_local_topic = "from_aws/${topic}" -# payload = "${payload}" -# qos = "${qos}" -# retain = "${retain}" -#} -# -#bridges.mqtt.my_mqtt_bridge_to_aws { -# server = "127.0.0.1:1883" -# proto_ver = "v4" -# clientid = "my_mqtt_bridge_to_aws" -# username = "username1" -# password = "" -# clean_start = true -# keepalive = 300 -# retry_interval = "30s" -# max_inflight = 32 -# reconnect_interval = "30s" -# bridge_mode = true -# replayq { -# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/" -# seg_bytes = "100MB" -# offload = false -# max_total_bytes = "1GB" -# } -# ssl { -# enable = false -# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem" -# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" -# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" -# } -# -# ## topic mappings for this bridge -# direction = out -# from_local_topic = "emqx/#" -# to_remote_topic = "from_emqx/${topic}" -# payload = "${payload}" -# qos = 1 -# retain = false -#} +bridges.mqtt.my_mqtt_bridge_to_aws { + server = "127.0.0.1:1883" + proto_ver = "v4" + clientid = "my_mqtt_bridge_to_aws" + username = "username1" + password = "" + clean_start = true + keepalive = 300 + retry_interval = "30s" + max_inflight = 32 + reconnect_interval = "30s" + bridge_mode = true + replayq { + dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/" + seg_bytes = "100MB" + offload = false + max_total_bytes = "1GB" + } + ssl { + enable = false + keyfile = "{{ platform_etc_dir }}/certs/client-key.pem" + certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" + cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" + } + + ## topic mappings for this bridge + ingress { + from_remote_topic = "aws/#" + subscribe_qos = 1 + to_local_topic = "from_aws/${topic}" + payload = "${payload}" + qos = "${qos}" + retain = "${retain}" + } + + egress { + from_local_topic = "emqx/#" + to_remote_topic = "from_emqx/${topic}" + payload = "${payload}" + qos = 1 + retain = false + } + +} ## HTTP bridges to an HTTP server bridges.http.my_http_bridge { diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index b0aaeb8b6..67df50213 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -26,53 +26,81 @@ -import(emqx_schema, [mk_duration/2]). roots() -> - [{config, #{type => hoconsc:ref(?MODULE, "config")}}]. + fields("config"). fields("config") -> - [ {server, hoconsc:mk(emqx_schema:ip_port(), #{default => "127.0.0.1:1883"})} + [ {server, + sc(emqx_schema:ip_port(), + #{ default => "127.0.0.1:1883" + , desc => "The host and port of the remote MQTT broker" + })} , {reconnect_interval, mk_duration("reconnect interval", #{default => "30s"})} - , {proto_ver, fun proto_ver/1} - , {bridge_mode, hoconsc:mk(boolean(), #{default => true})} - , {username, hoconsc:mk(string())} - , {password, hoconsc:mk(string())} - , {clean_start, hoconsc:mk(boolean(), #{default => true})} + , {proto_ver, + sc(hoconsc:enum([v3, v4, v5]), + #{ default => v4 + , desc => "The MQTT protocol version" + })} + , {bridge_mode, + sc(boolean(), + #{ default => true + , desc => "The bridge mode of the MQTT protocol" + })} + , {username, + sc(binary(), + #{ default => "emqx" + , desc => "The username of the MQTT protocol" + })} + , {password, + sc(binary(), + #{ default => "emqx" + , desc => "The password of the MQTT protocol" + })} + , {clientid, + sc(binary(), + #{ default => "emqx_${nodename}" + , desc => "The clientid of the MQTT protocol" + })} + , {clean_start, + sc(boolean(), + #{ default => true + , desc => "The clean-start or the clean-session of the MQTT protocol" + })} , {keepalive, mk_duration("keepalive", #{default => "300s"})} , {retry_interval, mk_duration("retry interval", #{default => "30s"})} - , {max_inflight, hoconsc:mk(integer(), #{default => 32})} - , {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))} - , {ingress_channels, hoconsc:mk(hoconsc:map(id, hoconsc:ref(?MODULE, "ingress_channels")), #{default => []})} - , {egress_channels, hoconsc:mk(hoconsc:map(id, hoconsc:ref(?MODULE, "egress_channels")), #{default => []})} + , {max_inflight, sc(integer(), #{default => 32})} + , {replayq, sc(ref("replayq"))} + , {ingress_channels, sc(hoconsc:map(id, ref("ingress_channels")), #{default => []})} + , {egress_channels, sc(hoconsc:map(id, ref("egress_channels")), #{default => []})} ] ++ emqx_connector_schema_lib:ssl_fields(); fields("ingress_channels") -> %% the message maybe subscribed by rules, in this case 'local_topic' is not necessary - [ {subscribe_remote_topic, hoconsc:mk(binary(), #{nullable => false})} - , {local_topic, hoconsc:mk(binary())} - , {subscribe_qos, hoconsc:mk(qos(), #{default => 1})} + [ {subscribe_remote_topic, sc(binary(), #{nullable => false})} + , {local_topic, sc(binary())} + , {subscribe_qos, sc(qos(), #{default => 1})} ] ++ common_inout_confs(); fields("egress_channels") -> %% the message maybe sent from rules, in this case 'subscribe_local_topic' is not necessary - [ {subscribe_local_topic, hoconsc:mk(binary())} - , {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})} + [ {subscribe_local_topic, sc(binary())} + , {remote_topic, sc(binary(), #{default => <<"${topic}">>})} ] ++ common_inout_confs(); fields("replayq") -> [ {dir, hoconsc:union([boolean(), string()])} - , {seg_bytes, hoconsc:mk(emqx_schema:bytesize(), #{default => "100MB"})} - , {offload, hoconsc:mk(boolean(), #{default => false})} - , {max_total_bytes, hoconsc:mk(emqx_schema:bytesize(), #{default => "1024MB"})} + , {seg_bytes, sc(emqx_schema:bytesize(), #{default => "100MB"})} + , {offload, sc(boolean(), #{default => false})} + , {max_total_bytes, sc(emqx_schema:bytesize(), #{default => "1024MB"})} ]. common_inout_confs() -> - [ {qos, hoconsc:mk(qos(), #{default => <<"${qos}">>})} - , {retain, hoconsc:mk(hoconsc:union([boolean(), binary()]), #{default => <<"${retain}">>})} - , {payload, hoconsc:mk(binary(), #{default => <<"${payload}">>})} + [ {qos, sc(qos(), #{default => <<"${qos}">>})} + , {retain, sc(hoconsc:union([boolean(), binary()]), #{default => <<"${retain}">>})} + , {payload, sc(binary(), #{default => <<"${payload}">>})} ]. qos() -> hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]). -proto_ver(type) -> hoconsc:enum([v3, v4, v5]); -proto_ver(default) -> v4; -proto_ver(_) -> undefined. +sc(Type, Meta) -> hoconsc:mk(Type, Meta). +ref(Field) -> hoconsc:ref(?MODULE, Field).