refactor: split cassandra bridges to actions and connectors

This commit is contained in:
Shawn 2024-01-19 18:46:35 +08:00 committed by zhongwencool
parent 497e735bf4
commit 6a21766ce3
11 changed files with 332 additions and 184 deletions

View File

@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_matrix_action_info, emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info, emqx_bridge_mongodb_action_info,
emqx_bridge_influxdb_action_info, emqx_bridge_influxdb_action_info,
emqx_bridge_cassandra_action_info,
emqx_bridge_mysql_action_info, emqx_bridge_mysql_action_info,
emqx_bridge_pgsql_action_info, emqx_bridge_pgsql_action_info,
emqx_bridge_syskeeper_action_info, emqx_bridge_syskeeper_action_info,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_cassandra, [ {application, emqx_bridge_cassandra, [
{description, "EMQX Enterprise Cassandra Bridge"}, {description, "EMQX Enterprise Cassandra Bridge"},
{vsn, "0.1.6"}, {vsn, "0.2.0"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,
@ -8,7 +8,7 @@
emqx_resource, emqx_resource,
ecql ecql
]}, ]},
{env, []}, {env, [{emqx_action_info_modules, [emqx_bridge_cassandra_action_info]}]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -12,11 +12,17 @@
%% schema examples %% schema examples
-export([ -export([
conn_bridge_examples/1,
values/2, values/2,
fields/2 fields/2
]). ]).
%% Examples
-export([
bridge_v2_examples/1,
conn_bridge_examples/1,
connector_examples/1
]).
%% schema %% schema
-export([ -export([
namespace/0, namespace/0,
@ -26,10 +32,13 @@
]). ]).
-define(DEFAULT_CQL, << -define(DEFAULT_CQL, <<
"insert into mqtt_msg(topic, msgid, sender, qos, payload, arrived, retain) " "insert into mqtt_msg(msgid, topic, qos, payload, arrived) "
"values (${topic}, ${id}, ${clientid}, ${qos}, ${payload}, ${timestamp}, ${flags.retain})" "values (${id}, ${topic}, ${qos}, ${payload}, ${timestamp})"
>>). >>).
-define(CONNECTOR_TYPE, cassandra).
-define(ACTION_TYPE, cassandra).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% schema examples %% schema examples
@ -43,6 +52,41 @@ conn_bridge_examples(Method) ->
} }
]. ].
bridge_v2_examples(Method) ->
ParamsExample = #{
parameters => #{
cql => ?DEFAULT_CQL
}
},
[
#{
<<"cassandra">> => #{
summary => <<"Cassandra Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, cassandra, cassandra, ParamsExample
)
}
}
].
connector_examples(Method) ->
[
#{
<<"cassandra">> => #{
summary => <<"Cassandra Connector">>,
value => emqx_connector_schema:connector_values(
Method, cassandra, #{
servers => <<"127.0.0.1:9042">>,
keyspace => <<"mqtt">>,
username => <<"root">>,
password => <<"******">>,
pool_size => 8
}
)
}
}
].
%% no difference in get/post/put method %% no difference in get/post/put method
values(_Method, Type) -> values(_Method, Type) ->
#{ #{
@ -73,14 +117,47 @@ namespace() -> "bridge_cassa".
roots() -> []. roots() -> [].
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
emqx_bridge_cassandra_connector:fields("connector") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(action) ->
{cassandra,
mk(
hoconsc:map(name, ref(?MODULE, cassandra_action)),
#{desc => <<"Cassandra Action Config">>, required => false}
)};
fields(cassandra_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(ref(?MODULE, action_parameters), #{
required => true, desc => ?DESC(action_parameters)
})
);
fields(action_parameters) ->
[
cql_field()
];
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
Fields =
emqx_bridge_cassandra_connector:fields("connector") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(cassandra_action));
fields("config") -> fields("config") ->
[ [
cql_field(),
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{cql,
mk(
binary(),
#{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>}
)},
{local_topic, {local_topic,
mk( mk(
binary(), binary(),
@ -99,8 +176,23 @@ fields("get") ->
fields("post", Type) -> fields("post", Type) ->
[type_field(Type), name_field() | fields("config")]. [type_field(Type), name_field() | fields("config")].
cql_field() ->
{cql,
mk(
binary(),
#{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>}
)}.
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(cassandra_action) ->
?DESC(cassandra_action);
desc(action_parameters) ->
?DESC(action_parameters);
desc("config_connector") ->
?DESC("desc_config");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Cassandra using `", string:to_upper(Method), "` method."]; ["Configuration for Cassandra using `", string:to_upper(Method), "` method."];
desc(_) -> desc(_) ->

View File

@ -0,0 +1,62 @@
-module(emqx_bridge_cassandra_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_config_to_action_config/2,
bridge_v1_config_to_connector_config/1,
connector_action_config_to_bridge_v1_config/2,
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
-import(emqx_utils_conv, [bin/1]).
-define(SCHEMA_MODULE, emqx_bridge_cassandra).
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(cassandra_action),
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig#{<<"connector">> => ConnectorName}
).
bridge_v1_config_to_connector_config(BridgeV1Config) ->
ActionTopLevelKeys = schema_keys(cassandra_action),
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ConnectorTopLevelKeys = schema_keys("config_connector"),
ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
ConnConfig0 = maps:with(ConnectorKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
ConnConfig0
).
connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) ->
RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config(
ConnectorRawConf, ActionRawConf
),
maps:without([<<"cassandra_type">>], RawConf).
bridge_v1_type_name() -> cassandra.
action_type_name() -> cassandra.
connector_type_name() -> cassandra.
schema_module() -> ?SCHEMA_MODULE.
make_config_map(PickKeys, IndentKeys, Config) ->
Conf0 = maps:with(PickKeys, Config),
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
schema_keys(Name) ->
[bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].

View File

@ -14,13 +14,17 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% schema %% schema
-export([roots/0, fields/1]). -export([roots/0, fields/1, desc/1]).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
callback_mode/0, callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channel_status/3,
on_get_channels/1,
on_query/3, on_query/3,
on_query_async/4, on_query_async/4,
on_batch_query/3, on_batch_query/3,
@ -28,6 +32,8 @@
on_get_status/2 on_get_status/2
]). ]).
-export([transform_bridge_v1_config_to_connector_config/1]).
%% callbacks of ecpool %% callbacks of ecpool
-export([ -export([
connect/1, connect/1,
@ -39,16 +45,10 @@
-export([do_get_status/1]). -export([do_get_status/1]).
-type prepares() :: #{atom() => binary()}.
-type params_tokens() :: #{atom() => list()}.
-type state() :: -type state() ::
#{ #{
pool_name := binary(), pool_name := binary(),
prepare_cql := prepares(), channels := #{}
params_tokens := params_tokens(),
%% returned by ecql:prepare/2
prepare_statement := binary()
}. }.
-define(DEFAULT_SERVER_OPTION, #{default_port => ?CASSANDRA_DEFAULT_PORT}). -define(DEFAULT_SERVER_OPTION, #{default_port => ?CASSANDRA_DEFAULT_PORT}).
@ -62,7 +62,9 @@ roots() ->
fields(config) -> fields(config) ->
cassandra_db_fields() ++ cassandra_db_fields() ++
emqx_connector_schema_lib:ssl_fields() ++ emqx_connector_schema_lib:ssl_fields() ++
emqx_connector_schema_lib:prepare_statement_fields(). emqx_connector_schema_lib:prepare_statement_fields();
fields("connector") ->
cassandra_db_fields() ++ emqx_connector_schema_lib:ssl_fields().
cassandra_db_fields() -> cassandra_db_fields() ->
[ [
@ -83,6 +85,11 @@ keyspace(desc) -> ?DESC("keyspace");
keyspace(required) -> true; keyspace(required) -> true;
keyspace(_) -> undefined. keyspace(_) -> undefined.
desc(config) ->
?DESC("config");
desc("connector") ->
?DESC("connector").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% callbacks for emqx_resource %% callbacks for emqx_resource
@ -130,10 +137,9 @@ on_start(
false -> false ->
[] []
end, end,
State = parse_prepare_cql(Config),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; {ok, #{pool_name => InstId, channels => #{}}};
{error, Reason} -> {error, Reason} ->
?tp( ?tp(
cassandra_connector_start_failed, cassandra_connector_start_failed,
@ -149,23 +155,49 @@ on_stop(InstId, _State) ->
}), }),
emqx_resource_pool:stop(InstId). emqx_resource_pool:stop(InstId).
on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) ->
#{parameters := #{cql := CQL}} = ChannConf0,
{PrepareCQL, ParamsTokens} = emqx_placeholder:preproc_sql(CQL, '?'),
ParsedCql = #{
prepare_key => short_prepare_key(ChannId),
prepare_cql => PrepareCQL,
params_tokens => ParamsTokens
},
NewChanns = Channs#{ChannId => #{parsed_cql => ParsedCql, prepare_result => not_prepared}},
{ok, OldState#{channels => NewChanns}}.
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) ->
NewState = State#{channels => maps:remove(ChannId, Channels)},
{ok, NewState}.
on_get_channel_status(InstanceId, ChannId, #{channels := Channels, pool_name := PoolName} = State) ->
case on_get_status(InstanceId, State) of
connected ->
#{parsed_cql := ParsedCql} = maps:get(ChannId, Channels),
case prepare_cql_to_cassandra(ParsedCql, PoolName) of
{ok, _} -> connected;
{error, Reason} -> {connecting, Reason}
end;
_ ->
connecting
end.
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
-type request() :: -type request() ::
% emqx_bridge.erl % emqx_bridge.erl
{send_message, Params :: map()} {ChannId :: binary(), Params :: map()}
% common query % common query
| {query, SQL :: binary()} | {query, CQL :: binary()}
| {query, SQL :: binary(), Params :: map()}. | {query, CQL :: binary(), Params :: map()}.
-spec on_query( -spec on_query(
emqx_resource:resource_id(), emqx_resource:resource_id(),
request(), request(),
state() state()
) -> ok | {ok, ecql:cql_result()} | {error, {recoverable_error | unrecoverable_error, term()}}. ) -> ok | {ok, ecql:cql_result()} | {error, {recoverable_error | unrecoverable_error, term()}}.
on_query( on_query(InstId, Request, State) ->
InstId,
Request,
State
) ->
do_single_query(InstId, Request, sync, State). do_single_query(InstId, Request, sync, State).
-spec on_query_async( -spec on_query_async(
@ -174,21 +206,11 @@ on_query(
{function(), list()}, {function(), list()},
state() state()
) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}. ) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
on_query_async( on_query_async(InstId, Request, Callback, State) ->
InstId,
Request,
Callback,
State
) ->
do_single_query(InstId, Request, {async, Callback}, State). do_single_query(InstId, Request, {async, Callback}, State).
do_single_query( do_single_query(InstId, Request, Async, #{pool_name := PoolName} = State) ->
InstId, {Type, PreparedKeyOrCQL, Params} = parse_request_to_cql(Request),
Request,
Async,
#{pool_name := PoolName} = State
) ->
{Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
?tp( ?tp(
debug, debug,
cassandra_connector_received_cql_query, cassandra_connector_received_cql_query,
@ -196,12 +218,12 @@ do_single_query(
connector => InstId, connector => InstId,
type => Type, type => Type,
params => Params, params => Params,
prepared_key_or_cql => PreparedKeyOrSQL, prepared_key_or_cql => PreparedKeyOrCQL,
state => State state => State
} }
), ),
{PreparedKeyOrSQL1, Data} = proc_cql_params(Type, PreparedKeyOrSQL, Params, State), {PreparedKeyOrCQL1, Data} = proc_cql_params(Type, PreparedKeyOrCQL, Params, State),
Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrSQL1, Data), Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrCQL1, Data),
handle_result(Res). handle_result(Res).
-spec on_batch_query( -spec on_batch_query(
@ -209,11 +231,7 @@ do_single_query(
[request()], [request()],
state() state()
) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}. ) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
on_batch_query( on_batch_query(InstId, Requests, State) ->
InstId,
Requests,
State
) ->
do_batch_query(InstId, Requests, sync, State). do_batch_query(InstId, Requests, sync, State).
-spec on_batch_query_async( -spec on_batch_query_async(
@ -222,25 +240,15 @@ on_batch_query(
{function(), list()}, {function(), list()},
state() state()
) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}. ) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
on_batch_query_async( on_batch_query_async(InstId, Requests, Callback, State) ->
InstId,
Requests,
Callback,
State
) ->
do_batch_query(InstId, Requests, {async, Callback}, State). do_batch_query(InstId, Requests, {async, Callback}, State).
do_batch_query( do_batch_query(InstId, Requests, Async, #{pool_name := PoolName} = State) ->
InstId,
Requests,
Async,
#{pool_name := PoolName} = State
) ->
CQLs = CQLs =
lists:map( lists:map(
fun(Request) -> fun(Request) ->
{Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request), {Type, PreparedKeyOrCQL, Params} = parse_request_to_cql(Request),
proc_cql_params(Type, PreparedKeyOrSQL, Params, State) proc_cql_params(Type, PreparedKeyOrCQL, Params, State)
end, end,
Requests Requests
), ),
@ -256,26 +264,24 @@ do_batch_query(
Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs), Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs),
handle_result(Res). handle_result(Res).
parse_request_to_cql({send_message, Params}) -> parse_request_to_cql({query, CQL}) ->
{prepared_query, _Key = send_message, Params}; {query, CQL, #{}};
parse_request_to_cql({query, SQL}) -> parse_request_to_cql({query, CQL, Params}) ->
parse_request_to_cql({query, SQL, #{}}); {query, CQL, Params};
parse_request_to_cql({query, SQL, Params}) -> parse_request_to_cql({ChannId, Params}) ->
{query, SQL, Params}. {prepared_query, ChannId, Params}.
proc_cql_params( proc_cql_params(prepared_query, ChannId, Params, #{channels := Channs}) ->
prepared_query, #{
PreparedKey0, parsed_cql := #{
Params, prepare_key := PrepareKey,
#{prepare_statement := Prepares, params_tokens := ParamsTokens} params_tokens := ParamsTokens
) -> }
%% assert } = maps:get(ChannId, Channs),
_PreparedKey = maps:get(PreparedKey0, Prepares), {PrepareKey, assign_type_for_params(emqx_placeholder:proc_sql(ParamsTokens, Params))};
Tokens = maps:get(PreparedKey0, ParamsTokens), proc_cql_params(query, CQL, Params, _State) ->
{PreparedKey0, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}; {CQL1, Tokens} = emqx_placeholder:preproc_sql(CQL, '?'),
proc_cql_params(query, SQL, Params, _State) -> {CQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}.
{SQL1, Tokens} = emqx_placeholder:preproc_sql(SQL, '?'),
{SQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}.
exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
Type == query; Type == prepared_query Type == query; Type == prepared_query
@ -314,38 +320,15 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
exec(PoolName, Query) -> exec(PoolName, Query) ->
ecpool:pick_and_do(PoolName, Query, no_handover). ecpool:pick_and_do(PoolName, Query, no_handover).
on_get_status(_InstId, #{pool_name := PoolName} = State) -> on_get_status(_InstId, #{pool_name := PoolName}) ->
case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
true -> true -> connected;
case do_check_prepares(State) of false -> connecting
ok ->
connected;
{ok, NState} ->
%% return new state with prepared statements
{connected, NState};
false ->
%% do not log error, it is logged in prepare_cql_to_conn
connecting
end;
false ->
connecting
end. end.
do_get_status(Conn) -> do_get_status(Conn) ->
ok == element(1, ecql:query(Conn, "SELECT cluster_name FROM system.local")). ok == element(1, ecql:query(Conn, "SELECT cluster_name FROM system.local")).
do_check_prepares(#{prepare_cql := Prepares}) when is_map(Prepares) ->
ok;
do_check_prepares(State = #{pool_name := PoolName, prepare_cql := {error, Prepares}}) ->
%% retry to prepare
case prepare_cql(Prepares, PoolName) of
{ok, Sts} ->
%% remove the error
{ok, State#{prepare_cql => Prepares, prepare_statement := Sts}};
_Error ->
false
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% callbacks query %% callbacks query
@ -394,88 +377,50 @@ conn_opts([Opt | Opts], Acc) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% prepare %% prepare
prepare_cql_to_cassandra(ParsedCql, PoolName) ->
%% XXX: hardcode case prepare_cql_to_cassandra(ecpool:workers(PoolName), ParsedCql, #{}) of
%% note: the `cql` param is passed by emqx_bridge_cassandra {ok, Statement} ->
parse_prepare_cql(#{cql := SQL}) ->
parse_prepare_cql([{send_message, SQL}], #{}, #{});
parse_prepare_cql(_) ->
#{prepare_cql => #{}, params_tokens => #{}}.
parse_prepare_cql([{Key, H} | T], Prepares, Tokens) ->
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '?'),
parse_prepare_cql(
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
);
parse_prepare_cql([], Prepares, Tokens) ->
#{
prepare_cql => Prepares,
params_tokens => Tokens
}.
init_prepare(State = #{prepare_cql := Prepares, pool_name := PoolName}) ->
case maps:size(Prepares) of
0 ->
State;
_ ->
case prepare_cql(Prepares, PoolName) of
{ok, Sts} ->
State#{prepare_statement := Sts};
Error ->
?tp(
error,
cassandra_prepare_cql_failed,
#{prepares => Prepares, reason => Error}
),
%% mark the prepare_cql as failed
State#{prepare_cql => {error, Prepares}}
end
end.
prepare_cql(Prepares, PoolName) when is_map(Prepares) ->
prepare_cql(maps:to_list(Prepares), PoolName);
prepare_cql(Prepares, PoolName) ->
case do_prepare_cql(Prepares, PoolName) of
{ok, _Sts} = Ok ->
%% prepare for reconnect %% prepare for reconnect
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_cql_to_conn, [Prepares]}), ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_cql_to_conn, [ParsedCql]}),
Ok; {ok, Statement};
Error -> Error ->
?tp(
error,
cassandra_prepare_cql_failed,
#{parsed_cql => ParsedCql, reason => Error}
),
Error Error
end. end.
do_prepare_cql(Prepares, PoolName) -> prepare_cql_to_cassandra([{_Name, Worker} | T], ParsedCql, _LastSts) ->
do_prepare_cql(ecpool:workers(PoolName), Prepares, #{}).
do_prepare_cql([{_Name, Worker} | T], Prepares, _LastSts) ->
{ok, Conn} = ecpool_worker:client(Worker), {ok, Conn} = ecpool_worker:client(Worker),
case prepare_cql_to_conn(Conn, Prepares) of case prepare_cql_to_conn(Conn, ParsedCql) of
{ok, Sts} -> {ok, Statement} ->
do_prepare_cql(T, Prepares, Sts); prepare_cql_to_cassandra(T, ParsedCql, Statement);
Error -> Error ->
Error Error
end; end;
do_prepare_cql([], _Prepares, LastSts) -> prepare_cql_to_cassandra([], _ParsedCql, LastSts) ->
{ok, LastSts}. {ok, LastSts}.
prepare_cql_to_conn(Conn, Prepares) -> prepare_cql_to_conn(Conn, #{prepare_key := PrepareKey, prepare_cql := PrepareCQL}) when
prepare_cql_to_conn(Conn, Prepares, #{}). is_pid(Conn)
->
prepare_cql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; ?SLOG(info, #{
prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> msg => "cassandra_prepare_cql", prepare_key => PrepareKey, prepare_cql => PrepareCQL
?SLOG(info, #{msg => "cassandra_prepare_cql", name => Key, prepare_cql => SQL}), }),
case ecql:prepare(Conn, Key, SQL) of case ecql:prepare(Conn, PrepareKey, PrepareCQL) of
{ok, Statement} -> {ok, Statement} ->
prepare_cql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); {ok, Statement};
{error, Error} = Other -> {error, Reason} = Error ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "cassandra_prepare_cql_failed", msg => "cassandra_prepare_cql_failed",
worker_pid => Conn, worker_pid => Conn,
name => Key, name => PrepareKey,
prepare_cql => SQL, prepare_cql => PrepareCQL,
error => Error reason => Reason
}), }),
Other Error
end. end.
handle_result({error, disconnected}) -> handle_result({error, disconnected}) ->
@ -487,6 +432,9 @@ handle_result({error, Error}) ->
handle_result(Res) -> handle_result(Res) ->
Res. Res.
transform_bridge_v1_config_to_connector_config(_) ->
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% utils %% utils
@ -513,3 +461,11 @@ maybe_assign_type(V) when is_integer(V) ->
maybe_assign_type(V) when is_float(V) -> {double, V}; maybe_assign_type(V) when is_float(V) -> {double, V};
maybe_assign_type(V) -> maybe_assign_type(V) ->
V. V.
short_prepare_key(Str) when is_binary(Str) ->
true = size(Str) > 0,
Sha = crypto:hash(sha, Str),
%% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25
Hex = string:lowercase(binary:encode_hex(Sha)),
<<UniqueEnough:16/binary, _/binary>> = Hex,
binary_to_atom(<<"cassa_prepare_key:", UniqueEnough/binary>>).

View File

@ -301,17 +301,28 @@ send_message(Config, Payload) ->
query_resource(Config, Request) -> query_resource(Config, Request) ->
Name = ?config(cassa_name, Config), Name = ?config(cassa_name, Config),
BridgeType = ?config(cassa_bridge_type, Config), BridgeType = ?config(cassa_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). ConnectorResId = emqx_connector_resource:resource_id(
cassandra, <<"connector_emqx_bridge_cassandra_SUITE">>
),
emqx_resource:query(BridgeV2Id, Request, #{
timeout => 1_000, connector_resource_id => ConnectorResId
}).
query_resource_async(Config, Request) -> query_resource_async(Config, Request) ->
Name = ?config(cassa_name, Config), Name = ?config(cassa_name, Config),
BridgeType = ?config(cassa_bridge_type, Config), BridgeType = ?config(cassa_bridge_type, Config),
Ref = alias([reply]), Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name),
Return = emqx_resource:query(ResourceID, Request, #{ ConnectorResId = emqx_connector_resource:resource_id(
timeout => 500, async_reply_fun => {AsyncReplyFun, []} cassandra, <<"connector_emqx_bridge_cassandra_SUITE">>
),
Return = emqx_resource:query(BridgeV2Id, Request, #{
timeout => 500,
async_reply_fun => {AsyncReplyFun, []},
connector_resource_id => ConnectorResId,
query_mode => async
}), }),
{Return, Ref}. {Return, Ref}.

View File

@ -22,10 +22,6 @@
%% ./rebar3 ct --name 'test@127.0.0.1' -v --suite \ %% ./rebar3 ct --name 'test@127.0.0.1' -v --suite \
%% apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE %% apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE
%% Cassandra servers are defined at `.ci/docker-compose-file/docker-compose-cassandra.yaml`
%% You can change it to `127.0.0.1`, if you run this SUITE locally
-define(CASSANDRA_HOST, "cassandra").
-define(CASSANDRA_HOST_NOAUTH, "cassandra_noauth").
-define(CASSANDRA_RESOURCE_MOD, emqx_bridge_cassandra_connector). -define(CASSANDRA_RESOURCE_MOD, emqx_bridge_cassandra_connector).
%% Cassandra default username & password once enable `authenticator: PasswordAuthenticator` %% Cassandra default username & password once enable `authenticator: PasswordAuthenticator`

View File

@ -36,6 +36,8 @@ resource_type(mongodb) ->
emqx_bridge_mongodb_connector; emqx_bridge_mongodb_connector;
resource_type(influxdb) -> resource_type(influxdb) ->
emqx_bridge_influxdb_connector; emqx_bridge_influxdb_connector;
resource_type(cassandra) ->
emqx_bridge_cassandra_connector;
resource_type(mysql) -> resource_type(mysql) ->
emqx_bridge_mysql_connector; emqx_bridge_mysql_connector;
resource_type(pgsql) -> resource_type(pgsql) ->
@ -134,6 +136,14 @@ connector_structs() ->
required => false required => false
} }
)}, )},
{cassandra,
mk(
hoconsc:map(name, ref(emqx_bridge_cassandra, "config_connector")),
#{
desc => <<"Cassandra Connector Config">>,
required => false
}
)},
{mysql, {mysql,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")), hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")),
@ -217,6 +227,7 @@ schema_modules() ->
emqx_bridge_matrix, emqx_bridge_matrix,
emqx_bridge_mongodb, emqx_bridge_mongodb,
emqx_bridge_influxdb, emqx_bridge_influxdb,
emqx_bridge_cassandra,
emqx_bridge_mysql, emqx_bridge_mysql,
emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_connector,
emqx_bridge_syskeeper_proxy, emqx_bridge_syskeeper_proxy,
@ -247,6 +258,7 @@ api_schemas(Method) ->
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"), api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"),
api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method ++ "_connector"),
api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),

