emqx/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

265 lines
8.5 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-import(hoconsc, [mk/2, ref/2]).
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
-export([
get_response/0,
put_request/0,
post_request/0
]).
-export([
common_bridge_fields/0,
metrics_fields/0,
status_fields/0,
type_and_name_fields/1
]).
%% for testing only
-export([enterprise_api_schemas/1, enterprise_fields_bridges/0]).
%%======================================================================================
%% Hocon Schema Definitions
%%======================================================================================
%% For HTTP APIs
get_response() ->
api_schema("get").
put_request() ->
api_schema("put").
post_request() ->
api_schema("post").
api_schema(Method) ->
Broker = [
{Type, ref(Mod, Method)}
|| {Type, Mod} <- [
{<<"webhook">>, emqx_bridge_http_schema},
{<<"mqtt">>, emqx_bridge_mqtt_schema}
]
],
EE = ?MODULE:enterprise_api_schemas(Method),
hoconsc:union(bridge_api_union(Broker ++ EE)).
bridge_api_union(Refs) ->
Index = maps:from_list(Refs),
fun
(all_union_members) ->
maps:values(Index);
({value, V}) ->
case V of
#{<<"type">> := T} ->
case maps:get(T, Index, undefined) of
undefined ->
throw(#{
field_name => type,
reason => <<"unknown bridge type">>
});
Ref ->
[Ref]
end;
_ ->
throw(#{
field_name => type,
reason => <<"unknown bridge type">>
})
end
end.
-if(?EMQX_RELEASE_EDITION == ee).
enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_bridge_enterprise:module_info(),
case erlang:function_exported(emqx_bridge_enterprise, api_schemas, 1) of
true -> emqx_bridge_enterprise:api_schemas(Method);
false -> []
end.
enterprise_fields_bridges() ->
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_bridge_enterprise:module_info(),
case erlang:function_exported(emqx_bridge_enterprise, fields, 1) of
true -> emqx_bridge_enterprise:fields(bridges);
false -> []
end.
-else.
enterprise_api_schemas(_) -> [].
enterprise_fields_bridges() -> [].
-endif.
common_bridge_fields() ->
[
{enable,
mk(
boolean(),
#{
desc => ?DESC("desc_enable"),
importance => ?IMPORTANCE_NO_DOC,
default => true
}
)},
{tags, emqx_schema:tags_schema()},
%% Create v2 connector then usr v1 /bridges_probe api to test connector
%% /bridges_probe should pass through v2 connector's description.
{description, emqx_schema:description_schema()}
].
status_fields() ->
[
{"status", mk(status(), #{desc => ?DESC("desc_status")})},
{"status_reason",
mk(binary(), #{
required => false,
desc => ?DESC("desc_status_reason"),
example => <<"Connection refused">>
})},
{"node_status",
mk(
hoconsc:array(ref(?MODULE, "node_status")),
#{desc => ?DESC("desc_node_status")}
)}
].
metrics_fields() ->
[
{"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("desc_metrics")})},
{"node_metrics",
mk(
hoconsc:array(ref(?MODULE, "node_metrics")),
#{desc => ?DESC("desc_node_metrics")}
)}
].
type_and_name_fields(ConnectorType) ->
[
{type, mk(ConnectorType, #{required => true, desc => ?DESC("desc_type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
].
%%======================================================================================
%% For config files
namespace() -> "bridge".
tags() ->
[<<"Bridge">>].
roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_HIDDEN})}].
fields(bridges) ->
[
{webhook,
mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
#{
desc => ?DESC("bridges_webhook"),
required => false,
converter => fun http_bridge_converter/2
}
)},
{mqtt,
mk(
hoconsc:map(name, ref(emqx_bridge_mqtt_schema, "config")),
#{
desc => ?DESC("bridges_mqtt"),
required => false,
converter => fun(X, _HoconOpts) ->
emqx_bridge_compatible_config:upgrade_pre_ee(
X, fun emqx_bridge_compatible_config:maybe_upgrade/1
)
end
}
)}
] ++ ?MODULE:enterprise_fields_bridges();
fields("metrics") ->
[
{"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},
{"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})},
{"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})},
{"dropped.resource_not_found",
mk(integer(), #{desc => ?DESC("metric_dropped_resource_not_found")})},
{"dropped.resource_stopped",
mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})},
{"matched", mk(integer(), #{desc => ?DESC("metric_matched")})},
{"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})},
{"retried", mk(integer(), #{desc => ?DESC("metric_retried")})},
{"failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})},
{"inflight", mk(integer(), #{desc => ?DESC("metric_inflight")})},
{"success", mk(integer(), #{desc => ?DESC("metric_sent_success")})},
{"rate", mk(float(), #{desc => ?DESC("metric_rate")})},
{"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})},
{"rate_last5m",
mk(
float(),
#{desc => ?DESC("metric_rate_last5m")}
)},
{"received", mk(float(), #{desc => ?DESC("metric_received")})}
];
fields("node_metrics") ->
[
node_name(),
{"metrics", mk(ref(?MODULE, "metrics"), #{})}
];
fields("node_status") ->
[
node_name(),
{"status", mk(status(), #{})},
{"status_reason",
mk(binary(), #{
required => false,
desc => ?DESC("desc_status_reason"),
example => <<"Connection refused">>
})}
].
desc(bridges) ->
?DESC("desc_bridges");
desc("metrics") ->
?DESC("desc_metrics");
desc("node_metrics") ->
?DESC("desc_node_metrics");
desc("node_status") ->
?DESC("desc_node_status");
desc(_) ->
undefined.
status() ->
hoconsc:enum([connected, disconnected, connecting, inconsistent]).
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
http_bridge_converter(Conf0, _HoconOpts) ->
emqx_bridge_compatible_config:upgrade_pre_ee(
Conf0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
).