From 20091f8d2ad0e204ad8551b58e3f26b7852b29bb Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 18 Dec 2021 09:52:15 +0800 Subject: [PATCH 1/9] fix(connector): some desc for connector APIs --- apps/emqx_bridge/src/emqx_bridge_schema.erl | 6 +++--- apps/emqx_connector/src/emqx_connector_api.erl | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index e06065c7d..b960cd8c8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -48,10 +48,10 @@ common_bridge_fields() -> , {connector, mk(binary(), #{ nullable => false + , example => <<"mqtt:my_mqtt_connector">> , desc =>""" -The connector name to be used for this bridge. -Connectors are configured as 'connectors.{type}.{name}', -for example 'connectors.http.mybridge'. +The connector Id to be used for this bridge. Connector Ids must be of format: '{type}:{name}'.
+In config files, you can find the corresponding config entry for a connector by such path: 'connectors.{type}.{name}'.
""" })} ]. diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 9db9f2a93..cdba638d8 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -38,9 +38,9 @@ _ = ConnName, EXPR catch - error:{invalid_bridge_id, Id0} -> - {400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary, - ". Bridge Ids must be of format {type}:{name}">>}} + error:{invalid_connector_id, Id0} -> + {400, #{code => 'INVALID_ID', message => <<"invalid_connector_id: ", Id0/binary, + ". Connector Ids must be of format {type}:{name}">>}} end). namespace() -> "connector". From f08f37ec9c10a46c7af57695f19aaa7f202b2394 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 18 Dec 2021 11:19:22 +0800 Subject: [PATCH 2/9] fix(bridge): create bridge failed but the config was saved --- apps/emqx_bridge/src/emqx_bridge.erl | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 50c39007a..fbbab59ac 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -111,13 +111,15 @@ bridge_type(emqx_connector_http) -> http. post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), - _ = perform_bridge_changes([ + %% The config update will be failed if any task in `perform_bridge_changes` failed. + Result = perform_bridge_changes([ {fun remove/3, Removed}, {fun create/3, Added}, {fun update/3, Updated} ]), ok = unload_hook(), - ok = load_hook(NewConf). + ok = load_hook(NewConf), + Result. perform_bridge_changes(Tasks) -> perform_bridge_changes(Tasks, ok). @@ -214,6 +216,10 @@ update(Type, Name, {_OldConf, Conf}) -> config => Conf}), case recreate(Type, Name, Conf) of {ok, _} -> maybe_disable_bridge(Type, Name, Conf); + {error, not_found} -> + ?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one" + , type => Type, name => Name, config => Conf}), + create(Type, Name, Conf); {error, _} = Err -> Err end. From 11e8e0db694ff2c58155cd345af322448038981a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 18 Dec 2021 15:33:25 +0800 Subject: [PATCH 3/9] fix(bridge): stop http failed due to econnrefused --- apps/emqx_bridge/src/emqx_bridge.erl | 38 +++++++++++----- apps/emqx_bridge/src/emqx_bridge_api.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_app.erl | 15 +++++-- .../src/emqx_connector_http.erl | 32 +++++++++----- apps/emqx_resource/src/emqx_resource.erl | 2 +- .../src/emqx_resource_instance.erl | 43 ++++++++++--------- 6 files changed, 86 insertions(+), 46 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index fbbab59ac..ad3fee858 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -203,7 +203,7 @@ create(Type, Name, Conf) -> {error, Reason} -> {error, Reason} end. -update(Type, Name, {_OldConf, Conf}) -> +update(Type, Name, {OldConf, Conf}) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% %% - if the connection related configs like `servers` is updated, we should restart/start @@ -212,15 +212,22 @@ update(Type, Name, {_OldConf, Conf}) -> %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated %% without restarting the bridge. %% - ?SLOG(info, #{msg => "update bridge", type => Type, name => Name, - config => Conf}), - case recreate(Type, Name, Conf) of - {ok, _} -> maybe_disable_bridge(Type, Name, Conf); - {error, not_found} -> - ?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one" - , type => Type, name => Name, config => Conf}), - create(Type, Name, Conf); - {error, _} = Err -> Err + case if_only_to_toggole_enable(OldConf, Conf) of + false -> + ?SLOG(info, #{msg => "update bridge", type => Type, name => Name, + config => Conf}), + case recreate(Type, Name, Conf) of + {ok, _} -> maybe_disable_bridge(Type, Name, Conf); + {error, not_found} -> + ?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one" + , type => Type, name => Name, config => Conf}), + create(Type, Name, Conf); + {error, Reason} -> {update_bridge_failed, Reason} + end; + true -> + %% we don't need to recreate the bridge if this config change is only to + %% toggole the config 'bridge.{type}.{name}.enable' + ok end. recreate(Type, Name) -> @@ -344,6 +351,17 @@ maybe_disable_bridge(Type, Name, Conf) -> true -> ok end. +if_only_to_toggole_enable(OldConf, Conf) -> + #{added := Added, removed := Removed, changed := Updated} = + emqx_map_lib:diff_maps(OldConf, Conf), + case {Added, Removed, Updated} of + {Added, Removed, #{enable := _}= Updated} + when map_size(Added) =:= 0, + map_size(Removed) =:= 0, + map_size(Updated) =:= 1 -> true; + {_, _, _} -> false + end. + bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 6f2d5c7ad..121d99f85 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -334,7 +334,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> invalid -> {404, error_msg('BAD_ARG', <<"invalid operation">>)}; UpReq -> case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], - UpReq, #{override_to => cluster}) of + {UpReq, BridgeType, BridgeName}, #{override_to => cluster}) of {ok, _} -> {200}; {error, {pre_config_update, _, bridge_not_found}} -> {404, error_msg('NOT_FOUND', <<"bridge not found">>)}; diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index 519368523..2c84e5630 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -40,10 +40,15 @@ stop(_State) -> ok. -define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart). -pre_config_update(_, Oper, undefined) ?IS_OPER(Oper) -> +pre_config_update(_, {Oper, _, _}, undefined) ?IS_OPER(Oper) -> {error, bridge_not_found}; -pre_config_update(_, Oper, OldConfig) ?IS_OPER(Oper) -> - {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; +pre_config_update(_, {Oper, Type, Name}, OldConfig) ?IS_OPER(Oper) -> + case perform_operation(Oper, Type, Name) of + ok -> + %% we also need to save the 'enable' to the config files + {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; + {error, _} = Err -> Err + end; pre_config_update(_, Conf, _OldConfig) -> {ok, Conf}. @@ -51,3 +56,7 @@ pre_config_update(_, Conf, _OldConfig) -> operation_to_enable(start) -> true; operation_to_enable(stop) -> false; operation_to_enable(restart) -> true. + +perform_operation(start, Type, Name) -> emqx_bridge:restart(Type, Name); +perform_operation(restart, Type, Name) -> emqx_bridge:restart(Type, Name); +perform_operation(stop, Type, Name) -> emqx_bridge:stop(Type, Name). diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 0249d51b1..2cdc9595c 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -169,14 +169,20 @@ on_start(InstId, #{base_url := #{scheme := Scheme, pool_name => PoolName, host => Host, port => Port, + connect_timeout => ConnectTimeout, base_path => BasePath, request => preprocess_request(maps:get(request, Config, undefined)) }, - case ehttpc_sup:start_pool(PoolName, PoolOpts) of - {ok, _} -> {ok, State}; - {error, {already_started, _}} -> {ok, State}; + case do_health_check(Host, Port, ConnectTimeout) of + ok -> + case ehttpc_sup:start_pool(PoolName, PoolOpts) of + {ok, _} -> {ok, State}; + {error, {already_started, _}} -> {ok, State}; + {error, Reason} -> + {error, Reason} + end; {error, Reason} -> - {error, Reason} + {error, {http_start_failed, Reason}} end. on_stop(InstId, #{pool_name := PoolName}) -> @@ -216,13 +222,17 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, end, Result. -on_health_check(_InstId, #{host := Host, port := Port} = State) -> - case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), 3000) of - {ok, Sock} -> - gen_tcp:close(Sock), - {ok, State}; - {error, _Reason} -> - {error, test_query_failed, State} +on_health_check(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) -> + case do_health_check(Host, Port, Timeout) of + ok -> {ok, State}; + {error, Reason} -> + {error, {http_health_check_failed, Reason}, State} + end. + +do_health_check(Host, Port, Timeout) -> + case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of + {ok, Sock} -> gen_tcp:close(Sock), ok; + {error, Reason} -> {error, Reason} end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b062e83ae..fd4046505 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -188,7 +188,7 @@ query(InstId, Request) -> query(InstId, Request, AfterQuery) -> case get_instance(InstId) of {ok, #{status := stopped}} -> - error({InstId, stopped}); + error({resource_stopped, InstId}); {ok, #{mod := Mod, state := ResourceState, status := started}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index eaf6db0b2..8c5232706 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -173,18 +173,19 @@ do_create(InstId, ResourceType, Config) -> case lookup(InstId) of {ok, _} -> {ok, already_created}; _ -> + Res0 = #{id => InstId, mod => ResourceType, config => Config, + status => stopped, state => undefined}, case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> - ets:insert(emqx_resource_instance, {InstId, - #{mod => ResourceType, config => Config, - state => ResourceState, status => stopped}}), - _ = do_health_check(InstId), ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), + %% this is the first time we do health check, this will update the + %% status and then do ets:insert/2 + _ = do_health_check(Res0#{state => ResourceState}), {ok, force_lookup(InstId)}; {error, Reason} -> logger:error("start ~ts resource ~ts failed: ~p", [ResourceType, InstId, Reason]), - {error, Reason} + {ok, Res0} end end. @@ -243,22 +244,24 @@ do_stop(InstId) -> Error end. -do_health_check(InstId) -> +do_health_check(InstId) when is_binary(InstId) -> case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState0} = Data} -> - case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of - {ok, ResourceState1} -> - ets:insert(emqx_resource_instance, - {InstId, Data#{status => started, state => ResourceState1}}), - ok; - {error, Reason, ResourceState1} -> - logger:error("health check for ~p failed: ~p", [InstId, Reason]), - ets:insert(emqx_resource_instance, - {InstId, Data#{status => stopped, state => ResourceState1}}), - {error, Reason} - end; - Error -> - Error + {ok, Data} -> do_health_check(Data); + Error -> Error + end; +do_health_check(#{state := undefined}) -> + {error, resource_not_initialized}; +do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> + case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of + {ok, ResourceState1} -> + ets:insert(emqx_resource_instance, + {InstId, Data#{status => started, state => ResourceState1}}), + ok; + {error, Reason, ResourceState1} -> + logger:error("health check for ~p failed: ~p", [InstId, Reason]), + ets:insert(emqx_resource_instance, + {InstId, Data#{status => stopped, state => ResourceState1}}), + {error, Reason} end. %%------------------------------------------------------------------------------ From a44e18e869467a06be6ff03864908fd429c6491c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 18 Dec 2021 19:19:58 +0800 Subject: [PATCH 4/9] fix(bridge): filter out some extra fields from the request body --- apps/emqx_bridge/src/emqx_bridge_api.erl | 18 +++++-- .../src/emqx_bridge_http_schema.erl | 2 +- .../src/emqx_bridge_mqtt_schema.erl | 4 +- apps/emqx_bridge/src/emqx_bridge_schema.erl | 47 ++++++++++++++++++- .../emqx_connector/src/emqx_connector_api.erl | 7 ++- .../src/mqtt/emqx_connector_mqtt_mod.erl | 3 ++ .../test/emqx_connector_api_SUITE.erl | 44 ++++++++++++++++- .../src/emqx_resource_instance.erl | 4 +- .../src/emqx_rule_api_schema.erl | 4 +- .../src/emqx_rule_engine_api.erl | 29 ++++++++---- 10 files changed, 142 insertions(+), 20 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 121d99f85..0f291ac1a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -134,7 +134,12 @@ method_example(Type, Direction, get) -> #{ id => bin(SType ++ ":" ++ SName), type => bin(SType), - name => bin(SName) + name => bin(SName), + metrics => ?METRICS(0, 0, 0, 0, 0, 0), + node_metrics => [ + #{node => node(), + metrics => ?METRICS(0, 0, 0, 0, 0, 0)} + ] }; method_example(Type, Direction, post) -> SType = atom_to_list(Type), @@ -269,7 +274,8 @@ schema("/bridges/:id/operation/:operation") -> } }. -'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) -> +'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf0}) -> + Conf = filter_out_request_body(Conf0), BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()), case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, _} -> @@ -291,7 +297,8 @@ list_local_bridges(Node) -> '/bridges/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); -'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf}) -> +'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf0}) -> + Conf = filter_out_request_body(Conf0), ?TRY_PARSE_ID(Id, case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, _} -> @@ -423,6 +430,11 @@ rpc_multicall(Func, Args) -> ErrL -> {error, ErrL} end. +filter_out_request_body(Conf) -> + ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>, + <<"metrics">>, <<"node">>], + maps:without(ExtraConfs, Conf). + rpc_call(Node, Fun, Args) -> rpc_call(Node, ?MODULE, Fun, Args). diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl index f7644af1d..88cc90db7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -76,7 +76,7 @@ fields("put") -> fields("get") -> [ id_field() - ] ++ fields("post"). + ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post"). basic_config() -> [ {enable, diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl index 4b6965349..3de011b4c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl @@ -38,10 +38,10 @@ fields("put_egress") -> fields("get_ingress") -> [ id_field() - ] ++ fields("post_ingress"); + ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post_ingress"); fields("get_egress") -> [ id_field() - ] ++ fields("post_egress"). + ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post_egress"). %%====================================================================================== id_field() -> diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index b960cd8c8..3acfbcdef 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -12,6 +12,7 @@ ]). -export([ common_bridge_fields/0 + , metrics_status_fields/0 , direction_field/2 ]). @@ -56,6 +57,17 @@ In config files, you can find the corresponding config entry for a connector by })} ]. +metrics_status_fields() -> + [ {"metrics", mk(ref(?MODULE, "metrics"), #{desc => "The metrics of the bridge"})} + , {"node_metrics", mk(hoconsc:array(ref(?MODULE, "node_metrics")), + #{ desc => "The metrics of the bridge for each node" + })} + , {"status", mk(ref(?MODULE, "status"), #{desc => "The status of the bridge"})} + , {"node_status", mk(hoconsc:array(ref(?MODULE, "node_status")), + #{ desc => "The status of the bridge for each node" + })} + ]. + direction_field(Dir, Desc) -> {direction, mk(Dir, #{ nullable => false @@ -72,7 +84,40 @@ fields(bridges) -> ++ [{T, mk(hoconsc:map(name, hoconsc:union([ ref(schema_mod(T), "ingress"), ref(schema_mod(T), "egress") - ])), #{})} || T <- ?CONN_TYPES]. + ])), #{})} || T <- ?CONN_TYPES]; + +fields("metrics") -> + [ {"matched", mk(integer(), #{desc => "Count of this bridge is queried"})} + , {"success", mk(integer(), #{desc => "Count of query success"})} + , {"failed", mk(integer(), #{desc => "Count of query failed"})} + , {"rate", mk(float(), #{desc => "The rate of matched, times/second"})} + , {"rate_max", mk(float(), #{desc => "The max rate of matched, times/second"})} + , {"rate_last5m", mk(float(), + #{desc => "The average rate of matched in last 5 mins, times/second"})} + ]; + +fields("node_metrics") -> + [ node_name() + , {"metrics", mk(ref(?MODULE, "metrics"), #{})} + ]; + +fields("status") -> + [ {"matched", mk(integer(), #{desc => "Count of this bridge is queried"})} + , {"success", mk(integer(), #{desc => "Count of query success"})} + , {"failed", mk(integer(), #{desc => "Count of query failed"})} + , {"rate", mk(float(), #{desc => "The rate of matched, times/second"})} + , {"rate_max", mk(float(), #{desc => "The max rate of matched, times/second"})} + , {"rate_last5m", mk(float(), + #{desc => "The average rate of matched in last 5 mins, times/second"})} + ]; + +fields("node_status") -> + [ node_name() + , {"status", mk(ref(?MODULE, "status"), #{})} + ]. + +node_name() -> + {"node", mk(binary(), #{desc => "The node name", example => "emqx@127.0.0.1"})}. schema_mod(Type) -> list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index cdba638d8..5d6bddb6a 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -234,7 +234,8 @@ schema("/connectors/:id") -> {404, error_msg('NOT_FOUND', <<"connector not found">>)} end); -'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params}) -> +'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params0}) -> + Params = filter_out_request_body(Params0), ?TRY_PARSE_ID(Id, case emqx_connector:lookup(ConnType, ConnName) of {ok, _} -> @@ -277,5 +278,9 @@ format_resp(ConnId, RawConf) -> <<"num_of_bridges">> => NumOfBridges }. +filter_out_request_body(Conf) -> + ExtraConfs = [<<"num_of_bridges">>, <<"type">>, <<"name">>], + maps:without(ExtraConfs, Conf). + bin(S) when is_list(S) -> list_to_binary(S). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 7fb260130..7d5bb1283 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -84,6 +84,9 @@ stop(#{client_pid := Pid}) -> safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000), ok. +ping(undefined) -> + pang; + ping(#{client_pid := Pid}) -> emqtt:ping(Pid). diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 699579d7f..11f9460b4 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -379,7 +379,7 @@ t_mqtt_conn_update(_) -> %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]), - ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)), + ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)), %% we fix the 'server' parameter to a normal one, it should work {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)), @@ -391,6 +391,48 @@ t_mqtt_conn_update(_) -> {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). +t_mqtt_conn_update2(_) -> + %% assert we there's no connectors and no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% then we add a mqtt connector, using POST + %% but this connector is point to a unreachable server "2603" + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>) + #{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + + ?assertMatch(#{ <<"id">> := ?CONNECTR_ID + , <<"server">> := <<"127.0.0.1:2603">> + }, jsx:decode(Connector)), + + %% ... and a MQTT bridge, using POST + %% we bind this bridge to the connector created just now + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS + , <<"type">> := <<"mqtt">> + , <<"name">> := ?BRIDGE_NAME_EGRESS + , <<"status">> := <<"disconnected">> + , <<"connector">> := ?CONNECTR_ID + }, jsx:decode(Bridge)), + %% we fix the 'server' parameter to a normal one, it should work + {ok, 200, Bridge2} = request(put, uri(["connectors", ?CONNECTR_ID]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), + ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge2)), + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% delete the connector + {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). + t_mqtt_conn_testing(_) -> %% APIs for testing the connectivity %% then we add a mqtt connector, using POST diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 8c5232706..708dc6030 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -156,7 +156,8 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> Config = emqx_resource:call_config_merge(ResourceType, OldConfig, NewConfig, Params), - case do_create_dry_run(InstId, ResourceType, Config) of + TestInstId = iolist_to_binary(emqx_misc:gen_id(16)), + case do_create_dry_run(TestInstId, ResourceType, Config) of ok -> do_remove(ResourceType, InstId, ResourceState), do_create(InstId, ResourceType, Config); @@ -185,6 +186,7 @@ do_create(InstId, ResourceType, Config) -> {error, Reason} -> logger:error("start ~ts resource ~ts failed: ~p", [ResourceType, InstId, Reason]), + ets:insert(emqx_resource_instance, {InstId, Res0}), {ok, Res0} end end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index b3b7afe4e..1caa8da23 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -43,7 +43,9 @@ fields("rule_creation") -> fields("rule_info") -> [ rule_id() , {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})} - , {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})} + , {"node_metrics", sc(hoconsc:array(ref("node_metrics")), + #{ desc => "The metrics of the rule for each node" + })} , {"from", sc(hoconsc:array(binary()), #{desc => "The topics of the rule", example => "t/#"})} , {"created_at", sc(binary(), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 7cfeb5d7e..85f27a651 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -164,14 +164,15 @@ param_path_id() -> Records = emqx_rule_engine:get_rules_ordered_by_ts(), {200, format_rule_resp(Records)}; -'/rules'(post, #{body := Params}) -> - Id = maps:get(<<"id">>, Params, list_to_binary(emqx_misc:gen_id(8))), +'/rules'(post, #{body := Params0}) -> + Id = maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))), + Params = filter_out_request_body(Params0), ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx_rule_engine:get_rule(Id) of {ok, _Rule} -> {400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}}; not_found -> - case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of + case emqx:update_config(ConfPath, Params, #{}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> [Rule] = get_one_rule(AllRules, Id), {201, format_rule_resp(Rule)}; @@ -197,8 +198,9 @@ param_path_id() -> end; '/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) -> + Params = filter_out_request_body(Params), ConfPath = emqx_rule_engine:config_key_path() ++ [Id], - case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of + case emqx:update_config(ConfPath, Params, #{}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> [Rule] = get_one_rule(AllRules, Id), {200, format_rule_resp(Rule)}; @@ -266,10 +268,12 @@ get_rule_metrics(Id) -> rate_max := Max, rate_last5m := Last5M }) -> - #{ matched => Matched - , rate => Current - , rate_max => Max - , rate_last5m => Last5M + #{ metrics => #{ + matched => Matched, + rate => Current, + rate_max => Max, + rate_last5m => Last5M + } , node => Node } end, @@ -279,7 +283,8 @@ get_rule_metrics(Id) -> aggregate_metrics(AllMetrics) -> InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0}, lists:foldl(fun - (#{matched := Match1, rate := Rate1, rate_max := RateMax1, rate_last5m := Rate5m1}, + (#{metrics := #{matched := Match1, rate := Rate1, + rate_max := RateMax1, rate_last5m := Rate5m1}}, #{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) -> #{matched => Match1 + Match0, rate => Rate1 + Rate0, rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0} @@ -287,3 +292,9 @@ aggregate_metrics(AllMetrics) -> get_one_rule(AllRules, Id) -> [R || R = #{id := Id0} <- AllRules, Id0 == Id]. + +filter_out_request_body(Conf) -> + ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>, + <<"metrics">>, <<"node">>], + maps:without(ExtraConfs, Conf). + From cfaad153648e61ce2e3898a4fda598bc35bcde7d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 18 Dec 2021 20:19:34 +0800 Subject: [PATCH 5/9] fix(connector): update the connector config failed --- apps/emqx_connector/src/emqx_connector_mqtt.erl | 15 ++++++++++----- .../test/emqx_connector_api_SUITE.erl | 7 +++++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 079f17716..f8d17ce32 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -129,12 +129,11 @@ on_start(InstId, Conf) -> }, case ?MODULE:create_bridge(BridgeConf) of {ok, _Pid} -> - case emqx_connector_mqtt_worker:ensure_started(InstanceId) of - ok -> {ok, #{name => InstanceId}}; - {error, Reason} -> {error, Reason} - end; + ensure_mqtt_worker_started(InstanceId); {error, {already_started, _Pid}} -> - {ok, #{name => InstanceId}}; + ok = ?MODULE:drop_bridge(InstanceId), + {ok, _} = ?MODULE:create_bridge(BridgeConf), + ensure_mqtt_worker_started(InstanceId); {error, Reason} -> {error, Reason} end. @@ -162,6 +161,12 @@ on_health_check(_InstId, #{name := InstanceId} = State) -> _ -> {error, {connector_down, InstanceId}, State} end. +ensure_mqtt_worker_started(InstanceId) -> + case emqx_connector_mqtt_worker:ensure_started(InstanceId) of + ok -> {ok, #{name => InstanceId}}; + {error, Reason} -> {error, Reason} + end. + make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 -> undefined; make_sub_confs(undefined) -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 11f9460b4..307852546 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -422,9 +422,12 @@ t_mqtt_conn_update2(_) -> , <<"connector">> := ?CONNECTR_ID }, jsx:decode(Bridge)), %% we fix the 'server' parameter to a normal one, it should work - {ok, 200, Bridge2} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), - ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge2)), + {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS + , <<"status">> := <<"connected">> + }, jsx:decode(BridgeStr)), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), From a9c9d9d805f24b33505ebff18e4221944268ac0e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 18 Dec 2021 20:24:23 +0800 Subject: [PATCH 6/9] fix(rule): rename enabled to enable --- apps/emqx_rule_engine/include/rule_engine.hrl | 2 +- apps/emqx_rule_engine/src/emqx_rule_engine.erl | 2 +- apps/emqx_rule_engine/src/emqx_rule_engine_api.erl | 8 ++++---- apps/emqx_rule_engine/src/emqx_rule_runtime.erl | 2 +- apps/emqx_rule_engine/src/emqx_rule_sqltester.erl | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 4884f329e..d7e02cf2e 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -47,7 +47,7 @@ , name := binary() , sql := binary() , outputs := [output()] - , enabled := boolean() + , enable := boolean() , description => binary() , created_at := integer() %% epoch in millisecond precision , from := list(topic()) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 35be28610..5316ca5ef 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -223,7 +223,7 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> id => RuleId, name => maps:get(name, Params, <<"">>), created_at => erlang:system_time(millisecond), - enabled => maps:get(enabled, Params, true), + enable => maps:get(enable, Params, true), sql => Sql, outputs => parse_outputs(Outputs), description => maps:get(description, Params, ""), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 85f27a651..205f85488 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -197,8 +197,8 @@ param_path_id() -> {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}} end; -'/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) -> - Params = filter_out_request_body(Params), +'/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) -> + Params = filter_out_request_body(Params0), ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx:update_config(ConfPath, Params, #{}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> @@ -235,7 +235,7 @@ format_rule_resp(#{ id := Id, name := Name, from := Topics, outputs := Output, sql := SQL, - enabled := Enabled, + enable := Enable, description := Descr}) -> NodeMetrics = get_rule_metrics(Id), #{id => Id, @@ -245,7 +245,7 @@ format_rule_resp(#{ id := Id, name := Name, sql => SQL, metrics => aggregate_metrics(NodeMetrics), node_metrics => NodeMetrics, - enabled => Enabled, + enable => Enable, created_at => format_datetime(CreatedAt, millisecond), description => Descr }. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 1cabf3e32..4225c6f72 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -48,7 +48,7 @@ -spec(apply_rules(list(rule()), input()) -> ok). apply_rules([], _Input) -> ok; -apply_rules([#{enabled := false}|More], Input) -> +apply_rules([#{enable := false}|More], Input) -> apply_rules(More, Input); apply_rules([Rule = #{id := RuleID}|More], Input) -> try apply_rule_discard_result(Rule, Input) diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 7cd9448db..74ec1bb1c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -47,7 +47,7 @@ test_rule(Sql, Select, Context, EventTopics) -> sql => Sql, from => EventTopics, outputs => [#{mod => ?MODULE, func => get_selected_data, args => #{}}], - enabled => true, + enable => true, is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), fields => emqx_rule_sqlparser:select_fields(Select), doeach => emqx_rule_sqlparser:select_doeach(Select), From 05e24b457ab7b5ce8b3600ce97c55c4f33a4dc35 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 18 Dec 2021 20:53:14 +0800 Subject: [PATCH 7/9] fix(bridge): update emqx_bridge.conf --- apps/emqx_bridge/etc/emqx_bridge.conf | 9 +++++---- apps/emqx_bridge/src/emqx_bridge.erl | 17 +++++------------ .../emqx_bridge/src/emqx_bridge_http_schema.erl | 6 +++++- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index fdd4005bf..04f4709b8 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -9,10 +9,10 @@ # direction = ingress # ## topic mappings for this bridge # remote_topic = "aws/#" -# subscribe_qos = 1 +# remote_qos = 1 # local_topic = "from_aws/${topic}" +# local_qos = "${qos}" # payload = "${payload}" -# qos = "${qos}" # retain = "${retain}" #} # @@ -23,14 +23,15 @@ # ## topic mappings for this bridge # local_topic = "emqx/#" # remote_topic = "from_emqx/${topic}" +# remote_qos = "${qos}" # payload = "${payload}" -# qos = 1 # retain = false #} # ## HTTP bridges to an HTTP server #bridges.http.my_http_bridge { # enable = true +# direction = egress # ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url # url = "http://localhost:9901/messages/${topic}" # request_timeout = "30s" @@ -47,7 +48,7 @@ # cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" # } # -# from_local_topic = "emqx_http/#" +# local_topic = "emqx_http/#" # ## the following config entries can use placehodler variables: # ## url, method, body, headers # method = post diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index ad3fee858..1e11046db 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -68,14 +68,9 @@ load_hook(Bridges) -> end, maps:to_list(Bridges)). do_load_hook(#{local_topic := _} = Conf) -> - case maps:find(direction, Conf) of - error -> - %% this bridge has no direction field, it means that it has only egress bridges - emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}); - {ok, egress} -> - emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}); - {ok, ingress} -> - ok + case maps:get(direction, Conf, egress) of + egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}); + ingress -> ok end; do_load_hook(_Conf) -> ok. @@ -276,10 +271,8 @@ get_matched_bridges(Topic) -> (_BName, #{direction := ingress}, Acc1) -> Acc1; (BName, #{direction := egress} = Egress, Acc1) -> - get_matched_bridge_id(Egress, Topic, BType, BName, Acc1); - %% HTTP, MySQL bridges only have egress direction - (BName, BridgeConf, Acc1) -> - get_matched_bridge_id(BridgeConf, Topic, BType, BName, Acc1) + %% HTTP, MySQL bridges only have egress direction + get_matched_bridge_id(Egress, Topic, BType, BName, Acc1) end, Acc0, Conf) end, [], Bridges). diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl index 88cc90db7..43cace332 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -81,9 +81,13 @@ fields("get") -> basic_config() -> [ {enable, mk(boolean(), - #{ desc =>"Enable or disable this bridge" + #{ desc => "Enable or disable this bridge" , default => true })} + , {direction, + mk(egress, + #{ desc => "The direction of this bridge, MUST be egress" + })} ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)). From a879ec0f3aca9212e4eee2580de91d5d6e5c5055 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 20 Dec 2021 10:24:47 +0800 Subject: [PATCH 8/9] feat(resource): add option 'force_create' to emqx_resource:create/4 --- apps/emqx_bridge/src/emqx_bridge.erl | 2 +- apps/emqx_resource/include/emqx_resource.hrl | 6 ++++ apps/emqx_resource/src/emqx_resource.erl | 32 ++++++++++++++++--- .../src/emqx_resource_instance.erl | 30 +++++++---------- .../test/emqx_resource_SUITE.erl | 5 +-- 5 files changed, 48 insertions(+), 27 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 1e11046db..d4fc3df2d 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -192,7 +192,7 @@ create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf)) of + parse_confs(Type, Name, Conf), #{force_create => true}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, Reason} -> {error, Reason} diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index e5eb1785f..08c230401 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -29,6 +29,12 @@ metrics := emqx_plugin_libs_metrics:metrics() }. -type resource_group() :: binary(). +-type create_opts() :: #{ + %% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails, + %% the 'status' of the resource will be 'stopped' in this case. + %% Defaults to 'false' + force_create => boolean() + }. -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | undefined. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index fd4046505..37c4caa2e 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -33,7 +33,9 @@ -export([ check_config/2 , check_and_create/3 + , check_and_create/4 , check_and_create_local/3 + , check_and_create_local/4 , check_and_recreate/4 , check_and_recreate_local/4 ]). @@ -42,7 +44,9 @@ %% provisional solution: rpc:multical to all the nodes for creating/updating/removing %% todo: replicate operations -export([ create/3 %% store the config and start the instance + , create/4 , create_local/3 + , create_local/4 , create_dry_run/2 %% run start/2, health_check/2 and stop/1 sequentially , create_dry_run_local/2 , recreate/4 %% this will do create_dry_run, stop the old instance and start a new one @@ -141,12 +145,22 @@ apply_query_after_calls(Funcs) -> -spec create(instance_id(), resource_type(), resource_config()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(InstId, ResourceType, Config) -> - cluster_call(create_local, [InstId, ResourceType, Config]). + create(InstId, ResourceType, Config, #{}). + +-spec create(instance_id(), resource_type(), resource_config(), create_opts()) -> + {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. +create(InstId, ResourceType, Config, Opts) -> + cluster_call(create_local, [InstId, ResourceType, Config, Opts]). -spec create_local(instance_id(), resource_type(), resource_config()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create_local(InstId, ResourceType, Config) -> - call_instance(InstId, {create, InstId, ResourceType, Config}). + create_local(InstId, ResourceType, Config, #{}). + +-spec create_local(instance_id(), resource_type(), resource_config(), create_opts()) -> + {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. +create_local(InstId, ResourceType, Config, Opts) -> + call_instance(InstId, {create, InstId, ResourceType, Config, Opts}). -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -294,14 +308,24 @@ check_config(ResourceType, RawConfigTerm) -> -spec check_and_create(instance_id(), resource_type(), raw_resource_config()) -> {ok, resource_data() | 'already_created'} | {error, term()}. check_and_create(InstId, ResourceType, RawConfig) -> + check_and_create(InstId, ResourceType, RawConfig, #{}). + +-spec check_and_create(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> + {ok, resource_data() | 'already_created'} | {error, term()}. +check_and_create(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> create(InstId, ResourceType, InstConf) end). + fun(InstConf) -> create(InstId, ResourceType, InstConf, Opts) end). -spec check_and_create_local(instance_id(), resource_type(), raw_resource_config()) -> {ok, resource_data()} | {error, term()}. check_and_create_local(InstId, ResourceType, RawConfig) -> + check_and_create_local(InstId, ResourceType, RawConfig, #{}). + +-spec check_and_create_local(instance_id(), resource_type(), raw_resource_config(), + create_opts()) -> {ok, resource_data()} | {error, term()}. +check_and_create_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> create_local(InstId, ResourceType, InstConf) end). + fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end). -spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) -> {ok, resource_data()} | {error, term()}. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 708dc6030..497affa5e 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -26,7 +26,6 @@ -export([ lookup/1 , get_metrics/1 , list_all/0 - , create_local/3 ]). -export([ hash_call/2 @@ -85,15 +84,6 @@ list_all() -> error:badarg -> [] end. - --spec create_local(instance_id(), resource_type(), resource_config()) -> - {ok, resource_data()} | {error, term()}. -create_local(InstId, ResourceType, InstConf) -> - case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of - {ok, Data} -> {ok, Data}; - Error -> Error - end. - %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ @@ -105,8 +95,8 @@ init({Pool, Id}) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #state{worker_pool = Pool, worker_id = Id}}. -handle_call({create, InstId, ResourceType, Config}, _From, State) -> - {reply, do_create(InstId, ResourceType, Config), State}; +handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) -> + {reply, do_create(InstId, ResourceType, Config, Opts), State}; handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) -> {reply, do_create_dry_run(InstId, ResourceType, Config), State}; @@ -146,7 +136,7 @@ code_change(_OldVsn, State, _Extra) -> %% suppress the race condition check, as these functions are protected in gproc workers -dialyzer({nowarn_function, [do_recreate/4, - do_create/3, + do_create/4, do_restart/1, do_stop/1, do_health_check/1]}). @@ -160,7 +150,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> case do_create_dry_run(TestInstId, ResourceType, Config) of ok -> do_remove(ResourceType, InstId, ResourceState), - do_create(InstId, ResourceType, Config); + do_create(InstId, ResourceType, Config, #{force_create => true}); Error -> Error end; @@ -170,7 +160,8 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> {error, not_found} end. -do_create(InstId, ResourceType, Config) -> +do_create(InstId, ResourceType, Config, Opts) -> + ForceCreate = maps:get(force_create, Opts, false), case lookup(InstId) of {ok, _} -> {ok, already_created}; _ -> @@ -183,11 +174,14 @@ do_create(InstId, ResourceType, Config) -> %% status and then do ets:insert/2 _ = do_health_check(Res0#{state => ResourceState}), {ok, force_lookup(InstId)}; - {error, Reason} -> - logger:error("start ~ts resource ~ts failed: ~p", + {error, Reason} when ForceCreate == true -> + logger:error("start ~ts resource ~ts failed: ~p, " + "force_create it as a stopped resource", [ResourceType, InstId, Reason]), ets:insert(emqx_resource_instance, {InstId, Res0}), - {ok, Res0} + {ok, Res0}; + {error, Reason} when ForceCreate == false -> + {error, Reason} end end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 4f641c85a..6b2e5903e 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -142,10 +142,7 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertException( - error, - {?ID, stopped}, - emqx_resource:query(?ID, get_state)), + ?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID), From 27de3139a45aaf02b12948ce54efc7e4be990c7a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 20 Dec 2021 11:36:52 +0800 Subject: [PATCH 9/9] fix(http_connect): don't check status on_start --- apps/emqx_connector/src/emqx_connector_http.erl | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 2cdc9595c..2b9bd48aa 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -173,16 +173,11 @@ on_start(InstId, #{base_url := #{scheme := Scheme, base_path => BasePath, request => preprocess_request(maps:get(request, Config, undefined)) }, - case do_health_check(Host, Port, ConnectTimeout) of - ok -> - case ehttpc_sup:start_pool(PoolName, PoolOpts) of - {ok, _} -> {ok, State}; - {error, {already_started, _}} -> {ok, State}; - {error, Reason} -> - {error, Reason} - end; + case ehttpc_sup:start_pool(PoolName, PoolOpts) of + {ok, _} -> {ok, State}; + {error, {already_started, _}} -> {ok, State}; {error, Reason} -> - {error, {http_start_failed, Reason}} + {error, Reason} end. on_stop(InstId, #{pool_name := PoolName}) ->