%%-*- mode: erlang -*- %%-------------------------------------------------------------------- %% Bridges %%-------------------------------------------------------------------- {mapping, "bridge.mqtt.$name.address", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.proto_ver", "emqx_bridge_mqtt.bridges", [ {datatype, {enum, [mqttv3, mqttv4, mqttv5]}} ]}. {mapping, "bridge.mqtt.$name.bridge_mode", "emqx_bridge_mqtt.bridges", [ {default, false}, {datatype, {enum, [true, false]}} ]}. {mapping, "bridge.mqtt.$name.start_type", "emqx_bridge_mqtt.bridges", [ {datatype, {enum, [manual, auto]}}, {default, auto} ]}. {mapping, "bridge.mqtt.$name.clientid", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.clean_start", "emqx_bridge_mqtt.bridges", [ {default, true}, {datatype, {enum, [true, false]}} ]}. {mapping, "bridge.mqtt.$name.username", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.password", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.forwards", "emqx_bridge_mqtt.bridges", [ {datatype, string}, {default, ""} ]}. {mapping, "bridge.mqtt.$name.forward_mountpoint", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.subscription.$id.topic", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.subscription.$id.qos", "emqx_bridge_mqtt.bridges", [ {datatype, integer} ]}. {mapping, "bridge.mqtt.$name.receive_mountpoint", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.ssl", "emqx_bridge_mqtt.bridges", [ {datatype, flag}, {default, off} ]}. {mapping, "bridge.mqtt.$name.cacertfile", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.certfile", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.keyfile", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.ciphers", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.psk_ciphers", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.keepalive", "emqx_bridge_mqtt.bridges", [ {default, "10s"}, {datatype, {duration, s}} ]}. {mapping, "bridge.mqtt.$name.tls_versions", "emqx_bridge_mqtt.bridges", [ {datatype, string}, {default, "tlsv1.3,tlsv1.2,tlsv1.1,tlsv1"} ]}. {mapping, "bridge.mqtt.$name.reconnect_interval", "emqx_bridge_mqtt.bridges", [ {default, "30s"}, {datatype, {duration, ms}} ]}. {mapping, "bridge.mqtt.$name.retry_interval", "emqx_bridge_mqtt.bridges", [ {default, "20s"}, {datatype, {duration, s}} ]}. {mapping, "bridge.mqtt.$name.max_inflight_size", "emqx_bridge_mqtt.bridges", [ {default, 0}, {datatype, integer} ]}. {mapping, "bridge.mqtt.$name.batch_size", "emqx_bridge_mqtt.bridges", [ {default, 0}, {datatype, integer} ]}. {mapping, "bridge.mqtt.$name.queue.replayq_dir", "emqx_bridge_mqtt.bridges", [ {datatype, string} ]}. {mapping, "bridge.mqtt.$name.queue.replayq_seg_bytes", "emqx_bridge_mqtt.bridges", [ {datatype, bytesize} ]}. {mapping, "bridge.mqtt.$name.queue.max_total_size", "emqx_bridge_mqtt.bridges", [ {datatype, bytesize} ]}. {translation, "emqx_bridge_mqtt.bridges", fun(Conf) -> MapPSKCiphers = fun(PSKCiphers) -> lists:map( fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; ("PSK-RC4-SHA") -> {psk, rc4_128, sha} end, PSKCiphers) end, Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, IsSsl = fun(cacertfile) -> true; (certfile) -> true; (keyfile) -> true; (ciphers) -> true; (psk_ciphers) -> true; (tls_versions) -> true; (_Opt) -> false end, Parse = fun(tls_versions, Vers) -> [{versions, [list_to_atom(S) || S <- Split(Vers)]}]; (ciphers, Ciphers) -> [{ciphers, Split(Ciphers)}]; (psk_ciphers, Ciphers) -> [{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}]; (Opt, Val) -> [{Opt, Val}] end, Merge = fun(forwards, Val, Opts) -> [{forwards, string:tokens(Val, ",")}|Opts]; (Opt, Val, Opts) -> case IsSsl(Opt) of true -> SslOpts = Parse(Opt, Val) ++ proplists:get_value(ssl_opts, Opts, []), lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts)); false -> [{Opt, Val}|Opts] end end, Queue = fun(Name) -> Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".queue", Conf), QOpts = [{list_to_atom(QOpt), QValue}|| {[_, _, _, "queue", QOpt], QValue} <- Configs], maps:from_list(QOpts) end, Subscriptions = fun(Name) -> Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".subscription", Conf), lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, _, "subscription", I, "topic"], Topic} <- Configs])], [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, _, "subscription", I, "qos"], QoS} <- Configs])]) end, IsNodeAddr = fun(Addr) -> case string:tokens(Addr, "@") of [_NodeName, _Hostname] -> true; _ -> false end end, ConnMod = fun(Name) -> [AddrConfig] = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".address", Conf), {_, Addr} = AddrConfig, Subs = Subscriptions(Name), case IsNodeAddr(Addr) of true when Subs =/= [] -> error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs}); true -> emqx_bridge_rpc; false -> emqx_bridge_mqtt end end, %% to be backward compatible Translate = fun Tr(queue, Q, Cfg) -> NewQ = maps:fold(Tr, #{}, Q), Cfg#{queue => NewQ}; Tr(address, Addr0, Cfg) -> Addr = case IsNodeAddr(Addr0) of true -> list_to_atom(Addr0); false -> Addr0 end, Cfg#{address => Addr}; Tr(reconnect_interval, Ms, Cfg) -> Cfg#{reconnect_delay_ms => Ms}; Tr(proto_ver, Ver, Cfg) -> Cfg#{proto_ver => case Ver of mqttv3 -> v3; mqttv4 -> v4; mqttv5 -> v5; _ -> v4 end}; Tr(max_inflight_size, Size, Cfg) -> Cfg#{max_inflight => Size}; Tr(Key, Value, Cfg) -> Cfg#{Key => Value} end, C = lists:foldl( fun({["bridge", "mqtt", Name, Opt], Val}, Acc) -> %% e.g #{aws => [{OptKey, OptVal}]} Init = [{list_to_atom(Opt), Val}, {connect_module, ConnMod(Name)}, {subscriptions, Subscriptions(Name)}, {queue, Queue(Name)}], maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc); (_, Acc) -> Acc end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.mqtt", Conf))), C1 = maps:map(fun(Bn, Bc) -> maps:to_list(maps:fold(Translate, #{}, maps:from_list(Bc))) end, C), maps:to_list(C1) end}.