Merge pull request #12330 from emqx/bridge-v2-cassandra
refactor: split cassandra bridges to actions and connectors
This commit is contained in:
commit
aafb683ec7
|
@ -39,6 +39,10 @@ services:
|
|||
- 19042:9042
|
||||
# Cassandra TLS
|
||||
- 19142:9142
|
||||
# Cassandra No Auth
|
||||
- 19043:9043
|
||||
# Cassandra TLS No Auth
|
||||
- 19143:9143
|
||||
# S3
|
||||
- 19000:19000
|
||||
# S3 TLS
|
||||
|
|
|
@ -96,6 +96,18 @@
|
|||
"upstream": "cassandra:9142",
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "cassa_no_auth_tcp",
|
||||
"listen": "0.0.0.0:9043",
|
||||
"upstream": "cassandra_noauth:9042",
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "cassa_no_auth_tls",
|
||||
"listen": "0.0.0.0:9143",
|
||||
"upstream": "cassandra_noauth:9142",
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "sqlserver",
|
||||
"listen": "0.0.0.0:1433",
|
||||
|
|
|
@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() ->
|
|||
emqx_bridge_matrix_action_info,
|
||||
emqx_bridge_mongodb_action_info,
|
||||
emqx_bridge_influxdb_action_info,
|
||||
emqx_bridge_cassandra_action_info,
|
||||
emqx_bridge_mysql_action_info,
|
||||
emqx_bridge_pgsql_action_info,
|
||||
emqx_bridge_syskeeper_action_info,
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
{erl_opts, [debug_info]}.
|
||||
{deps, [
|
||||
{ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.5.2"}}},
|
||||
{ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.6.0"}}},
|
||||
{emqx_connector, {path, "../../apps/emqx_connector"}},
|
||||
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
||||
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_bridge_cassandra, [
|
||||
{description, "EMQX Enterprise Cassandra Bridge"},
|
||||
{vsn, "0.1.6"},
|
||||
{vsn, "0.2.0"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
@ -8,7 +8,7 @@
|
|||
emqx_resource,
|
||||
ecql
|
||||
]},
|
||||
{env, []},
|
||||
{env, [{emqx_action_info_modules, [emqx_bridge_cassandra_action_info]}]},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
]}.
|
||||
|
|
|
@ -12,11 +12,17 @@
|
|||
|
||||
%% schema examples
|
||||
-export([
|
||||
conn_bridge_examples/1,
|
||||
values/2,
|
||||
fields/2
|
||||
]).
|
||||
|
||||
%% Examples
|
||||
-export([
|
||||
bridge_v2_examples/1,
|
||||
conn_bridge_examples/1,
|
||||
connector_examples/1
|
||||
]).
|
||||
|
||||
%% schema
|
||||
-export([
|
||||
namespace/0,
|
||||
|
@ -26,10 +32,13 @@
|
|||
]).
|
||||
|
||||
-define(DEFAULT_CQL, <<
|
||||
"insert into mqtt_msg(topic, msgid, sender, qos, payload, arrived, retain) "
|
||||
"values (${topic}, ${id}, ${clientid}, ${qos}, ${payload}, ${timestamp}, ${flags.retain})"
|
||||
"insert into mqtt_msg(msgid, topic, qos, payload, arrived) "
|
||||
"values (${id}, ${topic}, ${qos}, ${payload}, ${timestamp})"
|
||||
>>).
|
||||
|
||||
-define(CONNECTOR_TYPE, cassandra).
|
||||
-define(ACTION_TYPE, cassandra).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% schema examples
|
||||
|
||||
|
@ -43,6 +52,41 @@ conn_bridge_examples(Method) ->
|
|||
}
|
||||
].
|
||||
|
||||
bridge_v2_examples(Method) ->
|
||||
ParamsExample = #{
|
||||
parameters => #{
|
||||
cql => ?DEFAULT_CQL
|
||||
}
|
||||
},
|
||||
[
|
||||
#{
|
||||
<<"cassandra">> => #{
|
||||
summary => <<"Cassandra Action">>,
|
||||
value => emqx_bridge_v2_schema:action_values(
|
||||
Method, cassandra, cassandra, ParamsExample
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
connector_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"cassandra">> => #{
|
||||
summary => <<"Cassandra Connector">>,
|
||||
value => emqx_connector_schema:connector_values(
|
||||
Method, cassandra, #{
|
||||
servers => <<"127.0.0.1:9042">>,
|
||||
keyspace => <<"mqtt">>,
|
||||
username => <<"root">>,
|
||||
password => <<"******">>,
|
||||
pool_size => 8
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
%% no difference in get/post/put method
|
||||
values(_Method, Type) ->
|
||||
#{
|
||||
|
@ -73,14 +117,47 @@ namespace() -> "bridge_cassa".
|
|||
|
||||
roots() -> [].
|
||||
|
||||
fields("config_connector") ->
|
||||
emqx_connector_schema:common_fields() ++
|
||||
emqx_bridge_cassandra_connector:fields("connector") ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
||||
fields(action) ->
|
||||
{cassandra,
|
||||
mk(
|
||||
hoconsc:map(name, ref(?MODULE, cassandra_action)),
|
||||
#{desc => <<"Cassandra Action Config">>, required => false}
|
||||
)};
|
||||
fields(cassandra_action) ->
|
||||
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||
mk(ref(?MODULE, action_parameters), #{
|
||||
required => true, desc => ?DESC(action_parameters)
|
||||
})
|
||||
);
|
||||
fields(action_parameters) ->
|
||||
[
|
||||
cql_field()
|
||||
];
|
||||
fields(connector_resource_opts) ->
|
||||
emqx_connector_schema:resource_opts_fields();
|
||||
fields(Field) when
|
||||
Field == "get_connector";
|
||||
Field == "put_connector";
|
||||
Field == "post_connector"
|
||||
->
|
||||
Fields =
|
||||
emqx_bridge_cassandra_connector:fields("connector") ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
|
||||
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
|
||||
fields(Field) when
|
||||
Field == "get_bridge_v2";
|
||||
Field == "post_bridge_v2";
|
||||
Field == "put_bridge_v2"
|
||||
->
|
||||
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(cassandra_action));
|
||||
fields("config") ->
|
||||
[
|
||||
cql_field(),
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{cql,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>}
|
||||
)},
|
||||
{local_topic,
|
||||
mk(
|
||||
binary(),
|
||||
|
@ -99,8 +176,23 @@ fields("get") ->
|
|||
fields("post", Type) ->
|
||||
[type_field(Type), name_field() | fields("config")].
|
||||
|
||||
cql_field() ->
|
||||
{cql,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>}
|
||||
)}.
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(cassandra_action) ->
|
||||
?DESC(cassandra_action);
|
||||
desc(action_parameters) ->
|
||||
?DESC(action_parameters);
|
||||
desc("config_connector") ->
|
||||
?DESC("desc_config");
|
||||
desc(connector_resource_opts) ->
|
||||
?DESC(emqx_resource_schema, "resource_opts");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for Cassandra using `", string:to_upper(Method), "` method."];
|
||||
desc(_) ->
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
-module(emqx_bridge_cassandra_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
||||
-export([
|
||||
bridge_v1_config_to_action_config/2,
|
||||
bridge_v1_config_to_connector_config/1,
|
||||
connector_action_config_to_bridge_v1_config/2,
|
||||
bridge_v1_type_name/0,
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0
|
||||
]).
|
||||
|
||||
-import(emqx_utils_conv, [bin/1]).
|
||||
|
||||
-define(SCHEMA_MODULE, emqx_bridge_cassandra).
|
||||
|
||||
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
||||
ActionTopLevelKeys = schema_keys(cassandra_action),
|
||||
ActionParametersKeys = schema_keys(action_parameters),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
|
||||
emqx_utils_maps:update_if_present(
|
||||
<<"resource_opts">>,
|
||||
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
|
||||
ActionConfig#{<<"connector">> => ConnectorName}
|
||||
).
|
||||
|
||||
bridge_v1_config_to_connector_config(BridgeV1Config) ->
|
||||
ActionTopLevelKeys = schema_keys(cassandra_action),
|
||||
ActionParametersKeys = schema_keys(action_parameters),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ConnectorTopLevelKeys = schema_keys("config_connector"),
|
||||
ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
|
||||
ConnConfig0 = maps:with(ConnectorKeys, BridgeV1Config),
|
||||
emqx_utils_maps:update_if_present(
|
||||
<<"resource_opts">>,
|
||||
fun emqx_connector_schema:project_to_connector_resource_opts/1,
|
||||
ConnConfig0
|
||||
).
|
||||
|
||||
connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) ->
|
||||
RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config(
|
||||
ConnectorRawConf, ActionRawConf
|
||||
),
|
||||
maps:without([<<"cassandra_type">>], RawConf).
|
||||
|
||||
bridge_v1_type_name() -> cassandra.
|
||||
|
||||
action_type_name() -> cassandra.
|
||||
|
||||
connector_type_name() -> cassandra.
|
||||
|
||||
schema_module() -> ?SCHEMA_MODULE.
|
||||
|
||||
make_config_map(PickKeys, IndentKeys, Config) ->
|
||||
Conf0 = maps:with(PickKeys, Config),
|
||||
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
|
||||
|
||||
schema_keys(Name) ->
|
||||
[bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].
|
|
@ -14,13 +14,17 @@
|
|||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
%% schema
|
||||
-export([roots/0, fields/1]).
|
||||
-export([roots/0, fields/1, desc/1]).
|
||||
|
||||
%% callbacks of behaviour emqx_resource
|
||||
-export([
|
||||
callback_mode/0,
|
||||
on_start/2,
|
||||
on_stop/2,
|
||||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channel_status/3,
|
||||
on_get_channels/1,
|
||||
on_query/3,
|
||||
on_query_async/4,
|
||||
on_batch_query/3,
|
||||
|
@ -28,6 +32,8 @@
|
|||
on_get_status/2
|
||||
]).
|
||||
|
||||
-export([transform_bridge_v1_config_to_connector_config/1]).
|
||||
|
||||
%% callbacks of ecpool
|
||||
-export([
|
||||
connect/1,
|
||||
|
@ -39,16 +45,10 @@
|
|||
|
||||
-export([do_get_status/1]).
|
||||
|
||||
-type prepares() :: #{atom() => binary()}.
|
||||
-type params_tokens() :: #{atom() => list()}.
|
||||
|
||||
-type state() ::
|
||||
#{
|
||||
pool_name := binary(),
|
||||
prepare_cql := prepares(),
|
||||
params_tokens := params_tokens(),
|
||||
%% returned by ecql:prepare/2
|
||||
prepare_statement := binary()
|
||||
channels := #{}
|
||||
}.
|
||||
|
||||
-define(DEFAULT_SERVER_OPTION, #{default_port => ?CASSANDRA_DEFAULT_PORT}).
|
||||
|
@ -62,7 +62,9 @@ roots() ->
|
|||
fields(config) ->
|
||||
cassandra_db_fields() ++
|
||||
emqx_connector_schema_lib:ssl_fields() ++
|
||||
emqx_connector_schema_lib:prepare_statement_fields().
|
||||
emqx_connector_schema_lib:prepare_statement_fields();
|
||||
fields("connector") ->
|
||||
cassandra_db_fields() ++ emqx_connector_schema_lib:ssl_fields().
|
||||
|
||||
cassandra_db_fields() ->
|
||||
[
|
||||
|
@ -83,6 +85,11 @@ keyspace(desc) -> ?DESC("keyspace");
|
|||
keyspace(required) -> true;
|
||||
keyspace(_) -> undefined.
|
||||
|
||||
desc(config) ->
|
||||
?DESC("config");
|
||||
desc("connector") ->
|
||||
?DESC("connector").
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% callbacks for emqx_resource
|
||||
|
||||
|
@ -130,10 +137,9 @@ on_start(
|
|||
false ->
|
||||
[]
|
||||
end,
|
||||
State = parse_prepare_cql(Config),
|
||||
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
|
||||
ok ->
|
||||
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
|
||||
{ok, #{pool_name => InstId, channels => #{}}};
|
||||
{error, Reason} ->
|
||||
?tp(
|
||||
cassandra_connector_start_failed,
|
||||
|
@ -149,23 +155,49 @@ on_stop(InstId, _State) ->
|
|||
}),
|
||||
emqx_resource_pool:stop(InstId).
|
||||
|
||||
on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) ->
|
||||
#{parameters := #{cql := CQL}} = ChannConf0,
|
||||
{PrepareCQL, ParamsTokens} = emqx_placeholder:preproc_sql(CQL, '?'),
|
||||
ParsedCql = #{
|
||||
prepare_key => make_prepare_key(ChannId),
|
||||
prepare_cql => PrepareCQL,
|
||||
params_tokens => ParamsTokens
|
||||
},
|
||||
NewChanns = Channs#{ChannId => #{parsed_cql => ParsedCql, prepare_result => not_prepared}},
|
||||
{ok, OldState#{channels => NewChanns}}.
|
||||
|
||||
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) ->
|
||||
NewState = State#{channels => maps:remove(ChannId, Channels)},
|
||||
{ok, NewState}.
|
||||
|
||||
on_get_channel_status(InstanceId, ChannId, #{channels := Channels, pool_name := PoolName} = State) ->
|
||||
case on_get_status(InstanceId, State) of
|
||||
connected ->
|
||||
#{parsed_cql := ParsedCql} = maps:get(ChannId, Channels),
|
||||
case prepare_cql_to_cassandra(ParsedCql, PoolName) of
|
||||
{ok, _} -> connected;
|
||||
{error, Reason} -> {connecting, Reason}
|
||||
end;
|
||||
_ ->
|
||||
connecting
|
||||
end.
|
||||
|
||||
on_get_channels(InstanceId) ->
|
||||
emqx_bridge_v2:get_channels_for_connector(InstanceId).
|
||||
|
||||
-type request() ::
|
||||
% emqx_bridge.erl
|
||||
{send_message, Params :: map()}
|
||||
{ChannId :: binary(), Params :: map()}
|
||||
% common query
|
||||
| {query, SQL :: binary()}
|
||||
| {query, SQL :: binary(), Params :: map()}.
|
||||
| {query, CQL :: binary()}
|
||||
| {query, CQL :: binary(), Params :: map()}.
|
||||
|
||||
-spec on_query(
|
||||
emqx_resource:resource_id(),
|
||||
request(),
|
||||
state()
|
||||
) -> ok | {ok, ecql:cql_result()} | {error, {recoverable_error | unrecoverable_error, term()}}.
|
||||
on_query(
|
||||
InstId,
|
||||
Request,
|
||||
State
|
||||
) ->
|
||||
on_query(InstId, Request, State) ->
|
||||
do_single_query(InstId, Request, sync, State).
|
||||
|
||||
-spec on_query_async(
|
||||
|
@ -174,21 +206,11 @@ on_query(
|
|||
{function(), list()},
|
||||
state()
|
||||
) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
|
||||
on_query_async(
|
||||
InstId,
|
||||
Request,
|
||||
Callback,
|
||||
State
|
||||
) ->
|
||||
on_query_async(InstId, Request, Callback, State) ->
|
||||
do_single_query(InstId, Request, {async, Callback}, State).
|
||||
|
||||
do_single_query(
|
||||
InstId,
|
||||
Request,
|
||||
Async,
|
||||
#{pool_name := PoolName} = State
|
||||
) ->
|
||||
{Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
|
||||
do_single_query(InstId, Request, Async, #{pool_name := PoolName} = State) ->
|
||||
{Type, PreparedKeyOrCQL, Params} = parse_request_to_cql(Request),
|
||||
?tp(
|
||||
debug,
|
||||
cassandra_connector_received_cql_query,
|
||||
|
@ -196,12 +218,12 @@ do_single_query(
|
|||
connector => InstId,
|
||||
type => Type,
|
||||
params => Params,
|
||||
prepared_key_or_cql => PreparedKeyOrSQL,
|
||||
prepared_key_or_cql => PreparedKeyOrCQL,
|
||||
state => State
|
||||
}
|
||||
),
|
||||
{PreparedKeyOrSQL1, Data} = proc_cql_params(Type, PreparedKeyOrSQL, Params, State),
|
||||
Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrSQL1, Data),
|
||||
{PreparedKeyOrCQL1, Data} = proc_cql_params(Type, PreparedKeyOrCQL, Params, State),
|
||||
Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrCQL1, Data),
|
||||
handle_result(Res).
|
||||
|
||||
-spec on_batch_query(
|
||||
|
@ -209,11 +231,7 @@ do_single_query(
|
|||
[request()],
|
||||
state()
|
||||
) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
|
||||
on_batch_query(
|
||||
InstId,
|
||||
Requests,
|
||||
State
|
||||
) ->
|
||||
on_batch_query(InstId, Requests, State) ->
|
||||
do_batch_query(InstId, Requests, sync, State).
|
||||
|
||||
-spec on_batch_query_async(
|
||||
|
@ -222,25 +240,15 @@ on_batch_query(
|
|||
{function(), list()},
|
||||
state()
|
||||
) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
|
||||
on_batch_query_async(
|
||||
InstId,
|
||||
Requests,
|
||||
Callback,
|
||||
State
|
||||
) ->
|
||||
on_batch_query_async(InstId, Requests, Callback, State) ->
|
||||
do_batch_query(InstId, Requests, {async, Callback}, State).
|
||||
|
||||
do_batch_query(
|
||||
InstId,
|
||||
Requests,
|
||||
Async,
|
||||
#{pool_name := PoolName} = State
|
||||
) ->
|
||||
do_batch_query(InstId, Requests, Async, #{pool_name := PoolName} = State) ->
|
||||
CQLs =
|
||||
lists:map(
|
||||
fun(Request) ->
|
||||
{Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
|
||||
proc_cql_params(Type, PreparedKeyOrSQL, Params, State)
|
||||
{Type, PreparedKeyOrCQL, Params} = parse_request_to_cql(Request),
|
||||
proc_cql_params(Type, PreparedKeyOrCQL, Params, State)
|
||||
end,
|
||||
Requests
|
||||
),
|
||||
|
@ -256,26 +264,24 @@ do_batch_query(
|
|||
Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs),
|
||||
handle_result(Res).
|
||||
|
||||
parse_request_to_cql({send_message, Params}) ->
|
||||
{prepared_query, _Key = send_message, Params};
|
||||
parse_request_to_cql({query, SQL}) ->
|
||||
parse_request_to_cql({query, SQL, #{}});
|
||||
parse_request_to_cql({query, SQL, Params}) ->
|
||||
{query, SQL, Params}.
|
||||
parse_request_to_cql({query, CQL}) ->
|
||||
{query, CQL, #{}};
|
||||
parse_request_to_cql({query, CQL, Params}) ->
|
||||
{query, CQL, Params};
|
||||
parse_request_to_cql({ChannId, Params}) ->
|
||||
{prepared_query, ChannId, Params}.
|
||||
|
||||
proc_cql_params(
|
||||
prepared_query,
|
||||
PreparedKey0,
|
||||
Params,
|
||||
#{prepare_statement := Prepares, params_tokens := ParamsTokens}
|
||||
) ->
|
||||
%% assert
|
||||
_PreparedKey = maps:get(PreparedKey0, Prepares),
|
||||
Tokens = maps:get(PreparedKey0, ParamsTokens),
|
||||
{PreparedKey0, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))};
|
||||
proc_cql_params(query, SQL, Params, _State) ->
|
||||
{SQL1, Tokens} = emqx_placeholder:preproc_sql(SQL, '?'),
|
||||
{SQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}.
|
||||
proc_cql_params(prepared_query, ChannId, Params, #{channels := Channs}) ->
|
||||
#{
|
||||
parsed_cql := #{
|
||||
prepare_key := PrepareKey,
|
||||
params_tokens := ParamsTokens
|
||||
}
|
||||
} = maps:get(ChannId, Channs),
|
||||
{PrepareKey, assign_type_for_params(emqx_placeholder:proc_sql(ParamsTokens, Params))};
|
||||
proc_cql_params(query, CQL, Params, _State) ->
|
||||
{CQL1, Tokens} = emqx_placeholder:preproc_sql(CQL, '?'),
|
||||
{CQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}.
|
||||
|
||||
exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
|
||||
Type == query; Type == prepared_query
|
||||
|
@ -314,38 +320,15 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
|
|||
exec(PoolName, Query) ->
|
||||
ecpool:pick_and_do(PoolName, Query, no_handover).
|
||||
|
||||
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
|
||||
on_get_status(_InstId, #{pool_name := PoolName}) ->
|
||||
case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
|
||||
true ->
|
||||
case do_check_prepares(State) of
|
||||
ok ->
|
||||
connected;
|
||||
{ok, NState} ->
|
||||
%% return new state with prepared statements
|
||||
{connected, NState};
|
||||
false ->
|
||||
%% do not log error, it is logged in prepare_cql_to_conn
|
||||
connecting
|
||||
end;
|
||||
false ->
|
||||
connecting
|
||||
true -> connected;
|
||||
false -> connecting
|
||||
end.
|
||||
|
||||
do_get_status(Conn) ->
|
||||
ok == element(1, ecql:query(Conn, "SELECT cluster_name FROM system.local")).
|
||||
|
||||
do_check_prepares(#{prepare_cql := Prepares}) when is_map(Prepares) ->
|
||||
ok;
|
||||
do_check_prepares(State = #{pool_name := PoolName, prepare_cql := {error, Prepares}}) ->
|
||||
%% retry to prepare
|
||||
case prepare_cql(Prepares, PoolName) of
|
||||
{ok, Sts} ->
|
||||
%% remove the error
|
||||
{ok, State#{prepare_cql => Prepares, prepare_statement := Sts}};
|
||||
_Error ->
|
||||
false
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% callbacks query
|
||||
|
||||
|
@ -394,88 +377,50 @@ conn_opts([Opt | Opts], Acc) ->
|
|||
|
||||
%%--------------------------------------------------------------------
|
||||
%% prepare
|
||||
|
||||
%% XXX: hardcode
|
||||
%% note: the `cql` param is passed by emqx_bridge_cassandra
|
||||
parse_prepare_cql(#{cql := SQL}) ->
|
||||
parse_prepare_cql([{send_message, SQL}], #{}, #{});
|
||||
parse_prepare_cql(_) ->
|
||||
#{prepare_cql => #{}, params_tokens => #{}}.
|
||||
|
||||
parse_prepare_cql([{Key, H} | T], Prepares, Tokens) ->
|
||||
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '?'),
|
||||
parse_prepare_cql(
|
||||
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
|
||||
);
|
||||
parse_prepare_cql([], Prepares, Tokens) ->
|
||||
#{
|
||||
prepare_cql => Prepares,
|
||||
params_tokens => Tokens
|
||||
}.
|
||||
|
||||
init_prepare(State = #{prepare_cql := Prepares, pool_name := PoolName}) ->
|
||||
case maps:size(Prepares) of
|
||||
0 ->
|
||||
State;
|
||||
_ ->
|
||||
case prepare_cql(Prepares, PoolName) of
|
||||
{ok, Sts} ->
|
||||
State#{prepare_statement := Sts};
|
||||
Error ->
|
||||
?tp(
|
||||
error,
|
||||
cassandra_prepare_cql_failed,
|
||||
#{prepares => Prepares, reason => Error}
|
||||
),
|
||||
%% mark the prepare_cql as failed
|
||||
State#{prepare_cql => {error, Prepares}}
|
||||
end
|
||||
end.
|
||||
|
||||
prepare_cql(Prepares, PoolName) when is_map(Prepares) ->
|
||||
prepare_cql(maps:to_list(Prepares), PoolName);
|
||||
prepare_cql(Prepares, PoolName) ->
|
||||
case do_prepare_cql(Prepares, PoolName) of
|
||||
{ok, _Sts} = Ok ->
|
||||
prepare_cql_to_cassandra(ParsedCql, PoolName) ->
|
||||
case prepare_cql_to_cassandra(ecpool:workers(PoolName), ParsedCql, #{}) of
|
||||
{ok, Statement} ->
|
||||
%% prepare for reconnect
|
||||
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_cql_to_conn, [Prepares]}),
|
||||
Ok;
|
||||
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_cql_to_conn, [ParsedCql]}),
|
||||
{ok, Statement};
|
||||
Error ->
|
||||
?tp(
|
||||
error,
|
||||
cassandra_prepare_cql_failed,
|
||||
#{parsed_cql => ParsedCql, reason => Error}
|
||||
),
|
||||
Error
|
||||
end.
|
||||
|
||||
do_prepare_cql(Prepares, PoolName) ->
|
||||
do_prepare_cql(ecpool:workers(PoolName), Prepares, #{}).
|
||||
|
||||
do_prepare_cql([{_Name, Worker} | T], Prepares, _LastSts) ->
|
||||
prepare_cql_to_cassandra([{_Name, Worker} | T], ParsedCql, _LastSts) ->
|
||||
{ok, Conn} = ecpool_worker:client(Worker),
|
||||
case prepare_cql_to_conn(Conn, Prepares) of
|
||||
{ok, Sts} ->
|
||||
do_prepare_cql(T, Prepares, Sts);
|
||||
case prepare_cql_to_conn(Conn, ParsedCql) of
|
||||
{ok, Statement} ->
|
||||
prepare_cql_to_cassandra(T, ParsedCql, Statement);
|
||||
Error ->
|
||||
Error
|
||||
end;
|
||||
do_prepare_cql([], _Prepares, LastSts) ->
|
||||
prepare_cql_to_cassandra([], _ParsedCql, LastSts) ->
|
||||
{ok, LastSts}.
|
||||
|
||||
prepare_cql_to_conn(Conn, Prepares) ->
|
||||
prepare_cql_to_conn(Conn, Prepares, #{}).
|
||||
|
||||
prepare_cql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
|
||||
prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
|
||||
?SLOG(info, #{msg => "cassandra_prepare_cql", name => Key, prepare_cql => SQL}),
|
||||
case ecql:prepare(Conn, Key, SQL) of
|
||||
prepare_cql_to_conn(Conn, #{prepare_key := PrepareKey, prepare_cql := PrepareCQL}) when
|
||||
is_pid(Conn)
|
||||
->
|
||||
?SLOG(info, #{
|
||||
msg => "cassandra_prepare_cql", prepare_key => PrepareKey, prepare_cql => PrepareCQL
|
||||
}),
|
||||
case ecql:prepare(Conn, PrepareKey, PrepareCQL) of
|
||||
{ok, Statement} ->
|
||||
prepare_cql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
|
||||
{error, Error} = Other ->
|
||||
{ok, Statement};
|
||||
{error, Reason} = Error ->
|
||||
?SLOG(error, #{
|
||||
msg => "cassandra_prepare_cql_failed",
|
||||
worker_pid => Conn,
|
||||
name => Key,
|
||||
prepare_cql => SQL,
|
||||
error => Error
|
||||
name => PrepareKey,
|
||||
prepare_cql => PrepareCQL,
|
||||
reason => Reason
|
||||
}),
|
||||
Other
|
||||
Error
|
||||
end.
|
||||
|
||||
handle_result({error, disconnected}) ->
|
||||
|
@ -487,6 +432,9 @@ handle_result({error, Error}) ->
|
|||
handle_result(Res) ->
|
||||
Res.
|
||||
|
||||
transform_bridge_v1_config_to_connector_config(_) ->
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% utils
|
||||
|
||||
|
@ -513,3 +461,6 @@ maybe_assign_type(V) when is_integer(V) ->
|
|||
maybe_assign_type(V) when is_float(V) -> {double, V};
|
||||
maybe_assign_type(V) ->
|
||||
V.
|
||||
|
||||
make_prepare_key(ChannId) ->
|
||||
ChannId.
|
||||
|
|
|
@ -11,6 +11,14 @@
|
|||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
%% To run this test locally:
|
||||
%% ./scripts/ct/run.sh --app apps/emqx_bridge_cassandra --only-up
|
||||
%% PROFILE=emqx-enterprise PROXY_HOST=localhost CASSA_TLS_HOST=localhost \
|
||||
%% CASSA_TLS_PORT=19142 CASSA_TCP_HOST=localhost CASSA_TCP_NO_AUTH_HOST=localhost \
|
||||
%% CASSA_TCP_PORT=19042 CASSA_TCP_NO_AUTH_PORT=19043 \
|
||||
%% ./rebar3 ct --name 'test@127.0.0.1' -v --suite \
|
||||
%% apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE
|
||||
|
||||
% SQL definitions
|
||||
-define(SQL_BRIDGE,
|
||||
"insert into mqtt_msg_test(topic, payload, arrived) "
|
||||
|
@ -293,17 +301,24 @@ send_message(Config, Payload) ->
|
|||
query_resource(Config, Request) ->
|
||||
Name = ?config(cassa_name, Config),
|
||||
BridgeType = ?config(cassa_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
|
||||
BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name),
|
||||
ConnectorResId = emqx_connector_resource:resource_id(BridgeType, Name),
|
||||
emqx_resource:query(BridgeV2Id, Request, #{
|
||||
timeout => 1_000, connector_resource_id => ConnectorResId
|
||||
}).
|
||||
|
||||
query_resource_async(Config, Request) ->
|
||||
Name = ?config(cassa_name, Config),
|
||||
BridgeType = ?config(cassa_bridge_type, Config),
|
||||
Ref = alias([reply]),
|
||||
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
Return = emqx_resource:query(ResourceID, Request, #{
|
||||
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
|
||||
BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name),
|
||||
ConnectorResId = emqx_connector_resource:resource_id(BridgeType, Name),
|
||||
Return = emqx_resource:query(BridgeV2Id, Request, #{
|
||||
timeout => 500,
|
||||
async_reply_fun => {AsyncReplyFun, []},
|
||||
connector_resource_id => ConnectorResId,
|
||||
query_mode => async
|
||||
}),
|
||||
{Return, Ref}.
|
||||
|
||||
|
|
|
@ -14,19 +14,15 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
|
||||
%% Cassandra servers are defined at `.ci/docker-compose-file/docker-compose-cassandra.yaml`
|
||||
%% You can change it to `127.0.0.1`, if you run this SUITE locally
|
||||
-define(CASSANDRA_HOST, "cassandra").
|
||||
-define(CASSANDRA_HOST_NOAUTH, "cassandra_noauth").
|
||||
-define(CASSANDRA_RESOURCE_MOD, emqx_bridge_cassandra_connector).
|
||||
%% To run this test locally:
|
||||
%% ./scripts/ct/run.sh --app apps/emqx_bridge_cassandra --only-up
|
||||
%% PROFILE=emqx-enterprise PROXY_HOST=localhost CASSA_TLS_HOST=localhost \
|
||||
%% CASSA_TLS_PORT=9142 CASSA_TCP_HOST=localhost CASSA_TCP_NO_AUTH_HOST=localhost \
|
||||
%% CASSA_TCP_PORT=19042 CASSA_TCP_NO_AUTH_PORT=19043 \
|
||||
%% ./rebar3 ct --name 'test@127.0.0.1' -v --suite \
|
||||
%% apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE
|
||||
|
||||
%% This test SUITE requires a running cassandra instance. If you don't want to
|
||||
%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script
|
||||
%% you can create a cassandra instance with the following command (execute it
|
||||
%% from root of the EMQX directory.). You also need to set ?CASSANDRA_HOST and
|
||||
%% ?CASSANDRA_PORT to appropriate values.
|
||||
%%
|
||||
%% sudo docker run --rm -d --name cassandra --network host cassandra:3.11.14
|
||||
-define(CASSANDRA_RESOURCE_MOD, emqx_bridge_cassandra_connector).
|
||||
|
||||
%% Cassandra default username & password once enable `authenticator: PasswordAuthenticator`
|
||||
%% in cassandra config
|
||||
|
@ -45,14 +41,14 @@ groups() ->
|
|||
{noauth, [t_lifecycle]}
|
||||
].
|
||||
|
||||
cassandra_servers(CassandraHost) ->
|
||||
cassandra_servers(CassandraHost, CassandraPort) ->
|
||||
lists:map(
|
||||
fun(#{hostname := Host, port := Port}) ->
|
||||
{Host, Port}
|
||||
end,
|
||||
emqx_schema:parse_servers(
|
||||
iolist_to_binary([CassandraHost, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]),
|
||||
#{default_port => ?CASSANDRA_DEFAULT_PORT}
|
||||
iolist_to_binary([CassandraHost, ":", erlang:integer_to_list(CassandraPort)]),
|
||||
#{default_port => CassandraPort}
|
||||
)
|
||||
).
|
||||
|
||||
|
@ -63,25 +59,30 @@ init_per_suite(Config) ->
|
|||
Config.
|
||||
|
||||
init_per_group(Group, Config) ->
|
||||
{CassandraHost, AuthOpts} =
|
||||
{CassandraHost, CassandraPort, AuthOpts} =
|
||||
case Group of
|
||||
auth ->
|
||||
{?CASSANDRA_HOST, [{username, ?CASSA_USERNAME}, {password, ?CASSA_PASSWORD}]};
|
||||
TcpHost = os:getenv("CASSA_TCP_HOST", "toxiproxy"),
|
||||
TcpPort = list_to_integer(os:getenv("CASSA_TCP_PORT", "9042")),
|
||||
{TcpHost, TcpPort, [{username, ?CASSA_USERNAME}, {password, ?CASSA_PASSWORD}]};
|
||||
noauth ->
|
||||
{?CASSANDRA_HOST_NOAUTH, []}
|
||||
TcpHost = os:getenv("CASSA_TCP_NO_AUTH_HOST", "toxiproxy"),
|
||||
TcpPort = list_to_integer(os:getenv("CASSA_TCP_NO_AUTH_PORT", "9043")),
|
||||
{TcpHost, TcpPort, []}
|
||||
end,
|
||||
case emqx_common_test_helpers:is_tcp_server_available(CassandraHost, ?CASSANDRA_DEFAULT_PORT) of
|
||||
case emqx_common_test_helpers:is_tcp_server_available(CassandraHost, CassandraPort) of
|
||||
true ->
|
||||
%% keyspace `mqtt` must be created in advance
|
||||
{ok, Conn} =
|
||||
ecql:connect([
|
||||
{nodes, cassandra_servers(CassandraHost)},
|
||||
{nodes, cassandra_servers(CassandraHost, CassandraPort)},
|
||||
{keyspace, "mqtt"}
|
||||
| AuthOpts
|
||||
]),
|
||||
ecql:close(Conn),
|
||||
[
|
||||
{cassa_host, CassandraHost},
|
||||
{cassa_port, CassandraPort},
|
||||
{cassa_auth_opts, AuthOpts}
|
||||
| Config
|
||||
];
|
||||
|
@ -212,6 +213,7 @@ create_local_resource(ResourceId, CheckedConfig) ->
|
|||
|
||||
cassandra_config(Config) ->
|
||||
Host = ?config(cassa_host, Config),
|
||||
Port = ?config(cassa_port, Config),
|
||||
AuthOpts = maps:from_list(?config(cassa_auth_opts, Config)),
|
||||
CassConfig =
|
||||
AuthOpts#{
|
||||
|
@ -223,7 +225,7 @@ cassandra_config(Config) ->
|
|||
"~s:~b",
|
||||
[
|
||||
Host,
|
||||
?CASSANDRA_DEFAULT_PORT
|
||||
Port
|
||||
]
|
||||
)
|
||||
)
|
||||
|
|
|
@ -36,6 +36,8 @@ resource_type(mongodb) ->
|
|||
emqx_bridge_mongodb_connector;
|
||||
resource_type(influxdb) ->
|
||||
emqx_bridge_influxdb_connector;
|
||||
resource_type(cassandra) ->
|
||||
emqx_bridge_cassandra_connector;
|
||||
resource_type(mysql) ->
|
||||
emqx_bridge_mysql_connector;
|
||||
resource_type(pgsql) ->
|
||||
|
@ -134,6 +136,14 @@ connector_structs() ->
|
|||
required => false
|
||||
}
|
||||
)},
|
||||
{cassandra,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_cassandra, "config_connector")),
|
||||
#{
|
||||
desc => <<"Cassandra Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{mysql,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")),
|
||||
|
@ -217,6 +227,7 @@ schema_modules() ->
|
|||
emqx_bridge_matrix,
|
||||
emqx_bridge_mongodb,
|
||||
emqx_bridge_influxdb,
|
||||
emqx_bridge_cassandra,
|
||||
emqx_bridge_mysql,
|
||||
emqx_bridge_syskeeper_connector,
|
||||
emqx_bridge_syskeeper_proxy,
|
||||
|
@ -247,6 +258,7 @@ api_schemas(Method) ->
|
|||
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
|
||||
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
|
||||
|
|
|
@ -137,6 +137,8 @@ connector_type_to_bridge_types(mongodb) ->
|
|||
[mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
|
||||
connector_type_to_bridge_types(influxdb) ->
|
||||
[influxdb, influxdb_api_v1, influxdb_api_v2];
|
||||
connector_type_to_bridge_types(cassandra) ->
|
||||
[cassandra];
|
||||
connector_type_to_bridge_types(mysql) ->
|
||||
[mysql];
|
||||
connector_type_to_bridge_types(mqtt) ->
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
The bridges for Cassandra have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API.
|
|
@ -1,5 +1,15 @@
|
|||
emqx_bridge_cassandra {
|
||||
|
||||
action_parameters.desc:
|
||||
"""Action specific configs."""
|
||||
action_parameters.label:
|
||||
"""Action"""
|
||||
|
||||
cassandra_action.desc:
|
||||
"""Action configs."""
|
||||
cassandra_action.label:
|
||||
"""Action"""
|
||||
|
||||
config_enable.desc:
|
||||
"""Enable or disable this bridge"""
|
||||
|
||||
|
|
|
@ -1,5 +1,11 @@
|
|||
emqx_bridge_cassandra_connector {
|
||||
|
||||
config.desc:
|
||||
"""Cassandra connection config"""
|
||||
|
||||
config.label:
|
||||
"""Connection config"""
|
||||
|
||||
keyspace.desc:
|
||||
"""Keyspace name to connect to."""
|
||||
|
||||
|
|
Loading…
Reference in New Issue