View File

@ -137,6 +137,8 @@ connector_type_to_bridge_types(mongodb) ->
[mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
connector_type_to_bridge_types(influxdb) -> connector_type_to_bridge_types(influxdb) ->
[influxdb, influxdb_api_v1, influxdb_api_v2]; [influxdb, influxdb_api_v1, influxdb_api_v2];
connector_type_to_bridge_types(cassandra) ->
[cassandra];
connector_type_to_bridge_types(mysql) -> connector_type_to_bridge_types(mysql) ->
[mysql]; [mysql];
connector_type_to_bridge_types(mqtt) -> connector_type_to_bridge_types(mqtt) ->

View File

@ -1,5 +1,15 @@
emqx_bridge_cassandra { emqx_bridge_cassandra {
action_parameters.desc:
"""Action specific configs."""
action_parameters.label:
"""Action"""
cassandra_action.desc:
"""Action configs."""
cassandra_action.label:
"""Action"""
config_enable.desc: config_enable.desc:
"""Enable or disable this bridge""" """Enable or disable this bridge"""

View File

@ -1,5 +1,11 @@
emqx_bridge_cassandra_connector { emqx_bridge_cassandra_connector {
config.desc:
"""Cassandra connection config"""
config.label:
"""Connection config"""
keyspace.desc: keyspace.desc:
"""Keyspace name to connect to.""" """Keyspace name to connect to."""