From 186e26e417c114cbd88d38d2cb063e2b047edf2b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 11 Jun 2022 16:21:32 +0200 Subject: [PATCH 1/4] fix(emqx_plugins): call cluster_rpc to update config --- apps/emqx_plugins/src/emqx_plugins.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 8abc86d53..bca0defc5 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -648,7 +648,7 @@ put_config(Key, Value) when is_atom(Key) -> put_config([Key], Value); put_config(Path, Values) when is_list(Path) -> Opts = #{rawconf_with_defaults => true, override_to => cluster}, - case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of + case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of {ok, _} -> ok; Error -> Error end. From 0e06e4acaac63036f1a091b84a843192acb75da1 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 11 Jun 2022 16:22:02 +0200 Subject: [PATCH 2/4] fix(emqx_conf): throw exception on clusetr_call falure Not all callers handle error, some even ignore errors! --- apps/emqx_conf/src/emqx_conf.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index fb0082819..8b769097e 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -223,7 +223,10 @@ check_cluster_rpc_result(Result) -> Res; %% all MFA return not ok or {ok, term()}. {error, Error} -> - Error + %% a lot of the callers do not handle + %% this error return, some even ignore + %% throw here to ensure the code will not proceed + erlang:throw(Error) end. %% Only gen hot_conf schema, not all configuration fields. From b92708726aa3ab74c8fee339a7883b3c86f3e4d3 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 11 Jun 2022 18:48:13 +0200 Subject: [PATCH 3/4] test: Dockerfile:s to build test images --- Dockerfile.ubuntu20.04.runner | 42 ++++++++++++++++++++ Dockerfile.ubuntu20.04.runner.dockerignore | 4 ++ scripts/make-docker-image-from-host-build.sh | 28 +++++++++++++ scripts/start-two-nodes-in-docker.sh | 21 +++------- 4 files changed, 80 insertions(+), 15 deletions(-) create mode 100644 Dockerfile.ubuntu20.04.runner create mode 100644 Dockerfile.ubuntu20.04.runner.dockerignore create mode 100755 scripts/make-docker-image-from-host-build.sh diff --git a/Dockerfile.ubuntu20.04.runner b/Dockerfile.ubuntu20.04.runner new file mode 100644 index 000000000..124021c89 --- /dev/null +++ b/Dockerfile.ubuntu20.04.runner @@ -0,0 +1,42 @@ +## This is a fast-build Dockerfile only for testing +FROM ubuntu:20.04 +ARG PROFILE=emqx + +RUN apt-get update; \ + apt-get install -y --no-install-recommends ca-certificates procps; \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir /opt/emqx +RUN date > /opt/emqx/BUILD_TIME +COPY _build/${PROFILE}/rel/emqx /opt/emqx +RUN ln -s /opt/emqx/bin/* /usr/local/bin/ +COPY deploy/docker/docker-entrypoint.sh /usr/bin/ + +WORKDIR /opt/emqx + + +RUN groupadd -r -g 1000 emqx; \ + useradd -r -m -u 1000 -g emqx emqx; \ + chgrp -Rf emqx /opt/emqx; \ + chmod -Rf g+w /opt/emqx; \ + chown -Rf emqx /opt/emqx + +USER emqx + +VOLUME ["/opt/emqx/log", "/opt/emqx/data"] + +# emqx will occupy these port: +# - 1883 port for MQTT +# - 8081 for mgmt API +# - 8083 for WebSocket/HTTP +# - 8084 for WSS/HTTPS +# - 8883 port for MQTT(SSL) +# - 11883 port for internal MQTT/TCP +# - 18083 for dashboard +# - 4370 default Erlang distrbution port +# - 5369 for backplain gen_rpc +EXPOSE 1883 8081 8083 8084 8883 11883 18083 4370 5369 + +ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"] + +CMD ["/opt/emqx/bin/emqx", "foreground"] diff --git a/Dockerfile.ubuntu20.04.runner.dockerignore b/Dockerfile.ubuntu20.04.runner.dockerignore new file mode 100644 index 000000000..bf291d51e --- /dev/null +++ b/Dockerfile.ubuntu20.04.runner.dockerignore @@ -0,0 +1,4 @@ +* +!_build/emqx +!_build/emqx-enterprise +!deploy diff --git a/scripts/make-docker-image-from-host-build.sh b/scripts/make-docker-image-from-host-build.sh new file mode 100755 index 000000000..8911ed251 --- /dev/null +++ b/scripts/make-docker-image-from-host-build.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash + +set -euo pipefail +set -x + +PROFILE="$1" +COMPILE="${2:-no}" +DISTRO="$(./scripts/get-distro.sh)" +PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh "$PROFILE")}" + +case "$DISTRO" in + ubuntu20*) + EMQX_DOCKERFILE="Dockerfile.ubuntu20.04.runner" + ;; + *) + echo "sorry, no support for $DISTRO yet" + exit 1 +esac + +if [ "$COMPILE" = '--compile' ]; then + make "$PROFILE" + sync +fi + +export DOCKER_BUILDKIT=1 +docker build --build-arg PROFILE="${PROFILE}" \ + -t "emqx/emqx:${PKG_VSN}-${DISTRO}" \ + -f "$EMQX_DOCKERFILE" . diff --git a/scripts/start-two-nodes-in-docker.sh b/scripts/start-two-nodes-in-docker.sh index ec3989934..64a6647ce 100755 --- a/scripts/start-two-nodes-in-docker.sh +++ b/scripts/start-two-nodes-in-docker.sh @@ -2,14 +2,6 @@ set -euo pipefail -## This script takes the first argument as docker image name, -## starts two containers running with the built code mount -## into docker containers. -## -## NOTE: containers are not instructed to rebuild emqx, -## Please use a docker image which is compatible with -## the docker host. -## ## EMQX can only start with longname (https://erlang.org/doc/reference_manual/distributed.html) ## The host name part of EMQX's node name has to be static, this means we should either ## pre-assign static IP for containers, or ensure containers can communiate with each other by name @@ -19,7 +11,6 @@ set -euo pipefail cd -P -- "$(dirname -- "$0")/.." IMAGE="${1}" -PROJ_DIR="$(pwd)" NET='emqx.io' NODE1="node1.$NET" @@ -35,23 +26,23 @@ docker network create "$NET" docker run -d -t --restart=always --name "$NODE1" \ --net "$NET" \ + -e EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug \ -e EMQX_NODE_NAME="emqx@$NODE1" \ -e EMQX_NODE_COOKIE="$COOKIE" \ -p 18083:18083 \ - -v "$PROJ_DIR"/_build/emqx/rel/emqx:/built \ - "$IMAGE" sh -c 'cp -r /built /emqx && /emqx/bin/emqx console' + "$IMAGE" docker run -d -t --restart=always --name "$NODE2" \ --net "$NET" \ + -e EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug \ -e EMQX_NODE_NAME="emqx@$NODE2" \ -e EMQX_NODE_COOKIE="$COOKIE" \ -p 18084:18083 \ - -v "$PROJ_DIR"/_build/emqx/rel/emqx:/built \ - "$IMAGE" sh -c 'cp -r /built /emqx && /emqx/bin/emqx console' + "$IMAGE" wait (){ container="$1" - while ! docker exec "$container" /emqx/bin/emqx_ctl status >/dev/null 2>&1; do + while ! docker exec "$container" emqx_ctl status >/dev/null 2>&1; do echo -n '.' sleep 1 done @@ -61,4 +52,4 @@ wait $NODE1 wait $NODE2 echo -docker exec $NODE1 /emqx/bin/emqx_ctl cluster join "emqx@$NODE2" +docker exec $NODE1 emqx_ctl cluster join "emqx@$NODE2" From 2065be569eca0bc4871dc7f5f7993ea26aee0f22 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 11 Jun 2022 20:47:46 +0200 Subject: [PATCH 4/4] fix(emqx_cluster_rpc): fail fast on stale state Due to: * Cluster RPC MFA is not idempotent! * There is a lack of rollback for callback's side-effects For instance, when two nodes try to add a cluster-singleton concurrently, one of them will have to wait for the table lock then try to catch-up, then try to apply MFA. The catch-up will have the singleton created, but the initiated initiated multicall apply will fail causing the commit to rollback, but not to 'undo' the singleton creation. Later, the retries will fail indefinitely. --- apps/emqx/src/emqx_config.erl | 3 +- apps/emqx/src/emqx_metrics_worker.erl | 3 +- apps/emqx/test/emqx_bpapi_static_checks.erl | 11 +- apps/emqx_conf/src/emqx_cluster_rpc.erl | 209 +++++++++++------- apps/emqx_conf/src/emqx_conf.erl | 27 +-- .../src/proto/emqx_conf_proto_v1.erl | 6 +- .../emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 52 +++-- .../test/emqx_connector_SUITE.erl | 1 + apps/emqx_gateway/src/emqx_gateway_http.erl | 2 +- .../src/emqx_mgmt_api_plugins.erl | 37 ++-- .../proto/emqx_mgmt_api_plugins_proto_v1.erl | 5 +- .../src/emqx_plugin_libs_rule.erl | 6 - apps/emqx_plugins/src/emqx_plugins.erl | 2 +- apps/emqx_plugins/test/emqx_plugins_tests.erl | 14 +- apps/emqx_resource/src/emqx_resource.erl | 16 +- .../src/proto/emqx_resource_proto_v1.erl | 12 +- .../src/emqx_rule_engine_api.erl | 2 +- .../src/proto/emqx_rule_engine_proto_v1.erl | 3 +- 18 files changed, 202 insertions(+), 209 deletions(-) diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 5e50189be..ab98ded69 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -233,7 +233,8 @@ put(Config) -> erase(RootName) -> persistent_term:erase(?PERSIS_KEY(?CONF, bin(RootName))), - persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))). + persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))), + ok. -spec put(emqx_map_lib:config_key_path(), term()) -> ok. put(KeyPath, Config) -> diff --git a/apps/emqx/src/emqx_metrics_worker.erl b/apps/emqx/src/emqx_metrics_worker.erl index 575dcca6c..21e73ff51 100644 --- a/apps/emqx/src/emqx_metrics_worker.erl +++ b/apps/emqx/src/emqx_metrics_worker.erl @@ -163,8 +163,7 @@ get_counters(Name, Id) -> reset_counters(Name, Id) -> Indexes = maps:values(get_indexes(Name, Id)), Ref = get_ref(Name, Id), - [counters:put(Ref, Idx, 0) || Idx <- Indexes], - ok. + lists:foreach(fun(Idx) -> counters:put(Ref, Idx, 0) end, Indexes). -spec get_metrics(handler_name(), metric_id()) -> metrics(). get_metrics(Name, Id) -> diff --git a/apps/emqx/test/emqx_bpapi_static_checks.erl b/apps/emqx/test/emqx_bpapi_static_checks.erl index 6be38df0d..6ca0f3ec2 100644 --- a/apps/emqx/test/emqx_bpapi_static_checks.erl +++ b/apps/emqx/test/emqx_bpapi_static_checks.erl @@ -53,8 +53,7 @@ -define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc"). %% List of known functions also known to do RPC: -define(RPC_FUNCTIONS, - "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5, " - "emqx_plugin_libs_rule:cluster_call/3" + "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5" ). %% List of functions in the RPC backend modules that we can ignore: @@ -63,11 +62,9 @@ %% List of business-layer functions that are exempt from the checks: %% erlfmt-ignore -define(EXEMPTIONS, - "emqx_mgmt_api:do_query/6," % Reason: legacy code. A fun and a QC query are - % passed in the args, it's futile to try to statically - % check it - "emqx_plugin_libs_rule:cluster_call/3" % Reason: some sort of external plugin API that we - % don't want to break? + "emqx_mgmt_api:do_query/6" % Reason: legacy code. A fun and a QC query are + % passed in the args, it's futile to try to statically + % check it ). -define(XREF, myxref). diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index da49e31a5..be0e6232c 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -30,7 +30,11 @@ skip_failed_commit/1, fast_forward_to_commit/2 ]). --export([get_node_tnx_id/1, latest_tnx_id/0]). +-export([ + get_node_tnx_id/1, + latest_tnx_id/0, + make_initiate_call_req/3 +]). -export([ init/1, @@ -44,7 +48,7 @@ -export([get_tables_status/0]). --export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]). +-export_type([tnx_id/0, succeed_num/0]). -ifdef(TEST). -compile(export_all). @@ -56,19 +60,21 @@ -include_lib("emqx/include/logger.hrl"). -include("emqx_conf.hrl"). +-define(INITIATE(MFA, LatestIdLastSeen), {initiate, MFA, LatestIdLastSeen}). -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). --type txn_id() :: pos_integer(). +-type tnx_id() :: pos_integer(). -type succeed_num() :: pos_integer() | all. -type multicall_return(Result) :: - {ok, txn_id(), Result} - | {error, term()} - | {retry, txn_id(), Result, node()}. + {ok, tnx_id(), Result} + | {init_failure, term()} + | {peers_lagging, tnx_id(), Result, [node()]}. -type multicall_return() :: multicall_return(_). +-type init_call_req() :: ?INITIATE({module(), atom(), list()}, tnx_id()). %%%=================================================================== %%% API @@ -102,27 +108,73 @@ start_link(Node, Name, RetryMs) -> {error, Reason} end. -%% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok. -%% return {error, MFARes} when the first MFA result is no ok or {ok, term()}. -%% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok. --spec multicall(module(), atom(), list()) -> multicall_return(). +%% @doc Initiate a local call (or core node), +%% then async-ly replicate the call to peer nodes in the cluster. +%% The evaluation result of the provided MFA is returned, +%% the result is expected to be `ok | {ok, _}' to indicate success, +%% and `{error, _}' to indicate failure. +%% +%% The excpetion of the MFA evaluation is captured and translated +%% into an `{error, _}' tuple. +%% This call tries to wait for all peer nodes to be in-sync before +%% returning the result. +%% +%% In case of partial success, an `error' level log is emitted +%% but the initial localy apply result is returned. +-spec multicall(module(), atom(), list()) -> term(). multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). --spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return(). -multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 -> - MFA = {initiate, {M, F, A}}, +-spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> term(). +multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse RequiredSyncs >= 1 -> + case do_multicall(M, F, A, RequiredSyncs, Timeout) of + {ok, _TxnId, Result} -> + Result; + {init_failure, Error} -> + Error; + {peers_lagging, TnxId, Res, Nodes} -> + %% The init MFA return ok, but some other nodes failed. + ?SLOG(error, #{ + msg => "cluster_rpc_peers_lagging", + lagging_nodes => Nodes, + tnx_id => TnxId + }), + Res + end. + +%% Return {ok, TnxId, MFARes} the first MFA result when all MFA run ok. +%% return {init_failure, Error} when the initial MFA result is no ok or {ok, term()}. +%% return {peers_lagging, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok. +-spec do_multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return(). +do_multicall(M, F, A, RequiredSyncs, Timeout) when + RequiredSyncs =:= all orelse RequiredSyncs >= 1 +-> + %% Idealy 'LatestId' should be provided by the multicall originator, + %% which is the viewer of the state e.g. + %% * Sysadmin who issues CLI-commands or REST-API calls to make config changes + %% * Dashboard viewer who is making decision based on what they can see from the UI + %% To reach the ideal state, it would require adding transaction ID to each and + %% every view/GET requests and also provide the ID as a part of the view/GET responses. + %% + %% To keep things simple, we try to get the 'old' view when a multicall request + %% is received as early as possible. + %% + %% Reason to do this: + %% The 'initiate' call handler tries to take a table lock (cluster-wide) before + %% bumping the transaction ID. While waiting for the lock, the ID might have been + %% bumpped by another node in the cluster. + InitReq = make_initiate_call_req(M, F, A), Begin = erlang:monotonic_time(), InitRes = case mria_rlog:role() of core -> - gen_server:call(?MODULE, MFA, Timeout); + gen_server:call(?MODULE, InitReq, Timeout); replicant -> %% the initiate transaction must happened on core node %% make sure MFA(in the transaction) and the transaction on the same node %% don't need rpc again inside transaction. case mria_status:upstream_node(?CLUSTER_RPC_SHARD) of - {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); + {ok, Node} -> gen_server:call({?MODULE, Node}, InitReq, Timeout); disconnected -> {error, disconnected} end end, @@ -132,23 +184,23 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu RetryTimeout = ceil(3 * max(MinDelay, get_retry_ms())), OkOrFailed = case InitRes of - {ok, _TnxId, _} when RequireNum =:= 1 -> + {ok, _TnxId, _} when RequiredSyncs =:= 1 -> ok; - {ok, TnxId, _} when RequireNum =:= all -> + {ok, TnxId, _} when RequiredSyncs =:= all -> wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout); - {ok, TnxId, _} when is_integer(RequireNum) -> - wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout); + {ok, TnxId, _} when is_integer(RequiredSyncs) -> + wait_for_nodes_commit(RequiredSyncs, TnxId, MinDelay, RetryTimeout); Error -> Error end, case OkOrFailed of ok -> InitRes; - {error, Error0} -> - {error, Error0}; - {retry, Node0} -> + {init_failure, Error0} -> + {init_failure, Error0}; + {peers_lagging, Nodes} -> {ok, TnxId0, MFARes} = InitRes, - {retry, TnxId0, MFARes, Node0} + {peers_lagging, TnxId0, MFARes, Nodes} end. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. @@ -167,6 +219,11 @@ latest_tnx_id() -> {atomic, TnxId} = transaction(fun get_latest_id/0, []), TnxId. +-spec make_initiate_call_req(module(), atom(), list()) -> init_call_req(). +make_initiate_call_req(M, F, A) -> + TnxId = get_latest_id(dirty), + ?INITIATE({M, F, A}, TnxId). + -spec get_node_tnx_id(node()) -> integer(). get_node_tnx_id(Node) -> case mnesia:wread({?CLUSTER_COMMIT, Node}) of @@ -232,12 +289,12 @@ handle_call(reset, _From, State) -> _ = mria:clear_table(?CLUSTER_COMMIT), _ = mria:clear_table(?CLUSTER_MFA), {reply, ok, State, {continue, ?CATCH_UP}}; -handle_call({initiate, MFA}, _From, State = #{node := Node}) -> - case transaction(fun init_mfa/2, [Node, MFA]) of +handle_call(?INITIATE(MFA, LatestId), _From, State = #{node := Node}) -> + case transaction(fun init_mfa/3, [Node, MFA, LatestId]) of {atomic, {ok, TnxId, Result}} -> {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; - {aborted, Reason} -> - {reply, {error, Reason}, State, {continue, ?CATCH_UP}} + {aborted, Error} -> + {reply, {init_failure, Error}, State, {continue, ?CATCH_UP}} end; handle_call(skip_failed_commit, _From, State = #{node := Node}) -> Timeout = catch_up(State, true), @@ -273,7 +330,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> - {Succeed, _} = apply_mfa(NextId, MFA), + {Succeed, _} = apply_mfa(NextId, MFA, catch_up), case Succeed orelse SkipResult of true -> case transaction(fun commit/2, [Node, NextId]) of @@ -316,35 +373,6 @@ read_next_mfa(Node) -> [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA} end. -do_catch_up(ToTnxId, Node) -> - case mnesia:wread({?CLUSTER_COMMIT, Node}) of - [] -> - commit(Node, ToTnxId), - caught_up; - [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId =:= LastAppliedId -> - caught_up; - [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId > LastAppliedId -> - CurTnxId = LastAppliedId + 1, - [#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId), - case apply_mfa(CurTnxId, MFA) of - {true, _Result} -> ok = commit(Node, CurTnxId); - {false, Error} -> mnesia:abort(Error) - end; - [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> - Reason = lists:flatten( - io_lib:format( - "~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", - [Node, LastAppliedId, ToTnxId] - ) - ), - ?SLOG(error, #{ - msg => "catch_up_failed!", - last_applied_id => LastAppliedId, - to_tnx_id => ToTnxId - }), - mnesia:abort(Reason) - end. - commit(Node, TnxId) -> ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write). @@ -365,33 +393,44 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> end. get_latest_id() -> - case mnesia:last(?CLUSTER_MFA) of + get_latest_id(tnx). + +get_latest_id(IsolationLevel) -> + F = + case IsolationLevel of + tnx -> fun mnesia:last/1; + dirty -> fun mnesia:dirty_last/1 + end, + case F(?CLUSTER_MFA) of '$end_of_table' -> 0; Id -> Id end. -init_mfa(Node, MFA) -> +init_mfa(Node, MFA, LatestIdLastSeen) -> mnesia:write_lock_table(?CLUSTER_MFA), LatestId = get_latest_id(), - ok = do_catch_up_in_one_trans(LatestId, Node), - TnxId = LatestId + 1, - MFARec = #cluster_rpc_mfa{ - tnx_id = TnxId, - mfa = MFA, - initiator = Node, - created_at = erlang:localtime() - }, - ok = mnesia:write(?CLUSTER_MFA, MFARec, write), - ok = commit(Node, TnxId), - case apply_mfa(TnxId, MFA) of - {true, Result} -> {ok, TnxId, Result}; - {false, Error} -> mnesia:abort(Error) - end. - -do_catch_up_in_one_trans(LatestId, Node) -> - case do_catch_up(LatestId, Node) of - caught_up -> ok; - ok -> do_catch_up_in_one_trans(LatestId, Node) + case LatestIdLastSeen =:= LatestId of + true -> + TnxId = LatestId + 1, + MFARec = #cluster_rpc_mfa{ + tnx_id = TnxId, + mfa = MFA, + initiator = Node, + created_at = erlang:localtime() + }, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + ok = commit(Node, TnxId), + case apply_mfa(TnxId, MFA, init) of + {true, Result} -> {ok, TnxId, Result}; + {false, Error} -> mnesia:abort(Error) + end; + false -> + ?SLOG(error, #{ + msg => stale_view_of_cluster_state, + tnx_id => LatestId, + last_seen_tnx_id => LatestIdLastSeen + }), + mnesia:abort({error, stale_view_of_cluster_state}) end. transaction(Func, Args) -> @@ -433,7 +472,7 @@ trans_query(TnxId) -> -define(TO_BIN(_B_), iolist_to_binary(io_lib:format("~p", [_B_]))). -apply_mfa(TnxId, {M, F, A}) -> +apply_mfa(TnxId, {M, F, A}, Kind) -> Res = try erlang:apply(M, F, A) @@ -444,7 +483,7 @@ apply_mfa(TnxId, {M, F, A}) -> {error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}} end, %% Do not log args as it might be sensitive information - Meta = #{tnx_id => TnxId, entrypoint => format_mfa(M, F, length(A))}, + Meta = #{kind => Kind, tnx_id => TnxId, entrypoint => format_mfa(M, F, length(A))}, IsSuccess = is_success(Res), log_and_alarm(IsSuccess, Res, Meta), {IsSuccess, Res}. @@ -475,21 +514,21 @@ wait_for_all_nodes_commit(TnxId, Delay, Remain) -> [] -> ok; Nodes -> - {retry, Nodes} + {peers_lagging, Nodes} end. -wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) -> +wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> ok = timer:sleep(Delay), - case length(synced_nodes(TnxId)) >= RequiredNum of + case length(synced_nodes(TnxId)) >= RequiredSyncs of true -> ok; false when Remain > 0 -> - wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay); + wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay); false -> case lagging_node(TnxId) of %% All commit but The succeedNum > length(nodes()). [] -> ok; - Nodes -> {retry, Nodes} + Nodes -> {peers_lagging, Nodes} end end. diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 8b769097e..22969c272 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -92,7 +92,7 @@ get_node_and_config(KeyPath) -> ) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update(KeyPath, UpdateReq, Opts) -> - check_cluster_rpc_result(emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts)). + emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts). %% @doc Update the specified node's key path in local-override.conf. -spec update( @@ -111,7 +111,7 @@ update(Node, KeyPath, UpdateReq, Opts) -> -spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. remove(KeyPath, Opts) -> - check_cluster_rpc_result(emqx_conf_proto_v1:remove_config(KeyPath, Opts)). + emqx_conf_proto_v1:remove_config(KeyPath, Opts). %% @doc remove the specified node's key path in local-override.conf. -spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> @@ -125,7 +125,7 @@ remove(Node, KeyPath, Opts) -> -spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. reset(KeyPath, Opts) -> - check_cluster_rpc_result(emqx_conf_proto_v1:reset(KeyPath, Opts)). + emqx_conf_proto_v1:reset(KeyPath, Opts). %% @doc reset the specified node's key path in local-override.conf. -spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> @@ -208,27 +208,6 @@ gen_example(File, SchemaModule, I18nFile, Lang) -> Example = hocon_schema_example:gen(SchemaModule, Opts), file:write_file(File, Example). -check_cluster_rpc_result(Result) -> - case Result of - {ok, _TnxId, Res} -> - Res; - {retry, TnxId, Res, Nodes} -> - %% The init MFA return ok, but other nodes failed. - %% We return ok and alert an alarm. - ?SLOG(error, #{ - msg => "failed_to_update_config_in_cluster", - nodes => Nodes, - tnx_id => TnxId - }), - Res; - %% all MFA return not ok or {ok, term()}. - {error, Error} -> - %% a lot of the callers do not handle - %% this error return, some even ignore - %% throw here to ensure the code will not proceed - erlang:throw(Error) - end. - %% Only gen hot_conf schema, not all configuration fields. gen_hot_conf_schema(File) -> {ApiSpec0, Components0} = emqx_dashboard_swagger:spec( diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl index 68380b88a..97e14b7c4 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl @@ -61,7 +61,7 @@ get_all(KeyPath) -> update_config_key_path(), emqx_config:update_request(), emqx_config:update_opts() -) -> emqx_cluster_rpc:multicall_return(). +) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update(KeyPath, UpdateReq, Opts) -> emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). @@ -78,7 +78,7 @@ update(Node, KeyPath, UpdateReq, Opts) -> rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). -spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> - emqx_cluster_rpc:multicall_result(). + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. remove_config(KeyPath, Opts) -> emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). @@ -90,7 +90,7 @@ remove_config(Node, KeyPath, Opts) -> rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). -spec reset(update_config_key_path(), emqx_config:update_opts()) -> - emqx_cluster_rpc:multicall_return(). + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. reset(KeyPath, Opts) -> emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index ac2d8d90b..48fd91a33 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -69,13 +69,13 @@ t_base_test(_Config) -> ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), Pid = self(), MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, - {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = multicall(M, F, A), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFA, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), ?assertEqual(ok, receive_msg(3, test)), - ?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)), + ?assertEqual({ok, 2, ok}, multicall(M, F, A)), {atomic, Status} = emqx_cluster_rpc:status(), case length(Status) =:= 3 of true -> @@ -95,7 +95,7 @@ t_commit_fail_test(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]}, - {error, "MFA return not ok"} = emqx_cluster_rpc:multicall(M, F, A), + {init_failure, "MFA return not ok"} = multicall(M, F, A), ?assertEqual({atomic, []}, emqx_cluster_rpc:status()), ok. @@ -103,7 +103,7 @@ t_commit_crash_test(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, no_exist_function, []}, - {error, {error, Meta}} = emqx_cluster_rpc:multicall(M, F, A), + {init_failure, {error, Meta}} = multicall(M, F, A), ?assertEqual(undef, maps:get(reason, Meta)), ?assertEqual(error, maps:get(exception, Meta)), ?assertEqual(true, maps:is_key(stacktrace, Meta)), @@ -114,21 +114,23 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, - {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, _, ok} = multicall(M, F, A, 1, 1000), {atomic, [Status]} = emqx_cluster_rpc:status(), ?assertEqual(MFA, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), erlang:send(?NODE2, test), - Res = gen_server:call(?NODE2, {initiate, {M, F, A}}), - ?assertEqual({error, "MFA return not ok"}, Res), + Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A), + Res = gen_server:call(?NODE2, Call), + ?assertEqual({init_failure, "MFA return not ok"}, Res), ok. t_catch_up_status_handle_next_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, - {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {ok, 2} = gen_server:call(?NODE2, {initiate, {M, F, A}}), + {ok, 1, ok} = multicall(M, F, A, 1, 1000), + Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A), + {ok, 2} = gen_server:call(?NODE2, Call), ok. t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> @@ -138,19 +140,19 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> ets:insert(test, {other_mfa_result, failed}), ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]), {M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]}, - {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, 1, ok} = multicall(M, F, A, 1, 1000), ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]), ct:pal("333:~p~n", [emqx_cluster_rpc:status()]), {atomic, [_Status | L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), ets:insert(test, {other_mfa_result, ok}), - {ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000), + {ok, 2, ok} = multicall(io, format, ["test"], 1, 1000), ct:sleep(1000), {atomic, NewStatus} = emqx_cluster_rpc:status(), ?assertEqual(3, length(NewStatus)), Pid = self(), MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, - {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M1, F1, A1), + {ok, TnxId, ok} = multicall(M1, F1, A1), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFAEcho, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), @@ -167,7 +169,7 @@ t_del_stale_mfa(_Config) -> Ids = [ begin - {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = multicall(M, F, A), TnxId end || _ <- Keys @@ -176,7 +178,7 @@ t_del_stale_mfa(_Config) -> Ids2 = [ begin - {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = multicall(M, F, A), TnxId end || _ <- Keys2 @@ -203,7 +205,7 @@ t_del_stale_mfa(_Config) -> t_skip_failed_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000), + {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -212,7 +214,7 @@ t_skip_failed_commit(_Config) -> tnx_ids(List1) ), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, - {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, 2, ok} = multicall(M, F, A, 1, 1000), 2 = gen_server:call(?NODE2, skip_failed_commit, 5000), {atomic, List2} = emqx_cluster_rpc:status(), ?assertEqual( @@ -224,7 +226,7 @@ t_skip_failed_commit(_Config) -> t_fast_forward_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000), + {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -233,11 +235,11 @@ t_fast_forward_commit(_Config) -> tnx_ids(List1) ), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, - {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {ok, 4, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {ok, 5, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {retry, 6, ok, _} = emqx_cluster_rpc:multicall(M, F, A, 2, 1000), + {ok, 2, ok} = multicall(M, F, A, 1, 1000), + {ok, 3, ok} = multicall(M, F, A, 1, 1000), + {ok, 4, ok} = multicall(M, F, A, 1, 1000), + {ok, 5, ok} = multicall(M, F, A, 1, 1000), + {peers_lagging, 6, ok, _} = multicall(M, F, A, 2, 1000), 3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000), 4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000), 6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000), @@ -333,3 +335,9 @@ failed_on_other_recover_after_retry(Pid) -> [{_, Res}] = ets:lookup(test, other_mfa_result), Res end. + +multicall(M, F, A, N, T) -> + emqx_cluster_rpc:do_multicall(M, F, A, N, T). + +multicall(M, F, A) -> + multicall(M, F, A, all, timer:minutes(2)). diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index 429f5d3fa..c4a6418c2 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -70,6 +70,7 @@ end_per_testcase(_, _Config) -> ok. t_list_raw_empty(_) -> + ok = emqx_config:erase(hd(emqx_connector:config_key_path())), Result = emqx_connector:list_raw(), ?assertEqual([], Result). diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 0165efed9..a1640aee1 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -556,7 +556,7 @@ with_gateway(GwName0, Fun) -> end, case emqx_gateway:lookup(GwName) of undefined -> - return_http_error(404, "Gateway not load"); + return_http_error(404, "Gateway not loaded"); Gateway -> Fun(GwName, Gateway) end diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 7c4131443..6c625ba8c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -317,7 +317,7 @@ get_plugins() -> upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -> [{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)), %% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall - %% TODO what happened when a new node join in? + %% TODO what happens when a new node join in? %% emqx_plugins_monitor should copy plugins from other core node when boot-up. case emqx_plugins:describe(string:trim(FileName, trailing, ".tar.gz")) of {error, #{error := "bad_info_file", return := {enoent, _}}} -> @@ -358,16 +358,11 @@ upload_install(post, #{}) -> }}. do_install_package(FileName, Bin) -> - {Res, _} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin), - case lists:filter(fun(R) -> R =/= ok end, Res) of - [] -> - {200}; - [{error, Reason} | _] -> - {400, #{ - code => 'UNEXPECTED_ERROR', - message => iolist_to_binary(io_lib:format("~p", [Reason])) - }} - end. + %% TODO: handle bad nodes + {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin), + %% TODO: handle non-OKs + [] = lists:filter(fun(R) -> R =/= ok end, Res), + {200}. plugin(get, #{bindings := #{name := Name}}) -> {Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name), @@ -376,11 +371,11 @@ plugin(get, #{bindings := #{name := Name}}) -> [] -> {404, #{code => 'NOT_FOUND', message => Name}} end; plugin(delete, #{bindings := #{name := Name}}) -> - {ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:delete_package(Name), + Res = emqx_mgmt_api_plugins_proto_v1:delete_package(Name), return(204, Res). update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> - {ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action), + Res = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action), return(204, Res). update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> @@ -422,7 +417,8 @@ delete_package(Name) -> ok -> _ = emqx_plugins:ensure_disabled(Name), _ = emqx_plugins:ensure_uninstalled(Name), - _ = emqx_plugins:delete_package(Name); + _ = emqx_plugins:delete_package(Name), + ok; Error -> Error end. @@ -430,20 +426,19 @@ delete_package(Name) -> %% for RPC plugin update ensure_action(Name, start) -> _ = emqx_plugins:ensure_enabled(Name), - _ = emqx_plugins:ensure_started(Name); + _ = emqx_plugins:ensure_started(Name), + ok; ensure_action(Name, stop) -> _ = emqx_plugins:ensure_stopped(Name), - _ = emqx_plugins:ensure_disabled(Name); + _ = emqx_plugins:ensure_disabled(Name), + ok; ensure_action(Name, restart) -> _ = emqx_plugins:ensure_enabled(Name), - _ = emqx_plugins:restart(Name). + _ = emqx_plugins:restart(Name), + ok. return(Code, ok) -> {Code}; -return(Code, {ok, Result}) -> - {Code, Result}; -return(_, {error, #{error := "bad_info_file", return := {enoent, _}, path := Path}}) -> - {404, #{code => 'NOT_FOUND', message => Path}}; return(_, {error, Reason}) -> {400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}. diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl index dc7904d01..4ac594d2c 100644 --- a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl @@ -43,11 +43,10 @@ install_package(Filename, Bin) -> describe_package(Name) -> rpc:multicall(emqx_mgmt_api_plugins, describe_package, [Name], 10000). --spec delete_package(binary() | string()) -> emqx_cluster_rpc:multicall_return(). +-spec delete_package(binary() | string()) -> ok | {error, any()}. delete_package(Name) -> emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000). --spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> - emqx_cluster_rpc:multicall_return(). +-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}. ensure_action(Name, Action) -> emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000). diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index fe8e3c210..03304c209 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -61,8 +61,6 @@ can_topic_match_oneof/2 ]). --export([cluster_call/3]). - -compile({no_auto_import, [float/1]}). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). @@ -307,7 +305,3 @@ can_topic_match_oneof(Topic, Filters) -> end, Filters ). - -cluster_call(Module, Func, Args) -> - {ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args), - Result. diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index bca0defc5..0879d5936 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -337,7 +337,7 @@ do_ensure_started(NameVsn) -> ). %% try the function, catch 'throw' exceptions as normal 'error' return -%% other exceptions with stacktrace returned. +%% other exceptions with stacktrace logged. tryit(WhichOp, F) -> try F() diff --git a/apps/emqx_plugins/test/emqx_plugins_tests.erl b/apps/emqx_plugins/test/emqx_plugins_tests.erl index cda6dbf0f..58829c07f 100644 --- a/apps/emqx_plugins/test/emqx_plugins_tests.erl +++ b/apps/emqx_plugins/test/emqx_plugins_tests.erl @@ -125,20 +125,10 @@ purge_test() -> meck_emqx() -> meck:new(emqx, [unstick, passthrough]), meck:expect( - emqx, - update_config, + emqx_conf, + update, fun(Path, Values, _Opts) -> emqx_config:put(Path, Values) end ), - %meck:expect(emqx, get_config, - % fun(KeyPath, Default) -> - % Map = emqx:get_raw_config(KeyPath, Default), - % Map1 = emqx_map_lib:safe_atom_key_map(Map), - % case Map1 of - % #{states := Plugins} -> - % Map1#{states => [emqx_map_lib:safe_atom_key_map(P) ||P <- Plugins]}; - % _ -> Map1 - % end - % end), ok. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 5e3242135..2a3f29122 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -174,7 +174,7 @@ create(InstId, Group, ResourceType, Config) -> -spec create(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(InstId, Group, ResourceType, Config, Opts) -> - wrap_rpc(emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts)). + emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts). % -------------------------------------------- -spec create_local(instance_id(), resource_group(), resource_type(), resource_config()) -> @@ -196,7 +196,7 @@ create_local(InstId, Group, ResourceType, Config, Opts) -> -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> - wrap_rpc(emqx_resource_proto_v1:create_dry_run(ResourceType, Config)). + emqx_resource_proto_v1:create_dry_run(ResourceType, Config). -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -211,7 +211,7 @@ recreate(InstId, ResourceType, Config) -> -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. recreate(InstId, ResourceType, Config, Opts) -> - wrap_rpc(emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts)). + emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts). -spec recreate_local(instance_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. @@ -225,7 +225,7 @@ recreate_local(InstId, ResourceType, Config, Opts) -> -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> - wrap_rpc(emqx_resource_proto_v1:remove(InstId)). + emqx_resource_proto_v1:remove(InstId). -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. remove_local(InstId) -> @@ -237,7 +237,7 @@ reset_metrics_local(InstId) -> -spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}. reset_metrics(InstId) -> - wrap_rpc(emqx_resource_proto_v1:reset_metrics(InstId)). + emqx_resource_proto_v1:reset_metrics(InstId). %% ================================================================================= -spec query(instance_id(), Request :: term()) -> Result :: term(). @@ -430,11 +430,5 @@ inc_metrics_funcs(InstId) -> safe_apply(Func, Args) -> ?SAFE_CALL(erlang:apply(Func, Args)). -wrap_rpc(Ret) -> - case Ret of - {ok, _TxnId, Result} -> Result; - Failed -> Failed - end. - query_error(Reason, Msg) -> {error, {?MODULE, #{reason => Reason, msg => Msg}}}. diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index 04e489f78..cbbc4e552 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -40,7 +40,7 @@ introduced_in() -> resource_config(), create_opts() ) -> - emqx_cluster_rpc:multicall_return(resource_data()). + {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(InstId, Group, ResourceType, Config, Opts) -> emqx_cluster_rpc:multicall(emqx_resource, create_local, [ InstId, Group, ResourceType, Config, Opts @@ -50,7 +50,7 @@ create(InstId, Group, ResourceType, Config, Opts) -> resource_type(), resource_config() ) -> - emqx_cluster_rpc:multicall_return(resource_data()). + ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]). @@ -60,16 +60,14 @@ create_dry_run(ResourceType, Config) -> resource_config(), create_opts() ) -> - emqx_cluster_rpc:multicall_return(resource_data()). + {ok, resource_data()} | {error, Reason :: term()}. recreate(InstId, ResourceType, Config, Opts) -> emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]). --spec remove(instance_id()) -> - emqx_cluster_rpc:multicall_return(ok). +-spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]). --spec reset_metrics(instance_id()) -> - emqx_cluster_rpc:multicall_return(ok). +-spec reset_metrics(instance_id()) -> ok | {error, any()}. reset_metrics(InstId) -> emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 0f4b4a53d..27a627e3d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -321,7 +321,7 @@ replace_sql_clrf(#{<<"sql">> := SQL} = Params) -> end. '/rules/:id/reset_metrics'(put, #{bindings := #{id := RuleId}}) -> case emqx_rule_engine_proto_v1:reset_metrics(RuleId) of - {ok, _TxnId, _Result} -> + ok -> {200, <<"Reset Success">>}; Failed -> {400, #{ diff --git a/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl b/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl index 501a1d05c..ea7e4a53b 100644 --- a/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl +++ b/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl @@ -30,7 +30,6 @@ introduced_in() -> "5.0.0". --spec reset_metrics(rule_id()) -> - emqx_cluster_rpc:multicall_return(ok). +-spec reset_metrics(rule_id()) -> ok | {error, any()}. reset_metrics(RuleId) -> emqx_cluster_rpc:multicall(emqx_rule_engine, reset_metrics_for_rule, [RuleId]).