245 lines
8.3 KiB
Erlang
245 lines
8.3 KiB
Erlang
%%-*- mode: erlang -*-
|
|
%%--------------------------------------------------------------------
|
|
%% Bridges
|
|
%%--------------------------------------------------------------------
|
|
{mapping, "bridge.mqtt.$name.address", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.proto_ver", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, {enum, [mqttv3, mqttv4, mqttv5]}}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.bridge_mode", "emqx_bridge_mqtt.bridges", [
|
|
{default, false},
|
|
{datatype, {enum, [true, false]}}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.start_type", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, {enum, [manual, auto]}},
|
|
{default, auto}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.clientid", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.clean_start", "emqx_bridge_mqtt.bridges", [
|
|
{default, true},
|
|
{datatype, {enum, [true, false]}}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.username", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.password", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.forwards", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string},
|
|
{default, ""}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.forward_mountpoint", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.subscription.$id.topic", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.subscription.$id.qos", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, integer}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.receive_mountpoint", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.ssl", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, flag},
|
|
{default, off}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.cacertfile", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.certfile", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.keyfile", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.ciphers", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.psk_ciphers", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.keepalive", "emqx_bridge_mqtt.bridges", [
|
|
{default, "10s"},
|
|
{datatype, {duration, s}}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.tls_versions", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string},
|
|
{default, "tlsv1.3,tlsv1.2,tlsv1.1,tlsv1"}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.reconnect_interval", "emqx_bridge_mqtt.bridges", [
|
|
{default, "30s"},
|
|
{datatype, {duration, ms}}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.retry_interval", "emqx_bridge_mqtt.bridges", [
|
|
{default, "20s"},
|
|
{datatype, {duration, s}}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.max_inflight_size", "emqx_bridge_mqtt.bridges", [
|
|
{default, 0},
|
|
{datatype, integer}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.batch_size", "emqx_bridge_mqtt.bridges", [
|
|
{default, 0},
|
|
{datatype, integer}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.queue.replayq_dir", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, string}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.queue.replayq_seg_bytes", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, bytesize}
|
|
]}.
|
|
|
|
{mapping, "bridge.mqtt.$name.queue.max_total_size", "emqx_bridge_mqtt.bridges", [
|
|
{datatype, bytesize}
|
|
]}.
|
|
|
|
{translation, "emqx_bridge_mqtt.bridges", fun(Conf) ->
|
|
|
|
MapPSKCiphers = fun(PSKCiphers) ->
|
|
lists:map(
|
|
fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
|
|
("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
|
|
("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
|
|
("PSK-RC4-SHA") -> {psk, rc4_128, sha}
|
|
end, PSKCiphers)
|
|
end,
|
|
|
|
Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
|
|
|
|
IsSsl = fun(cacertfile) -> true;
|
|
(certfile) -> true;
|
|
(keyfile) -> true;
|
|
(ciphers) -> true;
|
|
(psk_ciphers) -> true;
|
|
(tls_versions) -> true;
|
|
(_Opt) -> false
|
|
end,
|
|
|
|
Parse = fun(tls_versions, Vers) ->
|
|
[{versions, [list_to_atom(S) || S <- Split(Vers)]}];
|
|
(ciphers, Ciphers) ->
|
|
[{ciphers, Split(Ciphers)}];
|
|
(psk_ciphers, Ciphers) ->
|
|
[{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}];
|
|
(Opt, Val) ->
|
|
[{Opt, Val}]
|
|
end,
|
|
|
|
Merge = fun(forwards, Val, Opts) ->
|
|
[{forwards, string:tokens(Val, ",")}|Opts];
|
|
(Opt, Val, Opts) ->
|
|
case IsSsl(Opt) of
|
|
true ->
|
|
SslOpts = Parse(Opt, Val) ++ proplists:get_value(ssl_opts, Opts, []),
|
|
lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
|
|
false ->
|
|
[{Opt, Val}|Opts]
|
|
end
|
|
end,
|
|
Queue = fun(Name) ->
|
|
Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".queue", Conf),
|
|
|
|
QOpts = [{list_to_atom(QOpt), QValue}|| {[_, _, _, "queue", QOpt], QValue} <- Configs],
|
|
maps:from_list(QOpts)
|
|
end,
|
|
Subscriptions = fun(Name) ->
|
|
Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".subscription", Conf),
|
|
lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, _, "subscription", I, "topic"], Topic} <- Configs])],
|
|
[QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, _, "subscription", I, "qos"], QoS} <- Configs])])
|
|
end,
|
|
IsNodeAddr = fun(Addr) ->
|
|
case string:tokens(Addr, "@") of
|
|
[_NodeName, _Hostname] -> true;
|
|
_ -> false
|
|
end
|
|
end,
|
|
ConnMod = fun(Name) ->
|
|
|
|
[AddrConfig] = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".address", Conf),
|
|
{_, Addr} = AddrConfig,
|
|
|
|
Subs = Subscriptions(Name),
|
|
case IsNodeAddr(Addr) of
|
|
true when Subs =/= [] ->
|
|
error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs});
|
|
true ->
|
|
emqx_bridge_rpc;
|
|
false ->
|
|
emqx_bridge_mqtt
|
|
end
|
|
end,
|
|
|
|
%% to be backward compatible
|
|
Translate =
|
|
fun Tr(queue, Q, Cfg) ->
|
|
NewQ = maps:fold(Tr, #{}, Q),
|
|
Cfg#{queue => NewQ};
|
|
Tr(address, Addr0, Cfg) ->
|
|
Addr = case IsNodeAddr(Addr0) of
|
|
true -> list_to_atom(Addr0);
|
|
false -> Addr0
|
|
end,
|
|
Cfg#{address => Addr};
|
|
Tr(reconnect_interval, Ms, Cfg) ->
|
|
Cfg#{reconnect_delay_ms => Ms};
|
|
Tr(proto_ver, Ver, Cfg) ->
|
|
Cfg#{proto_ver =>
|
|
case Ver of
|
|
mqttv3 -> v3;
|
|
mqttv4 -> v4;
|
|
mqttv5 -> v5;
|
|
_ -> v4
|
|
end};
|
|
Tr(max_inflight_size, Size, Cfg) ->
|
|
Cfg#{max_inflight => Size};
|
|
Tr(Key, Value, Cfg) ->
|
|
Cfg#{Key => Value}
|
|
end,
|
|
C = lists:foldl(
|
|
fun({["bridge", "mqtt", Name, Opt], Val}, Acc) ->
|
|
%% e.g #{aws => [{OptKey, OptVal}]}
|
|
Init = [{list_to_atom(Opt), Val},
|
|
{connect_module, ConnMod(Name)},
|
|
{subscriptions, Subscriptions(Name)},
|
|
{queue, Queue(Name)}],
|
|
maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc);
|
|
(_, Acc) -> Acc
|
|
end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.mqtt", Conf))),
|
|
C1 = maps:map(fun(Bn, Bc) ->
|
|
maps:to_list(maps:fold(Translate, #{}, maps:from_list(Bc)))
|
|
end, C),
|
|
maps:to_list(C1)
|
|
end}.
|