diff --git a/Dockerfile.ubuntu20.04.runner b/Dockerfile.ubuntu20.04.runner new file mode 100644 index 000000000..124021c89 --- /dev/null +++ b/Dockerfile.ubuntu20.04.runner @@ -0,0 +1,42 @@ +## This is a fast-build Dockerfile only for testing +FROM ubuntu:20.04 +ARG PROFILE=emqx + +RUN apt-get update; \ + apt-get install -y --no-install-recommends ca-certificates procps; \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir /opt/emqx +RUN date > /opt/emqx/BUILD_TIME +COPY _build/${PROFILE}/rel/emqx /opt/emqx +RUN ln -s /opt/emqx/bin/* /usr/local/bin/ +COPY deploy/docker/docker-entrypoint.sh /usr/bin/ + +WORKDIR /opt/emqx + + +RUN groupadd -r -g 1000 emqx; \ + useradd -r -m -u 1000 -g emqx emqx; \ + chgrp -Rf emqx /opt/emqx; \ + chmod -Rf g+w /opt/emqx; \ + chown -Rf emqx /opt/emqx + +USER emqx + +VOLUME ["/opt/emqx/log", "/opt/emqx/data"] + +# emqx will occupy these port: +# - 1883 port for MQTT +# - 8081 for mgmt API +# - 8083 for WebSocket/HTTP +# - 8084 for WSS/HTTPS +# - 8883 port for MQTT(SSL) +# - 11883 port for internal MQTT/TCP +# - 18083 for dashboard +# - 4370 default Erlang distrbution port +# - 5369 for backplain gen_rpc +EXPOSE 1883 8081 8083 8084 8883 11883 18083 4370 5369 + +ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"] + +CMD ["/opt/emqx/bin/emqx", "foreground"] diff --git a/Dockerfile.ubuntu20.04.runner.dockerignore b/Dockerfile.ubuntu20.04.runner.dockerignore new file mode 100644 index 000000000..bf291d51e --- /dev/null +++ b/Dockerfile.ubuntu20.04.runner.dockerignore @@ -0,0 +1,4 @@ +* +!_build/emqx +!_build/emqx-enterprise +!deploy diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 5e50189be..ab98ded69 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -233,7 +233,8 @@ put(Config) -> erase(RootName) -> persistent_term:erase(?PERSIS_KEY(?CONF, bin(RootName))), - persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))). + persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))), + ok. -spec put(emqx_map_lib:config_key_path(), term()) -> ok. put(KeyPath, Config) -> diff --git a/apps/emqx/src/emqx_metrics_worker.erl b/apps/emqx/src/emqx_metrics_worker.erl index 575dcca6c..21e73ff51 100644 --- a/apps/emqx/src/emqx_metrics_worker.erl +++ b/apps/emqx/src/emqx_metrics_worker.erl @@ -163,8 +163,7 @@ get_counters(Name, Id) -> reset_counters(Name, Id) -> Indexes = maps:values(get_indexes(Name, Id)), Ref = get_ref(Name, Id), - [counters:put(Ref, Idx, 0) || Idx <- Indexes], - ok. + lists:foreach(fun(Idx) -> counters:put(Ref, Idx, 0) end, Indexes). -spec get_metrics(handler_name(), metric_id()) -> metrics(). get_metrics(Name, Id) -> diff --git a/apps/emqx/test/emqx_bpapi_static_checks.erl b/apps/emqx/test/emqx_bpapi_static_checks.erl index 6be38df0d..6ca0f3ec2 100644 --- a/apps/emqx/test/emqx_bpapi_static_checks.erl +++ b/apps/emqx/test/emqx_bpapi_static_checks.erl @@ -53,8 +53,7 @@ -define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc"). %% List of known functions also known to do RPC: -define(RPC_FUNCTIONS, - "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5, " - "emqx_plugin_libs_rule:cluster_call/3" + "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5" ). %% List of functions in the RPC backend modules that we can ignore: @@ -63,11 +62,9 @@ %% List of business-layer functions that are exempt from the checks: %% erlfmt-ignore -define(EXEMPTIONS, - "emqx_mgmt_api:do_query/6," % Reason: legacy code. A fun and a QC query are - % passed in the args, it's futile to try to statically - % check it - "emqx_plugin_libs_rule:cluster_call/3" % Reason: some sort of external plugin API that we - % don't want to break? + "emqx_mgmt_api:do_query/6" % Reason: legacy code. A fun and a QC query are + % passed in the args, it's futile to try to statically + % check it ). -define(XREF, myxref). diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index da49e31a5..be0e6232c 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -30,7 +30,11 @@ skip_failed_commit/1, fast_forward_to_commit/2 ]). --export([get_node_tnx_id/1, latest_tnx_id/0]). +-export([ + get_node_tnx_id/1, + latest_tnx_id/0, + make_initiate_call_req/3 +]). -export([ init/1, @@ -44,7 +48,7 @@ -export([get_tables_status/0]). --export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]). +-export_type([tnx_id/0, succeed_num/0]). -ifdef(TEST). -compile(export_all). @@ -56,19 +60,21 @@ -include_lib("emqx/include/logger.hrl"). -include("emqx_conf.hrl"). +-define(INITIATE(MFA, LatestIdLastSeen), {initiate, MFA, LatestIdLastSeen}). -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). --type txn_id() :: pos_integer(). +-type tnx_id() :: pos_integer(). -type succeed_num() :: pos_integer() | all. -type multicall_return(Result) :: - {ok, txn_id(), Result} - | {error, term()} - | {retry, txn_id(), Result, node()}. + {ok, tnx_id(), Result} + | {init_failure, term()} + | {peers_lagging, tnx_id(), Result, [node()]}. -type multicall_return() :: multicall_return(_). +-type init_call_req() :: ?INITIATE({module(), atom(), list()}, tnx_id()). %%%=================================================================== %%% API @@ -102,27 +108,73 @@ start_link(Node, Name, RetryMs) -> {error, Reason} end. -%% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok. -%% return {error, MFARes} when the first MFA result is no ok or {ok, term()}. -%% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok. --spec multicall(module(), atom(), list()) -> multicall_return(). +%% @doc Initiate a local call (or core node), +%% then async-ly replicate the call to peer nodes in the cluster. +%% The evaluation result of the provided MFA is returned, +%% the result is expected to be `ok | {ok, _}' to indicate success, +%% and `{error, _}' to indicate failure. +%% +%% The excpetion of the MFA evaluation is captured and translated +%% into an `{error, _}' tuple. +%% This call tries to wait for all peer nodes to be in-sync before +%% returning the result. +%% +%% In case of partial success, an `error' level log is emitted +%% but the initial localy apply result is returned. +-spec multicall(module(), atom(), list()) -> term(). multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). --spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return(). -multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 -> - MFA = {initiate, {M, F, A}}, +-spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> term(). +multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse RequiredSyncs >= 1 -> + case do_multicall(M, F, A, RequiredSyncs, Timeout) of + {ok, _TxnId, Result} -> + Result; + {init_failure, Error} -> + Error; + {peers_lagging, TnxId, Res, Nodes} -> + %% The init MFA return ok, but some other nodes failed. + ?SLOG(error, #{ + msg => "cluster_rpc_peers_lagging", + lagging_nodes => Nodes, + tnx_id => TnxId + }), + Res + end. + +%% Return {ok, TnxId, MFARes} the first MFA result when all MFA run ok. +%% return {init_failure, Error} when the initial MFA result is no ok or {ok, term()}. +%% return {peers_lagging, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok. +-spec do_multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return(). +do_multicall(M, F, A, RequiredSyncs, Timeout) when + RequiredSyncs =:= all orelse RequiredSyncs >= 1 +-> + %% Idealy 'LatestId' should be provided by the multicall originator, + %% which is the viewer of the state e.g. + %% * Sysadmin who issues CLI-commands or REST-API calls to make config changes + %% * Dashboard viewer who is making decision based on what they can see from the UI + %% To reach the ideal state, it would require adding transaction ID to each and + %% every view/GET requests and also provide the ID as a part of the view/GET responses. + %% + %% To keep things simple, we try to get the 'old' view when a multicall request + %% is received as early as possible. + %% + %% Reason to do this: + %% The 'initiate' call handler tries to take a table lock (cluster-wide) before + %% bumping the transaction ID. While waiting for the lock, the ID might have been + %% bumpped by another node in the cluster. + InitReq = make_initiate_call_req(M, F, A), Begin = erlang:monotonic_time(), InitRes = case mria_rlog:role() of core -> - gen_server:call(?MODULE, MFA, Timeout); + gen_server:call(?MODULE, InitReq, Timeout); replicant -> %% the initiate transaction must happened on core node %% make sure MFA(in the transaction) and the transaction on the same node %% don't need rpc again inside transaction. case mria_status:upstream_node(?CLUSTER_RPC_SHARD) of - {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); + {ok, Node} -> gen_server:call({?MODULE, Node}, InitReq, Timeout); disconnected -> {error, disconnected} end end, @@ -132,23 +184,23 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu RetryTimeout = ceil(3 * max(MinDelay, get_retry_ms())), OkOrFailed = case InitRes of - {ok, _TnxId, _} when RequireNum =:= 1 -> + {ok, _TnxId, _} when RequiredSyncs =:= 1 -> ok; - {ok, TnxId, _} when RequireNum =:= all -> + {ok, TnxId, _} when RequiredSyncs =:= all -> wait_for_all_nodes_commit(TnxId, MinDelay, RetryTimeout); - {ok, TnxId, _} when is_integer(RequireNum) -> - wait_for_nodes_commit(RequireNum, TnxId, MinDelay, RetryTimeout); + {ok, TnxId, _} when is_integer(RequiredSyncs) -> + wait_for_nodes_commit(RequiredSyncs, TnxId, MinDelay, RetryTimeout); Error -> Error end, case OkOrFailed of ok -> InitRes; - {error, Error0} -> - {error, Error0}; - {retry, Node0} -> + {init_failure, Error0} -> + {init_failure, Error0}; + {peers_lagging, Nodes} -> {ok, TnxId0, MFARes} = InitRes, - {retry, TnxId0, MFARes, Node0} + {peers_lagging, TnxId0, MFARes, Nodes} end. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. @@ -167,6 +219,11 @@ latest_tnx_id() -> {atomic, TnxId} = transaction(fun get_latest_id/0, []), TnxId. +-spec make_initiate_call_req(module(), atom(), list()) -> init_call_req(). +make_initiate_call_req(M, F, A) -> + TnxId = get_latest_id(dirty), + ?INITIATE({M, F, A}, TnxId). + -spec get_node_tnx_id(node()) -> integer(). get_node_tnx_id(Node) -> case mnesia:wread({?CLUSTER_COMMIT, Node}) of @@ -232,12 +289,12 @@ handle_call(reset, _From, State) -> _ = mria:clear_table(?CLUSTER_COMMIT), _ = mria:clear_table(?CLUSTER_MFA), {reply, ok, State, {continue, ?CATCH_UP}}; -handle_call({initiate, MFA}, _From, State = #{node := Node}) -> - case transaction(fun init_mfa/2, [Node, MFA]) of +handle_call(?INITIATE(MFA, LatestId), _From, State = #{node := Node}) -> + case transaction(fun init_mfa/3, [Node, MFA, LatestId]) of {atomic, {ok, TnxId, Result}} -> {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; - {aborted, Reason} -> - {reply, {error, Reason}, State, {continue, ?CATCH_UP}} + {aborted, Error} -> + {reply, {init_failure, Error}, State, {continue, ?CATCH_UP}} end; handle_call(skip_failed_commit, _From, State = #{node := Node}) -> Timeout = catch_up(State, true), @@ -273,7 +330,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> - {Succeed, _} = apply_mfa(NextId, MFA), + {Succeed, _} = apply_mfa(NextId, MFA, catch_up), case Succeed orelse SkipResult of true -> case transaction(fun commit/2, [Node, NextId]) of @@ -316,35 +373,6 @@ read_next_mfa(Node) -> [#cluster_rpc_mfa{mfa = MFA}] -> {still_lagging, NextId, MFA} end. -do_catch_up(ToTnxId, Node) -> - case mnesia:wread({?CLUSTER_COMMIT, Node}) of - [] -> - commit(Node, ToTnxId), - caught_up; - [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId =:= LastAppliedId -> - caught_up; - [#cluster_rpc_commit{tnx_id = LastAppliedId}] when ToTnxId > LastAppliedId -> - CurTnxId = LastAppliedId + 1, - [#cluster_rpc_mfa{mfa = MFA}] = mnesia:read(?CLUSTER_MFA, CurTnxId), - case apply_mfa(CurTnxId, MFA) of - {true, _Result} -> ok = commit(Node, CurTnxId); - {false, Error} -> mnesia:abort(Error) - end; - [#cluster_rpc_commit{tnx_id = LastAppliedId}] -> - Reason = lists:flatten( - io_lib:format( - "~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", - [Node, LastAppliedId, ToTnxId] - ) - ), - ?SLOG(error, #{ - msg => "catch_up_failed!", - last_applied_id => LastAppliedId, - to_tnx_id => ToTnxId - }), - mnesia:abort(Reason) - end. - commit(Node, TnxId) -> ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write). @@ -365,33 +393,44 @@ do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> end. get_latest_id() -> - case mnesia:last(?CLUSTER_MFA) of + get_latest_id(tnx). + +get_latest_id(IsolationLevel) -> + F = + case IsolationLevel of + tnx -> fun mnesia:last/1; + dirty -> fun mnesia:dirty_last/1 + end, + case F(?CLUSTER_MFA) of '$end_of_table' -> 0; Id -> Id end. -init_mfa(Node, MFA) -> +init_mfa(Node, MFA, LatestIdLastSeen) -> mnesia:write_lock_table(?CLUSTER_MFA), LatestId = get_latest_id(), - ok = do_catch_up_in_one_trans(LatestId, Node), - TnxId = LatestId + 1, - MFARec = #cluster_rpc_mfa{ - tnx_id = TnxId, - mfa = MFA, - initiator = Node, - created_at = erlang:localtime() - }, - ok = mnesia:write(?CLUSTER_MFA, MFARec, write), - ok = commit(Node, TnxId), - case apply_mfa(TnxId, MFA) of - {true, Result} -> {ok, TnxId, Result}; - {false, Error} -> mnesia:abort(Error) - end. - -do_catch_up_in_one_trans(LatestId, Node) -> - case do_catch_up(LatestId, Node) of - caught_up -> ok; - ok -> do_catch_up_in_one_trans(LatestId, Node) + case LatestIdLastSeen =:= LatestId of + true -> + TnxId = LatestId + 1, + MFARec = #cluster_rpc_mfa{ + tnx_id = TnxId, + mfa = MFA, + initiator = Node, + created_at = erlang:localtime() + }, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + ok = commit(Node, TnxId), + case apply_mfa(TnxId, MFA, init) of + {true, Result} -> {ok, TnxId, Result}; + {false, Error} -> mnesia:abort(Error) + end; + false -> + ?SLOG(error, #{ + msg => stale_view_of_cluster_state, + tnx_id => LatestId, + last_seen_tnx_id => LatestIdLastSeen + }), + mnesia:abort({error, stale_view_of_cluster_state}) end. transaction(Func, Args) -> @@ -433,7 +472,7 @@ trans_query(TnxId) -> -define(TO_BIN(_B_), iolist_to_binary(io_lib:format("~p", [_B_]))). -apply_mfa(TnxId, {M, F, A}) -> +apply_mfa(TnxId, {M, F, A}, Kind) -> Res = try erlang:apply(M, F, A) @@ -444,7 +483,7 @@ apply_mfa(TnxId, {M, F, A}) -> {error, #{exception => Class, reason => Reason, stacktrace => Stacktrace}} end, %% Do not log args as it might be sensitive information - Meta = #{tnx_id => TnxId, entrypoint => format_mfa(M, F, length(A))}, + Meta = #{kind => Kind, tnx_id => TnxId, entrypoint => format_mfa(M, F, length(A))}, IsSuccess = is_success(Res), log_and_alarm(IsSuccess, Res, Meta), {IsSuccess, Res}. @@ -475,21 +514,21 @@ wait_for_all_nodes_commit(TnxId, Delay, Remain) -> [] -> ok; Nodes -> - {retry, Nodes} + {peers_lagging, Nodes} end. -wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain) -> +wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> ok = timer:sleep(Delay), - case length(synced_nodes(TnxId)) >= RequiredNum of + case length(synced_nodes(TnxId)) >= RequiredSyncs of true -> ok; false when Remain > 0 -> - wait_for_nodes_commit(RequiredNum, TnxId, Delay, Remain - Delay); + wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay); false -> case lagging_node(TnxId) of %% All commit but The succeedNum > length(nodes()). [] -> ok; - Nodes -> {retry, Nodes} + Nodes -> {peers_lagging, Nodes} end end. diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 0ab710e28..0e6b3ef0c 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -92,7 +92,7 @@ get_node_and_config(KeyPath) -> ) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update(KeyPath, UpdateReq, Opts) -> - check_cluster_rpc_result(emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts)). + emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts). %% @doc Update the specified node's key path in local-override.conf. -spec update( @@ -111,7 +111,7 @@ update(Node, KeyPath, UpdateReq, Opts) -> -spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. remove(KeyPath, Opts) -> - check_cluster_rpc_result(emqx_conf_proto_v1:remove_config(KeyPath, Opts)). + emqx_conf_proto_v1:remove_config(KeyPath, Opts). %% @doc remove the specified node's key path in local-override.conf. -spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> @@ -125,7 +125,7 @@ remove(Node, KeyPath, Opts) -> -spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. reset(KeyPath, Opts) -> - check_cluster_rpc_result(emqx_conf_proto_v1:reset(KeyPath, Opts)). + emqx_conf_proto_v1:reset(KeyPath, Opts). %% @doc reset the specified node's key path in local-override.conf. -spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> @@ -208,24 +208,6 @@ gen_example(File, SchemaModule, I18nFile, Lang) -> Example = hocon_schema_example:gen(SchemaModule, Opts), file:write_file(File, Example). -check_cluster_rpc_result(Result) -> - case Result of - {ok, _TnxId, Res} -> - Res; - {retry, TnxId, Res, Nodes} -> - %% The init MFA return ok, but other nodes failed. - %% We return ok and alert an alarm. - ?SLOG(error, #{ - msg => "failed_to_update_config_in_cluster", - nodes => Nodes, - tnx_id => TnxId - }), - Res; - %% all MFA return not ok or {ok, term()}. - {error, Error} -> - Error - end. - %% Only gen hot_conf schema, not all configuration fields. gen_hot_conf_schema(File) -> {ApiSpec0, Components0} = emqx_dashboard_swagger:spec( diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl index 68380b88a..97e14b7c4 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl @@ -61,7 +61,7 @@ get_all(KeyPath) -> update_config_key_path(), emqx_config:update_request(), emqx_config:update_opts() -) -> emqx_cluster_rpc:multicall_return(). +) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update(KeyPath, UpdateReq, Opts) -> emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). @@ -78,7 +78,7 @@ update(Node, KeyPath, UpdateReq, Opts) -> rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). -spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> - emqx_cluster_rpc:multicall_result(). + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. remove_config(KeyPath, Opts) -> emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). @@ -90,7 +90,7 @@ remove_config(Node, KeyPath, Opts) -> rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). -spec reset(update_config_key_path(), emqx_config:update_opts()) -> - emqx_cluster_rpc:multicall_return(). + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. reset(KeyPath, Opts) -> emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index ac2d8d90b..48fd91a33 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -69,13 +69,13 @@ t_base_test(_Config) -> ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), Pid = self(), MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, - {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = multicall(M, F, A), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFA, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), ?assertEqual(ok, receive_msg(3, test)), - ?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)), + ?assertEqual({ok, 2, ok}, multicall(M, F, A)), {atomic, Status} = emqx_cluster_rpc:status(), case length(Status) =:= 3 of true -> @@ -95,7 +95,7 @@ t_commit_fail_test(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE2)]}, - {error, "MFA return not ok"} = emqx_cluster_rpc:multicall(M, F, A), + {init_failure, "MFA return not ok"} = multicall(M, F, A), ?assertEqual({atomic, []}, emqx_cluster_rpc:status()), ok. @@ -103,7 +103,7 @@ t_commit_crash_test(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, no_exist_function, []}, - {error, {error, Meta}} = emqx_cluster_rpc:multicall(M, F, A), + {init_failure, {error, Meta}} = multicall(M, F, A), ?assertEqual(undef, maps:get(reason, Meta)), ?assertEqual(error, maps:get(exception, Meta)), ?assertEqual(true, maps:is_key(stacktrace, Meta)), @@ -114,21 +114,23 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), MFA = {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, - {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, _, ok} = multicall(M, F, A, 1, 1000), {atomic, [Status]} = emqx_cluster_rpc:status(), ?assertEqual(MFA, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), erlang:send(?NODE2, test), - Res = gen_server:call(?NODE2, {initiate, {M, F, A}}), - ?assertEqual({error, "MFA return not ok"}, Res), + Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A), + Res = gen_server:call(?NODE2, Call), + ?assertEqual({init_failure, "MFA return not ok"}, Res), ok. t_catch_up_status_handle_next_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {M, F, A} = {?MODULE, failed_on_node_by_odd, [erlang:whereis(?NODE1)]}, - {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {ok, 2} = gen_server:call(?NODE2, {initiate, {M, F, A}}), + {ok, 1, ok} = multicall(M, F, A, 1, 1000), + Call = emqx_cluster_rpc:make_initiate_call_req(M, F, A), + {ok, 2} = gen_server:call(?NODE2, Call), ok. t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> @@ -138,19 +140,19 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> ets:insert(test, {other_mfa_result, failed}), ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]), {M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]}, - {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, 1, ok} = multicall(M, F, A, 1, 1000), ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]), ct:pal("333:~p~n", [emqx_cluster_rpc:status()]), {atomic, [_Status | L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), ets:insert(test, {other_mfa_result, ok}), - {ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000), + {ok, 2, ok} = multicall(io, format, ["test"], 1, 1000), ct:sleep(1000), {atomic, NewStatus} = emqx_cluster_rpc:status(), ?assertEqual(3, length(NewStatus)), Pid = self(), MFAEcho = {M1, F1, A1} = {?MODULE, echo, [Pid, test]}, - {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M1, F1, A1), + {ok, TnxId, ok} = multicall(M1, F1, A1), {atomic, Query} = emqx_cluster_rpc:query(TnxId), ?assertEqual(MFAEcho, maps:get(mfa, Query)), ?assertEqual(node(), maps:get(initiator, Query)), @@ -167,7 +169,7 @@ t_del_stale_mfa(_Config) -> Ids = [ begin - {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = multicall(M, F, A), TnxId end || _ <- Keys @@ -176,7 +178,7 @@ t_del_stale_mfa(_Config) -> Ids2 = [ begin - {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), + {ok, TnxId, ok} = multicall(M, F, A), TnxId end || _ <- Keys2 @@ -203,7 +205,7 @@ t_del_stale_mfa(_Config) -> t_skip_failed_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000), + {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -212,7 +214,7 @@ t_skip_failed_commit(_Config) -> tnx_ids(List1) ), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, - {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + {ok, 2, ok} = multicall(M, F, A, 1, 1000), 2 = gen_server:call(?NODE2, skip_failed_commit, 5000), {atomic, List2} = emqx_cluster_rpc:status(), ?assertEqual( @@ -224,7 +226,7 @@ t_skip_failed_commit(_Config) -> t_fast_forward_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000), + {ok, 1, ok} = multicall(io, format, ["test~n"], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -233,11 +235,11 @@ t_fast_forward_commit(_Config) -> tnx_ids(List1) ), {M, F, A} = {?MODULE, failed_on_node, [erlang:whereis(?NODE1)]}, - {ok, 2, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {ok, 3, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {ok, 4, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {ok, 5, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {retry, 6, ok, _} = emqx_cluster_rpc:multicall(M, F, A, 2, 1000), + {ok, 2, ok} = multicall(M, F, A, 1, 1000), + {ok, 3, ok} = multicall(M, F, A, 1, 1000), + {ok, 4, ok} = multicall(M, F, A, 1, 1000), + {ok, 5, ok} = multicall(M, F, A, 1, 1000), + {peers_lagging, 6, ok, _} = multicall(M, F, A, 2, 1000), 3 = gen_server:call(?NODE2, {fast_forward_to_commit, 3}, 5000), 4 = gen_server:call(?NODE2, {fast_forward_to_commit, 4}, 5000), 6 = gen_server:call(?NODE2, {fast_forward_to_commit, 7}, 5000), @@ -333,3 +335,9 @@ failed_on_other_recover_after_retry(Pid) -> [{_, Res}] = ets:lookup(test, other_mfa_result), Res end. + +multicall(M, F, A, N, T) -> + emqx_cluster_rpc:do_multicall(M, F, A, N, T). + +multicall(M, F, A) -> + multicall(M, F, A, all, timer:minutes(2)). diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index 429f5d3fa..c4a6418c2 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -70,6 +70,7 @@ end_per_testcase(_, _Config) -> ok. t_list_raw_empty(_) -> + ok = emqx_config:erase(hd(emqx_connector:config_key_path())), Result = emqx_connector:list_raw(), ?assertEqual([], Result). diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 67228a31d..5cddc9305 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -556,7 +556,7 @@ with_gateway(GwName0, Fun) -> end, case emqx_gateway:lookup(GwName) of undefined -> - return_http_error(404, "Gateway not load"); + return_http_error(404, "Gateway not loaded"); Gateway -> Fun(GwName, Gateway) end diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 7c4131443..6c625ba8c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -317,7 +317,7 @@ get_plugins() -> upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -> [{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)), %% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall - %% TODO what happened when a new node join in? + %% TODO what happens when a new node join in? %% emqx_plugins_monitor should copy plugins from other core node when boot-up. case emqx_plugins:describe(string:trim(FileName, trailing, ".tar.gz")) of {error, #{error := "bad_info_file", return := {enoent, _}}} -> @@ -358,16 +358,11 @@ upload_install(post, #{}) -> }}. do_install_package(FileName, Bin) -> - {Res, _} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin), - case lists:filter(fun(R) -> R =/= ok end, Res) of - [] -> - {200}; - [{error, Reason} | _] -> - {400, #{ - code => 'UNEXPECTED_ERROR', - message => iolist_to_binary(io_lib:format("~p", [Reason])) - }} - end. + %% TODO: handle bad nodes + {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin), + %% TODO: handle non-OKs + [] = lists:filter(fun(R) -> R =/= ok end, Res), + {200}. plugin(get, #{bindings := #{name := Name}}) -> {Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name), @@ -376,11 +371,11 @@ plugin(get, #{bindings := #{name := Name}}) -> [] -> {404, #{code => 'NOT_FOUND', message => Name}} end; plugin(delete, #{bindings := #{name := Name}}) -> - {ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:delete_package(Name), + Res = emqx_mgmt_api_plugins_proto_v1:delete_package(Name), return(204, Res). update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> - {ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action), + Res = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action), return(204, Res). update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> @@ -422,7 +417,8 @@ delete_package(Name) -> ok -> _ = emqx_plugins:ensure_disabled(Name), _ = emqx_plugins:ensure_uninstalled(Name), - _ = emqx_plugins:delete_package(Name); + _ = emqx_plugins:delete_package(Name), + ok; Error -> Error end. @@ -430,20 +426,19 @@ delete_package(Name) -> %% for RPC plugin update ensure_action(Name, start) -> _ = emqx_plugins:ensure_enabled(Name), - _ = emqx_plugins:ensure_started(Name); + _ = emqx_plugins:ensure_started(Name), + ok; ensure_action(Name, stop) -> _ = emqx_plugins:ensure_stopped(Name), - _ = emqx_plugins:ensure_disabled(Name); + _ = emqx_plugins:ensure_disabled(Name), + ok; ensure_action(Name, restart) -> _ = emqx_plugins:ensure_enabled(Name), - _ = emqx_plugins:restart(Name). + _ = emqx_plugins:restart(Name), + ok. return(Code, ok) -> {Code}; -return(Code, {ok, Result}) -> - {Code, Result}; -return(_, {error, #{error := "bad_info_file", return := {enoent, _}, path := Path}}) -> - {404, #{code => 'NOT_FOUND', message => Path}}; return(_, {error, Reason}) -> {400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}. diff --git a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl index dc7904d01..4ac594d2c 100644 --- a/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl +++ b/apps/emqx_management/src/proto/emqx_mgmt_api_plugins_proto_v1.erl @@ -43,11 +43,10 @@ install_package(Filename, Bin) -> describe_package(Name) -> rpc:multicall(emqx_mgmt_api_plugins, describe_package, [Name], 10000). --spec delete_package(binary() | string()) -> emqx_cluster_rpc:multicall_return(). +-spec delete_package(binary() | string()) -> ok | {error, any()}. delete_package(Name) -> emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000). --spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> - emqx_cluster_rpc:multicall_return(). +-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}. ensure_action(Name, Action) -> emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000). diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index fe8e3c210..03304c209 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -61,8 +61,6 @@ can_topic_match_oneof/2 ]). --export([cluster_call/3]). - -compile({no_auto_import, [float/1]}). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). @@ -307,7 +305,3 @@ can_topic_match_oneof(Topic, Filters) -> end, Filters ). - -cluster_call(Module, Func, Args) -> - {ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args), - Result. diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 8abc86d53..0879d5936 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -337,7 +337,7 @@ do_ensure_started(NameVsn) -> ). %% try the function, catch 'throw' exceptions as normal 'error' return -%% other exceptions with stacktrace returned. +%% other exceptions with stacktrace logged. tryit(WhichOp, F) -> try F() @@ -648,7 +648,7 @@ put_config(Key, Value) when is_atom(Key) -> put_config([Key], Value); put_config(Path, Values) when is_list(Path) -> Opts = #{rawconf_with_defaults => true, override_to => cluster}, - case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of + case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of {ok, _} -> ok; Error -> Error end. diff --git a/apps/emqx_plugins/test/emqx_plugins_tests.erl b/apps/emqx_plugins/test/emqx_plugins_tests.erl index cda6dbf0f..58829c07f 100644 --- a/apps/emqx_plugins/test/emqx_plugins_tests.erl +++ b/apps/emqx_plugins/test/emqx_plugins_tests.erl @@ -125,20 +125,10 @@ purge_test() -> meck_emqx() -> meck:new(emqx, [unstick, passthrough]), meck:expect( - emqx, - update_config, + emqx_conf, + update, fun(Path, Values, _Opts) -> emqx_config:put(Path, Values) end ), - %meck:expect(emqx, get_config, - % fun(KeyPath, Default) -> - % Map = emqx:get_raw_config(KeyPath, Default), - % Map1 = emqx_map_lib:safe_atom_key_map(Map), - % case Map1 of - % #{states := Plugins} -> - % Map1#{states => [emqx_map_lib:safe_atom_key_map(P) ||P <- Plugins]}; - % _ -> Map1 - % end - % end), ok. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 5e3242135..2a3f29122 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -174,7 +174,7 @@ create(InstId, Group, ResourceType, Config) -> -spec create(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(InstId, Group, ResourceType, Config, Opts) -> - wrap_rpc(emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts)). + emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts). % -------------------------------------------- -spec create_local(instance_id(), resource_group(), resource_type(), resource_config()) -> @@ -196,7 +196,7 @@ create_local(InstId, Group, ResourceType, Config, Opts) -> -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> - wrap_rpc(emqx_resource_proto_v1:create_dry_run(ResourceType, Config)). + emqx_resource_proto_v1:create_dry_run(ResourceType, Config). -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -211,7 +211,7 @@ recreate(InstId, ResourceType, Config) -> -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. recreate(InstId, ResourceType, Config, Opts) -> - wrap_rpc(emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts)). + emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts). -spec recreate_local(instance_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. @@ -225,7 +225,7 @@ recreate_local(InstId, ResourceType, Config, Opts) -> -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> - wrap_rpc(emqx_resource_proto_v1:remove(InstId)). + emqx_resource_proto_v1:remove(InstId). -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. remove_local(InstId) -> @@ -237,7 +237,7 @@ reset_metrics_local(InstId) -> -spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}. reset_metrics(InstId) -> - wrap_rpc(emqx_resource_proto_v1:reset_metrics(InstId)). + emqx_resource_proto_v1:reset_metrics(InstId). %% ================================================================================= -spec query(instance_id(), Request :: term()) -> Result :: term(). @@ -430,11 +430,5 @@ inc_metrics_funcs(InstId) -> safe_apply(Func, Args) -> ?SAFE_CALL(erlang:apply(Func, Args)). -wrap_rpc(Ret) -> - case Ret of - {ok, _TxnId, Result} -> Result; - Failed -> Failed - end. - query_error(Reason, Msg) -> {error, {?MODULE, #{reason => Reason, msg => Msg}}}. diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index 04e489f78..cbbc4e552 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -40,7 +40,7 @@ introduced_in() -> resource_config(), create_opts() ) -> - emqx_cluster_rpc:multicall_return(resource_data()). + {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(InstId, Group, ResourceType, Config, Opts) -> emqx_cluster_rpc:multicall(emqx_resource, create_local, [ InstId, Group, ResourceType, Config, Opts @@ -50,7 +50,7 @@ create(InstId, Group, ResourceType, Config, Opts) -> resource_type(), resource_config() ) -> - emqx_cluster_rpc:multicall_return(resource_data()). + ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]). @@ -60,16 +60,14 @@ create_dry_run(ResourceType, Config) -> resource_config(), create_opts() ) -> - emqx_cluster_rpc:multicall_return(resource_data()). + {ok, resource_data()} | {error, Reason :: term()}. recreate(InstId, ResourceType, Config, Opts) -> emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]). --spec remove(instance_id()) -> - emqx_cluster_rpc:multicall_return(ok). +-spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]). --spec reset_metrics(instance_id()) -> - emqx_cluster_rpc:multicall_return(ok). +-spec reset_metrics(instance_id()) -> ok | {error, any()}. reset_metrics(InstId) -> emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 0f4b4a53d..27a627e3d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -321,7 +321,7 @@ replace_sql_clrf(#{<<"sql">> := SQL} = Params) -> end. '/rules/:id/reset_metrics'(put, #{bindings := #{id := RuleId}}) -> case emqx_rule_engine_proto_v1:reset_metrics(RuleId) of - {ok, _TxnId, _Result} -> + ok -> {200, <<"Reset Success">>}; Failed -> {400, #{ diff --git a/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl b/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl index 501a1d05c..ea7e4a53b 100644 --- a/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl +++ b/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl @@ -30,7 +30,6 @@ introduced_in() -> "5.0.0". --spec reset_metrics(rule_id()) -> - emqx_cluster_rpc:multicall_return(ok). +-spec reset_metrics(rule_id()) -> ok | {error, any()}. reset_metrics(RuleId) -> emqx_cluster_rpc:multicall(emqx_rule_engine, reset_metrics_for_rule, [RuleId]). diff --git a/scripts/make-docker-image-from-host-build.sh b/scripts/make-docker-image-from-host-build.sh new file mode 100755 index 000000000..8911ed251 --- /dev/null +++ b/scripts/make-docker-image-from-host-build.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash + +set -euo pipefail +set -x + +PROFILE="$1" +COMPILE="${2:-no}" +DISTRO="$(./scripts/get-distro.sh)" +PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh "$PROFILE")}" + +case "$DISTRO" in + ubuntu20*) + EMQX_DOCKERFILE="Dockerfile.ubuntu20.04.runner" + ;; + *) + echo "sorry, no support for $DISTRO yet" + exit 1 +esac + +if [ "$COMPILE" = '--compile' ]; then + make "$PROFILE" + sync +fi + +export DOCKER_BUILDKIT=1 +docker build --build-arg PROFILE="${PROFILE}" \ + -t "emqx/emqx:${PKG_VSN}-${DISTRO}" \ + -f "$EMQX_DOCKERFILE" . diff --git a/scripts/start-two-nodes-in-docker.sh b/scripts/start-two-nodes-in-docker.sh index ec3989934..64a6647ce 100755 --- a/scripts/start-two-nodes-in-docker.sh +++ b/scripts/start-two-nodes-in-docker.sh @@ -2,14 +2,6 @@ set -euo pipefail -## This script takes the first argument as docker image name, -## starts two containers running with the built code mount -## into docker containers. -## -## NOTE: containers are not instructed to rebuild emqx, -## Please use a docker image which is compatible with -## the docker host. -## ## EMQX can only start with longname (https://erlang.org/doc/reference_manual/distributed.html) ## The host name part of EMQX's node name has to be static, this means we should either ## pre-assign static IP for containers, or ensure containers can communiate with each other by name @@ -19,7 +11,6 @@ set -euo pipefail cd -P -- "$(dirname -- "$0")/.." IMAGE="${1}" -PROJ_DIR="$(pwd)" NET='emqx.io' NODE1="node1.$NET" @@ -35,23 +26,23 @@ docker network create "$NET" docker run -d -t --restart=always --name "$NODE1" \ --net "$NET" \ + -e EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug \ -e EMQX_NODE_NAME="emqx@$NODE1" \ -e EMQX_NODE_COOKIE="$COOKIE" \ -p 18083:18083 \ - -v "$PROJ_DIR"/_build/emqx/rel/emqx:/built \ - "$IMAGE" sh -c 'cp -r /built /emqx && /emqx/bin/emqx console' + "$IMAGE" docker run -d -t --restart=always --name "$NODE2" \ --net "$NET" \ + -e EMQX_LOG__CONSOLE_HANDLER__LEVEL=debug \ -e EMQX_NODE_NAME="emqx@$NODE2" \ -e EMQX_NODE_COOKIE="$COOKIE" \ -p 18084:18083 \ - -v "$PROJ_DIR"/_build/emqx/rel/emqx:/built \ - "$IMAGE" sh -c 'cp -r /built /emqx && /emqx/bin/emqx console' + "$IMAGE" wait (){ container="$1" - while ! docker exec "$container" /emqx/bin/emqx_ctl status >/dev/null 2>&1; do + while ! docker exec "$container" emqx_ctl status >/dev/null 2>&1; do echo -n '.' sleep 1 done @@ -61,4 +52,4 @@ wait $NODE1 wait $NODE2 echo -docker exec $NODE1 /emqx/bin/emqx_ctl cluster join "emqx@$NODE2" +docker exec $NODE1 emqx_ctl cluster join "emqx@$NODE2"