Merge pull request #11294 from ieQu1/mria-fixes

Mria fixes
This commit is contained in:
ieQu1 2023-07-19 15:10:42 +02:00 committed by GitHub
commit 57e39f42c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 199 additions and 138 deletions

View File

@ -27,7 +27,7 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.7"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.13"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},

View File

@ -110,17 +110,21 @@ broker(_) ->
%% @doc Cluster with other nodes
cluster(["join", SNode]) ->
case ekka:join(ekka_node:parse_name(SNode)) of
case mria:join(ekka_node:parse_name(SNode)) of
ok ->
emqx_ctl:print("Join the cluster successfully.~n"),
cluster(["status"]);
%% FIXME: running status on the replicant immediately
%% after join produces stale output
mria_rlog:role() =:= core andalso
cluster(["status"]),
ok;
ignore ->
emqx_ctl:print("Ignore.~n");
{error, Error} ->
emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
end;
cluster(["leave"]) ->
case ekka:leave() of
case mria:leave() of
ok ->
emqx_ctl:print("Leave the cluster successfully.~n"),
cluster(["status"]);
@ -128,7 +132,7 @@ cluster(["leave"]) ->
emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
end;
cluster(["force-leave", SNode]) ->
case ekka:force_leave(ekka_node:parse_name(SNode)) of
case mria:force_leave(ekka_node:parse_name(SNode)) of
ok ->
emqx_ctl:print("Remove the node from cluster successfully.~n"),
cluster(["status"]);
@ -138,9 +142,9 @@ cluster(["force-leave", SNode]) ->
emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error])
end;
cluster(["status"]) ->
emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]);
emqx_ctl:print("Cluster status: ~p~n", [cluster_info()]);
cluster(["status", "--json"]) ->
Info = sort_map_list_fields(ekka_cluster:info()),
Info = sort_map_list_fields(cluster_info()),
emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Info)]);
cluster(_) ->
emqx_ctl:usage([
@ -158,9 +162,7 @@ sort_map_list_fields(Map) when is_map(Map) ->
end,
Map,
maps:keys(Map)
);
sort_map_list_fields(NotMap) ->
NotMap.
).
sort_map_list_field(Field, Map) ->
case maps:get(Field, Map) of
@ -925,3 +927,26 @@ with_log(Fun, Msg) ->
{error, Reason} ->
emqx_ctl:print("~s FAILED~n~p~n", [Msg, Reason])
end.
cluster_info() ->
RunningNodes = safe_call_mria(running_nodes, [], []),
StoppedNodes = safe_call_mria(cluster_nodes, [stopped], []),
#{
running_nodes => RunningNodes,
stopped_nodes => StoppedNodes
}.
%% CLI starts before mria, so we should handle errors gracefully:
safe_call_mria(Fun, Args, OnFail) ->
try
apply(mria, Fun, Args)
catch
EC:Err:Stack ->
?SLOG(warning, #{
msg => "Call to mria failed",
call => {mria, Fun, Args},
EC => Err,
stacktrace => Stack
}),
OnFail
end.

View File

@ -0,0 +1 @@
Fix `emqx_ctl cluster join`, `leave`, and `status` commands.

207
mix.exs
View File

