feat(emqx_bridge_mysql): port to shared connectors
This commit is contained in:
parent
17e6703ba2
commit
71607aa2ad
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_auth_mysql, [
|
||||
{description, "EMQX MySQL Authentication and Authorization"},
|
||||
{vsn, "0.1.1"},
|
||||
{vsn, "0.1.2"},
|
||||
{registered, []},
|
||||
{mod, {emqx_auth_mysql_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -55,8 +55,7 @@ fields(mysql) ->
|
|||
{password_hash_algorithm, fun emqx_authn_password_hashing:type_ro/1},
|
||||
{query, fun query/1},
|
||||
{query_timeout, fun query_timeout/1}
|
||||
] ++ emqx_authn_schema:common_fields() ++
|
||||
proplists:delete(prepare_statement, emqx_mysql:fields(config)).
|
||||
] ++ emqx_authn_schema:common_fields() ++ emqx_mysql:fields(config).
|
||||
|
||||
desc(mysql) ->
|
||||
?DESC(mysql);
|
||||
|
|
|
@ -37,6 +37,7 @@ type() -> ?AUTHZ_TYPE.
|
|||
fields(mysql) ->
|
||||
emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++
|
||||
emqx_mysql:fields(config) ++
|
||||
emqx_connector_schema_lib:prepare_statement_fields() ++
|
||||
[{query, query()}].
|
||||
|
||||
desc(mysql) ->
|
||||
|
|
|
@ -79,6 +79,7 @@ hard_coded_action_info_modules_ee() ->
|
|||
emqx_bridge_kafka_action_info,
|
||||
emqx_bridge_matrix_action_info,
|
||||
emqx_bridge_mongodb_action_info,
|
||||
emqx_bridge_mysql_action_info,
|
||||
emqx_bridge_pgsql_action_info,
|
||||
emqx_bridge_syskeeper_action_info,
|
||||
emqx_bridge_timescale_action_info,
|
||||
|
|
|
@ -31,7 +31,8 @@
|
|||
get_response/0,
|
||||
put_request/0,
|
||||
post_request/0,
|
||||
examples/1
|
||||
examples/1,
|
||||
action_values/4
|
||||
]).
|
||||
|
||||
%% Exported for mocking
|
||||
|
@ -103,6 +104,54 @@ bridge_api_union(Refs) ->
|
|||
end
|
||||
end.
|
||||
|
||||
-type http_method() :: get | post | put.
|
||||
-type schema_example_map() :: #{atom() => term()}.
|
||||
|
||||
-spec action_values(http_method(), atom(), atom(), schema_example_map()) -> schema_example_map().
|
||||
action_values(Method, ActionType, ConnectorType, ActionValues) ->
|
||||
ActionTypeBin = atom_to_binary(ActionType),
|
||||
ConnectorTypeBin = atom_to_binary(ConnectorType),
|
||||
lists:foldl(
|
||||
fun(M1, M2) ->
|
||||
maps:merge(M1, M2)
|
||||
end,
|
||||
#{
|
||||
enable => true,
|
||||
description => <<"My example ", ActionTypeBin/binary, " action">>,
|
||||
connector => <<ConnectorTypeBin/binary, "_connector">>,
|
||||
resource_opts => #{
|
||||
health_check_interval => "30s"
|
||||
}
|
||||
},
|
||||
[
|
||||
ActionValues,
|
||||
method_values(Method, ActionType)
|
||||
]
|
||||
).
|
||||
|
||||
-spec method_values(http_method(), atom()) -> schema_example_map().
|
||||
method_values(post, Type) ->
|
||||
TypeBin = atom_to_binary(Type),
|
||||
#{
|
||||
name => <<TypeBin/binary, "_action">>,
|
||||
type => TypeBin
|
||||
};
|
||||
method_values(get, Type) ->
|
||||
maps:merge(
|
||||
method_values(post, Type),
|
||||
#{
|
||||
status => <<"connected">>,
|
||||
node_status => [
|
||||
#{
|
||||
node => <<"emqx@localhost">>,
|
||||
status => <<"connected">>
|
||||
}
|
||||
]
|
||||
}
|
||||
);
|
||||
method_values(put, _Type) ->
|
||||
#{}.
|
||||
|
||||
%%======================================================================================
|
||||
%% HOCON Schema Callbacks
|
||||
%%======================================================================================
|
||||
|
|
|
@ -36,9 +36,7 @@
|
|||
namespace() ->
|
||||
"bridge_mongodb".
|
||||
|
||||
roots() ->
|
||||
%% ???
|
||||
[].
|
||||
roots() -> [].
|
||||
|
||||
fields("config") ->
|
||||
[
|
||||
|
|
|
@ -31,9 +31,9 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
|||
maps:merge(
|
||||
maps:without(
|
||||
[<<"connector">>],
|
||||
map_unindent(<<"parameters">>, ActionConfig)
|
||||
emqx_utils_maps:unindent(<<"parameters">>, ActionConfig)
|
||||
),
|
||||
map_unindent(<<"parameters">>, ConnectorConfig)
|
||||
emqx_utils_maps:unindent(<<"parameters">>, ConnectorConfig)
|
||||
)
|
||||
).
|
||||
|
||||
|
@ -66,7 +66,7 @@ bridge_v1_config_to_connector_config(BridgeV1Config) ->
|
|||
|
||||
make_config_map(PickKeys, IndentKeys, Config) ->
|
||||
Conf0 = maps:with(PickKeys, Config),
|
||||
map_indent(<<"parameters">>, IndentKeys, Conf0).
|
||||
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
|
||||
|
||||
bridge_v1_type_name() ->
|
||||
{fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}.
|
||||
|
@ -86,18 +86,5 @@ v1_type(<<"rs">>) -> mongodb_rs;
|
|||
v1_type(<<"sharded">>) -> mongodb_sharded;
|
||||
v1_type(<<"single">>) -> mongodb_single.
|
||||
|
||||
map_unindent(Key, Map) ->
|
||||
maps:merge(
|
||||
maps:get(Key, Map),
|
||||
maps:remove(Key, Map)
|
||||
).
|
||||
|
||||
map_indent(IndentKey, PickKeys, Map) ->
|
||||
maps:put(
|
||||
IndentKey,
|
||||
maps:with(PickKeys, Map),
|
||||
maps:without(PickKeys, Map)
|
||||
).
|
||||
|
||||
schema_keys(Name) ->
|
||||
[bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
emqx_resource,
|
||||
emqx_mysql
|
||||
]},
|
||||
{env, []},
|
||||
{env, [{emqx_action_info_modules, [emqx_bridge_mysql_action_info]}]},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
]}.
|
||||
|
|
|
@ -10,7 +10,9 @@
|
|||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
-export([
|
||||
conn_bridge_examples/1
|
||||
bridge_v2_examples/1,
|
||||
conn_bridge_examples/1,
|
||||
connector_examples/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -20,6 +22,9 @@
|
|||
desc/1
|
||||
]).
|
||||
|
||||
-define(CONNECTOR_TYPE, mysql).
|
||||
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
|
||||
|
||||
-define(DEFAULT_SQL, <<
|
||||
"insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
|
||||
"values (${id}, ${topic}, ${qos}, ${payload}, FROM_UNIXTIME(${timestamp}/1000))"
|
||||
|
@ -28,6 +33,22 @@
|
|||
%% -------------------------------------------------------------------------------------------------
|
||||
%% api
|
||||
|
||||
bridge_v2_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"mysql">> =>
|
||||
#{
|
||||
summary => <<"MySQL Action">>,
|
||||
value => emqx_bridge_v2_schema:action_values(
|
||||
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
action_values() ->
|
||||
#{parameters => #{sql => ?DEFAULT_SQL}}.
|
||||
|
||||
conn_bridge_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
|
@ -38,6 +59,29 @@ conn_bridge_examples(Method) ->
|
|||
}
|
||||
].
|
||||
|
||||
connector_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"mysql">> =>
|
||||
#{
|
||||
summary => <<"MySQL Connector">>,
|
||||
value => emqx_connector_schema:connector_values(
|
||||
Method, ?CONNECTOR_TYPE, connector_values()
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
connector_values() ->
|
||||
#{
|
||||
server => <<"127.0.0.1:3306">>,
|
||||
database => <<"test">>,
|
||||
pool_size => 8,
|
||||
username => <<"root">>,
|
||||
password => <<"******">>,
|
||||
resource_opts => #{health_check_interval => <<"20s">>}
|
||||
}.
|
||||
|
||||
values(_Method) ->
|
||||
#{
|
||||
enable => true,
|
||||
|
@ -80,17 +124,70 @@ fields("config") ->
|
|||
#{desc => ?DESC("local_topic"), default => undefined}
|
||||
)}
|
||||
] ++ emqx_resource_schema:fields("resource_opts") ++
|
||||
(emqx_mysql:fields(config) --
|
||||
emqx_connector_schema_lib:prepare_statement_fields());
|
||||
emqx_mysql:fields(config);
|
||||
fields(action) ->
|
||||
{mysql,
|
||||
mk(
|
||||
hoconsc:map(name, ref(?MODULE, mysql_action)),
|
||||
#{desc => <<"MySQL Action Config">>, required => false}
|
||||
)};
|
||||
fields(mysql_action) ->
|
||||
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||
mk(
|
||||
ref(?MODULE, action_parameters),
|
||||
#{
|
||||
required => true, desc => ?DESC(action_parameters)
|
||||
}
|
||||
)
|
||||
);
|
||||
fields(action_parameters) ->
|
||||
[
|
||||
{sql,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||
)}
|
||||
];
|
||||
fields("config_connector") ->
|
||||
emqx_connector_schema:common_fields() ++
|
||||
emqx_mysql:fields(config) ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
||||
fields(connector_resource_opts) ->
|
||||
emqx_connector_schema:resource_opts_fields();
|
||||
fields("post") ->
|
||||
[type_field(), name_field() | fields("config")];
|
||||
fields("put") ->
|
||||
fields("config");
|
||||
fields("get") ->
|
||||
emqx_bridge_schema:status_fields() ++ fields("post").
|
||||
emqx_bridge_schema:status_fields() ++ fields("post");
|
||||
fields("get_bridge_v2") ->
|
||||
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2");
|
||||
fields("post_bridge_v2") ->
|
||||
[type_field(), name_field() | fields(mysql_action)];
|
||||
fields("put_bridge_v2") ->
|
||||
fields(mysql_action);
|
||||
fields(Field) when
|
||||
Field == "get_connector";
|
||||
Field == "put_connector";
|
||||
Field == "post_connector"
|
||||
->
|
||||
emqx_connector_schema:api_fields(
|
||||
Field,
|
||||
?CONNECTOR_TYPE,
|
||||
emqx_mysql:fields(config) ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts)
|
||||
).
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc("config_connector") ->
|
||||
?DESC("desc_config");
|
||||
desc(connector_resource_opts) ->
|
||||
?DESC(emqx_resource_schema, "resource_opts");
|
||||
desc(action_parameters) ->
|
||||
?DESC(action_parameters);
|
||||
desc(mysql_action) ->
|
||||
?DESC(mysql_action);
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for MySQL using `", string:to_upper(Method), "` method."];
|
||||
desc(_) ->
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_mysql_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
||||
%% behaviour callbacks
|
||||
-export([
|
||||
action_type_name/0,
|
||||
bridge_v1_config_to_action_config/2,
|
||||
bridge_v1_config_to_connector_config/1,
|
||||
bridge_v1_type_name/0,
|
||||
connector_action_config_to_bridge_v1_config/2,
|
||||
connector_type_name/0,
|
||||
schema_module/0
|
||||
]).
|
||||
|
||||
-import(emqx_utils_conv, [bin/1]).
|
||||
|
||||
-define(MYSQL_TYPE, mysql).
|
||||
-define(SCHEMA_MODULE, emqx_bridge_mysql).
|
||||
|
||||
action_type_name() -> ?MYSQL_TYPE.
|
||||
bridge_v1_type_name() -> ?MYSQL_TYPE.
|
||||
connector_type_name() -> ?MYSQL_TYPE.
|
||||
|
||||
schema_module() -> ?SCHEMA_MODULE.
|
||||
|
||||
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
||||
MergedConfig =
|
||||
emqx_utils_maps:deep_merge(
|
||||
maps:without(
|
||||
[<<"connector">>],
|
||||
emqx_utils_maps:unindent(<<"parameters">>, ActionConfig)
|
||||
),
|
||||
ConnectorConfig
|
||||
),
|
||||
BridgeV1Keys = schema_keys("config"),
|
||||
maps:with(BridgeV1Keys, MergedConfig).
|
||||
|
||||
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
||||
ActionTopLevelKeys = schema_keys(mysql_action),
|
||||
ActionParametersKeys = schema_keys(action_parameters),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
|
||||
ActionConfig#{<<"connector">> => ConnectorName}.
|
||||
|
||||
bridge_v1_config_to_connector_config(BridgeV1Config) ->
|
||||
ConnectorKeys = schema_keys("config_connector"),
|
||||
ResourceOptsKeys = schema_keys(connector_resource_opts),
|
||||
maps:update_with(
|
||||
<<"resource_opts">>,
|
||||
fun(ResourceOpts) -> maps:with(ResourceOptsKeys, ResourceOpts) end,
|
||||
#{},
|
||||
maps:with(ConnectorKeys, BridgeV1Config)
|
||||
).
|
||||
|
||||
make_config_map(PickKeys, IndentKeys, Config) ->
|
||||
Conf0 = maps:with(PickKeys, Config),
|
||||
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
|
||||
|
||||
schema_keys(Name) ->
|
||||
[bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].
|
|
@ -0,0 +1,150 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_mysql_connector).
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
%% `emqx_resource' API
|
||||
-export([
|
||||
on_remove_channel/3,
|
||||
callback_mode/0,
|
||||
on_add_channel/4,
|
||||
on_batch_query/3,
|
||||
on_get_channel_status/3,
|
||||
on_get_channels/1,
|
||||
on_get_status/2,
|
||||
on_query/3,
|
||||
on_start/2,
|
||||
on_stop/2
|
||||
]).
|
||||
|
||||
%%========================================================================================
|
||||
%% `emqx_resource' API
|
||||
%%========================================================================================
|
||||
|
||||
callback_mode() -> emqx_mysql:callback_mode().
|
||||
|
||||
on_add_channel(
|
||||
_InstanceId,
|
||||
#{channels := Channels, connector_state := ConnectorState} = State0,
|
||||
ChannelId,
|
||||
ChannelConfig0
|
||||
) ->
|
||||
ChannelConfig1 = emqx_utils_maps:unindent(parameters, ChannelConfig0),
|
||||
QueryTemplates = emqx_mysql:parse_prepare_sql(ChannelId, ChannelConfig1),
|
||||
ChannelConfig2 = maps:merge(ChannelConfig1, QueryTemplates),
|
||||
ChannelConfig = set_prepares(ChannelConfig2, ConnectorState),
|
||||
State = State0#{
|
||||
channels => maps:put(ChannelId, ChannelConfig, Channels),
|
||||
connector_state => ConnectorState
|
||||
},
|
||||
{ok, State}.
|
||||
|
||||
on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
|
||||
case maps:get(ChannelId, Channels) of
|
||||
#{prepares := ok} ->
|
||||
connected;
|
||||
#{prepares := {error, _}} ->
|
||||
connecting
|
||||
end.
|
||||
|
||||
on_get_channels(InstanceId) ->
|
||||
emqx_bridge_v2:get_channels_for_connector(InstanceId).
|
||||
|
||||
on_get_status(InstanceId, #{channels := Channels0, connector_state := ConnectorState} = State0) ->
|
||||
case emqx_mysql:on_get_status(InstanceId, ConnectorState) of
|
||||
WithState when is_tuple(WithState) ->
|
||||
NewConnectorState = element(2, WithState),
|
||||
State = State0#{connector_state => NewConnectorState},
|
||||
setelement(2, WithState, State);
|
||||
connected ->
|
||||
Channels =
|
||||
maps:map(
|
||||
fun
|
||||
(_ChannelId, #{prepares := ok} = ChannelConfig) ->
|
||||
ChannelConfig;
|
||||
(_ChannelId, #{prepares := {error, _}} = ChannelConfig) ->
|
||||
set_prepares(ChannelConfig, ConnectorState)
|
||||
end,
|
||||
Channels0
|
||||
),
|
||||
State = State0#{channels => Channels},
|
||||
{connected, State};
|
||||
Other ->
|
||||
Other
|
||||
end.
|
||||
|
||||
on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
|
||||
on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);
|
||||
on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) ->
|
||||
on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, State);
|
||||
on_query(
|
||||
InstanceId,
|
||||
{Channel, _Message, _Params, _Timeout} = Request,
|
||||
#{channels := Channels, connector_state := ConnectorState}
|
||||
) when is_binary(Channel) ->
|
||||
ChannelConfig = maps:get(Channel, Channels),
|
||||
Result = emqx_mysql:on_query(
|
||||
InstanceId,
|
||||
Request,
|
||||
maps:merge(ConnectorState, ChannelConfig)
|
||||
),
|
||||
?tp(mysql_connector_on_query_return, #{instance_id => InstanceId, result => Result}),
|
||||
Result;
|
||||
on_query(InstanceId, Request, _State = #{channels := _Channels, connector_state := ConnectorState}) ->
|
||||
emqx_mysql:on_query(InstanceId, Request, ConnectorState).
|
||||
|
||||
on_batch_query(
|
||||
InstanceId,
|
||||
[Req | _] = BatchRequest,
|
||||
#{channels := Channels, connector_state := ConnectorState}
|
||||
) when is_binary(element(1, Req)) ->
|
||||
Channel = element(1, Req),
|
||||
ChannelConfig = maps:get(Channel, Channels),
|
||||
Result = emqx_mysql:on_batch_query(
|
||||
InstanceId,
|
||||
BatchRequest,
|
||||
maps:merge(ConnectorState, ChannelConfig)
|
||||
),
|
||||
?tp(mysql_connector_on_batch_query_return, #{instance_id => InstanceId, result => Result}),
|
||||
Result;
|
||||
on_batch_query(InstanceId, BatchRequest, _State = #{connector_state := ConnectorState}) ->
|
||||
emqx_mysql:on_batch_query(InstanceId, BatchRequest, ConnectorState).
|
||||
|
||||
on_remove_channel(
|
||||
_InstanceId, #{channels := Channels, connector_state := ConnectorState} = State, ChannelId
|
||||
) ->
|
||||
ChannelConfig = maps:get(ChannelId, Channels),
|
||||
emqx_mysql:unprepare_sql(maps:merge(ChannelConfig, ConnectorState)),
|
||||
NewState = State#{channels => maps:remove(ChannelId, Channels)},
|
||||
{ok, NewState}.
|
||||
|
||||
-spec on_start(binary(), hocon:config()) ->
|
||||
{ok, #{connector_state := emqx_mysql:state(), channels := map()}} | {error, _}.
|
||||
on_start(InstanceId, Config) ->
|
||||
case emqx_mysql:on_start(InstanceId, Config) of
|
||||
{ok, ConnectorState} ->
|
||||
State = #{
|
||||
connector_state => ConnectorState,
|
||||
channels => #{}
|
||||
},
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
on_stop(InstanceId, _State = #{connector_state := ConnectorState}) ->
|
||||
ok = emqx_mysql:on_stop(InstanceId, ConnectorState),
|
||||
?tp(mysql_connector_stopped, #{instance_id => InstanceId}),
|
||||
ok.
|
||||
|
||||
%%========================================================================================
|
||||
%% Helper fns
|
||||
%%========================================================================================
|
||||
set_prepares(ChannelConfig, ConnectorState) ->
|
||||
#{prepares := Prepares} =
|
||||
emqx_mysql:init_prepare(maps:merge(ConnectorState, ChannelConfig)),
|
||||
ChannelConfig#{prepares => Prepares}.
|
|
@ -242,13 +242,12 @@ send_message(Config, Payload) ->
|
|||
query_resource(Config, Request) ->
|
||||
Name = ?config(mysql_name, Config),
|
||||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
emqx_resource:query(ResourceID, Request, #{timeout => 500}).
|
||||
emqx_bridge_v2:query(BridgeType, Name, Request, #{timeout => 500}).
|
||||
|
||||
sync_query_resource(Config, Request) ->
|
||||
Name = ?config(mysql_name, Config),
|
||||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
ResourceID = emqx_bridge_v2:id(BridgeType, Name),
|
||||
emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request).
|
||||
|
||||
query_resource_async(Config, Request) ->
|
||||
|
@ -256,8 +255,7 @@ query_resource_async(Config, Request) ->
|
|||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
Ref = alias([reply]),
|
||||
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
Return = emqx_resource:query(ResourceID, Request, #{
|
||||
Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{
|
||||
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
|
||||
}),
|
||||
{Return, Ref}.
|
||||
|
@ -274,7 +272,9 @@ unprepare(Config, Key) ->
|
|||
Name = ?config(mysql_name, Config),
|
||||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
{ok, _, #{state := #{pool_name := PoolName}}} = emqx_resource:get_instance(ResourceID),
|
||||
{ok, _, #{state := #{connector_state := #{pool_name := PoolName}}}} = emqx_resource:get_instance(
|
||||
ResourceID
|
||||
),
|
||||
[
|
||||
begin
|
||||
{ok, Conn} = ecpool_worker:client(Worker),
|
||||
|
@ -343,6 +343,17 @@ create_rule_and_action_http(Config) ->
|
|||
Error
|
||||
end.
|
||||
|
||||
request_api_status(BridgeId) ->
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader) of
|
||||
{ok, Res0} ->
|
||||
#{<<"status">> := Status} = _Res = emqx_utils_json:decode(Res0, [return_maps]),
|
||||
{ok, binary_to_existing_atom(Status)};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -519,14 +530,18 @@ t_write_timeout(Config) ->
|
|||
2 * Timeout
|
||||
),
|
||||
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||
Name = ?config(mysql_name, Config),
|
||||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
|
||||
case QueryMode of
|
||||
sync ->
|
||||
?assertMatch(
|
||||
{error, {resource_error, #{reason := timeout}}},
|
||||
query_resource(Config, {send_message, SentData, [], Timeout})
|
||||
query_resource(Config, {ResourceID, SentData, [], Timeout})
|
||||
);
|
||||
async ->
|
||||
query_resource(Config, {send_message, SentData, [], Timeout}),
|
||||
query_resource(Config, {ResourceID, SentData, [], Timeout}),
|
||||
ok
|
||||
end,
|
||||
ok
|
||||
|
@ -703,7 +718,10 @@ t_uninitialized_prepared_statement(Config) ->
|
|||
),
|
||||
Val = integer_to_binary(erlang:unique_integer()),
|
||||
SentData = #{payload => Val, timestamp => 1668602148000},
|
||||
unprepare(Config, send_message),
|
||||
Name = ?config(mysql_name, Config),
|
||||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_v2:id(BridgeType, Name),
|
||||
unprepare(Config, ResourceID),
|
||||
?check_trace(
|
||||
begin
|
||||
{Res, {ok, _}} =
|
||||
|
@ -721,7 +739,7 @@ t_uninitialized_prepared_statement(Config) ->
|
|||
#{?snk_kind := mysql_connector_prepare_query_failed, error := not_prepared},
|
||||
#{
|
||||
?snk_kind := mysql_connector_on_query_prepared_sql,
|
||||
type_or_key := send_message
|
||||
type_or_key := ResourceID
|
||||
},
|
||||
Trace
|
||||
)
|
||||
|
@ -736,33 +754,58 @@ t_uninitialized_prepared_statement(Config) ->
|
|||
ok.
|
||||
|
||||
t_missing_table(Config) ->
|
||||
QueryMode = ?config(query_mode, Config),
|
||||
Name = ?config(mysql_name, Config),
|
||||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
|
||||
?check_trace(
|
||||
begin
|
||||
connect_and_drop_table(Config),
|
||||
?assertMatch({ok, _}, create_bridge(Config)),
|
||||
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 20,
|
||||
?assertMatch(
|
||||
{ok, Status} when Status == connecting orelse Status == disconnected,
|
||||
emqx_resource_manager:health_check(ResourceID)
|
||||
request_api_status(BridgeID)
|
||||
)
|
||||
),
|
||||
Val = integer_to_binary(erlang:unique_integer()),
|
||||
SentData = #{payload => Val, timestamp => 1668602148000},
|
||||
Timeout = 1000,
|
||||
%Timeout = 1000,
|
||||
ResourceID = emqx_bridge_v2:id(BridgeType, Name),
|
||||
Request = {ResourceID, SentData},
|
||||
Result =
|
||||
case QueryMode of
|
||||
sync ->
|
||||
query_resource(Config, Request);
|
||||
async ->
|
||||
{_, Ref} = query_resource_async(Config, Request),
|
||||
{ok, Res} = receive_result(Ref, 2_000),
|
||||
Res
|
||||
end,
|
||||
|
||||
BatchSize = ?config(batch_size, Config),
|
||||
IsBatch = BatchSize > 1,
|
||||
case IsBatch of
|
||||
true ->
|
||||
?assertMatch(
|
||||
{error, {resource_error, #{reason := unhealthy_target}}},
|
||||
query_resource(Config, {send_message, SentData, [], Timeout})
|
||||
),
|
||||
{error,
|
||||
{unrecoverable_error,
|
||||
{1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
|
||||
Result
|
||||
);
|
||||
false ->
|
||||
?assertMatch(
|
||||
{error, undefined_table},
|
||||
Result
|
||||
)
|
||||
end,
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertMatch([_, _, _], ?of_kind(mysql_undefined_table, Trace)),
|
||||
?assertMatch([_ | _], ?of_kind(mysql_undefined_table, Trace)),
|
||||
ok
|
||||
end
|
||||
).
|
||||
|
@ -770,9 +813,9 @@ t_missing_table(Config) ->
|
|||
t_table_removed(Config) ->
|
||||
Name = ?config(mysql_name, Config),
|
||||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
connect_and_create_table(Config),
|
||||
?assertMatch({ok, _}, create_bridge(Config)),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 20,
|
||||
|
@ -782,17 +825,17 @@ t_table_removed(Config) ->
|
|||
Val = integer_to_binary(erlang:unique_integer()),
|
||||
SentData = #{payload => Val, timestamp => 1668602148000},
|
||||
Timeout = 1000,
|
||||
ActionID = emqx_bridge_v2:id(BridgeType, Name),
|
||||
?assertMatch(
|
||||
{error,
|
||||
{unrecoverable_error, {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
|
||||
sync_query_resource(Config, {send_message, SentData, [], Timeout})
|
||||
sync_query_resource(Config, {ActionID, SentData, [], Timeout})
|
||||
),
|
||||
ok.
|
||||
|
||||
t_nested_payload_template(Config) ->
|
||||
Name = ?config(mysql_name, Config),
|
||||
BridgeType = ?config(mysql_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
Value = integer_to_binary(erlang:unique_integer()),
|
||||
{ok, _} = create_bridge(
|
||||
Config,
|
||||
|
@ -803,6 +846,7 @@ t_nested_payload_template(Config) ->
|
|||
}
|
||||
),
|
||||
{ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 20,
|
||||
|
|
|
@ -34,6 +34,8 @@ resource_type(matrix) ->
|
|||
emqx_postgresql;
|
||||
resource_type(mongodb) ->
|
||||
emqx_bridge_mongodb_connector;
|
||||
resource_type(mysql) ->
|
||||
emqx_bridge_mysql_connector;
|
||||
resource_type(pgsql) ->
|
||||
emqx_postgresql;
|
||||
resource_type(syskeeper_forwarder) ->
|
||||
|
@ -94,6 +96,14 @@ connector_structs() ->
|
|||
required => false
|
||||
}
|
||||
)},
|
||||
{matrix,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")),
|
||||
#{
|
||||
desc => <<"Matrix Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{mongodb,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_mongodb, "config_connector")),
|
||||
|
@ -102,6 +112,30 @@ connector_structs() ->
|
|||
required => false
|
||||
}
|
||||
)},
|
||||
{mysql,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")),
|
||||
#{
|
||||
desc => <<"MySQL Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{pgsql,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_pgsql, "config_connector")),
|
||||
#{
|
||||
desc => <<"PostgreSQL Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{redis,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")),
|
||||
#{
|
||||
desc => <<"Redis Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{syskeeper_forwarder,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)),
|
||||
|
@ -118,14 +152,6 @@ connector_structs() ->
|
|||
required => false
|
||||
}
|
||||
)},
|
||||
{pgsql,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_pgsql, "config_connector")),
|
||||
#{
|
||||
desc => <<"PostgreSQL Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{timescale,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")),
|
||||
|
@ -133,22 +159,6 @@ connector_structs() ->
|
|||
desc => <<"Timescale Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{matrix,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")),
|
||||
#{
|
||||
desc => <<"Matrix Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{redis,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")),
|
||||
#{
|
||||
desc => <<"Redis Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)}
|
||||
].
|
||||
|
||||
|
@ -160,6 +170,7 @@ schema_modules() ->
|
|||
emqx_bridge_kafka,
|
||||
emqx_bridge_matrix,
|
||||
emqx_bridge_mongodb,
|
||||
emqx_bridge_mysql,
|
||||
emqx_bridge_syskeeper_connector,
|
||||
emqx_bridge_syskeeper_proxy,
|
||||
emqx_bridge_timescale,
|
||||
|
@ -185,6 +196,7 @@ api_schemas(Method) ->
|
|||
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
|
||||
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
|
||||
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
|
||||
|
|
|
@ -36,9 +36,11 @@
|
|||
-export([get_response/0, put_request/0, post_request/0]).
|
||||
|
||||
-export([connector_type_to_bridge_types/1]).
|
||||
|
||||
-export([
|
||||
api_fields/3,
|
||||
common_fields/0,
|
||||
connector_values/3,
|
||||
status_and_actions_fields/0,
|
||||
type_and_name_fields/1
|
||||
]).
|
||||
|
@ -128,16 +130,18 @@ connector_type_to_bridge_types(matrix) ->
|
|||
[matrix];
|
||||
connector_type_to_bridge_types(mongodb) ->
|
||||
[mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
|
||||
connector_type_to_bridge_types(mysql) ->
|
||||
[mysql];
|
||||
connector_type_to_bridge_types(pgsql) ->
|
||||
[pgsql];
|
||||
connector_type_to_bridge_types(redis) ->
|
||||
[redis, redis_single, redis_sentinel, redis_cluster];
|
||||
connector_type_to_bridge_types(syskeeper_forwarder) ->
|
||||
[syskeeper_forwarder];
|
||||
connector_type_to_bridge_types(syskeeper_proxy) ->
|
||||
[];
|
||||
connector_type_to_bridge_types(timescale) ->
|
||||
[timescale];
|
||||
connector_type_to_bridge_types(redis) ->
|
||||
[redis, redis_single, redis_sentinel, redis_cluster].
|
||||
[timescale].
|
||||
|
||||
actions_config_name() -> <<"actions">>.
|
||||
|
||||
|
@ -549,6 +553,48 @@ resource_opts_fields(Overrides) ->
|
|||
emqx_resource_schema:create_opts(Overrides)
|
||||
).
|
||||
|
||||
-type http_method() :: get | post | put.
|
||||
-type schema_example_map() :: #{atom() => term()}.
|
||||
|
||||
-spec connector_values(http_method(), atom(), schema_example_map()) -> schema_example_map().
|
||||
connector_values(Method, Type, ConnectorValues) ->
|
||||
TypeBin = atom_to_binary(Type),
|
||||
lists:foldl(
|
||||
fun(M1, M2) ->
|
||||
maps:merge(M1, M2)
|
||||
end,
|
||||
#{
|
||||
description => <<"My example ", TypeBin/binary, " connector">>
|
||||
},
|
||||
[
|
||||
ConnectorValues,
|
||||
method_values(Method, Type)
|
||||
]
|
||||
).
|
||||
|
||||
method_values(post, Type) ->
|
||||
TypeBin = atom_to_binary(Type),
|
||||
#{
|
||||
name => <<TypeBin/binary, "_connector">>,
|
||||
type => TypeBin
|
||||
};
|
||||
method_values(get, Type) ->
|
||||
maps:merge(
|
||||
method_values(post, Type),
|
||||
#{
|
||||
status => <<"connected">>,
|
||||
node_status => [
|
||||
#{
|
||||
node => <<"emqx@localhost">>,
|
||||
status => <<"connected">>
|
||||
}
|
||||
],
|
||||
actions => [<<"my_action">>]
|
||||
}
|
||||
);
|
||||
method_values(put, _Type) ->
|
||||
#{}.
|
||||
|
||||
%%======================================================================================
|
||||
%% Helper Functions
|
||||
%%======================================================================================
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
{erl_opts, [debug_info]}.
|
||||
{deps, [
|
||||
%% NOTE: mind ecpool version when updating eredis_cluster version
|
||||
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.4"}}},
|
||||
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.4.1"}}},
|
||||
{emqx_connector, {path, "../../apps/emqx_connector"}},
|
||||
{emqx_resource, {path, "../../apps/emqx_resource"}}
|
||||
]}.
|
||||
|
|
|
@ -36,7 +36,13 @@
|
|||
%% ecpool connect & reconnect
|
||||
-export([connect/1, prepare_sql_to_conn/2]).
|
||||
|
||||
-export([prepare_sql/2]).
|
||||
-export([
|
||||
init_prepare/1,
|
||||
prepare_sql/2,
|
||||
parse_prepare_sql/1,
|
||||
parse_prepare_sql/2,
|
||||
unprepare_sql/1
|
||||
]).
|
||||
|
||||
-export([roots/0, fields/1]).
|
||||
|
||||
|
@ -51,9 +57,10 @@
|
|||
#{
|
||||
pool_name := binary(),
|
||||
prepares := ok | {error, _},
|
||||
templates := #{{atom(), batch | prepstmt} => template()}
|
||||
templates := #{{atom(), batch | prepstmt} => template()},
|
||||
query_templates := map()
|
||||
}.
|
||||
|
||||
-export_type([state/0]).
|
||||
%%=====================================================================
|
||||
%% Hocon schema
|
||||
roots() ->
|
||||
|
@ -62,8 +69,7 @@ roots() ->
|
|||
fields(config) ->
|
||||
[{server, server()}] ++
|
||||
add_default_username(emqx_connector_schema_lib:relational_db_fields(), []) ++
|
||||
emqx_connector_schema_lib:ssl_fields() ++
|
||||
emqx_connector_schema_lib:prepare_statement_fields().
|
||||
emqx_connector_schema_lib:ssl_fields().
|
||||
|
||||
add_default_username([{username, OrigUsernameFn} | Tail], Head) ->
|
||||
Head ++ [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Tail];
|
||||
|
@ -267,7 +273,7 @@ do_check_prepares(
|
|||
);
|
||||
do_check_prepares(#{prepares := ok}) ->
|
||||
ok;
|
||||
do_check_prepares(#{prepares := {error, _}} = State) ->
|
||||
do_check_prepares(#{prepares := {error, _}, query_templates := _} = State) ->
|
||||
%% retry to prepare
|
||||
case prepare_sql(State) of
|
||||
ok ->
|
||||
|
@ -275,7 +281,9 @@ do_check_prepares(#{prepares := {error, _}} = State) ->
|
|||
{ok, State#{prepares => ok}};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
end;
|
||||
do_check_prepares(_NoTemplates) ->
|
||||
ok.
|
||||
|
||||
%% ===================================================================
|
||||
|
||||
|
@ -323,15 +331,17 @@ prepare_sql(Templates, PoolName) ->
|
|||
end.
|
||||
|
||||
do_prepare_sql(Templates, PoolName) ->
|
||||
Conns =
|
||||
Conns = get_connections_from_pool(PoolName),
|
||||
prepare_sql_to_conn_list(Conns, Templates).
|
||||
|
||||
get_connections_from_pool(PoolName) ->
|
||||
[
|
||||
begin
|
||||
{ok, Conn} = ecpool_worker:client(Worker),
|
||||
Conn
|
||||
end
|
||||
|| {_Name, Worker} <- ecpool:workers(PoolName)
|
||||
],
|
||||
prepare_sql_to_conn_list(Conns, Templates).
|
||||
].
|
||||
|
||||
prepare_sql_to_conn_list([], _Templates) ->
|
||||
ok;
|
||||
|
@ -369,6 +379,18 @@ prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) ->
|
|||
prepare_sql_to_conn(Conn, [{_Key, _Template} | Rest]) ->
|
||||
prepare_sql_to_conn(Conn, Rest).
|
||||
|
||||
unprepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
|
||||
ecpool:remove_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn}),
|
||||
lists:foreach(
|
||||
fun(Conn) ->
|
||||
lists:foreach(
|
||||
fun(Template) -> unprepare_sql_to_conn(Conn, Template) end,
|
||||
maps:to_list(Templates)
|
||||
)
|
||||
end,
|
||||
get_connections_from_pool(PoolName)
|
||||
).
|
||||
|
||||
unprepare_sql_to_conn(Conn, {{Key, prepstmt}, _}) ->
|
||||
mysql:unprepare(Conn, Key);
|
||||
unprepare_sql_to_conn(Conn, Key) when is_atom(Key) ->
|
||||
|
@ -377,12 +399,15 @@ unprepare_sql_to_conn(_Conn, _) ->
|
|||
ok.
|
||||
|
||||
parse_prepare_sql(Config) ->
|
||||
parse_prepare_sql(send_message, Config).
|
||||
|
||||
parse_prepare_sql(Key, Config) ->
|
||||
Queries =
|
||||
case Config of
|
||||
#{prepare_statement := Qs} ->
|
||||
Qs;
|
||||
#{sql := Query} ->
|
||||
#{send_message => Query};
|
||||
#{Key => Query};
|
||||
_ ->
|
||||
#{}
|
||||
end,
|
||||
|
@ -436,7 +461,9 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) -
|
|||
{emqx_jsonish, SQLOrData}
|
||||
),
|
||||
{TypeOrKey, Row}
|
||||
end.
|
||||
end;
|
||||
proc_sql_params(_TypeOrKey, SQLOrData, Params, _State) ->
|
||||
{SQLOrData, Params}.
|
||||
|
||||
on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State) ->
|
||||
Rows = [render_row(RowTemplate, Msg) || {_, Msg} <- BatchReqs],
|
||||
|
|
|
@ -16,27 +16,29 @@
|
|||
-module(emqx_utils_maps).
|
||||
|
||||
-export([
|
||||
deep_get/2,
|
||||
deep_get/3,
|
||||
deep_find/2,
|
||||
deep_put/3,
|
||||
deep_force_put/3,
|
||||
deep_remove/2,
|
||||
deep_merge/2,
|
||||
best_effort_recursive_sum/3,
|
||||
binary_key_map/1,
|
||||
safe_atom_key_map/1,
|
||||
unsafe_atom_key_map/1,
|
||||
jsonable_map/1,
|
||||
jsonable_map/2,
|
||||
binary_string/1,
|
||||
deep_convert/3,
|
||||
deep_find/2,
|
||||
deep_force_put/3,
|
||||
deep_get/2,
|
||||
deep_get/3,
|
||||
deep_merge/2,
|
||||
deep_put/3,
|
||||
deep_remove/2,
|
||||
diff_maps/2,
|
||||
best_effort_recursive_sum/3,
|
||||
if_only_to_toggle_enable/2,
|
||||
update_if_present/3,
|
||||
indent/3,
|
||||
jsonable_map/1,
|
||||
jsonable_map/2,
|
||||
key_comparer/1,
|
||||
put_if/4,
|
||||
rename/3,
|
||||
key_comparer/1
|
||||
safe_atom_key_map/1,
|
||||
unindent/2,
|
||||
unsafe_atom_key_map/1,
|
||||
update_if_present/3
|
||||
]).
|
||||
|
||||
-export_type([config_key/0, config_key_path/0]).
|
||||
|
@ -332,3 +334,18 @@ key_comparer(K) ->
|
|||
(M1, M2) ->
|
||||
M1 < M2
|
||||
end.
|
||||
|
||||
-spec indent(term(), [term()], map()) -> map().
|
||||
indent(IndentKey, PickKeys, Map) ->
|
||||
maps:put(
|
||||
IndentKey,
|
||||
maps:with(PickKeys, Map),
|
||||
maps:without(PickKeys, Map)
|
||||
).
|
||||
|
||||
-spec unindent(term(), map()) -> map().
|
||||
unindent(Key, Map) ->
|
||||
maps:merge(
|
||||
maps:remove(Key, Map),
|
||||
maps:get(Key, Map, #{})
|
||||
).
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
-module(emqx_utils_maps_tests).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-import(emqx_utils_maps, [indent/3, unindent/2]).
|
||||
|
||||
best_effort_recursive_sum_test_() ->
|
||||
DummyLogger = fun(_) -> ok end,
|
||||
[
|
||||
|
@ -129,3 +131,44 @@ key_comparer_test() ->
|
|||
#{}
|
||||
])
|
||||
).
|
||||
|
||||
map_indent_unindent_test_() ->
|
||||
M = #{a => 1, b => 2},
|
||||
[
|
||||
?_assertEqual(
|
||||
#{a => 1, c => #{b => 2}},
|
||||
indent(c, [b], M)
|
||||
),
|
||||
?_assertEqual(
|
||||
M,
|
||||
unindent(c, indent(c, [b], M))
|
||||
),
|
||||
?_assertEqual(
|
||||
#{a => 1, b => #{b => 2}},
|
||||
indent(b, [b], M)
|
||||
),
|
||||
?_assertEqual(
|
||||
M,
|
||||
unindent(b, #{a => 1, b => #{b => 2}})
|
||||
),
|
||||
?_assertEqual(
|
||||
#{a => 2},
|
||||
unindent(b, #{a => 1, b => #{a => 2}})
|
||||
),
|
||||
?_assertEqual(
|
||||
#{c => #{a => 1, b => 2}},
|
||||
indent(c, [a, b], M)
|
||||
),
|
||||
?_assertEqual(
|
||||
#{a => 1, b => 2, c => #{}},
|
||||
indent(c, [], M)
|
||||
),
|
||||
?_assertEqual(
|
||||
#{a => 1, b => 2, c => #{}},
|
||||
indent(c, [d, e, f], M)
|
||||
),
|
||||
?_assertEqual(
|
||||
#{a => 1, b => 2},
|
||||
unindent(c, M)
|
||||
)
|
||||
].
|
||||
|
|
2
mix.exs
2
mix.exs
|
@ -59,7 +59,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.0", override: true},
|
||||
{:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
|
||||
{:minirest, github: "emqx/minirest", tag: "1.3.15", override: true},
|
||||
{:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true},
|
||||
{:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true},
|
||||
{:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
|
||||
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
|
||||
# maybe forbid to fetch quicer
|
||||
|
|
|
@ -75,7 +75,7 @@
|
|||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}}
|
||||
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.15"}}}
|
||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
|
||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}}
|
||||
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
|
||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
||||
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}}
|
||||
|
|
|
@ -40,4 +40,14 @@ sql_template.desc:
|
|||
sql_template.label:
|
||||
"""SQL Template"""
|
||||
|
||||
action_parameters.label:
|
||||
"""Action Parameters"""
|
||||
action_parameters.desc:
|
||||
"""Additional parameters specific to this action type"""
|
||||
|
||||
mysql_action.label:
|
||||
"""MySQL Action"""
|
||||
mysql_action.desc:
|
||||
"""Action to interact with a MySQL connector"""
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue