feat: support array bridge_mqtt conf

This commit is contained in:
Turtle 2021-08-25 12:53:53 +08:00 committed by Rory Z
parent 78cf115a90
commit e4f5e9332e
6 changed files with 62 additions and 64 deletions

View File

@ -15,7 +15,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.11.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.12.1"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}

View File

@ -299,7 +299,7 @@ save_schema_mod_and_names(SchemaMod) ->
RootNames = SchemaMod:structs(), RootNames = SchemaMod:structs(),
OldMods = get_schema_mod(), OldMods = get_schema_mod(),
OldNames = get_root_names(), OldNames = get_root_names(),
NewMods = maps:from_list([{bin(Name), SchemaMod} || Name <- RootNames]), NewMods = maps:from_list([{root_bin(Name), SchemaMod} || Name <- RootNames]),
persistent_term:put(?PERSIS_SCHEMA_MODS, #{ persistent_term:put(?PERSIS_SCHEMA_MODS, #{
mods => maps:merge(OldMods, NewMods), mods => maps:merge(OldMods, NewMods),
names => lists:usort(OldNames ++ RootNames) names => lists:usort(OldNames ++ RootNames)
@ -440,3 +440,6 @@ conf_key(?CONF, RootName) ->
atom(RootName); atom(RootName);
conf_key(?RAW_CONF, RootName) -> conf_key(?RAW_CONF, RootName) ->
bin(RootName). bin(RootName).
root_bin({array, Bin}) -> bin(Bin);
root_bin(Bin) -> bin(Bin).

View File

@ -2,57 +2,55 @@
## Configuration for EMQ X MQTT Broker Bridge ## Configuration for EMQ X MQTT Broker Bridge
##==================================================================== ##====================================================================
emqx_bridge_mqtt { bridge_mqtt: [
bridges:[ # {
# { # name: "mqtt1"
# name: "mqtt1" # start_type: auto
# start_type: auto # forwards: ["test/#"],
# forwards: ["test/#"], # forward_mountpoint: ""
# forward_mountpoint: "" # reconnect_interval: "30s"
# reconnect_interval: "30s" # batch_size: 100
# batch_size: 100 # queue {
# queue { # replayq_dir: "{{ platform_data_dir }}/replayq/bridge_mqtt/"
# replayq_dir: "{{ platform_data_dir }}/replayq/bridge_mqtt/" # replayq_seg_bytes: "100MB"
# replayq_seg_bytes: "100MB" # replayq_offload_mode: false
# replayq_offload_mode: false # replayq_max_total_bytes: "1GB"
# replayq_max_total_bytes: "1GB" # },
# }, # config {
# config { # conn_type: mqtt
# conn_type: mqtt # address: "127.0.0.1:1883"
# address: "127.0.0.1:1883" # proto_ver: v4
# proto_ver: v4 # bridge_mode: true
# bridge_mode: true # clientid: "client1"
# clientid: "client1" # clean_start: true
# clean_start: true # username: "username1"
# username: "username1" # password: ""
# password: "" # keepalive: 300
# keepalive: 300 # subscriptions: [{
# subscriptions: [{ # topic: "t/#"
# topic: "t/#" # qos: 1
# qos: 1 # }]
# }] # receive_mountpoint: ""
# receive_mountpoint: "" # retry_interval: "30s"
# retry_interval: "30s" # max_inflight: 32
# max_inflight: 32 # }
# } # },
# }, # {
# { # name: "rpc1"
# name: "rpc1" # start_type: auto
# start_type: auto # forwards: ["test/#"],
# forwards: ["test/#"], # forward_mountpoint: ""
# forward_mountpoint: "" # reconnect_interval: "30s"
# reconnect_interval: "30s" # batch_size: 100
# batch_size: 100 # queue {
# queue { # replayq_dir: "{{ platform_data_dir }}/replayq/bridge_mqtt/"
# replayq_dir: "{{ platform_data_dir }}/replayq/bridge_mqtt/" # replayq_seg_bytes: "100MB"
# replayq_seg_bytes: "100MB" # replayq_offload_mode: false
# replayq_offload_mode: false # replayq_max_total_bytes: "1GB"
# replayq_max_total_bytes: "1GB" # },
# }, # config {
# config { # conn_type: rpc
# conn_type: rpc # node: "emqx@127.0.0.1"
# node: "emqx@127.0.0.1" # }
# } # }
# } ]
]
}

View File

@ -23,13 +23,9 @@
-export([ structs/0 -export([ structs/0
, fields/1]). , fields/1]).
structs() -> ["emqx_bridge_mqtt"]. structs() -> [{array, "bridge_mqtt"}].
fields("emqx_bridge_mqtt") -> fields("bridge_mqtt") ->
[ {bridges, hoconsc:array(hoconsc:ref(?MODULE, "bridges"))}
];
fields("bridges") ->
[ {name, emqx_schema:t(string(), undefined, true)} [ {name, emqx_schema:t(string(), undefined, true)}
, {start_type, fun start_type/1} , {start_type, fun start_type/1}
, {forwards, fun forwards/1} , {forwards, fun forwards/1}

View File

@ -204,7 +204,8 @@ fields(Name) ->
find_field(Name, []) -> find_field(Name, []) ->
error({unknown_config_struct_field, Name}); error({unknown_config_struct_field, Name});
find_field(Name, [SchemaModule | Rest]) -> find_field(Name, [SchemaModule | Rest]) ->
case lists:member(Name, SchemaModule:structs()) of case lists:member(Name, SchemaModule:structs()) orelse
lists:keymember(Name, 2, SchemaModule:structs()) of
true -> SchemaModule:fields(Name); true -> SchemaModule:fields(Name);
false -> find_field(Name, Rest) false -> find_field(Name, Rest)
end. end.

View File

@ -61,7 +61,7 @@
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {getopt, "1.0.2"} , {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.11.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.12.1"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
]}. ]}.