@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
{:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true},
{:ekka, github: "emqx/ekka", tag: "0.15.5", override: true},
{:ekka, github: "emqx/ekka", tag: "0.15.7", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
{:minirest, github: "emqx/minirest", tag: "1.3.11", override: true},
@ -326,7 +326,7 @@ defmodule EMQXUmbrella.MixProject do
end
def applications(edition_type) do
[
system_apps = [
crypto: :permanent,
public_key: :permanent,
asn1: :permanent,
@ -339,96 +339,121 @@ defmodule EMQXUmbrella.MixProject do
redbug: :permanent,
xmerl: :permanent,
hocon: :load,
telemetry: :permanent,
emqx: :load,
emqx_conf: :load,
emqx_machine: :permanent
] ++
if(enable_rocksdb?(),
do: [mnesia_rocksdb: :load],
else: []
) ++
[
mnesia: :load,
ekka: :load,
esasl: :load,
observer_cli: :permanent,
tools: :permanent,
covertool: :load,
system_monitor: :load,
emqx_utils: :load,
emqx_http_lib: :permanent,
emqx_resource: :permanent,
emqx_connector: :permanent,
emqx_authn: :permanent,
emqx_authz: :permanent,
emqx_auto_subscribe: :permanent,
emqx_gateway: :permanent,
emqx_gateway_stomp: :permanent,
emqx_gateway_mqttsn: :permanent,
emqx_gateway_coap: :permanent,
emqx_gateway_lwm2m: :permanent,
emqx_gateway_exproto: :permanent,
emqx_exhook: :permanent,
emqx_bridge: :permanent,
emqx_bridge_mqtt: :permanent,
emqx_bridge_http: :permanent,
emqx_rule_engine: :permanent,
emqx_modules: :permanent,
emqx_management: :permanent,
emqx_dashboard: :permanent,
emqx_retainer: :permanent,
emqx_prometheus: :permanent,
emqx_psk: :permanent,
emqx_slow_subs: :permanent,
emqx_mongodb: :permanent,
emqx_redis: :permanent,
emqx_mysql: :permanent,
emqx_plugins: :permanent,
emqx_mix: :none
] ++
if(enable_quicer?(), do: [quicer: :permanent], else: []) ++
if(enable_bcrypt?(), do: [bcrypt: :permanent], else: []) ++
if(enable_jq?(), do: [jq: :load], else: []) ++
if(is_app(:observer),
do: [observer: :load],
else: []
) ++
if(edition_type == :enterprise,
do: [
emqx_license: :permanent,
emqx_enterprise: :load,
emqx_bridge_kafka: :permanent,
emqx_bridge_pulsar: :permanent,
emqx_bridge_gcp_pubsub: :permanent,
emqx_bridge_cassandra: :permanent,
emqx_bridge_opents: :permanent,
emqx_bridge_clickhouse: :permanent,
emqx_bridge_dynamo: :permanent,
emqx_bridge_hstreamdb: :permanent,
emqx_bridge_influxdb: :permanent,
emqx_bridge_iotdb: :permanent,
emqx_bridge_matrix: :permanent,
emqx_bridge_mongodb: :permanent,
emqx_bridge_mysql: :permanent,
emqx_bridge_pgsql: :permanent,
emqx_bridge_redis: :permanent,
emqx_bridge_rocketmq: :permanent,
emqx_bridge_tdengine: :permanent,
emqx_bridge_timescale: :permanent,
emqx_bridge_sqlserver: :permanent,
emqx_oracle: :permanent,
emqx_bridge_oracle: :permanent,
emqx_bridge_rabbitmq: :permanent,
emqx_schema_registry: :permanent,
emqx_eviction_agent: :permanent,
emqx_node_rebalance: :permanent,
emqx_ft: :permanent
],
else: [
emqx_telemetry: :permanent
telemetry: :permanent
]
db_apps =
if enable_rocksdb?() do
[:mnesia_rocksdb]
else
[]
end ++
[
:mnesia,
:mria,
:ekka
]
)
business_apps =
[
:emqx,
:emqx_conf,
:esasl,
:observer_cli,
:tools,
:covertool,
:system_monitor,
:emqx_utils,
:emqx_http_lib,
:emqx_resource,
:emqx_connector,
:emqx_authn,
:emqx_authz,
:emqx_auto_subscribe,
:emqx_gateway,
:emqx_gateway_stomp,
:emqx_gateway_mqttsn,
:emqx_gateway_coap,
:emqx_gateway_lwm2m,
:emqx_gateway_exproto,
:emqx_exhook,
:emqx_bridge,
:emqx_bridge_mqtt,
:emqx_bridge_http,
:emqx_rule_engine,
:emqx_modules,
:emqx_management,
:emqx_dashboard,
:emqx_retainer,
:emqx_prometheus,
:emqx_psk,
:emqx_slow_subs,
:emqx_mongodb,
:emqx_redis,
:emqx_mysql,
:emqx_plugins,
:emqx_mix
] ++
if enable_quicer?() do
[:quicer]
else
[]
end ++
if enable_bcrypt?() do
[:bcrypt]
else
[]
end ++
if enable_jq?() do
[:jq]
else
[]
end ++
if(is_app(:observer),
do: [:observer],
else: []
) ++
case edition_type do
:enterprise ->
[
:emqx_license,
:emqx_enterprise,
:emqx_bridge_kafka,
:emqx_bridge_pulsar,
:emqx_bridge_gcp_pubsub,
:emqx_bridge_cassandra,
:emqx_bridge_opents,
:emqx_bridge_clickhouse,
:emqx_bridge_dynamo,
:emqx_bridge_hstreamdb,
:emqx_bridge_influxdb,
:emqx_bridge_iotdb,
:emqx_bridge_matrix,
:emqx_bridge_mongodb,
:emqx_bridge_mysql,
:emqx_bridge_pgsql,
:emqx_bridge_redis,
:emqx_bridge_rocketmq,
:emqx_bridge_tdengine,
:emqx_bridge_timescale,
:emqx_bridge_sqlserver,
:emqx_oracle,
:emqx_bridge_oracle,
:emqx_bridge_rabbitmq,
:emqx_schema_registry,
:emqx_eviction_agent,
:emqx_node_rebalance,
:emqx_ft
]
_ ->
[:emqx_telemetry]
end
system_apps ++
Enum.map(db_apps, &{&1, :load}) ++
[emqx_machine: :permanent] ++
Enum.map(business_apps, &{&1, :load})
end
defp is_app(name) do

View File

@ -62,7 +62,7 @@
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.5"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.7"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.11"}}}

View File

@ -386,38 +386,43 @@ overlay_vars_pkg(pkg) ->
].
relx_apps(ReleaseType, Edition) ->
[
kernel,
sasl,
crypto,
public_key,
asn1,
syntax_tools,
ssl,
os_mon,
inets,
compiler,
runtime_tools,
redbug,
xmerl,
{hocon, load},
telemetry,
% started by emqx_machine
{emqx, load},
{emqx_conf, load},
emqx_machine
] ++
[{mnesia_rocksdb, load} || is_rocksdb_supported()] ++
SystemApps =
[
{mnesia, load},
{ekka, load},
{esasl, load},
kernel,
sasl,
crypto,
public_key,
asn1,
syntax_tools,
ssl,
os_mon,
inets,
compiler,
runtime_tools,
redbug,
xmerl,
{hocon, load},
telemetry
],
DBApps =
[mnesia_rocksdb || is_rocksdb_supported()] ++
[
mnesia,
mria,
ekka
],
BusinessApps =
[
emqx,
emqx_conf,
esasl,
observer_cli,
tools,
{covertool, load},
covertool,
% started by emqx_machine
{system_monitor, load},
{emqx_utils, load},
system_monitor,
emqx_utils,
emqx_http_lib,
emqx_resource,
emqx_connector,
@ -447,13 +452,18 @@ relx_apps(ReleaseType, Edition) ->
emqx_mysql,
emqx_plugins
] ++
[quicer || is_quicer_supported()] ++
[bcrypt || provide_bcrypt_release(ReleaseType)] ++
%% Started automatically when needed (only needs to be started when the
%% port implementation is used)
[{jq, load} || is_jq_supported()] ++
[{observer, load} || is_app(observer)] ++
relx_apps_per_edition(Edition).
[quicer || is_quicer_supported()] ++
[bcrypt || provide_bcrypt_release(ReleaseType)] ++
%% Started automatically when needed (only needs to be started when the
%% port implementation is used)
[jq || is_jq_supported()] ++
[observer || is_app(observer)] ++
relx_apps_per_edition(Edition),
SystemApps ++
%% EMQX starts the DB and the business applications:
[{App, load} || App <- DBApps] ++
[emqx_machine] ++
[{App, load} || App <- BusinessApps].
is_app(Name) ->
case application:load(Name) of
@ -465,7 +475,7 @@ is_app(Name) ->
relx_apps_per_edition(ee) ->
[
emqx_license,
{emqx_enterprise, load},
emqx_enterprise,
emqx_bridge_kafka,
emqx_bridge_pulsar,
emqx_bridge_gcp_pubsub,