refactor: split clickhouse bridges to actions and connectors
This commit is contained in:
parent
451c8ecaf5
commit
8ae0e78786
|
@ -97,6 +97,7 @@ hard_coded_action_info_modules_ee() ->
|
|||
emqx_bridge_rocketmq_action_info,
|
||||
emqx_bridge_influxdb_action_info,
|
||||
emqx_bridge_cassandra_action_info,
|
||||
emqx_bridge_clickhouse_action_info,
|
||||
emqx_bridge_mysql_action_info,
|
||||
emqx_bridge_pgsql_action_info,
|
||||
emqx_bridge_syskeeper_action_info,
|
||||
|
|
|
@ -32,8 +32,6 @@
|
|||
on_get_status/2
|
||||
]).
|
||||
|
||||
-export([transform_bridge_v1_config_to_connector_config/1]).
|
||||
|
||||
%% callbacks of ecpool
|
||||
-export([
|
||||
connect/1,
|
||||
|
@ -448,9 +446,6 @@ handle_result({error, Error}) ->
|
|||
handle_result(Res) ->
|
||||
Res.
|
||||
|
||||
transform_bridge_v1_config_to_connector_config(_) ->
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% utils
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_bridge_clickhouse, [
|
||||
{description, "EMQX Enterprise ClickHouse Bridge"},
|
||||
{vsn, "0.2.5"},
|
||||
{vsn, "0.3.0"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
@ -8,7 +8,7 @@
|
|||
emqx_resource,
|
||||
clickhouse
|
||||
]},
|
||||
{env, []},
|
||||
{env, [{emqx_action_info_modules, [emqx_bridge_clickhouse_action_info]}]},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
]}.
|
||||
|
|
|
@ -9,8 +9,11 @@
|
|||
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
%% Examples
|
||||
-export([
|
||||
conn_bridge_examples/1
|
||||
bridge_v2_examples/1,
|
||||
conn_bridge_examples/1,
|
||||
connector_examples/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -20,12 +23,10 @@
|
|||
desc/1
|
||||
]).
|
||||
|
||||
-define(DEFAULT_SQL,
|
||||
<<"INSERT INTO mqtt_test(payload, arrived) VALUES ('${payload}', ${timestamp})">>
|
||||
).
|
||||
|
||||
-define(DEFAULT_SQL, <<"INSERT INTO messages(data, arrived) VALUES ('${payload}', ${timestamp})">>).
|
||||
-define(DEFAULT_BATCH_VALUE_SEPARATOR, <<", ">>).
|
||||
|
||||
-define(CONNECTOR_TYPE, clickhouse).
|
||||
-define(ACTION_TYPE, clickhouse).
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Callback used by HTTP API
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
|
@ -40,6 +41,42 @@ conn_bridge_examples(Method) ->
|
|||
}
|
||||
].
|
||||
|
||||
bridge_v2_examples(Method) ->
|
||||
ParamsExample = #{
|
||||
parameters => #{
|
||||
batch_value_separator => ?DEFAULT_BATCH_VALUE_SEPARATOR,
|
||||
sql => ?DEFAULT_SQL
|
||||
}
|
||||
},
|
||||
[
|
||||
#{
|
||||
<<"clickhouse">> => #{
|
||||
summary => <<"ClickHouse Action">>,
|
||||
value => emqx_bridge_v2_schema:action_values(
|
||||
Method, clickhouse, clickhouse, ParamsExample
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
connector_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"clickhouse">> => #{
|
||||
summary => <<"ClickHouse Connector">>,
|
||||
value => emqx_connector_schema:connector_values(
|
||||
Method, clickhouse, #{
|
||||
server => <<"127.0.0.1:8123">>,
|
||||
database => <<"mqtt">>,
|
||||
pool_size => 8,
|
||||
username => <<"default">>,
|
||||
password => <<"******">>
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
values(_Method, Type) ->
|
||||
#{
|
||||
enable => true,
|
||||
|
@ -71,19 +108,49 @@ namespace() -> "bridge_clickhouse".
|
|||
|
||||
roots() -> [].
|
||||
|
||||
fields("config_connector") ->
|
||||
emqx_connector_schema:common_fields() ++
|
||||
emqx_bridge_clickhouse_connector:fields(config) ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
||||
fields(action) ->
|
||||
{clickhouse,
|
||||
mk(
|
||||
hoconsc:map(name, ref(?MODULE, clickhouse_action)),
|
||||
#{desc => <<"ClickHouse Action Config">>, required => false}
|
||||
)};
|
||||
fields(clickhouse_action) ->
|
||||
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||
mk(ref(?MODULE, action_parameters), #{
|
||||
required => true, desc => ?DESC(action_parameters)
|
||||
})
|
||||
);
|
||||
fields(action_parameters) ->
|
||||
[
|
||||
sql_field(),
|
||||
batch_value_separator_field()
|
||||
];
|
||||
fields(connector_resource_opts) ->
|
||||
emqx_connector_schema:resource_opts_fields();
|
||||
fields(Field) when
|
||||
Field == "get_connector";
|
||||
Field == "put_connector";
|
||||
Field == "post_connector"
|
||||
->
|
||||
Fields =
|
||||
emqx_bridge_clickhouse_connector:fields(config) ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
|
||||
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
|
||||
fields(Field) when
|
||||
Field == "get_bridge_v2";
|
||||
Field == "post_bridge_v2";
|
||||
Field == "put_bridge_v2"
|
||||
->
|
||||
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(clickhouse_action));
|
||||
fields("config") ->
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{sql,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||
)},
|
||||
{batch_value_separator,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR}
|
||||
)},
|
||||
sql_field(),
|
||||
batch_value_separator_field(),
|
||||
{local_topic,
|
||||
mk(
|
||||
binary(),
|
||||
|
@ -112,6 +179,28 @@ fields("get") ->
|
|||
fields("post", Type) ->
|
||||
[type_field(Type), name_field() | fields("config")].
|
||||
|
||||
sql_field() ->
|
||||
{sql,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||
)}.
|
||||
|
||||
batch_value_separator_field() ->
|
||||
{batch_value_separator,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR}
|
||||
)}.
|
||||
|
||||
desc(clickhouse_action) ->
|
||||
?DESC(clickhouse_action);
|
||||
desc(action_parameters) ->
|
||||
?DESC(action_parameters);
|
||||
desc("config_connector") ->
|
||||
?DESC("desc_config");
|
||||
desc(connector_resource_opts) ->
|
||||
?DESC(emqx_resource_schema, "resource_opts");
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
-module(emqx_bridge_clickhouse_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
||||
-export([
|
||||
bridge_v1_config_to_action_config/2,
|
||||
bridge_v1_config_to_connector_config/1,
|
||||
connector_action_config_to_bridge_v1_config/2,
|
||||
bridge_v1_type_name/0,
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0
|
||||
]).
|
||||
|
||||
-import(emqx_utils_conv, [bin/1]).
|
||||
|
||||
-define(SCHEMA_MODULE, emqx_bridge_clickhouse).
|
||||
|
||||
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
||||
ActionTopLevelKeys = schema_keys(clickhouse_action),
|
||||
ActionParametersKeys = schema_keys(action_parameters),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
|
||||
emqx_utils_maps:update_if_present(
|
||||
<<"resource_opts">>,
|
||||
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
|
||||
ActionConfig#{<<"connector">> => ConnectorName}
|
||||
).
|
||||
|
||||
bridge_v1_config_to_connector_config(BridgeV1Config) ->
|
||||
ActionTopLevelKeys = schema_keys(clickhouse_action),
|
||||
ActionParametersKeys = schema_keys(action_parameters),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ConnectorTopLevelKeys = schema_keys("config_connector"),
|
||||
ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
|
||||
ConnConfig0 = maps:with(ConnectorKeys, BridgeV1Config),
|
||||
emqx_utils_maps:update_if_present(
|
||||
<<"resource_opts">>,
|
||||
fun emqx_connector_schema:project_to_connector_resource_opts/1,
|
||||
ConnConfig0
|
||||
).
|
||||
|
||||
connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) ->
|
||||
RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config(
|
||||
ConnectorRawConf, ActionRawConf
|
||||
),
|
||||
maps:without([<<"clickhouse_type">>], RawConf).
|
||||
|
||||
bridge_v1_type_name() -> clickhouse.
|
||||
|
||||
action_type_name() -> clickhouse.
|
||||
|
||||
connector_type_name() -> clickhouse.
|
||||
|
||||
schema_module() -> ?SCHEMA_MODULE.
|
||||
|
||||
make_config_map(PickKeys, IndentKeys, Config) ->
|
||||
Conf0 = maps:with(PickKeys, Config),
|
||||
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
|
||||
|
||||
schema_keys(Name) ->
|
||||
[bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].
|
|
@ -32,6 +32,10 @@
|
|||
callback_mode/0,
|
||||
on_start/2,
|
||||
on_stop/2,
|
||||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channel_status/3,
|
||||
on_get_channels/1,
|
||||
on_query/3,
|
||||
on_batch_query/3,
|
||||
on_get_status/2
|
||||
|
@ -62,6 +66,7 @@
|
|||
|
||||
-type state() ::
|
||||
#{
|
||||
channels => #{binary() => templates()},
|
||||
templates := templates(),
|
||||
pool_name := binary(),
|
||||
connect_timeout := pos_integer()
|
||||
|
@ -155,10 +160,9 @@ on_start(
|
|||
{pool, InstanceID}
|
||||
],
|
||||
try
|
||||
Templates = prepare_sql_templates(Config),
|
||||
State = #{
|
||||
channels => #{},
|
||||
pool_name => InstanceID,
|
||||
templates => Templates,
|
||||
connect_timeout => ConnectTimeout
|
||||
},
|
||||
case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
|
||||
|
@ -195,10 +199,8 @@ prepare_sql_templates(#{
|
|||
sql := Template,
|
||||
batch_value_separator := Separator
|
||||
}) ->
|
||||
InsertTemplate =
|
||||
emqx_placeholder:preproc_tmpl(Template),
|
||||
BulkExtendInsertTemplate =
|
||||
prepare_sql_bulk_extend_template(Template, Separator),
|
||||
InsertTemplate = emqx_placeholder:preproc_tmpl(Template),
|
||||
BulkExtendInsertTemplate = prepare_sql_bulk_extend_template(Template, Separator),
|
||||
#{
|
||||
send_message_template => InsertTemplate,
|
||||
extend_send_message_template => BulkExtendInsertTemplate
|
||||
|
@ -285,6 +287,27 @@ on_stop(InstanceID, _State) ->
|
|||
}),
|
||||
emqx_resource_pool:stop(InstanceID).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% channel related emqx_resouce callbacks
|
||||
%% -------------------------------------------------------------------
|
||||
on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) ->
|
||||
#{parameters := ParamConf} = ChannConf0,
|
||||
NewChanns = Channs#{ChannId => #{templates => prepare_sql_templates(ParamConf)}},
|
||||
{ok, OldState#{channels => NewChanns}}.
|
||||
|
||||
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) ->
|
||||
NewState = State#{channels => maps:remove(ChannId, Channels)},
|
||||
{ok, NewState}.
|
||||
|
||||
on_get_channel_status(InstanceId, _ChannId, State) ->
|
||||
case on_get_status(InstanceId, State) of
|
||||
{connected, _} -> connected;
|
||||
{disconnected, _, _} -> disconnected
|
||||
end.
|
||||
|
||||
on_get_channels(InstanceId) ->
|
||||
emqx_bridge_v2:get_channels_for_connector(InstanceId).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% on_get_status emqx_resouce callback and related functions
|
||||
%% -------------------------------------------------------------------
|
||||
|
@ -339,8 +362,8 @@ do_get_status(PoolName, Timeout) ->
|
|||
|
||||
-spec on_query
|
||||
(resource_id(), Request, resource_state()) -> query_result() when
|
||||
Request :: {RequestType, Data},
|
||||
RequestType :: send_message,
|
||||
Request :: {ChannId, Data},
|
||||
ChannId :: binary(),
|
||||
Data :: map();
|
||||
(resource_id(), Request, resource_state()) -> query_result() when
|
||||
Request :: {RequestType, SQL},
|
||||
|
@ -361,12 +384,20 @@ on_query(
|
|||
}),
|
||||
%% Have we got a query or data to fit into an SQL template?
|
||||
SimplifiedRequestType = query_type(RequestType),
|
||||
#{templates := Templates} = State,
|
||||
Templates = get_templates(RequestType, State),
|
||||
SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL),
|
||||
ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL),
|
||||
transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
|
||||
|
||||
get_sql(send_message, #{send_message_template := PreparedSQL}, Data) ->
|
||||
get_templates(ChannId, State) ->
|
||||
case maps:find(channels, State) of
|
||||
{ok, Channels} ->
|
||||
maps:get(templates, maps:get(ChannId, Channels, #{}), #{});
|
||||
error ->
|
||||
#{}
|
||||
end.
|
||||
|
||||
get_sql(channel_message, #{send_message_template := PreparedSQL}, Data) ->
|
||||
emqx_placeholder:proc_tmpl(PreparedSQL, Data);
|
||||
get_sql(_, _, SQL) ->
|
||||
SQL.
|
||||
|
@ -376,24 +407,21 @@ query_type(sql) ->
|
|||
query_type(query) ->
|
||||
query;
|
||||
%% Data that goes to bridges use the prepared template
|
||||
query_type(send_message) ->
|
||||
send_message.
|
||||
query_type(ChannId) when is_binary(ChannId) ->
|
||||
channel_message.
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% on_batch_query emqx_resouce callback and related functions
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when
|
||||
BatchReq :: nonempty_list({'send_message', map()}).
|
||||
BatchReq :: nonempty_list({binary(), map()}).
|
||||
|
||||
on_batch_query(
|
||||
ResourceID,
|
||||
BatchReq,
|
||||
#{pool_name := PoolName, templates := Templates} = _State
|
||||
) ->
|
||||
%% Currently we only support batch requests with the send_message key
|
||||
{Keys, ObjectsToInsert} = lists:unzip(BatchReq),
|
||||
ensure_keys_are_of_type_send_message(Keys),
|
||||
on_batch_query(ResourceID, BatchReq, #{pool_name := PoolName} = State) ->
|
||||
%% Currently we only support batch requests with a binary ChannId
|
||||
{[ChannId | _] = Keys, ObjectsToInsert} = lists:unzip(BatchReq),
|
||||
ensure_channel_messages(Keys),
|
||||
Templates = get_templates(ChannId, State),
|
||||
%% Create batch insert SQL statement
|
||||
SQL = objects_to_sql(ObjectsToInsert, Templates),
|
||||
%% Do the actual query in the database
|
||||
|
@ -401,22 +429,16 @@ on_batch_query(
|
|||
%% Transform the result to a better format
|
||||
transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL).
|
||||
|
||||
ensure_keys_are_of_type_send_message(Keys) ->
|
||||
case lists:all(fun is_send_message_atom/1, Keys) of
|
||||
ensure_channel_messages(Keys) ->
|
||||
case lists:all(fun is_binary/1, Keys) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
erlang:error(
|
||||
{unrecoverable_error,
|
||||
<<"Unexpected type for batch message (Expected send_message)">>}
|
||||
{unrecoverable_error, <<"Unexpected type for batch message (Expected channel-id)">>}
|
||||
)
|
||||
end.
|
||||
|
||||
is_send_message_atom(send_message) ->
|
||||
true;
|
||||
is_send_message_atom(_) ->
|
||||
false.
|
||||
|
||||
objects_to_sql(
|
||||
[FirstObject | RemainingObjects] = _ObjectsToInsert,
|
||||
#{
|
||||
|
|
|
@ -42,6 +42,8 @@ resource_type(influxdb) ->
|
|||
emqx_bridge_influxdb_connector;
|
||||
resource_type(cassandra) ->
|
||||
emqx_bridge_cassandra_connector;
|
||||
resource_type(clickhouse) ->
|
||||
emqx_bridge_clickhouse_connector;
|
||||
resource_type(mysql) ->
|
||||
emqx_bridge_mysql_connector;
|
||||
resource_type(pgsql) ->
|
||||
|
@ -181,6 +183,14 @@ connector_structs() ->
|
|||
required => false
|
||||
}
|
||||
)},
|
||||
{clickhouse,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_clickhouse, "config_connector")),
|
||||
#{
|
||||
desc => <<"ClickHouse Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{mysql,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")),
|
||||
|
@ -307,6 +317,7 @@ schema_modules() ->
|
|||
emqx_bridge_oracle,
|
||||
emqx_bridge_influxdb,
|
||||
emqx_bridge_cassandra,
|
||||
emqx_bridge_clickhouse,
|
||||
emqx_bridge_mysql,
|
||||
emqx_bridge_syskeeper_connector,
|
||||
emqx_bridge_syskeeper_proxy,
|
||||
|
@ -345,6 +356,7 @@ api_schemas(Method) ->
|
|||
api_ref(emqx_bridge_oracle, <<"oracle">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_clickhouse, <<"clickhouse">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
|
||||
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
|
||||
|
|
|
@ -142,6 +142,8 @@ connector_type_to_bridge_types(influxdb) ->
|
|||
[influxdb, influxdb_api_v1, influxdb_api_v2];
|
||||
connector_type_to_bridge_types(cassandra) ->
|
||||
[cassandra];
|
||||
connector_type_to_bridge_types(clickhouse) ->
|
||||
[clickhouse];
|
||||
connector_type_to_bridge_types(mysql) ->
|
||||
[mysql];
|
||||
connector_type_to_bridge_types(mqtt) ->
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
The bridges for ClickHouse have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API.
|
|
@ -6,6 +6,16 @@ batch_value_separator.desc:
|
|||
batch_value_separator.label:
|
||||
"""Batch Value Separator"""
|
||||
|
||||
action_parameters.desc:
|
||||
"""Action specific configs."""
|
||||
action_parameters.label:
|
||||
"""Action"""
|
||||
|
||||
cassandra_action.desc:
|
||||
"""Action configs."""
|
||||
cassandra_action.label:
|
||||
"""Action"""
|
||||
|
||||
config_enable.desc:
|
||||
"""Enable or disable this bridge"""
|
||||
|
||||
|
|
Loading…
Reference in New Issue