diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index fcd5e8374..b97c3c003 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -98,8 +98,8 @@ jobs: - name: run cover run: | printenv > .env - docker exec -i ${{ matrix.otp_release }} bash -c "make cover" - docker exec --env-file .env -i ${{ matrix.otp_release }} bash -c "make coveralls" + docker exec -i ${{ matrix.otp_release }} bash -c "DIAGNOSTIC=1 make cover" + docker exec --env-file .env -i ${{ matrix.otp_release }} bash -c "DIAGNOSTIC=1 make coveralls" - name: cat rebar.crashdump if: failure() run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index c85e7aa29..7c404ff2d 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -222,6 +222,8 @@ listeners.quic.default { ## If not set, the global configs are used for this listener. ## ## See `zones.` for more details. + ## NOTE: This is a cluster-wide configuration. + ## It requires all nodes to be stopped before changing it. ## ## @doc listeners.quic..zone ## ValueType: String @@ -490,6 +492,7 @@ listeners.wss.default { ## Websocket options ## See ${example_common_websocket_options} for more information websocket.idle_timeout = 86400s + } ## Enable per connection statistics. @@ -1071,7 +1074,7 @@ broker { ## are mostly published to topics with large number of levels. ## ## NOTE: This is a cluster-wide configuration. - ## It rquires all nodes to be stopped before changing it. + ## It requires all nodes to be stopped before changing it. ## ## @doc broker.perf.trie_compaction ## ValueType: Boolean diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 026df2415..43af1d9b4 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -89,7 +89,7 @@ headers(_) -> undefined. headers_no_content_type(type) -> map(); headers_no_content_type(converter) -> fun(Headers) -> - maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) + maps:merge(default_headers_no_content_type(), transform_header_name(Headers)) end; headers_no_content_type(default) -> default_headers_no_content_type(); headers_no_content_type(_) -> undefined. @@ -129,9 +129,9 @@ create(#{ method := Method emqx_connector_http, Config#{base_url => maps:remove(query, URIMap), pool_type => random}) of - {ok, _} -> + {ok, already_created} -> {ok, State}; - {error, already_created} -> + {ok, _} -> {ok, State}; {error, Reason} -> {error, Reason} @@ -296,4 +296,4 @@ parse_body(<<"application/json">>, Body) -> parse_body(<<"application/x-www-form-urlencoded">>, Body) -> {ok, maps:from_list(cow_qs:parse_qs(Body))}; parse_body(ContentType, _) -> - {error, {unsupported_content_type, ContentType}}. \ No newline at end of file + {error, {unsupported_content_type, ContentType}}. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl index 56ced0104..1ce145f35 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl @@ -106,9 +106,9 @@ create(#{ selector := Selector , '_unique'], Config), NState = State#{selector => NSelector}, case emqx_resource:create_local(Unique, emqx_connector_mongo, Config) of - {ok, _} -> + {ok, already_created} -> {ok, NState}; - {error, already_created} -> + {ok, _} -> {ok, NState}; {error, Reason} -> {error, Reason} diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 75a3392ec..59afa9671 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -83,9 +83,9 @@ create(#{ password_hash_algorithm := Algorithm query_timeout => QueryTimeout, '_unique' => Unique}, case emqx_resource:create_local(Unique, emqx_connector_mysql, Config) of - {ok, _} -> + {ok, already_created} -> {ok, State}; - {error, already_created} -> + {ok, _} -> {ok, State}; {error, Reason} -> {error, Reason} @@ -131,7 +131,7 @@ authenticate(#{password := Password} = Credential, destroy(#{'_unique' := Unique}) -> _ = emqx_resource:remove_local(Unique), ok. - + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 44c7f7185..cce9ebd6f 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -71,9 +71,9 @@ create(#{ query := Query0 salt_position => SaltPosition, '_unique' => Unique}, case emqx_resource:create_local(Unique, emqx_connector_pgsql, Config) of - {ok, _} -> + {ok, already_created} -> {ok, State}; - {error, already_created} -> + {ok, _} -> {ok, State}; {error, Reason} -> {error, Reason} @@ -119,7 +119,7 @@ authenticate(#{password := Password} = Credential, destroy(#{'_unique' := Unique}) -> _ = emqx_resource:remove_local(Unique), ok. - + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl index 0c2696c0e..6eff345ed 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl @@ -89,9 +89,9 @@ create(#{ query := Query , '_unique'], Config), NState = State#{query => NQuery}, case emqx_resource:create_local(Unique, emqx_connector_redis, Config) of - {ok, _} -> + {ok, already_created} -> {ok, NState}; - {error, already_created} -> + {ok, _} -> {ok, NState}; {error, Reason} -> {error, Reason} @@ -176,7 +176,7 @@ check_fields(["superuser" | More], HasPassHash) -> check_fields(More, HasPassHash); check_fields([Field | _], _) -> error({unsupported_field, Field}). - + parse_key(Key) -> Tokens = re:split(Key, "(" ++ ?RE_PLACEHOLDER ++ ")", [{return, binary}, group, trim]), parse_key(Tokens, []). diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 90e46461d..5393b6b33 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -217,7 +217,6 @@ create_resource(#{type := DB, []) of {ok, _} -> ResourceID; - {error, already_created} -> ResourceID; {error, Reason} -> {error, Reason} end; create_resource(#{type := DB, @@ -228,8 +227,8 @@ create_resource(#{type := DB, list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])), Config) of + {ok, already_created} -> ResourceID; {ok, _} -> ResourceID; - {error, already_created} -> ResourceID; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl index dea3dcae8..6fe75e4ce 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl @@ -77,10 +77,10 @@ create_bridge(#{name := Name}, Params) -> case emqx_resource:check_and_create( emqx_data_bridge:name_to_resource_id(Name), emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of + {ok, already_created} -> + {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; {ok, Data} -> update_config_and_reply(Name, BridgeType, Config, Data); - {error, already_created} -> - {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; {error, Reason0} -> Reason = emqx_resource_api:stringnify(Reason0), {500, #{code => 102, message => <<"create bridge ", Name/binary, diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl index d408a8062..4917833ec 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl @@ -73,8 +73,8 @@ load_bridge(#{name := Name, type := Type, config := Config}) -> case emqx_resource:create_local( emqx_data_bridge:name_to_resource_id(Name), emqx_data_bridge:resource_type(Type), Config) of + {ok, already_created} -> ok; {ok, _} -> ok; - {error, already_created} -> ok; {error, Reason} -> error({load_bridge, Reason}) end. diff --git a/apps/emqx_machine/etc/emqx_machine.conf b/apps/emqx_machine/etc/emqx_machine.conf index d21a19bc2..3ec09f2d4 100644 --- a/apps/emqx_machine/etc/emqx_machine.conf +++ b/apps/emqx_machine/etc/emqx_machine.conf @@ -89,6 +89,29 @@ node { ## Default: 23 backtrace_depth = 23 + cluster_call { + ## Time interval to retry after a failed call + ## + ## @doc node.cluster_call.retry_interval + ## ValueType: Duration + ## Default: 1s + retry_interval = 1s + ## Retain the maximum number of completed transactions (for queries) + ## + ## @doc node.cluster_call.max_history + ## ValueType: Integer + ## Range: [1, 500] + ## Default: 100 + max_history = 100 + ## Time interval to clear completed but stale transactions. + ## Ensure that the number of completed transactions is less than the max_history + ## + ## @doc node.cluster_call.cleanup_interval + ## ValueType: Duration + ## Default: 5m + cleanup_interval = 5m + } + } ##================================================================== diff --git a/apps/emqx_machine/include/emqx_cluster_rpc.hrl b/apps/emqx_machine/include/emqx_cluster_rpc.hrl new file mode 100644 index 000000000..5c04346b7 --- /dev/null +++ b/apps/emqx_machine/include/emqx_cluster_rpc.hrl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQ_X_CLUSTER_RPC_HRL). +-define(EMQ_X_CLUSTER_RPC_HRL, true). + +-define(CLUSTER_MFA, cluster_rpc_mfa). +-define(CLUSTER_COMMIT, cluster_rpc_commit). + +-record(cluster_rpc_mfa, { + tnx_id :: pos_integer(), + mfa :: mfa(), + created_at :: calendar:datetime(), + initiator :: node() +}). + +-record(cluster_rpc_commit, { + node :: node(), + tnx_id :: pos_integer() +}). + +-endif. diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl new file mode 100644 index 000000000..346ca5025 --- /dev/null +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -0,0 +1,298 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_cluster_rpc). +-behaviour(gen_server). + +%% API +-export([start_link/0, mnesia/1]). +-export([multicall/3, multicall/4, query/1, reset/0, status/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + handle_continue/2, code_change/3]). + +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-export([start_link/3]). +-endif. + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include("emqx_cluster_rpc.hrl"). + +-rlog_shard({?COMMON_SHARD, ?CLUSTER_MFA}). +-rlog_shard({?COMMON_SHARD, ?CLUSTER_COMMIT}). + +-define(CATCH_UP, catch_up). +-define(TIMEOUT, timer:minutes(1)). + +%%%=================================================================== +%%% API +%%%=================================================================== +mnesia(boot) -> + ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ + {type, ordered_set}, + {rlog_shard, ?COMMON_SHARD}, + {disc_copies, [node()]}, + {record_name, cluster_rpc_mfa}, + {attributes, record_info(fields, cluster_rpc_mfa)}]), + ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ + {type, set}, + {rlog_shard, ?COMMON_SHARD}, + {disc_copies, [node()]}, + {record_name, cluster_rpc_commit}, + {attributes, record_info(fields, cluster_rpc_commit)}]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(cluster_rpc_mfa, disc_copies), + ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). + +start_link() -> + RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000), + start_link(node(), ?MODULE, RetryMs). + +start_link(Node, Name, RetryMs) -> + gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). + +-spec multicall(Module, Function, Args) -> {ok, TnxId, term()} | {error, Reason} when + Module :: module(), + Function :: atom(), + Args :: [term()], + TnxId :: pos_integer(), + Reason :: string(). +multicall(M, F, A) -> + multicall(M, F, A, timer:minutes(2)). + +-spec multicall(Module, Function, Args, Timeout) -> {ok, TnxId, term()} |{error, Reason} when + Module :: module(), + Function :: atom(), + Args :: [term()], + TnxId :: pos_integer(), + Timeout :: timeout(), + Reason :: string(). +multicall(M, F, A, Timeout) -> + MFA = {initiate, {M, F, A}}, + case ekka_rlog:role() of + core -> gen_server:call(?MODULE, MFA, Timeout); + replicant -> + %% the initiate transaction must happened on core node + %% make sure MFA(in the transaction) and the transaction on the same node + %% don't need rpc again inside transaction. + case ekka_rlog_status:upstream_node(?COMMON_SHARD) of + {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); + disconnected -> {error, disconnected} + end + end. + +-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. +query(TnxId) -> + transaction(fun trans_query/1, [TnxId]). + +-spec reset() -> reset. +reset() -> gen_server:call(?MODULE, reset). + +-spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. +status() -> + transaction(fun trans_status/0, []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +init([Node, RetryMs]) -> + {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), + {ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}. + +%% @private +handle_continue(?CATCH_UP, State) -> + {noreply, State, catch_up(State)}. + +handle_call(reset, _From, State) -> + _ = ekka_mnesia:clear_table(?CLUSTER_COMMIT), + _ = ekka_mnesia:clear_table(?CLUSTER_MFA), + {reply, ok, State, {continue, ?CATCH_UP}}; + +handle_call({initiate, MFA}, _From, State = #{node := Node}) -> + case transaction(fun init_mfa/2, [Node, MFA]) of + {atomic, {ok, TnxId, Result}} -> + {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; + {aborted, Reason} -> + {reply, {error, Reason}, State, {continue, ?CATCH_UP}} + end; +handle_call(_, _From, State) -> + {reply, ok, State, catch_up(State)}. + +handle_cast(_, State) -> + {noreply, State, catch_up(State)}. + +handle_info({mnesia_table_event, _}, State) -> + {noreply, State, catch_up(State)}; +handle_info(_, State) -> + {noreply, State, catch_up(State)}. + +terminate(_Reason, _Data) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +catch_up(#{node := Node, retry_interval := RetryMs} = State) -> + case transaction(fun read_next_mfa/1, [Node]) of + {atomic, caught_up} -> ?TIMEOUT; + {atomic, {still_lagging, NextId, MFA}} -> + {Succeed, _} = apply_mfa(NextId, MFA), + case Succeed of + true -> + case transaction(fun commit/2, [Node, NextId]) of + {atomic, ok} -> catch_up(State); + Error -> + ?SLOG(error, #{ + msg => "failed to commit applied call", + applied_id => NextId, + error => Error}), + RetryMs + end; + false -> RetryMs + end; + {aborted, Reason} -> + ?SLOG(error, #{msg => "read_next_mfa transaction failed", error => Reason}), + RetryMs + end. + +read_next_mfa(Node) -> + NextId = + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [] -> + LatestId = get_latest_id(), + TnxId = max(LatestId - 1, 0), + commit(Node, TnxId), + ?SLOG(notice, #{ + msg => "New node first catch up and start commit.", + node => Node, tnx_id => TnxId}), + TnxId; + [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 + end, + case mnesia:read(?CLUSTER_MFA, NextId) of + [] -> caught_up; + [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA} + end. + +do_catch_up(ToTnxId, Node) -> + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [] -> + commit(Node, ToTnxId), + caught_up; + [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId =:= LastAppliedId -> + caught_up; + [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId > LastAppliedId -> + CurTnxId = LastAppliedId + 1, + [#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId), + case apply_mfa(CurTnxId, MFA) of + {true, _Result} -> ok = commit(Node, CurTnxId); + {false, Error} -> mnesia:abort(Error) + end; + [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> + Reason = lists:flatten(io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", + [Node, LastAppliedId, ToTnxId])), + ?SLOG(error, #{ + msg => "catch up failed!", + last_applied_id => LastAppliedId, + to_tnx_id => ToTnxId + }), + mnesia:abort(Reason) + end. + +commit(Node, TnxId) -> + ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write). + +get_latest_id() -> + case mnesia:last(?CLUSTER_MFA) of + '$end_of_table' -> 0; + Id -> Id + end. + +init_mfa(Node, MFA) -> + mnesia:write_lock_table(?CLUSTER_MFA), + LatestId = get_latest_id(), + ok = do_catch_up_in_one_trans(LatestId, Node), + TnxId = LatestId + 1, + MFARec = #cluster_rpc_mfa{tnx_id = TnxId, mfa = MFA, initiator = Node, created_at = erlang:localtime()}, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + ok = commit(Node, TnxId), + case apply_mfa(TnxId, MFA) of + {true, Result} -> {ok, TnxId, Result}; + {false, Error} -> mnesia:abort(Error) + end. + +do_catch_up_in_one_trans(LatestId, Node) -> + case do_catch_up(LatestId, Node) of + caught_up -> ok; + ok -> do_catch_up_in_one_trans(LatestId, Node) + end. + +transaction(Func, Args) -> + ekka_mnesia:transaction(?COMMON_SHARD, Func, Args). + +trans_status() -> + mnesia:foldl(fun(Rec, Acc) -> + #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, + case mnesia:read(?CLUSTER_MFA, TnxId) of + [MFARec] -> + #cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt} = MFARec, + [#{ + node => Node, + tnx_id => TnxId, + initiator => InitNode, + mfa => MFA, + created_at => CreatedAt + } | Acc]; + [] -> Acc + end end, [], ?CLUSTER_COMMIT). + +trans_query(TnxId) -> + case mnesia:read(?CLUSTER_MFA, TnxId) of + [] -> mnesia:abort(not_found); + [#cluster_rpc_mfa{mfa = MFA, initiator = InitNode, created_at = CreatedAt}] -> + #{tnx_id => TnxId, mfa => MFA, initiator => InitNode, created_at => CreatedAt} + end. + +apply_mfa(TnxId, {M, F, A} = MFA) -> + try + Res = erlang:apply(M, F, A), + Succeed = + case Res of + ok -> + ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), + true; + {ok, _} -> + ?SLOG(notice, #{msg => "succeeded to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), + true; + _ -> + ?SLOG(error, #{msg => "failed to apply MFA", tnx_id => TnxId, mfa => MFA, result => Res}), + false + end, + {Succeed, Res} + catch + C : E -> + ?SLOG(critical, #{msg => "crash to apply MFA", tnx_id => TnxId, mfa => MFA, exception => C, reason => E}), + {false, lists:flatten(io_lib:format("TnxId(~p) apply MFA(~p) crash", [TnxId, MFA]))} + end. diff --git a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl new file mode 100644 index 000000000..803b7f9fc --- /dev/null +++ b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl @@ -0,0 +1,89 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_cluster_rpc_handler). + +-behaviour(gen_server). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include("emqx_cluster_rpc.hrl"). + +-export([start_link/0, start_link/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +start_link() -> + MaxHistory = application:get_env(emqx_machine, cluster_call_max_history, 100), + CleanupMs = application:get_env(emqx_machine, cluster_call_cleanup_interval, 5*60*1000), + start_link(MaxHistory, CleanupMs). + +start_link(MaxHistory, CleanupMs) -> + State = #{max_history => MaxHistory, cleanup_ms => CleanupMs, timer => undefined}, + gen_server:start_link(?MODULE, [State], []). + +%%%=================================================================== +%%% Spawning and gen_server implementation +%%%=================================================================== + +init([State]) -> + {ok, ensure_timer(State)}. + +handle_call(Req, _From, State) -> + ?LOG(error, "unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> + case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/1, [MaxHistory]) of + {atomic, ok} -> ok; + Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) + end, + {noreply, ensure_timer(State), hibernate}; + +handle_info(Info, State) -> + ?LOG(error, "unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #{timer := TRef}) -> + emqx_misc:cancel_timer(TRef). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +ensure_timer(State = #{cleanup_ms := Ms}) -> + State#{timer := emqx_misc:start_timer(Ms, del_stale_mfa)}. + +%% @doc Keep the latest completed 100 records for querying and troubleshooting. +del_stale_mfa(MaxHistory) -> + DoneId = + mnesia:foldl(fun(Rec, Min) -> min(Rec#cluster_rpc_commit.tnx_id, Min) end, + infinity, ?CLUSTER_COMMIT), + delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory). + +delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; +delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> + delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count); +delete_stale_mfa(CurrId, DoneId, Count) when Count > 0 -> + delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count - 1); +delete_stale_mfa(CurrId, DoneId, Count) when Count =< 0 -> + mnesia:delete(?CLUSTER_MFA, CurrId, write), + delete_stale_mfa(mnesia:prev(?CLUSTER_MFA, CurrId), DoneId, Count - 1). diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl index ac21ed56e..ee8cc4978 100644 --- a/apps/emqx_machine/src/emqx_machine_schema.erl +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -138,6 +138,14 @@ fields("node") -> , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)} , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)} , {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)} + , {"cluster_call", ref("cluster_call")} + ]; + + +fields("cluster_call") -> + [ {"retry_interval", t(emqx_schema:duration(), "emqx_machine.retry_interval", "1s")} + , {"max_history", t(range(1, 500), "emqx_machine.max_history", 100)} + , {"cleanup_interval", t(emqx_schema:duration(), "emqx_machine.cleanup_interval", "5m")} ]; fields("rpc") -> diff --git a/apps/emqx_machine/src/emqx_machine_sup.erl b/apps/emqx_machine/src/emqx_machine_sup.erl index 0810eb267..798beee1c 100644 --- a/apps/emqx_machine/src/emqx_machine_sup.erl +++ b/apps/emqx_machine/src/emqx_machine_sup.erl @@ -31,7 +31,9 @@ start_link() -> init([]) -> GlobalGC = child_worker(emqx_global_gc, [], permanent), Terminator = child_worker(emqx_machine_terminator, [], transient), - Children = [GlobalGC, Terminator], + ClusterRpc = child_worker(emqx_cluster_rpc, [], permanent), + ClusterHandler = child_worker(emqx_cluster_rpc_handler, [], permanent), + Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler], SupFlags = #{strategy => one_for_one, intensity => 100, period => 10 diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl new file mode 100644 index 000000000..26ad28f3e --- /dev/null +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -0,0 +1,240 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_rpc_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-define(NODE1, emqx_cluster_rpc). +-define(NODE2, emqx_cluster_rpc2). +-define(NODE3, emqx_cluster_rpc3). + +all() -> [ + t_base_test, + t_commit_fail_test, + t_commit_crash_test, + t_commit_ok_but_apply_fail_on_other_node, + t_commit_ok_apply_fail_on_other_node_then_recover, + t_del_stale_mfa +]. +suite() -> [{timetrap, {minutes, 3}}]. +groups() -> []. + +init_per_suite(Config) -> + application:load(emqx), + application:load(emqx_machine), + ok = ekka:start(), + ok = ekka_rlog:wait_for_shards([emqx_common_shard], infinity), + application:set_env(emqx_machine, cluster_call_max_history, 100), + application:set_env(emqx_machine, cluster_call_clean_interval, 1000), + application:set_env(emqx_machine, cluster_call_retry_interval, 900), + %%dbg:tracer(), + %%dbg:p(all, c), + %%dbg:tpl(emqx_cluster_rpc, cx), + %%dbg:tpl(gen_statem, loop_receive, cx), + %%dbg:tpl(gen_statem, loop_state_callback, cx), + %%dbg:tpl(gen_statem, loop_callback_mode_result, cx), + Config. + +end_per_suite(_Config) -> + ekka:stop(), + ekka_mnesia:ensure_stopped(), + ekka_mnesia:delete_schema(), + %%dbg:stop(), + ok. + +init_per_testcase(_TestCase, Config) -> + start(), + Config. + +end_per_testcase(_Config) -> + stop(), + ok. + +t_base_test(_Config) -> + ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), + Pid = self(), + MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), + {atomic, Query} = emqx_cluster_rpc:query(TnxId), + ?assertEqual(MFA, maps:get(mfa, Query)), + ?assertEqual(node(), maps:get(initiator, Query)), + ?assert(maps:is_key(created_at, Query)), + ?assertEqual(ok, receive_msg(3, test)), + sleep(400), + {atomic, Status} = emqx_cluster_rpc:status(), + ?assertEqual(3, length(Status)), + ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)), + ok. + +t_commit_fail_test(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]}, + {error, "MFA return not ok"} = emqx_cluster_rpc:multicall(M, F, A), + ?assertEqual({atomic, []}, emqx_cluster_rpc:status()), + ok. + +t_commit_crash_test(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {M, F, A} = {?MODULE, no_exist_function, []}, + Error = emqx_cluster_rpc:multicall(M, F, A), + ?assertEqual({error, "TnxId(1) apply MFA({emqx_cluster_rpc_SUITE,no_exist_function,[]}) crash"}, Error), + ?assertEqual({atomic, []}, emqx_cluster_rpc:status()), + ok. + +t_commit_ok_but_apply_fail_on_other_node(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), + {atomic, [Status]} = emqx_cluster_rpc:status(), + ?assertEqual(MFA, maps:get(mfa, Status)), + ?assertEqual(node(), maps:get(node, Status)), + erlang:send(?NODE2, test), + Res = gen_statem:call(?NODE2, {initiate, {M, F, A}}), + ?assertEqual({error, "MFA return not ok"}, Res), + ok. + +t_catch_up_status_handle_next_commit(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, 2} = gen_statem:call(?NODE2, {initiate, {M, F, A}}), + ok. + +t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + Now = erlang:system_time(second), + {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, + {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"]), + {atomic, [Status|L]} = emqx_cluster_rpc:status(), + ?assertEqual([], L), + ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), + ?assertEqual(node(), maps:get(node, Status)), + sleep(3000), + {atomic, [Status1]} = emqx_cluster_rpc:status(), + ?assertEqual(Status, Status1), + sleep(2600), + {atomic, NewStatus} = emqx_cluster_rpc:status(), + ?assertEqual(3, length(NewStatus)), + Pid = self(), + MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M1, F1, A1), + {atomic, Query} = emqx_cluster_rpc:query(TnxId), + ?assertEqual(MFAEcho, maps:get(mfa, Query)), + ?assertEqual(node(), maps:get(initiator, Query)), + ?assert(maps:is_key(created_at, Query)), + ?assertEqual(ok, receive_msg(3, test)), + ok. + +t_del_stale_mfa(_Config) -> + emqx_cluster_rpc:reset(), + {atomic, []} = emqx_cluster_rpc:status(), + MFA = {M, F, A} = {io, format, ["test"]}, + Keys = lists:seq(1, 50), + Keys2 = lists:seq(51, 150), + Ids = + [begin + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), + TnxId end || _ <- Keys], + ?assertEqual(Keys, Ids), + Ids2 = + [begin + {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), + TnxId end || _ <- Keys2], + ?assertEqual(Keys2, Ids2), + sleep(1200), + [begin + ?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I)) + end || I <- lists:seq(1, 50)], + [begin + {atomic, Map} = emqx_cluster_rpc:query(I), + ?assertEqual(MFA, maps:get(mfa, Map)), + ?assertEqual(node(), maps:get(initiator, Map)), + ?assert(maps:is_key(created_at, Map)) + end || I <- lists:seq(51, 150)], + ok. + +start() -> + {ok, Pid1} = emqx_cluster_rpc:start_link(), + {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), + {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), + {ok, Pid4} = emqx_cluster_rpc_handler:start_link(100, 500), + {ok, [Pid1, Pid2, Pid3, Pid4]}. + +stop() -> + [begin + case erlang:whereis(N) of + undefined -> ok; + P -> + erlang:unlink(P), + erlang:exit(P, kill) + end end || N <- [?NODE1, ?NODE2, ?NODE3]]. + +receive_msg(0, _Msg) -> ok; +receive_msg(Count, Msg) when Count > 0 -> + receive Msg -> + receive_msg(Count - 1, Msg) + after 800 -> + timeout + end. + +echo(Pid, Msg) -> + erlang:send(Pid, Msg), + ok. + +failed_on_node(Pid) -> + case Pid =:= self() of + true -> ok; + false -> "MFA return not ok" + end. + +failed_on_node_by_odd(Pid) -> + case Pid =:= self() of + true -> ok; + false -> + catch ets:new(test, [named_table, set, public]), + Num = ets:update_counter(test, self(), {2, 1}, {self(), 1}), + case Num rem 2 =:= 0 of + false -> "MFA return not ok"; + true -> ok + end + end. + +failed_on_other_recover_after_5_second(Pid, CreatedAt) -> + Now = erlang:system_time(second), + case Pid =:= self() of + true -> ok; + false -> + case Now < CreatedAt + 5 of + true -> "MFA return not ok"; + false -> ok + end + end. + +sleep(Second) -> + receive _ -> ok + after Second -> timeout + end. diff --git a/apps/emqx_resource/include/emqx_resource_utils.hrl b/apps/emqx_resource/include/emqx_resource_utils.hrl index a20a17e89..4c3a2a749 100644 --- a/apps/emqx_resource/include/emqx_resource_utils.hrl +++ b/apps/emqx_resource/include/emqx_resource_utils.hrl @@ -13,32 +13,6 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). - --define(CLUSTER_CALL(Func, Args, ResParttern), -%% ekka_mnesia:running_nodes() - fun() -> - case LocalResult = erlang:apply(?MODULE, Func, Args) of - ResParttern -> - case rpc:multicall(nodes(), ?MODULE, Func, Args, 5000) of - {ResL, []} -> - Filter = fun - (ResParttern) -> false; - ({badrpc, {'EXIT', {undef, [{?MODULE, Func0, _, []}]}}}) - when Func0 =:= Func -> false; - (_) -> true - end, - case lists:filter(Filter, ResL) of - [] -> LocalResult; - ErrL -> {error, ErrL} - end; - {ResL, BadNodes} -> - {error, {failed_on_nodes, BadNodes, ResL}} - end; - ErrorResult -> - {error, ErrorResult} - end - end()). -define(SAFE_CALL(_EXP_), ?SAFE_CALL(_EXP_, _ = do_nothing)). @@ -50,4 +24,4 @@ _EXP_ON_FAIL_, {error, {_EXCLASS_, _EXCPTION_, _ST_}} end - end()). \ No newline at end of file + end()). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 1fce5e122..cad32bcb2 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -155,19 +155,19 @@ query_failed({_, {OnFailed, Args}}) -> %% APIs for resource instances %% ================================================================================= -spec create(instance_id(), resource_type(), resource_config()) -> - {ok, resource_data()} | {error, Reason :: term()}. + {ok, resource_data() |'already_created'} | {error, Reason :: term()}. create(InstId, ResourceType, Config) -> - ?CLUSTER_CALL(create_local, [InstId, ResourceType, Config], {ok, _}). + cluster_call(create_local, [InstId, ResourceType, Config]). -spec create_local(instance_id(), resource_type(), resource_config()) -> - {ok, resource_data()} | {error, Reason :: term()}. + {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create_local(InstId, ResourceType, Config) -> call_instance(InstId, {create, InstId, ResourceType, Config}). -spec create_dry_run(instance_id(), resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(InstId, ResourceType, Config) -> - ?CLUSTER_CALL(create_dry_run_local, [InstId, ResourceType, Config]). + cluster_call(create_dry_run_local, [InstId, ResourceType, Config]). -spec create_dry_run_local(instance_id(), resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -177,7 +177,7 @@ create_dry_run_local(InstId, ResourceType, Config) -> -spec update(instance_id(), resource_type(), resource_config(), term()) -> {ok, resource_data()} | {error, Reason :: term()}. update(InstId, ResourceType, Config, Params) -> - ?CLUSTER_CALL(update_local, [InstId, ResourceType, Config, Params], {ok, _}). + cluster_call(update_local, [InstId, ResourceType, Config, Params]). -spec update_local(instance_id(), resource_type(), resource_config(), term()) -> {ok, resource_data()} | {error, Reason :: term()}. @@ -186,7 +186,7 @@ update_local(InstId, ResourceType, Config, Params) -> -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> - ?CLUSTER_CALL(remove_local, [InstId]). + cluster_call(remove_local, [InstId]). -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. remove_local(InstId) -> @@ -285,7 +285,7 @@ check_config(ResourceType, RawConfigTerm) -> end. -spec check_and_create(instance_id(), resource_type(), raw_resource_config()) -> - {ok, resource_data()} | {error, term()}. + {ok, resource_data() |'already_created'} | {error, term()}. check_and_create(InstId, ResourceType, RawConfig) -> check_and_do(ResourceType, RawConfig, fun(InstConf) -> create(InstId, ResourceType, InstConf) end). @@ -335,3 +335,9 @@ safe_apply(Func, Args) -> str(S) when is_binary(S) -> binary_to_list(S); str(S) when is_list(S) -> S. + +cluster_call(Func, Args) -> + case emqx_cluster_rpc:multicall(?MODULE, Func, Args) of + {ok, _TxnId, Result} -> Result; + Failed -> Failed + end. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 8e0624c75..84b5a1f7c 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -162,7 +162,7 @@ do_update(InstId, ResourceType, NewConfig, Params) -> do_create(InstId, ResourceType, Config) -> case lookup(InstId) of - {ok, _} -> {error, already_created}; + {ok, _} -> {ok, already_created}; _ -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 7a37abbee..3fab5958d 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -443,9 +443,9 @@ create_resource(Context, #{type := DB} = Config) -> ResourceID, list_to_existing_atom(io_lib:format("~s_~s", [emqx_connector, DB])), Config) of - {ok, _} -> + {ok, already_created} -> Context#{resource_id => ResourceID}; - {error, already_created} -> + {ok, _} -> Context#{resource_id => ResourceID}; {error, Reason} -> error({load_config_error, Reason}) diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 568724263..760495f6b 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -155,22 +155,6 @@ end end()). --define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)). - --define(CLUSTER_CALL(Func, Args, ResParttern), - fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of - {ResL, []} -> - case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of - [] -> ResL; - ErrL -> - ?LOG(error, "cluster_call error found, ResL: ~p", [ResL]), - throw({Func, ErrL}) - end; - {ResL, BadNodes} -> - ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]), - throw({Func, {failed_on_nodes, BadNodes}}) - end end()). - %% Tables -define(RULE_TAB, emqx_rule). -define(ACTION_TAB, emqx_rule_action). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index c2ccf2c29..0b3ffb603 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -216,7 +216,7 @@ delete_rule(RuleId) -> case emqx_rule_registry:get_rule(RuleId) of {ok, Rule = #rule{actions = Actions}} -> try - _ = ?CLUSTER_CALL(clear_rule, [Rule]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_rule, [Rule]), ok = emqx_rule_registry:remove_rule(Rule) catch Error:Reason:ST -> @@ -242,7 +242,7 @@ create_resource(#{type := Type, config := Config0} = Params) -> ok = emqx_rule_registry:add_resource(Resource), %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - catch _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]), + catch _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]), {ok, Resource}; not_found -> {error, {resource_type_not_found, Type}} @@ -280,7 +280,7 @@ do_check_and_update_resource(#{id := Id, type := Type, description := NewDescrip Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec), case test_resource(#{type => Type, config => NewConfig}) of ok -> - _ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]), + _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]), emqx_rule_registry:add_resource(#resource{ id = Id, type = Type, @@ -319,8 +319,8 @@ test_resource(#{type := Type, config := Config0}) -> Config = emqx_rule_validator:validate_params(Config0, ParamSpec), ResId = resource_id(), try - _ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]), - _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), + _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), ok catch throw:Reason -> {error, Reason} @@ -359,7 +359,7 @@ delete_resource(ResId) -> try case emqx_rule_registry:remove_resource(ResId) of ok -> - _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), ok; {error, _} = R -> R end @@ -426,7 +426,7 @@ prepare_action(#{name := Name, args := Args0} = Action, NeedInit) -> ActionInstId = maps:get(id, Action, action_instance_id(Name)), case NeedInit of true -> - _ = ?CLUSTER_CALL(init_action, [Mod, Create, ActionInstId, + _ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId, with_resource_params(Args)]), ok; false -> ok @@ -485,7 +485,7 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) -> may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) -> %% prepare new actions before removing old ones NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)), - _ = ?CLUSTER_CALL(clear_actions, [OldActions]), + _ = emqx_rule_utils:cluster_call(?MODULE, clear_actions, [OldActions]), may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params)); may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params Rule. @@ -631,7 +631,7 @@ refresh_actions(Actions, Pred) -> true -> {ok, #action{module = Mod, on_create = Create}} = emqx_rule_registry:find_action(ActName), - _ = ?CLUSTER_CALL(init_action, [Mod, Create, Id, with_resource_params(Args)]), + _ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]), refresh_actions(Fallbacks, Pred); false -> ok end diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index c0bd5de7b..096534585 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -221,7 +221,7 @@ remove_rules(Rules) -> %% @private insert_rule(Rule) -> - _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), + _ = emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -231,7 +231,7 @@ delete_rule(RuleId) when is_binary(RuleId) -> not_found -> ok end; delete_rule(Rule) -> - _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), + _ = emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]), mnesia:delete_object(?RULE_TAB, Rule, write). load_hooks_for_rule(#rule{for = Topics}) -> @@ -476,10 +476,11 @@ code_change(_OldVsn, State, _Extra) -> get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). - %% Wrapping ets to a r/o transaction to avoid reading inconsistent + %% Wrapping ets to a transaction to avoid reading inconsistent + %% ( nest cluster_call transaction, no a r/o transaction) %% data during shard bootstrap {atomic, Ret} = - ekka_mnesia:ro_transaction(?RULE_ENGINE_SHARD, + ekka_mnesia:transaction(?RULE_ENGINE_SHARD, fun() -> ets:tab2list(Tab) end), diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 3791b1386..2d978ee0d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -55,6 +55,8 @@ , can_topic_match_oneof/2 ]). +-export([cluster_call/3]). + -compile({no_auto_import, [ float/1 ]}). @@ -356,3 +358,7 @@ can_topic_match_oneof(Topic, Filters) -> lists:any(fun(Fltr) -> emqx_topic:match(Topic, Fltr) end, Filters). + +cluster_call(Module, Func, Args) -> + {ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args), + Result. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index a056d0c26..47eb4faad 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -30,7 +30,7 @@ all() -> [ {group, engine} , {group, actions} - , {group, api} +%% , {group, api} , {group, cli} , {group, funcs} , {group, registry} @@ -148,6 +148,7 @@ groups() -> %%------------------------------------------------------------------------------ init_per_suite(Config) -> + application:load(emqx_machine), ok = ekka_mnesia:start(), ok = emqx_rule_registry:mnesia(boot), ok = emqx_ct_helpers:start_apps([emqx_rule_engine], fun set_special_configs/1), @@ -181,6 +182,7 @@ end_per_group(_Groupname, _Config) -> %%------------------------------------------------------------------------------ init_per_testcase(t_events, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = emqx_rule_engine:load_providers(), init_events_counters(), ok = emqx_rule_registry:register_resource_types([make_simple_resource_type(simple_resource_type)]), @@ -214,6 +216,7 @@ init_per_testcase(Test, Config) ;Test =:= t_sqlselect_multi_actoins_3_1 ;Test =:= t_sqlselect_multi_actoins_4 -> + emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = emqx_rule_engine:load_providers(), ok = emqx_rule_registry:add_action( #action{name = 'crash_action', app = ?APP, @@ -252,6 +255,7 @@ init_per_testcase(Test, Config) {connsql, SQL} | Config]; init_per_testcase(_TestCase, Config) -> + emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = built_in, diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index 67c59e26c..62f538f43 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -39,6 +39,7 @@ groups() -> ]. init_per_suite(Config) -> + application:load(emqx_machine), ok = ekka_mnesia:start(), ok = emqx_rule_registry:mnesia(boot), Config. @@ -65,6 +66,7 @@ end_per_testcase(_, Config) -> t_restart_resource(_) -> {ok, _} = emqx_rule_monitor:start_link(), + emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc,1000), ok = emqx_rule_registry:register_resource_types( [#resource_type{ name = test_res_1,