emqx/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema

243 lines
8.2 KiB
Erlang

%%-*- mode: erlang -*-
%%--------------------------------------------------------------------
%% Bridges
%%--------------------------------------------------------------------
{mapping, "bridge.mqtt.$name.address", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.proto_ver", "emqx_bridge_mqtt.bridges", [
{datatype, {enum, [mqttv3, mqttv4, mqttv5]}}
]}.
{mapping, "bridge.mqtt.$name.bridge_mode", "emqx_bridge_mqtt.bridges", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{mapping, "bridge.mqtt.$name.start_type", "emqx_bridge_mqtt.bridges", [
{datatype, {enum, [manual, auto]}},
{default, auto}
]}.
{mapping, "bridge.mqtt.$name.clientid", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.clean_start", "emqx_bridge_mqtt.bridges", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "bridge.mqtt.$name.username", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.password", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.forwards", "emqx_bridge_mqtt.bridges", [
{datatype, string},
{default, ""}
]}.
{mapping, "bridge.mqtt.$name.forward_mountpoint", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.subscription.$id.topic", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.subscription.$id.qos", "emqx_bridge_mqtt.bridges", [
{datatype, integer}
]}.
{mapping, "bridge.mqtt.$name.receive_mountpoint", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.ssl", "emqx_bridge_mqtt.bridges", [
{datatype, flag},
{default, off}
]}.
{mapping, "bridge.mqtt.$name.cacertfile", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.certfile", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.keyfile", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.ciphers", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.psk_ciphers", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.keepalive", "emqx_bridge_mqtt.bridges", [
{default, "10s"},
{datatype, {duration, s}}
]}.
{mapping, "bridge.mqtt.$name.tls_versions", "emqx_bridge_mqtt.bridges", [
{datatype, string},
{default, "tlsv1,tlsv1.1,tlsv1.2"}
]}.
{mapping, "bridge.mqtt.$name.reconnect_interval", "emqx_bridge_mqtt.bridges", [
{default, "30s"},
{datatype, {duration, ms}}
]}.
{mapping, "bridge.mqtt.$name.retry_interval", "emqx_bridge_mqtt.bridges", [
{default, "20s"},
{datatype, {duration, ms}}
]}.
{mapping, "bridge.mqtt.$name.max_inflight_size", "emqx_bridge_mqtt.bridges", [
{default, 0},
{datatype, integer}
]}.
{mapping, "bridge.mqtt.$name.batch_size", "emqx_bridge_mqtt.bridges", [
{default, 0},
{datatype, integer}
]}.
{mapping, "bridge.mqtt.$name.queue.replayq_dir", "emqx_bridge_mqtt.bridges", [
{datatype, string}
]}.
{mapping, "bridge.mqtt.$name.queue.replayq_seg_bytes", "emqx_bridge_mqtt.bridges", [
{datatype, bytesize}
]}.
{mapping, "bridge.mqtt.$name.queue.max_total_size", "emqx_bridge_mqtt.bridges", [
{datatype, bytesize}
]}.
{translation, "emqx_bridge_mqtt.bridges", fun(Conf) ->
MapPSKCiphers = fun(PSKCiphers) ->
lists:map(
fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
("PSK-RC4-SHA") -> {psk, rc4_128, sha}
end, PSKCiphers)
end,
Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
IsSsl = fun(cacertfile) -> true;
(certfile) -> true;
(keyfile) -> true;
(ciphers) -> true;
(psk_ciphers) -> true;
(tls_versions) -> true;
(_Opt) -> false
end,
Parse = fun(tls_versions, Vers) ->
[{versions, [list_to_atom(S) || S <- Split(Vers)]}];
(ciphers, Ciphers) ->
[{ciphers, Split(Ciphers)}];
(psk_ciphers, Ciphers) ->
[{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}];
(Opt, Val) ->
[{Opt, Val}]
end,
Merge = fun(forwards, Val, Opts) ->
[{forwards, string:tokens(Val, ",")}|Opts];
(Opt, Val, Opts) ->
case IsSsl(Opt) of
true ->
SslOpts = Parse(Opt, Val) ++ proplists:get_value(ssl_opts, Opts, []),
lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
false ->
[{Opt, Val}|Opts]
end
end,
Queue = fun(Name) ->
Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".queue", Conf),
QOpts = [{list_to_atom(QOpt), QValue}|| {[_, _, _, "queue", QOpt], QValue} <- Configs],
maps:from_list(QOpts)
end,
Subscriptions = fun(Name) ->
Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".subscription", Conf),
lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, _, "subscription", I, "topic"], Topic} <- Configs])],
[QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, _, "subscription", I, "qos"], QoS} <- Configs])])
end,
IsNodeAddr = fun(Addr) ->
case string:tokens(Addr, "@") of
[_NodeName, _Hostname] -> true;
_ -> false
end
end,
ConnMod = fun(Name) ->
[AddrConfig] = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".address", Conf),
{_, Addr} = AddrConfig,
Subs = Subscriptions(Name),
case IsNodeAddr(Addr) of
true when Subs =/= [] ->
error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs});
true ->
emqx_bridge_rpc;
false ->
emqx_bridge_mqtt
end
end,
%% to be backward compatible
Translate =
fun Tr(queue, Q, Cfg) ->
NewQ = maps:fold(Tr, #{}, Q),
Cfg#{queue => NewQ};
Tr(address, Addr0, Cfg) ->
Addr = case IsNodeAddr(Addr0) of
true -> list_to_atom(Addr0);
false -> Addr0
end,
Cfg#{address => Addr};
Tr(reconnect_interval, Ms, Cfg) ->
Cfg#{reconnect_delay_ms => Ms};
Tr(proto_ver, Ver, Cfg) ->
Cfg#{proto_ver =>
case Ver of
mqttv3 -> v3;
mqttv4 -> v4;
mqttv5 -> v5;
_ -> v4
end};
Tr(Key, Value, Cfg) ->
Cfg#{Key => Value}
end,
C = lists:foldl(
fun({["bridge", "mqtt", Name, Opt], Val}, Acc) ->
%% e.g #{aws => [{OptKey, OptVal}]}
Init = [{list_to_atom(Opt), Val},
{connect_module, ConnMod(Name)},
{subscriptions, Subscriptions(Name)},
{queue, Queue(Name)}],
maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc);
(_, Acc) -> Acc
end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.mqtt", Conf))),
C1 = maps:map(fun(Bn, Bc) ->
maps:to_list(maps:fold(Translate, #{}, maps:from_list(Bc)))
end, C),
maps:to_list(C1)
end}.