fix(bridge): filter out some extra fields from the request body

This commit is contained in:
Shawn 2021-12-18 19:19:58 +08:00
parent 11e8e0db69
commit a44e18e869
10 changed files with 142 additions and 20 deletions

View File

@ -134,7 +134,12 @@ method_example(Type, Direction, get) ->
#{ #{
id => bin(SType ++ ":" ++ SName), id => bin(SType ++ ":" ++ SName),
type => bin(SType), type => bin(SType),
name => bin(SName) name => bin(SName),
metrics => ?METRICS(0, 0, 0, 0, 0, 0),
node_metrics => [
#{node => node(),
metrics => ?METRICS(0, 0, 0, 0, 0, 0)}
]
}; };
method_example(Type, Direction, post) -> method_example(Type, Direction, post) ->
SType = atom_to_list(Type), SType = atom_to_list(Type),
@ -269,7 +274,8 @@ schema("/bridges/:id/operation/:operation") ->
} }
}. }.
'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) -> '/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf0}) ->
Conf = filter_out_request_body(Conf0),
BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()), BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()),
case emqx_bridge:lookup(BridgeType, BridgeName) of case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} -> {ok, _} ->
@ -291,7 +297,8 @@ list_local_bridges(Node) ->
'/bridges/:id'(get, #{bindings := #{id := Id}}) -> '/bridges/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf}) -> '/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf0}) ->
Conf = filter_out_request_body(Conf0),
?TRY_PARSE_ID(Id, ?TRY_PARSE_ID(Id,
case emqx_bridge:lookup(BridgeType, BridgeName) of case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} -> {ok, _} ->
@ -423,6 +430,11 @@ rpc_multicall(Func, Args) ->
ErrL -> {error, ErrL} ErrL -> {error, ErrL}
end. end.
filter_out_request_body(Conf) ->
ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>,
<<"metrics">>, <<"node">>],
maps:without(ExtraConfs, Conf).
rpc_call(Node, Fun, Args) -> rpc_call(Node, Fun, Args) ->
rpc_call(Node, ?MODULE, Fun, Args). rpc_call(Node, ?MODULE, Fun, Args).

View File

@ -76,7 +76,7 @@ fields("put") ->
fields("get") -> fields("get") ->
[ id_field() [ id_field()
] ++ fields("post"). ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post").
basic_config() -> basic_config() ->
[ {enable, [ {enable,

View File

@ -38,10 +38,10 @@ fields("put_egress") ->
fields("get_ingress") -> fields("get_ingress") ->
[ id_field() [ id_field()
] ++ fields("post_ingress"); ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post_ingress");
fields("get_egress") -> fields("get_egress") ->
[ id_field() [ id_field()
] ++ fields("post_egress"). ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post_egress").
%%====================================================================================== %%======================================================================================
id_field() -> id_field() ->

View File

@ -12,6 +12,7 @@
]). ]).
-export([ common_bridge_fields/0 -export([ common_bridge_fields/0
, metrics_status_fields/0
, direction_field/2 , direction_field/2
]). ]).
@ -56,6 +57,17 @@ In config files, you can find the corresponding config entry for a connector by
})} })}
]. ].
metrics_status_fields() ->
[ {"metrics", mk(ref(?MODULE, "metrics"), #{desc => "The metrics of the bridge"})}
, {"node_metrics", mk(hoconsc:array(ref(?MODULE, "node_metrics")),
#{ desc => "The metrics of the bridge for each node"
})}
, {"status", mk(ref(?MODULE, "status"), #{desc => "The status of the bridge"})}
, {"node_status", mk(hoconsc:array(ref(?MODULE, "node_status")),
#{ desc => "The status of the bridge for each node"
})}
].
direction_field(Dir, Desc) -> direction_field(Dir, Desc) ->
{direction, mk(Dir, {direction, mk(Dir,
#{ nullable => false #{ nullable => false
@ -72,7 +84,40 @@ fields(bridges) ->
++ [{T, mk(hoconsc:map(name, hoconsc:union([ ++ [{T, mk(hoconsc:map(name, hoconsc:union([
ref(schema_mod(T), "ingress"), ref(schema_mod(T), "ingress"),
ref(schema_mod(T), "egress") ref(schema_mod(T), "egress")
])), #{})} || T <- ?CONN_TYPES]. ])), #{})} || T <- ?CONN_TYPES];
fields("metrics") ->
[ {"matched", mk(integer(), #{desc => "Count of this bridge is queried"})}
, {"success", mk(integer(), #{desc => "Count of query success"})}
, {"failed", mk(integer(), #{desc => "Count of query failed"})}
, {"rate", mk(float(), #{desc => "The rate of matched, times/second"})}
, {"rate_max", mk(float(), #{desc => "The max rate of matched, times/second"})}
, {"rate_last5m", mk(float(),
#{desc => "The average rate of matched in last 5 mins, times/second"})}
];
fields("node_metrics") ->
[ node_name()
, {"metrics", mk(ref(?MODULE, "metrics"), #{})}
];
fields("status") ->
[ {"matched", mk(integer(), #{desc => "Count of this bridge is queried"})}
, {"success", mk(integer(), #{desc => "Count of query success"})}
, {"failed", mk(integer(), #{desc => "Count of query failed"})}
, {"rate", mk(float(), #{desc => "The rate of matched, times/second"})}
, {"rate_max", mk(float(), #{desc => "The max rate of matched, times/second"})}
, {"rate_last5m", mk(float(),
#{desc => "The average rate of matched in last 5 mins, times/second"})}
];
fields("node_status") ->
[ node_name()
, {"status", mk(ref(?MODULE, "status"), #{})}
].
node_name() ->
{"node", mk(binary(), #{desc => "The node name", example => "emqx@127.0.0.1"})}.
schema_mod(Type) -> schema_mod(Type) ->
list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])). list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])).

View File

@ -234,7 +234,8 @@ schema("/connectors/:id") ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)} {404, error_msg('NOT_FOUND', <<"connector not found">>)}
end); end);
'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params}) -> '/connectors/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
Params = filter_out_request_body(Params0),
?TRY_PARSE_ID(Id, ?TRY_PARSE_ID(Id,
case emqx_connector:lookup(ConnType, ConnName) of case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} -> {ok, _} ->
@ -277,5 +278,9 @@ format_resp(ConnId, RawConf) ->
<<"num_of_bridges">> => NumOfBridges <<"num_of_bridges">> => NumOfBridges
}. }.
filter_out_request_body(Conf) ->
ExtraConfs = [<<"num_of_bridges">>, <<"type">>, <<"name">>],
maps:without(ExtraConfs, Conf).
bin(S) when is_list(S) -> bin(S) when is_list(S) ->
list_to_binary(S). list_to_binary(S).

View File

@ -84,6 +84,9 @@ stop(#{client_pid := Pid}) ->
safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000), safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000),
ok. ok.
ping(undefined) ->
pang;
ping(#{client_pid := Pid}) -> ping(#{client_pid := Pid}) ->
emqtt:ping(Pid). emqtt:ping(Pid).

View File

@ -379,7 +379,7 @@ t_mqtt_conn_update(_) ->
%% then we try to update 'server' of the connector, to an unavailable IP address %% then we try to update 'server' of the connector, to an unavailable IP address
%% the update should fail because of 'unreachable' or 'connrefused' %% the update should fail because of 'unreachable' or 'connrefused'
{ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)),
%% we fix the 'server' parameter to a normal one, it should work %% we fix the 'server' parameter to a normal one, it should work
{ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
@ -391,6 +391,48 @@ t_mqtt_conn_update(_) ->
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_update2(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST
%% but this connector is point to a unreachable server "2603"
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)
#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"server">> := <<"127.0.0.1:2603">>
}, jsx:decode(Connector)),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
, <<"type">> := <<"mqtt">>
, <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"disconnected">>
, <<"connector">> := ?CONNECTR_ID
}, jsx:decode(Bridge)),
%% we fix the 'server' parameter to a normal one, it should work
{ok, 200, Bridge2} = request(put, uri(["connectors", ?CONNECTR_ID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge2)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_testing(_) -> t_mqtt_conn_testing(_) ->
%% APIs for testing the connectivity %% APIs for testing the connectivity
%% then we add a mqtt connector, using POST %% then we add a mqtt connector, using POST

View File

@ -156,7 +156,8 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
{ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} ->
Config = emqx_resource:call_config_merge(ResourceType, OldConfig, Config = emqx_resource:call_config_merge(ResourceType, OldConfig,
NewConfig, Params), NewConfig, Params),
case do_create_dry_run(InstId, ResourceType, Config) of TestInstId = iolist_to_binary(emqx_misc:gen_id(16)),
case do_create_dry_run(TestInstId, ResourceType, Config) of
ok -> ok ->
do_remove(ResourceType, InstId, ResourceState), do_remove(ResourceType, InstId, ResourceState),
do_create(InstId, ResourceType, Config); do_create(InstId, ResourceType, Config);
@ -185,6 +186,7 @@ do_create(InstId, ResourceType, Config) ->
{error, Reason} -> {error, Reason} ->
logger:error("start ~ts resource ~ts failed: ~p", logger:error("start ~ts resource ~ts failed: ~p",
[ResourceType, InstId, Reason]), [ResourceType, InstId, Reason]),
ets:insert(emqx_resource_instance, {InstId, Res0}),
{ok, Res0} {ok, Res0}
end end
end. end.

View File

@ -43,7 +43,9 @@ fields("rule_creation") ->
fields("rule_info") -> fields("rule_info") ->
[ rule_id() [ rule_id()
, {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})} , {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})}
, {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})} , {"node_metrics", sc(hoconsc:array(ref("node_metrics")),
#{ desc => "The metrics of the rule for each node"
})}
, {"from", sc(hoconsc:array(binary()), , {"from", sc(hoconsc:array(binary()),
#{desc => "The topics of the rule", example => "t/#"})} #{desc => "The topics of the rule", example => "t/#"})}
, {"created_at", sc(binary(), , {"created_at", sc(binary(),

View File

@ -164,14 +164,15 @@ param_path_id() ->
Records = emqx_rule_engine:get_rules_ordered_by_ts(), Records = emqx_rule_engine:get_rules_ordered_by_ts(),
{200, format_rule_resp(Records)}; {200, format_rule_resp(Records)};
'/rules'(post, #{body := Params}) -> '/rules'(post, #{body := Params0}) ->
Id = maps:get(<<"id">>, Params, list_to_binary(emqx_misc:gen_id(8))), Id = maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))),
Params = filter_out_request_body(Params0),
ConfPath = emqx_rule_engine:config_key_path() ++ [Id], ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx_rule_engine:get_rule(Id) of case emqx_rule_engine:get_rule(Id) of
{ok, _Rule} -> {ok, _Rule} ->
{400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}}; {400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}};
not_found -> not_found ->
case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of case emqx:update_config(ConfPath, Params, #{}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id), [Rule] = get_one_rule(AllRules, Id),
{201, format_rule_resp(Rule)}; {201, format_rule_resp(Rule)};
@ -197,8 +198,9 @@ param_path_id() ->
end; end;
'/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) -> '/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) ->
Params = filter_out_request_body(Params),
ConfPath = emqx_rule_engine:config_key_path() ++ [Id], ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of case emqx:update_config(ConfPath, Params, #{}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id), [Rule] = get_one_rule(AllRules, Id),
{200, format_rule_resp(Rule)}; {200, format_rule_resp(Rule)};
@ -266,10 +268,12 @@ get_rule_metrics(Id) ->
rate_max := Max, rate_max := Max,
rate_last5m := Last5M rate_last5m := Last5M
}) -> }) ->
#{ matched => Matched #{ metrics => #{
, rate => Current matched => Matched,
, rate_max => Max rate => Current,
, rate_last5m => Last5M rate_max => Max,
rate_last5m => Last5M
}
, node => Node , node => Node
} }
end, end,
@ -279,7 +283,8 @@ get_rule_metrics(Id) ->
aggregate_metrics(AllMetrics) -> aggregate_metrics(AllMetrics) ->
InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0}, InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0},
lists:foldl(fun lists:foldl(fun
(#{matched := Match1, rate := Rate1, rate_max := RateMax1, rate_last5m := Rate5m1}, (#{metrics := #{matched := Match1, rate := Rate1,
rate_max := RateMax1, rate_last5m := Rate5m1}},
#{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) -> #{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) ->
#{matched => Match1 + Match0, rate => Rate1 + Rate0, #{matched => Match1 + Match0, rate => Rate1 + Rate0,
rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0} rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0}
@ -287,3 +292,9 @@ aggregate_metrics(AllMetrics) ->
get_one_rule(AllRules, Id) -> get_one_rule(AllRules, Id) ->
[R || R = #{id := Id0} <- AllRules, Id0 == Id]. [R || R = #{id := Id0} <- AllRules, Id0 == Id].
filter_out_request_body(Conf) ->
ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>,
<<"metrics">>, <<"node">>],
maps:without(ExtraConfs, Conf).