fix(ekka): cluster cannot get started with the new config

This commit is contained in:
Shawn 2021-07-15 20:28:12 +08:00
parent a1488b3946
commit 543f2c78c5
2 changed files with 45 additions and 3 deletions

View File

@ -101,7 +101,7 @@ fields("static") ->
fields("mcast") -> fields("mcast") ->
[ {"addr", t(string(), undefined, "239.192.0.1")} [ {"addr", t(string(), undefined, "239.192.0.1")}
, {"ports", t(comma_separated_list(), undefined, "4369")} , {"ports", t(hoconsc:array(integer()), undefined, [4369, 4370])}
, {"iface", t(string(), undefined, "0.0.0.0")} , {"iface", t(string(), undefined, "0.0.0.0")}
, {"ttl", t(integer(), undefined, 255)} , {"ttl", t(integer(), undefined, 255)}
, {"loop", t(boolean(), undefined, true)} , {"loop", t(boolean(), undefined, true)}
@ -523,12 +523,19 @@ base_listener() ->
, {"rate_limit", ref("rate_limit")} , {"rate_limit", ref("rate_limit")}
]. ].
translations() -> ["kernel"]. translations() -> ["ekka", "kernel"].
translation("ekka") ->
[ {"cluster_discovery", fun tr_cluster__discovery/1}];
translation("kernel") -> translation("kernel") ->
[ {"logger_level", fun tr_logger_level/1} [ {"logger_level", fun tr_logger_level/1}
, {"logger", fun tr_logger/1}]. , {"logger", fun tr_logger/1}].
tr_cluster__discovery(Conf) ->
Strategy = conf_get("cluster.discovery_strategy", Conf),
{Strategy, filter(options(Strategy, Conf))}.
tr_logger_level(Conf) -> conf_get("log.primary_level", Conf). tr_logger_level(Conf) -> conf_get("log.primary_level", Conf).
tr_logger(Conf) -> tr_logger(Conf) ->
@ -806,3 +813,34 @@ to_erl_cipher_suite(Str) ->
{error, Reason} -> error({invalid_cipher, Reason}); {error, Reason} -> error({invalid_cipher, Reason});
Cipher -> Cipher Cipher -> Cipher
end. end.
options(static, Conf) ->
[{seeds, [list_to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, [])]}];
options(mcast, Conf) ->
{ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)),
{ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)),
Ports = conf_get("cluster.mcast.ports", Conf),
[{addr, Addr}, {ports, Ports}, {iface, Iface},
{ttl, conf_get("cluster.mcast.ttl", Conf, 1)},
{loop, conf_get("cluster.mcast.loop", Conf, true)}];
options(dns, Conf) ->
[{name, conf_get("cluster.dns.name", Conf)},
{app, conf_get("cluster.dns.app", Conf)}];
options(etcd, Conf) ->
Namespace = "cluster.etcd.ssl",
SslOpts = fun(C) ->
Options = keys(Namespace, C),
lists:map(fun(Key) -> {list_to_atom(Key), conf_get([Namespace, Key], Conf)} end, Options) end,
[{server, conf_get("cluster.etcd.server", Conf)},
{prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")},
{node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)},
{ssl_options, filter(SslOpts(Conf))}];
options(k8s, Conf) ->
[{apiserver, conf_get("cluster.k8s.apiserver", Conf)},
{service_name, conf_get("cluster.k8s.service_name", Conf)},
{address_type, conf_get("cluster.k8s.address_type", Conf, ip)},
{app_name, conf_get("cluster.k8s.app_name", Conf)},
{namespace, conf_get("cluster.k8s.namespace", Conf)},
{suffix, conf_get("cluster.k8s.suffix", Conf, "")}];
options(manual, _Conf) ->
[].

View File

@ -49,6 +49,7 @@
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-define(CHAIN_TAB, emqx_authn_chain). -define(CHAIN_TAB, emqx_authn_chain).
@ -69,7 +70,10 @@ mnesia(boot) ->
{record_name, chain}, {record_name, chain},
{local_content, true}, {local_content, true},
{attributes, record_info(fields, chain)}, {attributes, record_info(fields, chain)},
{storage_properties, StoreProps}]). {storage_properties, StoreProps}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?CHAIN_TAB, ram_copies).
enable() -> enable() ->
case emqx:hook('client.authenticate', {?MODULE, authenticate, []}) of case emqx:hook('client.authenticate', {?MODULE, authenticate, []}) of