WIP
This commit is contained in:
parent
fb231ad3a7
commit
57df0702e2
|
@ -0,0 +1,468 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_connector_cass).
|
||||
|
||||
-include("emqx_connector.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
%% FIXME
|
||||
%-include_lib("epgsql/include/epgsql.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-export([roots/0, fields/1]).
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
|
||||
%% callbacks of behaviour emqx_resource
|
||||
-export([
|
||||
callback_mode/0,
|
||||
on_start/2,
|
||||
on_stop/2,
|
||||
on_query/3,
|
||||
on_batch_query/3,
|
||||
on_get_status/2
|
||||
]).
|
||||
|
||||
-export([connect/1]).
|
||||
|
||||
-export([
|
||||
query/3,
|
||||
prepared_query/3,
|
||||
execute_batch/3
|
||||
]).
|
||||
|
||||
-export([do_get_status/1]).
|
||||
|
||||
-define(CASSA_HOST_OPTIONS, #{
|
||||
default_port => 9042
|
||||
}).
|
||||
|
||||
-type prepares() :: #{atom() => binary()}.
|
||||
-type params_tokens() :: #{atom() => list()}.
|
||||
|
||||
-type state() ::
|
||||
#{
|
||||
poolname := atom(),
|
||||
prepare_sql := prepares(),
|
||||
params_tokens := params_tokens(),
|
||||
prepare_statement := epgsql:statement()
|
||||
}.
|
||||
|
||||
%%====================================================================
|
||||
|
||||
roots() ->
|
||||
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
||||
|
||||
fields(config) ->
|
||||
cassandra_db_fields() ++
|
||||
emqx_connector_schema_lib:ssl_fields() ++
|
||||
emqx_connector_schema_lib:prepare_statement_fields().
|
||||
|
||||
cassandra_db_fields() ->
|
||||
[
|
||||
{nodes, fun nodes/1},
|
||||
{keyspace, fun keyspace/1},
|
||||
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
|
||||
{username, fun emqx_connector_schema_lib:username/1},
|
||||
{password, fun emqx_connector_schema_lib:password/1},
|
||||
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
|
||||
].
|
||||
|
||||
nodes(type) -> binary();
|
||||
nodes(desc) -> ?DESC("nodes");
|
||||
nodes(required) -> true;
|
||||
nodes(converter) -> fun convert_nodes/2;
|
||||
nodes(_) -> undefined.
|
||||
|
||||
convert_nodes(Nodes0) when is_binary(Nodes0) ->
|
||||
Nodes = binary_to_list(Nodes0),
|
||||
[
|
||||
begin
|
||||
[Host, Port] = string:tokens(S, ":"),
|
||||
{Host, list_to_integer(Port)}
|
||||
end
|
||||
|| S <- string:tokens(Nodes, ",")
|
||||
].
|
||||
|
||||
keyspace(type) -> binary();
|
||||
keyspace(desc) -> ?DESC("keyspace");
|
||||
keyspace(required) -> true;
|
||||
keyspace(_) -> undefined.
|
||||
|
||||
%%====================================================================
|
||||
|
||||
callback_mode() -> always_sync.
|
||||
|
||||
-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
|
||||
on_start(
|
||||
InstId,
|
||||
#{
|
||||
nodes := Nodes,
|
||||
keyspace := Keyspace,
|
||||
username := Username,
|
||||
pool_size := PoolSize,
|
||||
ssl := SSL
|
||||
} = Config
|
||||
) ->
|
||||
{ok, _} = application:ensure_all_started(ecpool),
|
||||
{ok, _} = application:ensure_all_started(ecql),
|
||||
|
||||
?SLOG(info, #{
|
||||
msg => "starting_cassandra_connector",
|
||||
connector => InstId,
|
||||
config => emqx_misc:redact(Config)
|
||||
}),
|
||||
|
||||
Options = [
|
||||
{nodes, Nodes},
|
||||
{username, Username},
|
||||
{password, emqx_secret:wrap(maps:get(password, Config, ""))},
|
||||
{keyspace, Keyspace},
|
||||
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
|
||||
{pool_size, PoolSize}
|
||||
],
|
||||
|
||||
SslOpts =
|
||||
case maps:get(enable, SSL) of
|
||||
true ->
|
||||
[
|
||||
%% note: this is converted to `required' in
|
||||
%% `conn_opts/2', and there's a boolean guard
|
||||
%% there; if this is set to `required' here,
|
||||
%% that'll require changing `conn_opts/2''s guard.
|
||||
{ssl, true},
|
||||
{ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
|
||||
];
|
||||
false ->
|
||||
[{ssl, false}]
|
||||
end,
|
||||
|
||||
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
||||
Prepares = parse_prepare_sql(Config),
|
||||
InitState = #{poolname => PoolName, prepare_statement => #{}},
|
||||
State = maps:merge(InitState, Prepares),
|
||||
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
|
||||
ok ->
|
||||
{ok, init_prepare(State)};
|
||||
{error, Reason} ->
|
||||
?tp(
|
||||
cassandra_connector_start_failed,
|
||||
#{error => Reason}
|
||||
),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
on_stop(InstId, #{poolname := PoolName}) ->
|
||||
?SLOG(info, #{
|
||||
msg => "stopping_cassandra_connector",
|
||||
connector => InstId
|
||||
}),
|
||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||
|
||||
on_query(InstId, {TypeOrKey, NameOrSQL}, #{poolname := _PoolName} = State) ->
|
||||
on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
|
||||
on_query(
|
||||
InstId,
|
||||
{TypeOrKey, NameOrSQL, Params},
|
||||
#{poolname := PoolName} = State
|
||||
) ->
|
||||
?SLOG(debug, #{
|
||||
msg => "postgresql connector received sql query",
|
||||
connector => InstId,
|
||||
type => TypeOrKey,
|
||||
sql => NameOrSQL,
|
||||
state => State
|
||||
}),
|
||||
Type = pgsql_query_type(TypeOrKey),
|
||||
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
|
||||
Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data),
|
||||
handle_result(Res).
|
||||
|
||||
pgsql_query_type(sql) ->
|
||||
query;
|
||||
pgsql_query_type(query) ->
|
||||
query;
|
||||
pgsql_query_type(prepared_query) ->
|
||||
prepared_query;
|
||||
%% for bridge
|
||||
pgsql_query_type(_) ->
|
||||
pgsql_query_type(prepared_query).
|
||||
|
||||
on_batch_query(
|
||||
InstId,
|
||||
BatchReq,
|
||||
#{poolname := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
|
||||
) ->
|
||||
case BatchReq of
|
||||
[{Key, _} = Request | _] ->
|
||||
BinKey = to_bin(Key),
|
||||
case maps:get(BinKey, Tokens, undefined) of
|
||||
undefined ->
|
||||
Log = #{
|
||||
connector => InstId,
|
||||
first_request => Request,
|
||||
state => State,
|
||||
msg => "batch prepare not implemented"
|
||||
},
|
||||
?SLOG(error, Log),
|
||||
{error, {unrecoverable_error, batch_prepare_not_implemented}};
|
||||
TokenList ->
|
||||
{_, Datas} = lists:unzip(BatchReq),
|
||||
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
|
||||
St = maps:get(BinKey, Sts),
|
||||
case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of
|
||||
{error, _Error} = Result ->
|
||||
handle_result(Result);
|
||||
{_Column, Results} ->
|
||||
handle_batch_result(Results, 0)
|
||||
end
|
||||
end;
|
||||
_ ->
|
||||
Log = #{
|
||||
connector => InstId,
|
||||
request => BatchReq,
|
||||
state => State,
|
||||
msg => "invalid request"
|
||||
},
|
||||
?SLOG(error, Log),
|
||||
{error, {unrecoverable_error, invalid_request}}
|
||||
end.
|
||||
|
||||
proc_sql_params(query, SQLOrKey, Params, _State) ->
|
||||
{SQLOrKey, Params};
|
||||
proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
|
||||
{SQLOrKey, Params};
|
||||
proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) ->
|
||||
Key = to_bin(TypeOrKey),
|
||||
case maps:get(Key, ParamsTokens, undefined) of
|
||||
undefined ->
|
||||
{SQLOrData, Params};
|
||||
Tokens ->
|
||||
{Key, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
|
||||
end.
|
||||
|
||||
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
|
||||
try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
|
||||
{error, Reason} = Result ->
|
||||
?tp(
|
||||
pgsql_connector_query_return,
|
||||
#{error => Reason}
|
||||
),
|
||||
?SLOG(error, #{
|
||||
msg => "postgresql connector do sql query failed",
|
||||
connector => InstId,
|
||||
type => Type,
|
||||
sql => NameOrSQL,
|
||||
reason => Reason
|
||||
}),
|
||||
Result;
|
||||
Result ->
|
||||
?tp(
|
||||
pgsql_connector_query_return,
|
||||
#{result => Result}
|
||||
),
|
||||
Result
|
||||
catch
|
||||
error:function_clause:Stacktrace ->
|
||||
?SLOG(error, #{
|
||||
msg => "postgresql connector do sql query failed",
|
||||
connector => InstId,
|
||||
type => Type,
|
||||
sql => NameOrSQL,
|
||||
reason => function_clause,
|
||||
stacktrace => Stacktrace
|
||||
}),
|
||||
{error, {unrecoverable_error, invalid_request}}
|
||||
end.
|
||||
|
||||
on_get_status(_InstId, #{poolname := Pool} = State) ->
|
||||
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
|
||||
true ->
|
||||
case do_check_prepares(State) of
|
||||
ok ->
|
||||
connected;
|
||||
{ok, NState} ->
|
||||
%% return new state with prepared statements
|
||||
{connected, NState};
|
||||
false ->
|
||||
%% do not log error, it is logged in prepare_sql_to_conn
|
||||
connecting
|
||||
end;
|
||||
false ->
|
||||
connecting
|
||||
end.
|
||||
|
||||
do_get_status(Conn) ->
|
||||
ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
|
||||
|
||||
do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
|
||||
ok;
|
||||
do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepares}}) ->
|
||||
%% retry to prepare
|
||||
case prepare_sql(Prepares, PoolName) of
|
||||
{ok, Sts} ->
|
||||
%% remove the error
|
||||
{ok, State#{prepare_sql => Prepares, prepare_statement := Sts}};
|
||||
_Error ->
|
||||
false
|
||||
end.
|
||||
|
||||
%% ===================================================================
|
||||
|
||||
connect(Opts) ->
|
||||
Host = proplists:get_value(host, Opts),
|
||||
Username = proplists:get_value(username, Opts),
|
||||
Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
|
||||
case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
|
||||
{ok, _Conn} = Ok ->
|
||||
Ok;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
query(Conn, SQL, Params) ->
|
||||
epgsql:equery(Conn, SQL, Params).
|
||||
|
||||
prepared_query(Conn, Name, Params) ->
|
||||
epgsql:prepared_query2(Conn, Name, Params).
|
||||
|
||||
execute_batch(Conn, Statement, Params) ->
|
||||
epgsql:execute_batch(Conn, Statement, Params).
|
||||
|
||||
conn_opts(Opts) ->
|
||||
conn_opts(Opts, []).
|
||||
conn_opts([], Acc) ->
|
||||
Acc;
|
||||
conn_opts([Opt = {database, _} | Opts], Acc) ->
|
||||
conn_opts(Opts, [Opt | Acc]);
|
||||
conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
|
||||
Flag =
|
||||
case Bool of
|
||||
true -> required;
|
||||
false -> false
|
||||
end,
|
||||
conn_opts(Opts, [{ssl, Flag} | Acc]);
|
||||
conn_opts([Opt = {port, _} | Opts], Acc) ->
|
||||
conn_opts(Opts, [Opt | Acc]);
|
||||
conn_opts([Opt = {timeout, _} | Opts], Acc) ->
|
||||
conn_opts(Opts, [Opt | Acc]);
|
||||
conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
|
||||
conn_opts(Opts, [Opt | Acc]);
|
||||
conn_opts([_Opt | Opts], Acc) ->
|
||||
conn_opts(Opts, Acc).
|
||||
|
||||
parse_prepare_sql(Config) ->
|
||||
SQL =
|
||||
case maps:get(prepare_statement, Config, undefined) of
|
||||
undefined ->
|
||||
case maps:get(sql, Config, undefined) of
|
||||
undefined -> #{};
|
||||
Template -> #{<<"send_message">> => Template}
|
||||
end;
|
||||
Any ->
|
||||
Any
|
||||
end,
|
||||
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
|
||||
|
||||
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
|
||||
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '$n'),
|
||||
parse_prepare_sql(
|
||||
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
|
||||
);
|
||||
parse_prepare_sql([], Prepares, Tokens) ->
|
||||
#{
|
||||
prepare_sql => Prepares,
|
||||
params_tokens => Tokens
|
||||
}.
|
||||
|
||||
init_prepare(State = #{prepare_sql := Prepares, poolname := PoolName}) ->
|
||||
case maps:size(Prepares) of
|
||||
0 ->
|
||||
State;
|
||||
_ ->
|
||||
case prepare_sql(Prepares, PoolName) of
|
||||
{ok, Sts} ->
|
||||
State#{prepare_statement := Sts};
|
||||
Error ->
|
||||
LogMeta = #{
|
||||
msg => <<"PostgreSQL init prepare statement failed">>, error => Error
|
||||
},
|
||||
?SLOG(error, LogMeta),
|
||||
%% mark the prepare_sqlas failed
|
||||
State#{prepare_sql => {error, Prepares}}
|
||||
end
|
||||
end.
|
||||
|
||||
prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
|
||||
prepare_sql(maps:to_list(Prepares), PoolName);
|
||||
prepare_sql(Prepares, PoolName) ->
|
||||
case do_prepare_sql(Prepares, PoolName) of
|
||||
{ok, _Sts} = Ok ->
|
||||
%% prepare for reconnect
|
||||
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
|
||||
Ok;
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
do_prepare_sql(Prepares, PoolName) ->
|
||||
do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
|
||||
|
||||
do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
|
||||
{ok, Conn} = ecpool_worker:client(Worker),
|
||||
case prepare_sql_to_conn(Conn, Prepares) of
|
||||
{ok, Sts} ->
|
||||
do_prepare_sql(T, Prepares, PoolName, Sts);
|
||||
Error ->
|
||||
Error
|
||||
end;
|
||||
do_prepare_sql([], _Prepares, _PoolName, LastSts) ->
|
||||
{ok, LastSts}.
|
||||
|
||||
prepare_sql_to_conn(Conn, Prepares) ->
|
||||
prepare_sql_to_conn(Conn, Prepares, #{}).
|
||||
|
||||
prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
|
||||
prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
|
||||
LogMeta = #{msg => "PostgreSQL Prepare Statement", name => Key, prepare_sql => SQL},
|
||||
?SLOG(info, LogMeta),
|
||||
case epgsql:parse2(Conn, Key, SQL, []) of
|
||||
{ok, Statement} ->
|
||||
prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
|
||||
{error, Error} = Other ->
|
||||
?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}),
|
||||
Other
|
||||
end.
|
||||
|
||||
to_bin(Bin) when is_binary(Bin) ->
|
||||
Bin;
|
||||
to_bin(Atom) when is_atom(Atom) ->
|
||||
erlang:atom_to_binary(Atom).
|
||||
|
||||
handle_result({error, disconnected}) ->
|
||||
{error, {recoverable_error, disconnected}};
|
||||
handle_result({error, Error}) ->
|
||||
{error, {unrecoverable_error, Error}};
|
||||
handle_result(Res) ->
|
||||
Res.
|
||||
|
||||
handle_batch_result([{ok, Count} | Rest], Acc) ->
|
||||
handle_batch_result(Rest, Acc + Count);
|
||||
handle_batch_result([{error, Error} | _Rest], _Acc) ->
|
||||
{error, {unrecoverable_error, Error}};
|
||||
handle_batch_result([], Acc) ->
|
||||
{ok, Acc}.
|
|
@ -0,0 +1,129 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_ee_bridge_cassa).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
-export([
|
||||
conn_bridge_examples/1,
|
||||
values/2,
|
||||
fields/2
|
||||
]).
|
||||
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1
|
||||
]).
|
||||
|
||||
-define(DEFAULT_SQL, <<
|
||||
"insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) "
|
||||
"values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
|
||||
>>).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% api
|
||||
|
||||
conn_bridge_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"cassa">> => #{
|
||||
summary => <<"Cassandra Bridge">>,
|
||||
value => values(Method, cassa)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
values(get, Type) ->
|
||||
maps:merge(values(post, Type), ?METRICS_EXAMPLE);
|
||||
values(post, Type) ->
|
||||
#{
|
||||
enable => true,
|
||||
type => Type,
|
||||
name => <<"foo">>,
|
||||
server => <<"127.0.0.1:5432">>,
|
||||
database => <<"mqtt">>,
|
||||
pool_size => 8,
|
||||
username => <<"root">>,
|
||||
password => <<"public">>,
|
||||
sql => ?DEFAULT_SQL,
|
||||
local_topic => <<"local/topic/#">>,
|
||||
resource_opts => #{
|
||||
worker_pool_size => 8,
|
||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||
batch_time => ?DEFAULT_BATCH_TIME,
|
||||
query_mode => async,
|
||||
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
||||
}
|
||||
};
|
||||
values(put, Type) ->
|
||||
values(post, Type).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Hocon Schema Definitions
|
||||
namespace() -> "bridge_cassa".
|
||||
|
||||
roots() -> [].
|
||||
|
||||
fields("config") ->
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{sql,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||
)},
|
||||
{local_topic,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("local_topic"), default => undefined}
|
||||
)},
|
||||
{resource_opts,
|
||||
mk(
|
||||
ref(?MODULE, "creation_opts"),
|
||||
#{
|
||||
required => false,
|
||||
default => #{},
|
||||
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
|
||||
}
|
||||
)}
|
||||
] ++
|
||||
(emqx_connector_cassa:fields(config) --
|
||||
emqx_connector_schema_lib:prepare_statement_fields());
|
||||
fields("creation_opts") ->
|
||||
emqx_resource_schema:fields("creation_opts_sync_only");
|
||||
fields("post") ->
|
||||
fields("post", cassa);
|
||||
fields("put") ->
|
||||
fields("config");
|
||||
fields("get") ->
|
||||
emqx_bridge_schema:status_fields() ++ fields("post").
|
||||
|
||||
fields("post", Type) ->
|
||||
[type_field(Type), name_field() | fields("config")].
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for Cassandra using `", string:to_upper(Method), "` method."];
|
||||
desc("creation_opts" = Name) ->
|
||||
emqx_resource_schema:desc(Name);
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
|
||||
type_field(Type) ->
|
||||
{type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
|
||||
|
||||
name_field() ->
|
||||
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
|
@ -1,4 +1,4 @@
|
|||
#!/usr/bin/env elixir
|
||||
#! /usr/bin/env elixir
|
||||
|
||||
defmodule CheckElixirApplications do
|
||||
alias EMQXUmbrella.MixProject
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
#!/usr/bin/env elixir
|
||||
#! /usr/bin/env elixir
|
||||
|
||||
# ensure we have a fresh rebar.lock
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
#!/usr/bin/env elixir
|
||||
#! /usr/bin/env elixir
|
||||
|
||||
defmodule CheckElixirEMQXMachineBootDiscrepancies do
|
||||
alias EMQXUmbrella.MixProject
|
||||
|
|
Loading…
Reference in New Issue