From a44e18e869467a06be6ff03864908fd429c6491c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 18 Dec 2021 19:19:58 +0800 Subject: [PATCH] fix(bridge): filter out some extra fields from the request body --- apps/emqx_bridge/src/emqx_bridge_api.erl | 18 +++++-- .../src/emqx_bridge_http_schema.erl | 2 +- .../src/emqx_bridge_mqtt_schema.erl | 4 +- apps/emqx_bridge/src/emqx_bridge_schema.erl | 47 ++++++++++++++++++- .../emqx_connector/src/emqx_connector_api.erl | 7 ++- .../src/mqtt/emqx_connector_mqtt_mod.erl | 3 ++ .../test/emqx_connector_api_SUITE.erl | 44 ++++++++++++++++- .../src/emqx_resource_instance.erl | 4 +- .../src/emqx_rule_api_schema.erl | 4 +- .../src/emqx_rule_engine_api.erl | 29 ++++++++---- 10 files changed, 142 insertions(+), 20 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 121d99f85..0f291ac1a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -134,7 +134,12 @@ method_example(Type, Direction, get) -> #{ id => bin(SType ++ ":" ++ SName), type => bin(SType), - name => bin(SName) + name => bin(SName), + metrics => ?METRICS(0, 0, 0, 0, 0, 0), + node_metrics => [ + #{node => node(), + metrics => ?METRICS(0, 0, 0, 0, 0, 0)} + ] }; method_example(Type, Direction, post) -> SType = atom_to_list(Type), @@ -269,7 +274,8 @@ schema("/bridges/:id/operation/:operation") -> } }. -'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) -> +'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf0}) -> + Conf = filter_out_request_body(Conf0), BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()), case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, _} -> @@ -291,7 +297,8 @@ list_local_bridges(Node) -> '/bridges/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); -'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf}) -> +'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf0}) -> + Conf = filter_out_request_body(Conf0), ?TRY_PARSE_ID(Id, case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, _} -> @@ -423,6 +430,11 @@ rpc_multicall(Func, Args) -> ErrL -> {error, ErrL} end. +filter_out_request_body(Conf) -> + ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>, + <<"metrics">>, <<"node">>], + maps:without(ExtraConfs, Conf). + rpc_call(Node, Fun, Args) -> rpc_call(Node, ?MODULE, Fun, Args). diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl index f7644af1d..88cc90db7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -76,7 +76,7 @@ fields("put") -> fields("get") -> [ id_field() - ] ++ fields("post"). + ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post"). basic_config() -> [ {enable, diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl index 4b6965349..3de011b4c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl @@ -38,10 +38,10 @@ fields("put_egress") -> fields("get_ingress") -> [ id_field() - ] ++ fields("post_ingress"); + ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post_ingress"); fields("get_egress") -> [ id_field() - ] ++ fields("post_egress"). + ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post_egress"). %%====================================================================================== id_field() -> diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index b960cd8c8..3acfbcdef 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -12,6 +12,7 @@ ]). -export([ common_bridge_fields/0 + , metrics_status_fields/0 , direction_field/2 ]). @@ -56,6 +57,17 @@ In config files, you can find the corresponding config entry for a connector by })} ]. +metrics_status_fields() -> + [ {"metrics", mk(ref(?MODULE, "metrics"), #{desc => "The metrics of the bridge"})} + , {"node_metrics", mk(hoconsc:array(ref(?MODULE, "node_metrics")), + #{ desc => "The metrics of the bridge for each node" + })} + , {"status", mk(ref(?MODULE, "status"), #{desc => "The status of the bridge"})} + , {"node_status", mk(hoconsc:array(ref(?MODULE, "node_status")), + #{ desc => "The status of the bridge for each node" + })} + ]. + direction_field(Dir, Desc) -> {direction, mk(Dir, #{ nullable => false @@ -72,7 +84,40 @@ fields(bridges) -> ++ [{T, mk(hoconsc:map(name, hoconsc:union([ ref(schema_mod(T), "ingress"), ref(schema_mod(T), "egress") - ])), #{})} || T <- ?CONN_TYPES]. + ])), #{})} || T <- ?CONN_TYPES]; + +fields("metrics") -> + [ {"matched", mk(integer(), #{desc => "Count of this bridge is queried"})} + , {"success", mk(integer(), #{desc => "Count of query success"})} + , {"failed", mk(integer(), #{desc => "Count of query failed"})} + , {"rate", mk(float(), #{desc => "The rate of matched, times/second"})} + , {"rate_max", mk(float(), #{desc => "The max rate of matched, times/second"})} + , {"rate_last5m", mk(float(), + #{desc => "The average rate of matched in last 5 mins, times/second"})} + ]; + +fields("node_metrics") -> + [ node_name() + , {"metrics", mk(ref(?MODULE, "metrics"), #{})} + ]; + +fields("status") -> + [ {"matched", mk(integer(), #{desc => "Count of this bridge is queried"})} + , {"success", mk(integer(), #{desc => "Count of query success"})} + , {"failed", mk(integer(), #{desc => "Count of query failed"})} + , {"rate", mk(float(), #{desc => "The rate of matched, times/second"})} + , {"rate_max", mk(float(), #{desc => "The max rate of matched, times/second"})} + , {"rate_last5m", mk(float(), + #{desc => "The average rate of matched in last 5 mins, times/second"})} + ]; + +fields("node_status") -> + [ node_name() + , {"status", mk(ref(?MODULE, "status"), #{})} + ]. + +node_name() -> + {"node", mk(binary(), #{desc => "The node name", example => "emqx@127.0.0.1"})}. schema_mod(Type) -> list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index cdba638d8..5d6bddb6a 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -234,7 +234,8 @@ schema("/connectors/:id") -> {404, error_msg('NOT_FOUND', <<"connector not found">>)} end); -'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params}) -> +'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params0}) -> + Params = filter_out_request_body(Params0), ?TRY_PARSE_ID(Id, case emqx_connector:lookup(ConnType, ConnName) of {ok, _} -> @@ -277,5 +278,9 @@ format_resp(ConnId, RawConf) -> <<"num_of_bridges">> => NumOfBridges }. +filter_out_request_body(Conf) -> + ExtraConfs = [<<"num_of_bridges">>, <<"type">>, <<"name">>], + maps:without(ExtraConfs, Conf). + bin(S) when is_list(S) -> list_to_binary(S). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 7fb260130..7d5bb1283 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -84,6 +84,9 @@ stop(#{client_pid := Pid}) -> safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000), ok. +ping(undefined) -> + pang; + ping(#{client_pid := Pid}) -> emqtt:ping(Pid). diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 699579d7f..11f9460b4 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -379,7 +379,7 @@ t_mqtt_conn_update(_) -> %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]), - ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)), + ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)), %% we fix the 'server' parameter to a normal one, it should work {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)), @@ -391,6 +391,48 @@ t_mqtt_conn_update(_) -> {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). +t_mqtt_conn_update2(_) -> + %% assert we there's no connectors and no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% then we add a mqtt connector, using POST + %% but this connector is point to a unreachable server "2603" + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>) + #{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + + ?assertMatch(#{ <<"id">> := ?CONNECTR_ID + , <<"server">> := <<"127.0.0.1:2603">> + }, jsx:decode(Connector)), + + %% ... and a MQTT bridge, using POST + %% we bind this bridge to the connector created just now + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS + , <<"type">> := <<"mqtt">> + , <<"name">> := ?BRIDGE_NAME_EGRESS + , <<"status">> := <<"disconnected">> + , <<"connector">> := ?CONNECTR_ID + }, jsx:decode(Bridge)), + %% we fix the 'server' parameter to a normal one, it should work + {ok, 200, Bridge2} = request(put, uri(["connectors", ?CONNECTR_ID]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), + ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge2)), + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% delete the connector + {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). + t_mqtt_conn_testing(_) -> %% APIs for testing the connectivity %% then we add a mqtt connector, using POST diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 8c5232706..708dc6030 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -156,7 +156,8 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> Config = emqx_resource:call_config_merge(ResourceType, OldConfig, NewConfig, Params), - case do_create_dry_run(InstId, ResourceType, Config) of + TestInstId = iolist_to_binary(emqx_misc:gen_id(16)), + case do_create_dry_run(TestInstId, ResourceType, Config) of ok -> do_remove(ResourceType, InstId, ResourceState), do_create(InstId, ResourceType, Config); @@ -185,6 +186,7 @@ do_create(InstId, ResourceType, Config) -> {error, Reason} -> logger:error("start ~ts resource ~ts failed: ~p", [ResourceType, InstId, Reason]), + ets:insert(emqx_resource_instance, {InstId, Res0}), {ok, Res0} end end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index b3b7afe4e..1caa8da23 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -43,7 +43,9 @@ fields("rule_creation") -> fields("rule_info") -> [ rule_id() , {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})} - , {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})} + , {"node_metrics", sc(hoconsc:array(ref("node_metrics")), + #{ desc => "The metrics of the rule for each node" + })} , {"from", sc(hoconsc:array(binary()), #{desc => "The topics of the rule", example => "t/#"})} , {"created_at", sc(binary(), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 7cfeb5d7e..85f27a651 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -164,14 +164,15 @@ param_path_id() -> Records = emqx_rule_engine:get_rules_ordered_by_ts(), {200, format_rule_resp(Records)}; -'/rules'(post, #{body := Params}) -> - Id = maps:get(<<"id">>, Params, list_to_binary(emqx_misc:gen_id(8))), +'/rules'(post, #{body := Params0}) -> + Id = maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))), + Params = filter_out_request_body(Params0), ConfPath = emqx_rule_engine:config_key_path() ++ [Id], case emqx_rule_engine:get_rule(Id) of {ok, _Rule} -> {400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}}; not_found -> - case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of + case emqx:update_config(ConfPath, Params, #{}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> [Rule] = get_one_rule(AllRules, Id), {201, format_rule_resp(Rule)}; @@ -197,8 +198,9 @@ param_path_id() -> end; '/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) -> + Params = filter_out_request_body(Params), ConfPath = emqx_rule_engine:config_key_path() ++ [Id], - case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of + case emqx:update_config(ConfPath, Params, #{}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> [Rule] = get_one_rule(AllRules, Id), {200, format_rule_resp(Rule)}; @@ -266,10 +268,12 @@ get_rule_metrics(Id) -> rate_max := Max, rate_last5m := Last5M }) -> - #{ matched => Matched - , rate => Current - , rate_max => Max - , rate_last5m => Last5M + #{ metrics => #{ + matched => Matched, + rate => Current, + rate_max => Max, + rate_last5m => Last5M + } , node => Node } end, @@ -279,7 +283,8 @@ get_rule_metrics(Id) -> aggregate_metrics(AllMetrics) -> InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0}, lists:foldl(fun - (#{matched := Match1, rate := Rate1, rate_max := RateMax1, rate_last5m := Rate5m1}, + (#{metrics := #{matched := Match1, rate := Rate1, + rate_max := RateMax1, rate_last5m := Rate5m1}}, #{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) -> #{matched => Match1 + Match0, rate => Rate1 + Rate0, rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0} @@ -287,3 +292,9 @@ aggregate_metrics(AllMetrics) -> get_one_rule(AllRules, Id) -> [R || R = #{id := Id0} <- AllRules, Id0 == Id]. + +filter_out_request_body(Conf) -> + ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>, + <<"metrics">>, <<"node">>], + maps:without(ExtraConfs, Conf). +