Merge pull request #12893 from zmstone/0416-add-is-template-flag-to-dashboard-schema
0416 add `is_template` flag to dashboard schema
This commit is contained in:
commit
a8b1224225
|
@ -61,6 +61,8 @@
|
||||||
}.
|
}.
|
||||||
-type url() :: binary().
|
-type url() :: binary().
|
||||||
-type json_binary() :: binary().
|
-type json_binary() :: binary().
|
||||||
|
-type template() :: binary().
|
||||||
|
-type template_str() :: string().
|
||||||
|
|
||||||
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
|
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
|
||||||
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
|
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
|
||||||
|
@ -78,6 +80,8 @@
|
||||||
-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
|
-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
|
||||||
-typerefl_from_string({url/0, emqx_schema, to_url}).
|
-typerefl_from_string({url/0, emqx_schema, to_url}).
|
||||||
-typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}).
|
-typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}).
|
||||||
|
-typerefl_from_string({template/0, emqx_schema, to_template}).
|
||||||
|
-typerefl_from_string({template_str/0, emqx_schema, to_template_str}).
|
||||||
|
|
||||||
-type parsed_server() :: #{
|
-type parsed_server() :: #{
|
||||||
hostname := string(),
|
hostname := string(),
|
||||||
|
@ -120,7 +124,9 @@
|
||||||
to_erl_cipher_suite/1,
|
to_erl_cipher_suite/1,
|
||||||
to_comma_separated_atoms/1,
|
to_comma_separated_atoms/1,
|
||||||
to_url/1,
|
to_url/1,
|
||||||
to_json_binary/1
|
to_json_binary/1,
|
||||||
|
to_template/1,
|
||||||
|
to_template_str/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -160,7 +166,9 @@
|
||||||
comma_separated_atoms/0,
|
comma_separated_atoms/0,
|
||||||
url/0,
|
url/0,
|
||||||
json_binary/0,
|
json_binary/0,
|
||||||
port_number/0
|
port_number/0,
|
||||||
|
template/0,
|
||||||
|
template_str/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
|
-export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
|
||||||
|
@ -2594,6 +2602,12 @@ to_json_binary(Str) ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
to_template(Str) ->
|
||||||
|
{ok, iolist_to_binary(Str)}.
|
||||||
|
|
||||||
|
to_template_str(Str) ->
|
||||||
|
{ok, unicode:characters_to_list(Str, utf8)}.
|
||||||
|
|
||||||
%% @doc support the following format:
|
%% @doc support the following format:
|
||||||
%% - 127.0.0.1:1883
|
%% - 127.0.0.1:1883
|
||||||
%% - ::1:1883
|
%% - ::1:1883
|
||||||
|
|
|
@ -353,13 +353,13 @@ init(_Opts) ->
|
||||||
ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], Module),
|
ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], Module),
|
||||||
ok = hook_deny(),
|
ok = hook_deny(),
|
||||||
{ok, #{hooked => false, providers => #{}, init_done => false},
|
{ok, #{hooked => false, providers => #{}, init_done => false},
|
||||||
{continue, initialize_authentication}}.
|
{continue, {initialize_authentication, init}}}.
|
||||||
|
|
||||||
handle_call(get_providers, _From, #{providers := Providers} = State) ->
|
handle_call(get_providers, _From, #{providers := Providers} = State) ->
|
||||||
reply(Providers, State);
|
reply(Providers, State);
|
||||||
handle_call(
|
handle_call(
|
||||||
{register_providers, Providers},
|
{register_providers, Providers},
|
||||||
_From,
|
From,
|
||||||
#{providers := Reg0} = State
|
#{providers := Reg0} = State
|
||||||
) ->
|
) ->
|
||||||
case lists:filter(fun({T, _}) -> maps:is_key(T, Reg0) end, Providers) of
|
case lists:filter(fun({T, _}) -> maps:is_key(T, Reg0) end, Providers) of
|
||||||
|
@ -371,7 +371,7 @@ handle_call(
|
||||||
Reg0,
|
Reg0,
|
||||||
Providers
|
Providers
|
||||||
),
|
),
|
||||||
reply(ok, State#{providers := Reg}, initialize_authentication);
|
reply(ok, State#{providers := Reg}, {initialize_authentication, From});
|
||||||
Clashes ->
|
Clashes ->
|
||||||
reply({error, {authentication_type_clash, Clashes}}, State)
|
reply({error, {authentication_type_clash, Clashes}}, State)
|
||||||
end;
|
end;
|
||||||
|
@ -447,10 +447,10 @@ handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_continue(initialize_authentication, #{init_done := true} = State) ->
|
handle_continue({initialize_authentication, _From}, #{init_done := true} = State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_continue(initialize_authentication, #{providers := Providers} = State) ->
|
handle_continue({initialize_authentication, From}, #{providers := Providers} = State) ->
|
||||||
InitDone = initialize_authentication(Providers),
|
InitDone = initialize_authentication(Providers, From),
|
||||||
{noreply, maybe_hook(State#{init_done := InitDone})}.
|
{noreply, maybe_hook(State#{init_done := InitDone})}.
|
||||||
|
|
||||||
handle_cast(Req, State) ->
|
handle_cast(Req, State) ->
|
||||||
|
@ -484,11 +484,13 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Private functions
|
%% Private functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
initialize_authentication(Providers) ->
|
initialize_authentication(Providers, From) ->
|
||||||
ProviderTypes = maps:keys(Providers),
|
ProviderTypes = maps:keys(Providers),
|
||||||
Chains = chain_configs(),
|
Chains = chain_configs(),
|
||||||
HasProviders = has_providers_for_configs(Chains, ProviderTypes),
|
HasProviders = has_providers_for_configs(Chains, ProviderTypes),
|
||||||
do_initialize_authentication(Providers, Chains, HasProviders).
|
Result = do_initialize_authentication(Providers, Chains, HasProviders),
|
||||||
|
?tp(info, authn_chains_initialization_done, #{from => From, result => Result}),
|
||||||
|
Result.
|
||||||
|
|
||||||
do_initialize_authentication(_Providers, _Chains, _HasProviders = false) ->
|
do_initialize_authentication(_Providers, _Chains, _HasProviders = false) ->
|
||||||
false;
|
false;
|
||||||
|
@ -500,7 +502,6 @@ do_initialize_authentication(Providers, Chains, _HasProviders = true) ->
|
||||||
Chains
|
Chains
|
||||||
),
|
),
|
||||||
ok = unhook_deny(),
|
ok = unhook_deny(),
|
||||||
?tp(info, authn_chains_initialization_done, #{}),
|
|
||||||
true.
|
true.
|
||||||
|
|
||||||
initialize_chain_authentication(_Providers, _ChainName, []) ->
|
initialize_chain_authentication(_Providers, _ChainName, []) ->
|
||||||
|
|
|
@ -69,9 +69,10 @@ t_initialize(_Config) ->
|
||||||
emqx_access_control:authenticate(?CLIENTINFO)
|
emqx_access_control:authenticate(?CLIENTINFO)
|
||||||
),
|
),
|
||||||
|
|
||||||
|
Self = self(),
|
||||||
?assertWaitEvent(
|
?assertWaitEvent(
|
||||||
ok = emqx_authn_test_lib:register_fake_providers([{password_based, built_in_database}]),
|
ok = emqx_authn_test_lib:register_fake_providers([{password_based, built_in_database}]),
|
||||||
#{?snk_kind := authn_chains_initialization_done},
|
#{?snk_kind := authn_chains_initialization_done, from := {Self, _}},
|
||||||
100
|
100
|
||||||
),
|
),
|
||||||
|
|
||||||
|
|
|
@ -1007,7 +1007,13 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, _ConfRootKey, BridgeType, Br
|
||||||
{error, not_implemented} ->
|
{error, not_implemented} ->
|
||||||
?NOT_IMPLEMENTED;
|
?NOT_IMPLEMENTED;
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
?BAD_REQUEST(<<"Request timeout">>);
|
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => "bridge_bpapi_call_timeout",
|
||||||
|
bridge => BridgeId,
|
||||||
|
call => OperFunc
|
||||||
|
}),
|
||||||
|
?SERVICE_UNAVAILABLE(<<"Request timeout">>);
|
||||||
{error, {start_pool_failed, Name, Reason}} ->
|
{error, {start_pool_failed, Name, Reason}} ->
|
||||||
Msg = bin(
|
Msg = bin(
|
||||||
io_lib:format("Failed to start ~p pool for reason ~p", [Name, redact(Reason)])
|
io_lib:format("Failed to start ~p pool for reason ~p", [Name, redact(Reason)])
|
||||||
|
@ -1018,9 +1024,8 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, _ConfRootKey, BridgeType, Br
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "bridge_inconsistent_in_cluster_for_call_operation",
|
msg => "bridge_inconsistent_in_cluster_for_call_operation",
|
||||||
reason => not_found,
|
reason => not_found,
|
||||||
type => BridgeType,
|
bridge => BridgeId,
|
||||||
name => BridgeName,
|
call => OperFunc
|
||||||
bridge => BridgeId
|
|
||||||
}),
|
}),
|
||||||
?SERVICE_UNAVAILABLE(<<"Bridge not found on remote node: ", BridgeId/binary>>);
|
?SERVICE_UNAVAILABLE(<<"Bridge not found on remote node: ", BridgeId/binary>>);
|
||||||
{error, {node_not_found, Node}} ->
|
{error, {node_not_found, Node}} ->
|
||||||
|
|
|
@ -181,7 +181,7 @@ fields("post", Type) ->
|
||||||
cql_field() ->
|
cql_field() ->
|
||||||
{cql,
|
{cql,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>}
|
#{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>}
|
||||||
)}.
|
)}.
|
||||||
|
|
||||||
|
|
|
@ -184,8 +184,12 @@ fields("post", Type) ->
|
||||||
sql_field() ->
|
sql_field() ->
|
||||||
{sql,
|
{sql,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
#{
|
||||||
|
desc => ?DESC("sql_template"),
|
||||||
|
default => ?DEFAULT_SQL,
|
||||||
|
format => <<"sql">>
|
||||||
|
}
|
||||||
)}.
|
)}.
|
||||||
|
|
||||||
batch_value_separator_field() ->
|
batch_value_separator_field() ->
|
||||||
|
|
|
@ -160,13 +160,7 @@ fields(dynamo_action) ->
|
||||||
);
|
);
|
||||||
fields(action_parameters) ->
|
fields(action_parameters) ->
|
||||||
Parameters =
|
Parameters =
|
||||||
[
|
[{template, template_field_schema()}] ++ emqx_bridge_dynamo_connector:fields(config),
|
||||||
{template,
|
|
||||||
mk(
|
|
||||||
binary(),
|
|
||||||
#{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
|
|
||||||
)}
|
|
||||||
] ++ emqx_bridge_dynamo_connector:fields(config),
|
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(Key, Acc) ->
|
fun(Key, Acc) ->
|
||||||
proplists:delete(Key, Acc)
|
proplists:delete(Key, Acc)
|
||||||
|
@ -199,11 +193,7 @@ fields(connector_resource_opts) ->
|
||||||
fields("config") ->
|
fields("config") ->
|
||||||
[
|
[
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
{template,
|
{template, template_field_schema()},
|
||||||
mk(
|
|
||||||
binary(),
|
|
||||||
#{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
|
|
||||||
)},
|
|
||||||
{local_topic,
|
{local_topic,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
binary(),
|
||||||
|
@ -230,6 +220,15 @@ fields("put") ->
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
emqx_bridge_schema:status_fields() ++ fields("post").
|
emqx_bridge_schema:status_fields() ++ fields("post").
|
||||||
|
|
||||||
|
template_field_schema() ->
|
||||||
|
mk(
|
||||||
|
emqx_schema:template(),
|
||||||
|
#{
|
||||||
|
desc => ?DESC("template"),
|
||||||
|
default => ?DEFAULT_TEMPLATE
|
||||||
|
}
|
||||||
|
).
|
||||||
|
|
||||||
desc("config") ->
|
desc("config") ->
|
||||||
?DESC("desc_config");
|
?DESC("desc_config");
|
||||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||||
|
|
|
@ -135,7 +135,7 @@ overwrite() ->
|
||||||
index() ->
|
index() ->
|
||||||
{index,
|
{index,
|
||||||
?HOCON(
|
?HOCON(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
example => <<"${payload.index}">>,
|
example => <<"${payload.index}">>,
|
||||||
|
@ -146,7 +146,7 @@ index() ->
|
||||||
id(Required) ->
|
id(Required) ->
|
||||||
{id,
|
{id,
|
||||||
?HOCON(
|
?HOCON(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => Required,
|
required => Required,
|
||||||
example => <<"${payload.id}">>,
|
example => <<"${payload.id}">>,
|
||||||
|
@ -157,7 +157,7 @@ id(Required) ->
|
||||||
doc() ->
|
doc() ->
|
||||||
{doc,
|
{doc,
|
||||||
?HOCON(
|
?HOCON(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => false,
|
required => false,
|
||||||
example => <<"${payload.doc}">>,
|
example => <<"${payload.doc}">>,
|
||||||
|
@ -187,7 +187,7 @@ doc_as_upsert() ->
|
||||||
routing() ->
|
routing() ->
|
||||||
{routing,
|
{routing,
|
||||||
?HOCON(
|
?HOCON(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => false,
|
required => false,
|
||||||
example => <<"${payload.routing}">>,
|
example => <<"${payload.routing}">>,
|
||||||
|
|
|
@ -122,7 +122,7 @@ fields(producer) ->
|
||||||
)},
|
)},
|
||||||
{ordering_key_template,
|
{ordering_key_template,
|
||||||
sc(
|
sc(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
default => <<>>,
|
default => <<>>,
|
||||||
desc => ?DESC("ordering_key_template")
|
desc => ?DESC("ordering_key_template")
|
||||||
|
@ -130,7 +130,7 @@ fields(producer) ->
|
||||||
)},
|
)},
|
||||||
{payload_template,
|
{payload_template,
|
||||||
sc(
|
sc(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
default => <<>>,
|
default => <<>>,
|
||||||
desc => ?DESC("payload_template")
|
desc => ?DESC("payload_template")
|
||||||
|
@ -201,8 +201,11 @@ fields(consumer_topic_mapping) ->
|
||||||
{qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
|
{qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
|
||||||
{payload_template,
|
{payload_template,
|
||||||
mk(
|
mk(
|
||||||
string(),
|
emqx_schema:template(),
|
||||||
#{default => <<"${.}">>, desc => ?DESC(consumer_mqtt_payload)}
|
#{
|
||||||
|
default => <<"${.}">>,
|
||||||
|
desc => ?DESC(consumer_mqtt_payload)
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields("consumer_resource_opts") ->
|
fields("consumer_resource_opts") ->
|
||||||
|
@ -221,14 +224,18 @@ fields("consumer_resource_opts") ->
|
||||||
fields(key_value_pair) ->
|
fields(key_value_pair) ->
|
||||||
[
|
[
|
||||||
{key,
|
{key,
|
||||||
mk(binary(), #{
|
mk(emqx_schema:template(), #{
|
||||||
required => true,
|
required => true,
|
||||||
validator => [
|
validator => [
|
||||||
emqx_resource_validator:not_empty("Key templates must not be empty")
|
emqx_resource_validator:not_empty("Key templates must not be empty")
|
||||||
],
|
],
|
||||||
desc => ?DESC(kv_pair_key)
|
desc => ?DESC(kv_pair_key)
|
||||||
})},
|
})},
|
||||||
{value, mk(binary(), #{required => true, desc => ?DESC(kv_pair_value)})}
|
{value,
|
||||||
|
mk(emqx_schema:template(), #{
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC(kv_pair_value)
|
||||||
|
})}
|
||||||
];
|
];
|
||||||
fields("get_producer") ->
|
fields("get_producer") ->
|
||||||
emqx_bridge_schema:status_fields() ++ fields("post_producer");
|
emqx_bridge_schema:status_fields() ++ fields("post_producer");
|
||||||
|
|
|
@ -167,13 +167,13 @@ fields(action_parameters) ->
|
||||||
})},
|
})},
|
||||||
|
|
||||||
{partition_key,
|
{partition_key,
|
||||||
mk(binary(), #{
|
mk(emqx_schema:template(), #{
|
||||||
required => false, desc => ?DESC(emqx_bridge_hstreamdb_connector, "partition_key")
|
required => false,
|
||||||
|
desc => ?DESC(emqx_bridge_hstreamdb_connector, "partition_key")
|
||||||
})},
|
})},
|
||||||
|
|
||||||
{grpc_flush_timeout, fun grpc_flush_timeout/1},
|
{grpc_flush_timeout, fun grpc_flush_timeout/1},
|
||||||
{record_template,
|
{record_template, record_template_schema()},
|
||||||
mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})},
|
|
||||||
{aggregation_pool_size,
|
{aggregation_pool_size,
|
||||||
mk(pos_integer(), #{
|
mk(pos_integer(), #{
|
||||||
default => ?DEFAULT_AGG_POOL_SIZE, desc => ?DESC("aggregation_pool_size")
|
default => ?DEFAULT_AGG_POOL_SIZE, desc => ?DESC("aggregation_pool_size")
|
||||||
|
@ -222,6 +222,12 @@ fields("put") ->
|
||||||
hstream_bridge_common_fields() ++
|
hstream_bridge_common_fields() ++
|
||||||
connector_fields().
|
connector_fields().
|
||||||
|
|
||||||
|
record_template_schema() ->
|
||||||
|
mk(emqx_schema:template(), #{
|
||||||
|
default => <<"${payload}">>,
|
||||||
|
desc => ?DESC("record_template")
|
||||||
|
}).
|
||||||
|
|
||||||
grpc_timeout(type) -> emqx_schema:timeout_duration_ms();
|
grpc_timeout(type) -> emqx_schema:timeout_duration_ms();
|
||||||
grpc_timeout(desc) -> ?DESC(emqx_bridge_hstreamdb_connector, "grpc_timeout");
|
grpc_timeout(desc) -> ?DESC(emqx_bridge_hstreamdb_connector, "grpc_timeout");
|
||||||
grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW;
|
grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW;
|
||||||
|
@ -239,8 +245,7 @@ hstream_bridge_common_fields() ->
|
||||||
[
|
[
|
||||||
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
|
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
|
||||||
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
|
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
|
||||||
{record_template,
|
{record_template, record_template_schema()}
|
||||||
mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})}
|
|
||||||
] ++
|
] ++
|
||||||
emqx_resource_schema:fields("resource_opts").
|
emqx_resource_schema:fields("resource_opts").
|
||||||
|
|
||||||
|
|
|
@ -128,9 +128,10 @@ fields("request") ->
|
||||||
desc => ?DESC("method"),
|
desc => ?DESC("method"),
|
||||||
validator => fun ?MODULE:validate_method/1
|
validator => fun ?MODULE:validate_method/1
|
||||||
})},
|
})},
|
||||||
{path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
|
{path, hoconsc:mk(emqx_schema:template(), #{required => false, desc => ?DESC("path")})},
|
||||||
{body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
|
{body, hoconsc:mk(emqx_schema:template(), #{required => false, desc => ?DESC("body")})},
|
||||||
{headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})},
|
{headers,
|
||||||
|
hoconsc:mk(map(), #{required => false, desc => ?DESC("headers"), is_template => true})},
|
||||||
{max_retries,
|
{max_retries,
|
||||||
sc(
|
sc(
|
||||||
non_neg_integer(),
|
non_neg_integer(),
|
||||||
|
|
|
@ -114,7 +114,7 @@ fields("parameters_opts") ->
|
||||||
[
|
[
|
||||||
{path,
|
{path,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("config_path"),
|
desc => ?DESC("config_path"),
|
||||||
required => false
|
required => false
|
||||||
|
@ -270,7 +270,8 @@ headers_field() ->
|
||||||
<<"content-type">> => <<"application/json">>,
|
<<"content-type">> => <<"application/json">>,
|
||||||
<<"keep-alive">> => <<"timeout=5">>
|
<<"keep-alive">> => <<"timeout=5">>
|
||||||
},
|
},
|
||||||
desc => ?DESC("config_headers")
|
desc => ?DESC("config_headers"),
|
||||||
|
is_template => true
|
||||||
}
|
}
|
||||||
)}.
|
)}.
|
||||||
|
|
||||||
|
@ -287,7 +288,7 @@ method_field() ->
|
||||||
body_field() ->
|
body_field() ->
|
||||||
{body,
|
{body,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
default => undefined,
|
default => undefined,
|
||||||
desc => ?DESC("config_body")
|
desc => ?DESC("config_body")
|
||||||
|
|
|
@ -42,7 +42,7 @@
|
||||||
%% api
|
%% api
|
||||||
|
|
||||||
write_syntax_type() ->
|
write_syntax_type() ->
|
||||||
typerefl:alias("string", write_syntax()).
|
typerefl:alias("template", write_syntax()).
|
||||||
|
|
||||||
%% Examples
|
%% Examples
|
||||||
conn_bridge_examples(Method) ->
|
conn_bridge_examples(Method) ->
|
||||||
|
|
|
@ -84,7 +84,7 @@ fields(action_parameters) ->
|
||||||
)},
|
)},
|
||||||
{device_id,
|
{device_id,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("config_device_id")
|
desc => ?DESC("config_device_id")
|
||||||
}
|
}
|
||||||
|
@ -114,7 +114,7 @@ fields(action_parameters_data) ->
|
||||||
)},
|
)},
|
||||||
{measurement,
|
{measurement,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("config_parameters_measurement")
|
desc => ?DESC("config_parameters_measurement")
|
||||||
|
@ -122,7 +122,9 @@ fields(action_parameters_data) ->
|
||||||
)},
|
)},
|
||||||
{data_type,
|
{data_type,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:union([enum([text, boolean, int32, int64, float, double]), binary()]),
|
hoconsc:union([
|
||||||
|
enum([text, boolean, int32, int64, float, double]), emqx_schema:template()
|
||||||
|
]),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("config_parameters_data_type")
|
desc => ?DESC("config_parameters_data_type")
|
||||||
|
@ -130,7 +132,7 @@ fields(action_parameters_data) ->
|
||||||
)},
|
)},
|
||||||
{value,
|
{value,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("config_parameters_value")
|
desc => ?DESC("config_parameters_value")
|
||||||
|
|
|
@ -389,7 +389,7 @@ fields(producer_kafka_opts) ->
|
||||||
)},
|
)},
|
||||||
{kafka_headers,
|
{kafka_headers,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => false,
|
required => false,
|
||||||
validator => fun kafka_header_validator/1,
|
validator => fun kafka_header_validator/1,
|
||||||
|
@ -462,12 +462,12 @@ fields(producer_kafka_ext_headers) ->
|
||||||
[
|
[
|
||||||
{kafka_ext_header_key,
|
{kafka_ext_header_key,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{required => true, desc => ?DESC(producer_kafka_ext_header_key)}
|
#{required => true, desc => ?DESC(producer_kafka_ext_header_key)}
|
||||||
)},
|
)},
|
||||||
{kafka_ext_header_value,
|
{kafka_ext_header_value,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
validator => fun kafka_ext_header_value_validator/1,
|
validator => fun kafka_ext_header_value_validator/1,
|
||||||
|
@ -477,11 +477,20 @@ fields(producer_kafka_ext_headers) ->
|
||||||
];
|
];
|
||||||
fields(kafka_message) ->
|
fields(kafka_message) ->
|
||||||
[
|
[
|
||||||
{key, mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC(kafka_message_key)})},
|
{key,
|
||||||
{value, mk(string(), #{default => <<"${.}">>, desc => ?DESC(kafka_message_value)})},
|
mk(emqx_schema:template(), #{
|
||||||
|
default => <<"${.clientid}">>,
|
||||||
|
desc => ?DESC(kafka_message_key)
|
||||||
|
})},
|
||||||
|
{value,
|
||||||
|
mk(emqx_schema:template(), #{
|
||||||
|
default => <<"${.}">>,
|
||||||
|
desc => ?DESC(kafka_message_value)
|
||||||
|
})},
|
||||||
{timestamp,
|
{timestamp,
|
||||||
mk(string(), #{
|
mk(emqx_schema:template(), #{
|
||||||
default => <<"${.timestamp}">>, desc => ?DESC(kafka_message_timestamp)
|
default => <<"${.timestamp}">>,
|
||||||
|
desc => ?DESC(kafka_message_timestamp)
|
||||||
})}
|
})}
|
||||||
];
|
];
|
||||||
fields(producer_buffer) ->
|
fields(producer_buffer) ->
|
||||||
|
@ -536,8 +545,11 @@ fields(consumer_topic_mapping) ->
|
||||||
{qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
|
{qos, mk(emqx_schema:qos(), #{default => 0, desc => ?DESC(consumer_mqtt_qos)})},
|
||||||
{payload_template,
|
{payload_template,
|
||||||
mk(
|
mk(
|
||||||
string(),
|
emqx_schema:template(),
|
||||||
#{default => <<"${.}">>, desc => ?DESC(consumer_mqtt_payload)}
|
#{
|
||||||
|
default => <<"${.}">>,
|
||||||
|
desc => ?DESC(consumer_mqtt_payload)
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields(consumer_kafka_opts) ->
|
fields(consumer_kafka_opts) ->
|
||||||
|
@ -744,8 +756,8 @@ producer_strategy_key_validator(
|
||||||
producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf));
|
producer_strategy_key_validator(emqx_utils_maps:binary_key_map(Conf));
|
||||||
producer_strategy_key_validator(#{
|
producer_strategy_key_validator(#{
|
||||||
<<"partition_strategy">> := key_dispatch,
|
<<"partition_strategy">> := key_dispatch,
|
||||||
<<"message">> := #{<<"key">> := ""}
|
<<"message">> := #{<<"key">> := Key}
|
||||||
}) ->
|
}) when Key =:= "" orelse Key =:= <<>> ->
|
||||||
{error, "Message key cannot be empty when `key_dispatch` strategy is used"};
|
{error, "Message key cannot be empty when `key_dispatch` strategy is used"};
|
||||||
producer_strategy_key_validator(_) ->
|
producer_strategy_key_validator(_) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -150,7 +150,7 @@ fields(producer) ->
|
||||||
[
|
[
|
||||||
{payload_template,
|
{payload_template,
|
||||||
sc(
|
sc(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
default => <<"${.}">>,
|
default => <<"${.}">>,
|
||||||
desc => ?DESC("payload_template")
|
desc => ?DESC("payload_template")
|
||||||
|
|
|
@ -44,8 +44,10 @@ roots() -> [].
|
||||||
fields("config") ->
|
fields("config") ->
|
||||||
[
|
[
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
|
||||||
{collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
|
{collection,
|
||||||
{payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})},
|
mk(emqx_schema:template(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
|
||||||
|
{payload_template,
|
||||||
|
mk(emqx_schema:template(), #{required => false, desc => ?DESC("payload_template")})},
|
||||||
{resource_opts,
|
{resource_opts,
|
||||||
mk(
|
mk(
|
||||||
ref(?MODULE, "creation_opts"),
|
ref(?MODULE, "creation_opts"),
|
||||||
|
|
|
@ -200,7 +200,7 @@ fields("ingress_local") ->
|
||||||
[
|
[
|
||||||
{topic,
|
{topic,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
validator => fun emqx_schema:non_empty_string/1,
|
validator => fun emqx_schema:non_empty_string/1,
|
||||||
desc => ?DESC("ingress_local_topic"),
|
desc => ?DESC("ingress_local_topic"),
|
||||||
|
@ -217,7 +217,7 @@ fields("ingress_local") ->
|
||||||
)},
|
)},
|
||||||
{retain,
|
{retain,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:union([boolean(), binary()]),
|
hoconsc:union([boolean(), emqx_schema:template()]),
|
||||||
#{
|
#{
|
||||||
default => <<"${retain}">>,
|
default => <<"${retain}">>,
|
||||||
desc => ?DESC("retain")
|
desc => ?DESC("retain")
|
||||||
|
@ -225,7 +225,7 @@ fields("ingress_local") ->
|
||||||
)},
|
)},
|
||||||
{payload,
|
{payload,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
default => undefined,
|
default => undefined,
|
||||||
desc => ?DESC("payload")
|
desc => ?DESC("payload")
|
||||||
|
@ -268,7 +268,7 @@ fields("egress_remote") ->
|
||||||
[
|
[
|
||||||
{topic,
|
{topic,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
validator => fun emqx_schema:non_empty_string/1,
|
validator => fun emqx_schema:non_empty_string/1,
|
||||||
|
@ -286,7 +286,7 @@ fields("egress_remote") ->
|
||||||
)},
|
)},
|
||||||
{retain,
|
{retain,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:union([boolean(), binary()]),
|
hoconsc:union([boolean(), emqx_schema:template()]),
|
||||||
#{
|
#{
|
||||||
required => false,
|
required => false,
|
||||||
default => false,
|
default => false,
|
||||||
|
@ -295,7 +295,7 @@ fields("egress_remote") ->
|
||||||
)},
|
)},
|
||||||
{payload,
|
{payload,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
default => undefined,
|
default => undefined,
|
||||||
desc => ?DESC("payload")
|
desc => ?DESC("payload")
|
||||||
|
@ -344,7 +344,7 @@ desc(_) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
qos() ->
|
qos() ->
|
||||||
hoconsc:union([emqx_schema:qos(), binary()]).
|
hoconsc:union([emqx_schema:qos(), emqx_schema:template()]).
|
||||||
|
|
||||||
parse_server(Str) ->
|
parse_server(Str) ->
|
||||||
#{hostname := Host, port := Port} = emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS),
|
#{hostname := Host, port := Port} = emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS),
|
||||||
|
|
|
@ -117,7 +117,7 @@ fields("config") ->
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
{sql,
|
{sql,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||||
)},
|
)},
|
||||||
{local_topic,
|
{local_topic,
|
||||||
|
|
|
@ -146,7 +146,7 @@ fields(action_parameters_data) ->
|
||||||
[
|
[
|
||||||
{timestamp,
|
{timestamp,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("config_parameters_timestamp"),
|
desc => ?DESC("config_parameters_timestamp"),
|
||||||
required => false
|
required => false
|
||||||
|
@ -154,7 +154,7 @@ fields(action_parameters_data) ->
|
||||||
)},
|
)},
|
||||||
{metric,
|
{metric,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("config_parameters_metric")
|
desc => ?DESC("config_parameters_metric")
|
||||||
|
@ -162,7 +162,7 @@ fields(action_parameters_data) ->
|
||||||
)},
|
)},
|
||||||
{tags,
|
{tags,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:union([map(), binary()]),
|
hoconsc:union([map(), emqx_schema:template()]),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("config_parameters_tags"),
|
desc => ?DESC("config_parameters_tags"),
|
||||||
|
@ -188,7 +188,7 @@ fields(action_parameters_data) ->
|
||||||
)},
|
)},
|
||||||
{value,
|
{value,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:union([integer(), float(), binary()]),
|
hoconsc:union([integer(), float(), emqx_schema:template()]),
|
||||||
#{
|
#{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("config_parameters_value")
|
desc => ?DESC("config_parameters_value")
|
||||||
|
|
|
@ -158,7 +158,7 @@ fields(action_parameters) ->
|
||||||
[
|
[
|
||||||
{sql,
|
{sql,
|
||||||
hoconsc:mk(
|
hoconsc:mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
|
@ -177,7 +177,7 @@ fields("config") ->
|
||||||
)},
|
)},
|
||||||
{sql,
|
{sql,
|
||||||
hoconsc:mk(
|
hoconsc:mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||||
)},
|
)},
|
||||||
{local_topic,
|
{local_topic,
|
||||||
|
|
|
@ -61,7 +61,7 @@ fields(action_parameters) ->
|
||||||
[
|
[
|
||||||
{sql,
|
{sql,
|
||||||
hoconsc:mk(
|
hoconsc:mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
|
#{desc => ?DESC("sql_template"), default => default_sql(), format => <<"sql">>}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
|
|
|
@ -51,12 +51,12 @@ fields(action_parameters) ->
|
||||||
fields(producer_pulsar_message) ->
|
fields(producer_pulsar_message) ->
|
||||||
[
|
[
|
||||||
{key,
|
{key,
|
||||||
?HOCON(string(), #{
|
?HOCON(emqx_schema:template(), #{
|
||||||
default => <<"${.clientid}">>,
|
default => <<"${.clientid}">>,
|
||||||
desc => ?DESC("producer_key_template")
|
desc => ?DESC("producer_key_template")
|
||||||
})},
|
})},
|
||||||
{value,
|
{value,
|
||||||
?HOCON(string(), #{
|
?HOCON(emqx_schema:template(), #{
|
||||||
default => <<"${.}">>,
|
default => <<"${.}">>,
|
||||||
desc => ?DESC("producer_value_template")
|
desc => ?DESC("producer_value_template")
|
||||||
})}
|
})}
|
||||||
|
|
|
@ -1235,7 +1235,7 @@ t_resilience(Config) ->
|
||||||
after 1_000 -> ct:fail("producer didn't stop!")
|
after 1_000 -> ct:fail("producer didn't stop!")
|
||||||
end,
|
end,
|
||||||
Consumed = lists:flatmap(
|
Consumed = lists:flatmap(
|
||||||
fun(_) -> receive_consumed(5_000) end, lists:seq(1, NumProduced)
|
fun(_) -> receive_consumed(10_000) end, lists:seq(1, NumProduced)
|
||||||
),
|
),
|
||||||
?assertEqual(NumProduced, length(Consumed)),
|
?assertEqual(NumProduced, length(Consumed)),
|
||||||
ExpectedPayloads = lists:map(fun integer_to_binary/1, lists:seq(1, NumProduced)),
|
ExpectedPayloads = lists:map(fun integer_to_binary/1, lists:seq(1, NumProduced)),
|
||||||
|
|
|
@ -99,7 +99,7 @@ fields(action_parameters) ->
|
||||||
)},
|
)},
|
||||||
{payload_template,
|
{payload_template,
|
||||||
hoconsc:mk(
|
hoconsc:mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
default => <<"">>,
|
default => <<"">>,
|
||||||
desc => ?DESC(?CONNECTOR_SCHEMA, "payload_template")
|
desc => ?DESC(?CONNECTOR_SCHEMA, "payload_template")
|
||||||
|
|
|
@ -211,7 +211,7 @@ desc(_) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
command_template(type) ->
|
command_template(type) ->
|
||||||
list(binary());
|
hoconsc:array(emqx_schema:template());
|
||||||
command_template(required) ->
|
command_template(required) ->
|
||||||
true;
|
true;
|
||||||
command_template(validator) ->
|
command_template(validator) ->
|
||||||
|
|
|
@ -162,8 +162,11 @@ fields(action_parameters) ->
|
||||||
[
|
[
|
||||||
{template,
|
{template,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
|
#{
|
||||||
|
desc => ?DESC("template"),
|
||||||
|
default => ?DEFAULT_TEMPLATE
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
] ++ emqx_bridge_rocketmq_connector:fields(config),
|
] ++ emqx_bridge_rocketmq_connector:fields(config),
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
|
@ -205,7 +208,7 @@ fields("config") ->
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
{template,
|
{template,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
|
#{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
|
||||||
)},
|
)},
|
||||||
{local_topic,
|
{local_topic,
|
||||||
|
@ -214,8 +217,7 @@ fields("config") ->
|
||||||
#{desc => ?DESC("local_topic"), required => false}
|
#{desc => ?DESC("local_topic"), required => false}
|
||||||
)}
|
)}
|
||||||
] ++ emqx_resource_schema:fields("resource_opts") ++
|
] ++ emqx_resource_schema:fields("resource_opts") ++
|
||||||
(emqx_bridge_rocketmq_connector:fields(config) --
|
emqx_bridge_rocketmq_connector:fields(config);
|
||||||
emqx_connector_schema_lib:prepare_statement_fields());
|
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
[type_field(), name_field() | fields("config")];
|
[type_field(), name_field() | fields("config")];
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
|
|
|
@ -47,7 +47,7 @@ fields(config) ->
|
||||||
{servers, servers()},
|
{servers, servers()},
|
||||||
{topic,
|
{topic,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{default => <<"TopicTest">>, desc => ?DESC(topic)}
|
#{default => <<"TopicTest">>, desc => ?DESC(topic)}
|
||||||
)},
|
)},
|
||||||
{access_key,
|
{access_key,
|
||||||
|
|
|
@ -77,7 +77,7 @@ fields(s3_upload_parameters) ->
|
||||||
[
|
[
|
||||||
{content,
|
{content,
|
||||||
hoconsc:mk(
|
hoconsc:mk(
|
||||||
string(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
required => false,
|
required => false,
|
||||||
default => <<"${.}">>,
|
default => <<"${.}">>,
|
||||||
|
|
|
@ -192,7 +192,7 @@ fields(action_parameters) ->
|
||||||
[
|
[
|
||||||
{sql,
|
{sql,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
|
|
|
@ -112,7 +112,7 @@ fields("parameters") ->
|
||||||
[
|
[
|
||||||
{target_topic,
|
{target_topic,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("target_topic"), default => <<"${topic}">>}
|
#{desc => ?DESC("target_topic"), default => <<"${topic}">>}
|
||||||
)},
|
)},
|
||||||
{target_qos,
|
{target_qos,
|
||||||
|
@ -122,7 +122,7 @@ fields("parameters") ->
|
||||||
)},
|
)},
|
||||||
{template,
|
{template,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{desc => ?DESC("template"), default => <<"${payload}">>}
|
#{desc => ?DESC("template"), default => <<"${payload}">>}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
|
|
|
@ -83,7 +83,7 @@ fields("config") ->
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
{sql,
|
{sql,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("sql_template"),
|
desc => ?DESC("sql_template"),
|
||||||
default => ?DEFAULT_SQL,
|
default => ?DEFAULT_SQL,
|
||||||
|
@ -125,7 +125,7 @@ fields(action_parameters) ->
|
||||||
{database, fun emqx_connector_schema_lib:database/1},
|
{database, fun emqx_connector_schema_lib:database/1},
|
||||||
{sql,
|
{sql,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
emqx_schema:template(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("sql_template"),
|
desc => ?DESC("sql_template"),
|
||||||
default => ?DEFAULT_SQL,
|
default => ?DEFAULT_SQL,
|
||||||
|
|
|
@ -304,12 +304,22 @@ gen_flat_doc(RootNames, #{full_name := FullName, fields := Fields} = S, DescReso
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
#{
|
try
|
||||||
text => short_name(FullName),
|
#{
|
||||||
hash => format_hash(FullName),
|
text => short_name(FullName),
|
||||||
doc => maps:get(desc, S, <<"">>),
|
hash => format_hash(FullName),
|
||||||
fields => format_fields(Fields, DescResolver)
|
doc => maps:get(desc, S, <<"">>),
|
||||||
}.
|
fields => format_fields(Fields, DescResolver)
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
throw:Reason ->
|
||||||
|
io:format(
|
||||||
|
standard_error,
|
||||||
|
"failed_to_build_doc for ~s:~n~p~n",
|
||||||
|
[FullName, Reason]
|
||||||
|
),
|
||||||
|
error(failed_to_build_doc)
|
||||||
|
end.
|
||||||
|
|
||||||
format_fields(Fields, DescResolver) ->
|
format_fields(Fields, DescResolver) ->
|
||||||
[format_field(F, DescResolver) || F <- Fields].
|
[format_field(F, DescResolver) || F <- Fields].
|
||||||
|
|
|
@ -33,8 +33,19 @@ readable(Module, TypeStr) when is_list(TypeStr) ->
|
||||||
%% Module is ignored so far as all types are distinguished by their names
|
%% Module is ignored so far as all types are distinguished by their names
|
||||||
readable(TypeStr)
|
readable(TypeStr)
|
||||||
catch
|
catch
|
||||||
throw:unknown_type ->
|
throw:Reason ->
|
||||||
fail(#{reason => unknown_type, type => TypeStr, module => Module})
|
throw(#{
|
||||||
|
reason => Reason,
|
||||||
|
type => TypeStr,
|
||||||
|
module => Module
|
||||||
|
});
|
||||||
|
error:Reason:Stacktrace ->
|
||||||
|
throw(#{
|
||||||
|
reason => Reason,
|
||||||
|
stacktrace => Stacktrace,
|
||||||
|
type => TypeStr,
|
||||||
|
module => Module
|
||||||
|
})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
readable_swagger(Module, TypeStr) ->
|
readable_swagger(Module, TypeStr) ->
|
||||||
|
@ -49,22 +60,28 @@ readable_docgen(Module, TypeStr) ->
|
||||||
get_readable(Module, TypeStr, Flavor) ->
|
get_readable(Module, TypeStr, Flavor) ->
|
||||||
Map = readable(Module, TypeStr),
|
Map = readable(Module, TypeStr),
|
||||||
case maps:get(Flavor, Map, undefined) of
|
case maps:get(Flavor, Map, undefined) of
|
||||||
undefined -> fail(#{reason => unknown_type, module => Module, type => TypeStr});
|
undefined -> throw(#{reason => unknown_type, module => Module, type => TypeStr});
|
||||||
Value -> Value
|
Value -> Value
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Fail the build or test. Production code should never get here.
|
|
||||||
-spec fail(_) -> no_return().
|
|
||||||
fail(Reason) ->
|
|
||||||
io:format(standard_error, "ERROR: ~p~n", [Reason]),
|
|
||||||
error(Reason).
|
|
||||||
|
|
||||||
readable("boolean()") ->
|
readable("boolean()") ->
|
||||||
#{
|
#{
|
||||||
swagger => #{type => boolean},
|
swagger => #{type => boolean},
|
||||||
dashboard => #{type => boolean},
|
dashboard => #{type => boolean},
|
||||||
docgen => #{type => "Boolean"}
|
docgen => #{type => "Boolean"}
|
||||||
};
|
};
|
||||||
|
readable("template()") ->
|
||||||
|
#{
|
||||||
|
swagger => #{type => string},
|
||||||
|
dashboard => #{type => string, is_template => true},
|
||||||
|
docgen => #{type => "String", desc => ?DESC(template)}
|
||||||
|
};
|
||||||
|
readable("template_str()") ->
|
||||||
|
#{
|
||||||
|
swagger => #{type => string},
|
||||||
|
dashboard => #{type => string, is_template => true},
|
||||||
|
docgen => #{type => "String", desc => ?DESC(template)}
|
||||||
|
};
|
||||||
readable("binary()") ->
|
readable("binary()") ->
|
||||||
#{
|
#{
|
||||||
swagger => #{type => string},
|
swagger => #{type => string},
|
||||||
|
|
|
@ -44,6 +44,8 @@
|
||||||
])
|
])
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-define(SCHEMA_VERSION, <<"0.2.0">>).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% minirest API and schema
|
%% minirest API and schema
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -97,20 +99,31 @@ gen_schema(connectors) ->
|
||||||
connectors_schema_json().
|
connectors_schema_json().
|
||||||
|
|
||||||
hotconf_schema_json() ->
|
hotconf_schema_json() ->
|
||||||
SchemaInfo = #{title => <<"EMQX Hot Conf API Schema">>, version => <<"0.1.0">>},
|
SchemaInfo = #{
|
||||||
|
title => <<"Hot Conf Schema">>,
|
||||||
|
version => ?SCHEMA_VERSION
|
||||||
|
},
|
||||||
gen_api_schema_json_iodata(emqx_mgmt_api_configs, SchemaInfo).
|
gen_api_schema_json_iodata(emqx_mgmt_api_configs, SchemaInfo).
|
||||||
|
|
||||||
bridge_schema_json() ->
|
bridge_schema_json() ->
|
||||||
SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => <<"0.1.0">>},
|
SchemaInfo = #{
|
||||||
|
title => <<"Data Bridge Schema">>,
|
||||||
|
version => ?SCHEMA_VERSION
|
||||||
|
},
|
||||||
gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo).
|
gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo).
|
||||||
|
|
||||||
actions_schema_json() ->
|
actions_schema_json() ->
|
||||||
SchemaInfo = #{title => <<"EMQX Data Actions API Schema">>, version => <<"0.1.0">>},
|
SchemaInfo = #{
|
||||||
%% Note: this will be moved to `emqx_actions' application in the future.
|
title => <<"Actions and Sources Schema">>,
|
||||||
|
version => ?SCHEMA_VERSION
|
||||||
|
},
|
||||||
gen_api_schema_json_iodata(emqx_bridge_v2_api, SchemaInfo).
|
gen_api_schema_json_iodata(emqx_bridge_v2_api, SchemaInfo).
|
||||||
|
|
||||||
connectors_schema_json() ->
|
connectors_schema_json() ->
|
||||||
SchemaInfo = #{title => <<"EMQX Connectors Schema">>, version => <<"0.1.0">>},
|
SchemaInfo = #{
|
||||||
|
title => <<"Connectors Schema">>,
|
||||||
|
version => ?SCHEMA_VERSION
|
||||||
|
},
|
||||||
gen_api_schema_json_iodata(emqx_connector_api, SchemaInfo).
|
gen_api_schema_json_iodata(emqx_connector_api, SchemaInfo).
|
||||||
|
|
||||||
gen_api_schema_json_iodata(SchemaMod, SchemaInfo) ->
|
gen_api_schema_json_iodata(SchemaMod, SchemaInfo) ->
|
||||||
|
|
|
@ -57,7 +57,11 @@
|
||||||
allowEmptyValue,
|
allowEmptyValue,
|
||||||
deprecated,
|
deprecated,
|
||||||
minimum,
|
minimum,
|
||||||
maximum
|
maximum,
|
||||||
|
%% is_template is a type property,
|
||||||
|
%% but some exceptions are made for them to be field property
|
||||||
|
%% for example, HTTP headers (which is a map type)
|
||||||
|
is_template
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(INIT_SCHEMA, #{
|
-define(INIT_SCHEMA, #{
|
||||||
|
@ -652,19 +656,6 @@ trans_required(Spec, true, _) -> Spec#{required => true};
|
||||||
trans_required(Spec, _, path) -> Spec#{required => true};
|
trans_required(Spec, _, path) -> Spec#{required => true};
|
||||||
trans_required(Spec, _, _) -> Spec.
|
trans_required(Spec, _, _) -> Spec.
|
||||||
|
|
||||||
trans_desc(Init, Hocon, Func, Name, Options) ->
|
|
||||||
Spec0 = trans_description(Init, Hocon, Options),
|
|
||||||
case Func =:= fun hocon_schema_to_spec/2 of
|
|
||||||
true ->
|
|
||||||
Spec0;
|
|
||||||
false ->
|
|
||||||
Spec1 = trans_label(Spec0, Hocon, Name, Options),
|
|
||||||
case Spec1 of
|
|
||||||
#{description := _} -> Spec1;
|
|
||||||
_ -> Spec1
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
trans_description(Spec, Hocon, Options) ->
|
trans_description(Spec, Hocon, Options) ->
|
||||||
Desc =
|
Desc =
|
||||||
case desc_struct(Hocon) of
|
case desc_struct(Hocon) of
|
||||||
|
@ -702,19 +693,6 @@ get_i18n_text(Lang, Namespace, Id, Tag, Default) ->
|
||||||
get_lang(#{i18n_lang := Lang}) -> Lang;
|
get_lang(#{i18n_lang := Lang}) -> Lang;
|
||||||
get_lang(_) -> emqx:get_config([dashboard, i18n_lang]).
|
get_lang(_) -> emqx:get_config([dashboard, i18n_lang]).
|
||||||
|
|
||||||
trans_label(Spec, Hocon, Default, Options) ->
|
|
||||||
Label =
|
|
||||||
case desc_struct(Hocon) of
|
|
||||||
?DESC(_, _) = Struct -> get_i18n(<<"label">>, Struct, Default, Options);
|
|
||||||
_ -> Default
|
|
||||||
end,
|
|
||||||
case Label =:= undefined of
|
|
||||||
true ->
|
|
||||||
Spec;
|
|
||||||
false ->
|
|
||||||
Spec#{label => Label}
|
|
||||||
end.
|
|
||||||
|
|
||||||
desc_struct(Hocon) ->
|
desc_struct(Hocon) ->
|
||||||
R =
|
R =
|
||||||
case hocon_schema:field_schema(Hocon, desc) of
|
case hocon_schema:field_schema(Hocon, desc) of
|
||||||
|
@ -772,7 +750,7 @@ response(Status, #{content := _} = Content, {Acc, RefsAcc, Module, Options}) ->
|
||||||
response(Status, ?REF(StructName), {Acc, RefsAcc, Module, Options}) ->
|
response(Status, ?REF(StructName), {Acc, RefsAcc, Module, Options}) ->
|
||||||
response(Status, ?R_REF(Module, StructName), {Acc, RefsAcc, Module, Options});
|
response(Status, ?R_REF(Module, StructName), {Acc, RefsAcc, Module, Options});
|
||||||
response(Status, ?R_REF(_Mod, _Name) = RRef, {Acc, RefsAcc, Module, Options}) ->
|
response(Status, ?R_REF(_Mod, _Name) = RRef, {Acc, RefsAcc, Module, Options}) ->
|
||||||
SchemaToSpec = schema_converter(Options),
|
SchemaToSpec = get_schema_converter(Options),
|
||||||
{Spec, Refs} = SchemaToSpec(RRef, Module),
|
{Spec, Refs} = SchemaToSpec(RRef, Module),
|
||||||
Content = content(Spec),
|
Content = content(Spec),
|
||||||
{
|
{
|
||||||
|
@ -910,7 +888,7 @@ parse_object(PropList = [_ | _], Module, Options) when is_list(PropList) ->
|
||||||
parse_object(Other, Module, Options) ->
|
parse_object(Other, Module, Options) ->
|
||||||
erlang:throw(
|
erlang:throw(
|
||||||
{error, #{
|
{error, #{
|
||||||
msg => <<"Object only supports not empty proplists">>,
|
msg => <<"Object only supports non-empty fields list">>,
|
||||||
args => Other,
|
args => Other,
|
||||||
module => Module,
|
module => Module,
|
||||||
options => Options
|
options => Options
|
||||||
|
@ -950,10 +928,10 @@ parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs
|
||||||
true ->
|
true ->
|
||||||
HoconType = hocon_schema:field_schema(Hocon, type),
|
HoconType = hocon_schema:field_schema(Hocon, type),
|
||||||
Init0 = init_prop([default | ?DEFAULT_FIELDS], #{}, Hocon),
|
Init0 = init_prop([default | ?DEFAULT_FIELDS], #{}, Hocon),
|
||||||
SchemaToSpec = schema_converter(Options),
|
SchemaToSpec = get_schema_converter(Options),
|
||||||
Init = maps:remove(
|
Init = maps:remove(
|
||||||
summary,
|
summary,
|
||||||
trans_desc(Init0, Hocon, SchemaToSpec, NameBin, Options)
|
trans_description(Init0, Hocon, Options)
|
||||||
),
|
),
|
||||||
{Prop, Refs1} = SchemaToSpec(HoconType, Module),
|
{Prop, Refs1} = SchemaToSpec(HoconType, Module),
|
||||||
NewRequiredAcc =
|
NewRequiredAcc =
|
||||||
|
@ -1002,7 +980,7 @@ to_ref(Mod, StructName, Acc, RefsAcc) ->
|
||||||
Ref = #{<<"$ref">> => ?TO_COMPONENTS_PARAM(Mod, StructName)},
|
Ref = #{<<"$ref">> => ?TO_COMPONENTS_PARAM(Mod, StructName)},
|
||||||
{[Ref | Acc], [{Mod, StructName, parameter} | RefsAcc]}.
|
{[Ref | Acc], [{Mod, StructName, parameter} | RefsAcc]}.
|
||||||
|
|
||||||
schema_converter(Options) ->
|
get_schema_converter(Options) ->
|
||||||
maps:get(schema_converter, Options, fun hocon_schema_to_spec/2).
|
maps:get(schema_converter, Options, fun hocon_schema_to_spec/2).
|
||||||
|
|
||||||
hocon_error_msg(Reason) ->
|
hocon_error_msg(Reason) ->
|
||||||
|
|
|
@ -359,7 +359,7 @@ t_bad_ref(_Config) ->
|
||||||
Refs = [{?MODULE, bad_ref}],
|
Refs = [{?MODULE, bad_ref}],
|
||||||
Fields = fields(bad_ref),
|
Fields = fields(bad_ref),
|
||||||
?assertThrow(
|
?assertThrow(
|
||||||
{error, #{msg := <<"Object only supports not empty proplists">>, args := Fields}},
|
{error, #{msg := <<"Object only supports non-empty fields list">>, args := Fields}},
|
||||||
validate(Path, Spec, Refs)
|
validate(Path, Spec, Refs)
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -189,7 +189,7 @@ t_nest_object(_Config) ->
|
||||||
t_empty(_Config) ->
|
t_empty(_Config) ->
|
||||||
?assertThrow(
|
?assertThrow(
|
||||||
{error, #{
|
{error, #{
|
||||||
msg := <<"Object only supports not empty proplists">>,
|
msg := <<"Object only supports non-empty fields list">>,
|
||||||
args := [],
|
args := [],
|
||||||
module := ?MODULE
|
module := ?MODULE
|
||||||
}},
|
}},
|
||||||
|
@ -273,7 +273,7 @@ t_bad_ref(_Config) ->
|
||||||
?assertThrow(
|
?assertThrow(
|
||||||
{error, #{
|
{error, #{
|
||||||
module := ?MODULE,
|
module := ?MODULE,
|
||||||
msg := <<"Object only supports not empty proplists">>
|
msg := <<"Object only supports non-empty fields list">>
|
||||||
}},
|
}},
|
||||||
validate(Path, Object, ExpectRefs)
|
validate(Path, Object, ExpectRefs)
|
||||||
),
|
),
|
||||||
|
|
|
@ -1058,7 +1058,8 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
|
||||||
get_config_for_channels(Data0, ChannelsNotAdded),
|
get_config_for_channels(Data0, ChannelsNotAdded),
|
||||||
Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
|
Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
|
||||||
%% Now that we have done the adding, we can get the status of all channels
|
%% Now that we have done the adding, we can get the status of all channels
|
||||||
trigger_health_check_for_added_channels(Data1);
|
Data2 = trigger_health_check_for_added_channels(Data1),
|
||||||
|
update_state(Data2, Data0);
|
||||||
channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
|
channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
|
||||||
%% Whenever the resource is connecting:
|
%% Whenever the resource is connecting:
|
||||||
%% 1. Change the status of all added channels to connecting
|
%% 1. Change the status of all added channels to connecting
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_s3, [
|
{application, emqx_s3, [
|
||||||
{description, "EMQX S3"},
|
{description, "EMQX S3"},
|
||||||
{vsn, "5.0.14"},
|
{vsn, "5.1.0"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_s3_sup]},
|
{registered, [emqx_s3_sup]},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -74,7 +74,7 @@ fields(s3_upload) ->
|
||||||
[
|
[
|
||||||
{bucket,
|
{bucket,
|
||||||
mk(
|
mk(
|
||||||
string(),
|
emqx_schema:template_str(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("bucket"),
|
desc => ?DESC("bucket"),
|
||||||
required => true
|
required => true
|
||||||
|
@ -82,7 +82,7 @@ fields(s3_upload) ->
|
||||||
)},
|
)},
|
||||||
{key,
|
{key,
|
||||||
mk(
|
mk(
|
||||||
string(),
|
emqx_schema:template_str(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("key"),
|
desc => ?DESC("key"),
|
||||||
required => true
|
required => true
|
||||||
|
|
|
@ -9,4 +9,7 @@ emqx_conf_schema_types {
|
||||||
secret.desc:
|
secret.desc:
|
||||||
"""A string holding some sensitive information, such as a password. When secret starts with <code>file://</code>, the rest of the string is interpreted as a path to a file containing the secret itself: whole content of the file except any trailing whitespace characters is considered a secret value. Note: when clustered, all EMQX nodes should have the same file present before using <code>file://</code> secrets."""
|
"""A string holding some sensitive information, such as a password. When secret starts with <code>file://</code>, the rest of the string is interpreted as a path to a file containing the secret itself: whole content of the file except any trailing whitespace characters is considered a secret value. Note: when clustered, all EMQX nodes should have the same file present before using <code>file://</code> secrets."""
|
||||||
|
|
||||||
|
template.desc: """~
|
||||||
|
A string for `${.path.to.var}` style value interpolation,
|
||||||
|
where the leading dot is optional, and `${.}` represents all values as an object."""
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,8 +82,10 @@ main() {
|
||||||
## The json status feature was added after hotconf and bridges schema API
|
## The json status feature was added after hotconf and bridges schema API
|
||||||
if [ "$JSON_STATUS" != 'NOT_JSON' ]; then
|
if [ "$JSON_STATUS" != 'NOT_JSON' ]; then
|
||||||
check_swagger_json
|
check_swagger_json
|
||||||
check_schema_json hotconf "EMQX Hot Conf API Schema"
|
check_schema_json hotconf "Hot Conf Schema"
|
||||||
check_schema_json bridges "EMQX Data Bridge API Schema"
|
check_schema_json bridges "Data Bridge Schema"
|
||||||
|
check_schema_json actions "Actions and Sources Schema"
|
||||||
|
check_schema_json connectors "Connectors Schema"
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue