refactor(bridge): update config struct for mqtt bridge

This commit is contained in:
Shawn 2021-10-21 17:13:13 +08:00
parent d046f9c6e7
commit 0cbdaa0f40
2 changed files with 97 additions and 93 deletions

View File

@ -3,74 +3,50 @@
##--------------------------------------------------------------------
## MQTT bridges to/from another MQTT broker
#bridges.mqtt.my_mqtt_bridge_from_aws {
# server = "127.0.0.1:1883"
# proto_ver = "v4"
# clientid = "my_mqtt_bridge_from_aws"
# username = "username1"
# password = ""
# clean_start = true
# keepalive = 300
# retry_interval = "30s"
# max_inflight = 32
# reconnect_interval = "30s"
# bridge_mode = true
# replayq {
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
# seg_bytes = "100MB"
# offload = false
# max_total_bytes = "1GB"
# }
# ssl {
# enable = false
# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
#
# ## topic mappings for this bridge
# direction = in
# from_remote_topic = "aws/#"
# subscribe_qos = 1
# to_local_topic = "from_aws/${topic}"
# payload = "${payload}"
# qos = "${qos}"
# retain = "${retain}"
#}
#
#bridges.mqtt.my_mqtt_bridge_to_aws {
# server = "127.0.0.1:1883"
# proto_ver = "v4"
# clientid = "my_mqtt_bridge_to_aws"
# username = "username1"
# password = ""
# clean_start = true
# keepalive = 300
# retry_interval = "30s"
# max_inflight = 32
# reconnect_interval = "30s"
# bridge_mode = true
# replayq {
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
# seg_bytes = "100MB"
# offload = false
# max_total_bytes = "1GB"
# }
# ssl {
# enable = false
# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
#
# ## topic mappings for this bridge
# direction = out
# from_local_topic = "emqx/#"
# to_remote_topic = "from_emqx/${topic}"
# payload = "${payload}"
# qos = 1
# retain = false
#}
bridges.mqtt.my_mqtt_bridge_to_aws {
server = "127.0.0.1:1883"
proto_ver = "v4"
clientid = "my_mqtt_bridge_to_aws"
username = "username1"
password = ""
clean_start = true
keepalive = 300
retry_interval = "30s"
max_inflight = 32
reconnect_interval = "30s"
bridge_mode = true
replayq {
dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
seg_bytes = "100MB"
offload = false
max_total_bytes = "1GB"
}
ssl {
enable = false
keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
}
## topic mappings for this bridge
ingress {
from_remote_topic = "aws/#"
subscribe_qos = 1
to_local_topic = "from_aws/${topic}"
payload = "${payload}"
qos = "${qos}"
retain = "${retain}"
}
egress {
from_local_topic = "emqx/#"
to_remote_topic = "from_emqx/${topic}"
payload = "${payload}"
qos = 1
retain = false
}
}
## HTTP bridges to an HTTP server
bridges.http.my_http_bridge {

View File

@ -26,53 +26,81 @@
-import(emqx_schema, [mk_duration/2]).
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, "config")}}].
fields("config").
fields("config") ->
[ {server, hoconsc:mk(emqx_schema:ip_port(), #{default => "127.0.0.1:1883"})}
[ {server,
sc(emqx_schema:ip_port(),
#{ default => "127.0.0.1:1883"
, desc => "The host and port of the remote MQTT broker"
})}
, {reconnect_interval, mk_duration("reconnect interval", #{default => "30s"})}
, {proto_ver, fun proto_ver/1}
, {bridge_mode, hoconsc:mk(boolean(), #{default => true})}
, {username, hoconsc:mk(string())}
, {password, hoconsc:mk(string())}
, {clean_start, hoconsc:mk(boolean(), #{default => true})}
, {proto_ver,
sc(hoconsc:enum([v3, v4, v5]),
#{ default => v4
, desc => "The MQTT protocol version"
})}
, {bridge_mode,
sc(boolean(),
#{ default => true
, desc => "The bridge mode of the MQTT protocol"
})}
, {username,
sc(binary(),
#{ default => "emqx"
, desc => "The username of the MQTT protocol"
})}
, {password,
sc(binary(),
#{ default => "emqx"
, desc => "The password of the MQTT protocol"
})}
, {clientid,
sc(binary(),
#{ default => "emqx_${nodename}"
, desc => "The clientid of the MQTT protocol"
})}
, {clean_start,
sc(boolean(),
#{ default => true
, desc => "The clean-start or the clean-session of the MQTT protocol"
})}
, {keepalive, mk_duration("keepalive", #{default => "300s"})}
, {retry_interval, mk_duration("retry interval", #{default => "30s"})}
, {max_inflight, hoconsc:mk(integer(), #{default => 32})}
, {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))}
, {ingress_channels, hoconsc:mk(hoconsc:map(id, hoconsc:ref(?MODULE, "ingress_channels")), #{default => []})}
, {egress_channels, hoconsc:mk(hoconsc:map(id, hoconsc:ref(?MODULE, "egress_channels")), #{default => []})}
, {max_inflight, sc(integer(), #{default => 32})}
, {replayq, sc(ref("replayq"))}
, {ingress_channels, sc(hoconsc:map(id, ref("ingress_channels")), #{default => []})}
, {egress_channels, sc(hoconsc:map(id, ref("egress_channels")), #{default => []})}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress_channels") ->
%% the message maybe subscribed by rules, in this case 'local_topic' is not necessary
[ {subscribe_remote_topic, hoconsc:mk(binary(), #{nullable => false})}
, {local_topic, hoconsc:mk(binary())}
, {subscribe_qos, hoconsc:mk(qos(), #{default => 1})}
[ {subscribe_remote_topic, sc(binary(), #{nullable => false})}
, {local_topic, sc(binary())}
, {subscribe_qos, sc(qos(), #{default => 1})}
] ++ common_inout_confs();
fields("egress_channels") ->
%% the message maybe sent from rules, in this case 'subscribe_local_topic' is not necessary
[ {subscribe_local_topic, hoconsc:mk(binary())}
, {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
[ {subscribe_local_topic, sc(binary())}
, {remote_topic, sc(binary(), #{default => <<"${topic}">>})}
] ++ common_inout_confs();
fields("replayq") ->
[ {dir, hoconsc:union([boolean(), string()])}
, {seg_bytes, hoconsc:mk(emqx_schema:bytesize(), #{default => "100MB"})}
, {offload, hoconsc:mk(boolean(), #{default => false})}
, {max_total_bytes, hoconsc:mk(emqx_schema:bytesize(), #{default => "1024MB"})}
, {seg_bytes, sc(emqx_schema:bytesize(), #{default => "100MB"})}
, {offload, sc(boolean(), #{default => false})}
, {max_total_bytes, sc(emqx_schema:bytesize(), #{default => "1024MB"})}
].
common_inout_confs() ->
[ {qos, hoconsc:mk(qos(), #{default => <<"${qos}">>})}
, {retain, hoconsc:mk(hoconsc:union([boolean(), binary()]), #{default => <<"${retain}">>})}
, {payload, hoconsc:mk(binary(), #{default => <<"${payload}">>})}
[ {qos, sc(qos(), #{default => <<"${qos}">>})}
, {retain, sc(hoconsc:union([boolean(), binary()]), #{default => <<"${retain}">>})}
, {payload, sc(binary(), #{default => <<"${payload}">>})}
].
qos() ->
hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]).
proto_ver(type) -> hoconsc:enum([v3, v4, v5]);
proto_ver(default) -> v4;
proto_ver(_) -> undefined.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).