Merge pull request #10521 from zhongwencool/sync-release-50-to-master

Sync release 50 to master
This commit is contained in:
zhongwencool 2023-04-26 14:27:51 +08:00 committed by GitHub
commit 62e1dbdb4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 369 additions and 214 deletions

View File

@ -7,7 +7,8 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
export EMQX_DASHBOARD_VERSION ?= v1.2.3
export EMQX_EE_DASHBOARD_VERSION ?= e1.0.6-beta.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.0.6-beta.2
export EMQX_REL_FORM ?= tgz
export QUICER_DOWNLOAD_FROM_RELEASE = 1
ifeq ($(OS),Windows_NT)

View File

@ -35,7 +35,7 @@
-define(EMQX_RELEASE_CE, "5.0.23").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.0.3-alpha.1").
-define(EMQX_RELEASE_EE, "5.0.3-alpha.3").
%% the HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -333,6 +333,7 @@ init_load(SchemaMod, Conf, Opts) when is_list(Conf) orelse is_binary(Conf) ->
init_load(HasDeprecatedFile, SchemaMod, RawConf, Opts).
init_load(true, SchemaMod, RawConf, Opts) when is_map(RawConf) ->
ok = save_schema_mod_and_names(SchemaMod),
%% deprecated conf will be removed in 5.1
%% Merge environment variable overrides on top
RawConfWithEnvs = merge_envs(SchemaMod, RawConf),

View File

@ -58,3 +58,22 @@ t_fill_default_values(_) ->
%% ensure JSON compatible
_ = emqx_utils_json:encode(WithDefaults),
ok.
t_init_load(_Config) ->
ConfFile = "./test_emqx.conf",
ok = file:write_file(ConfFile, <<"">>),
ExpectRootNames = lists:sort(hocon_schema:root_names(emqx_schema)),
emqx_config:erase_schema_mod_and_names(),
{ok, DeprecatedFile} = application:get_env(emqx, cluster_override_conf_file),
?assertEqual(false, filelib:is_regular(DeprecatedFile), DeprecatedFile),
%% Don't has deprecated file
ok = emqx_config:init_load(emqx_schema, [ConfFile]),
?assertEqual(ExpectRootNames, lists:sort(emqx_config:get_root_names())),
?assertMatch({ok, #{raw_config := 256}}, emqx:update_config([mqtt, max_topic_levels], 256)),
emqx_config:erase_schema_mod_and_names(),
%% Has deprecated file
ok = file:write_file(DeprecatedFile, <<"{}">>),
ok = emqx_config:init_load(emqx_schema, [ConfFile]),
?assertEqual(ExpectRootNames, lists:sort(emqx_config:get_root_names())),
?assertMatch({ok, #{raw_config := 128}}, emqx:update_config([mqtt, max_topic_levels], 128)),
ok = file:delete(DeprecatedFile).

View File

@ -2,6 +2,4 @@ dashboard {
listeners.http {
bind = 18083
}
default_username = "admin"
default_password = "public"
}

View File

@ -141,9 +141,5 @@ ensure_disk_queue_dir_absent(ResourceId, Index) ->
ok.
ensure_worker_pool_removed(ResId) ->
try
gproc_pool:delete(ResId)
catch
error:badarg -> ok
end,
gproc_pool:force_delete(ResId),
ok.

View File

@ -26,7 +26,7 @@
"values ( ${id}, ${topic}, ${qos}, ${payload} )"
>>).
-define(DEFAULT_DRIVER, <<"ms-sqlserver-18">>).
-define(DEFAULT_DRIVER, <<"ms-sql">>).
conn_bridge_examples(Method) ->
[

View File

@ -251,7 +251,7 @@ directly_setup_dynamo() ->
directly_query(Query) ->
directly_setup_dynamo(),
emqx_ee_connector_dynamo:execute(Query, ?TABLE_BIN).
emqx_ee_connector_dynamo_client:execute(Query, ?TABLE_BIN).
directly_get_payload(Key) ->
case directly_query({get_item, {<<"id">>, Key}}) of

View File

@ -136,7 +136,7 @@ rocketmq_config(BridgeType, Config) ->
io_lib:format(
"bridges.~s.~s {\n"
" enable = true\n"
" server = ~p\n"
" servers = ~p\n"
" topic = ~p\n"
" resource_opts = {\n"
" request_timeout = 1500ms\n"

View File

@ -28,22 +28,11 @@
]).
-export([
connect/1,
do_get_status/1,
do_async_reply/2,
worker_do_query/4
connect/1
]).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-define(DYNAMO_HOST_OPTIONS, #{
default_port => 8000
}).
-ifdef(TEST).
-export([execute/2]).
-endif.
%%=====================================================================
%% Hocon schema
roots() ->
@ -91,8 +80,10 @@ on_start(
config => redact(Config)
}),
{Schema, Server} = get_host_schema(to_str(Url)),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS),
{Schema, Server, DefaultPort} = get_host_info(to_str(Url)),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{
default_port => DefaultPort
}),
Options = [
{config, #{
@ -126,45 +117,39 @@ on_stop(InstanceId, #{pool_name := PoolName}) ->
emqx_resource_pool:stop(PoolName).
on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, handover, State).
do_query(InstanceId, Query, sync, State).
on_query_async(InstanceId, Query, Reply, State) ->
on_query_async(InstanceId, Query, ReplyCtx, State) ->
do_query(
InstanceId,
Query,
{handover_async, {?MODULE, do_async_reply, [Reply]}},
{async, ReplyCtx},
State
).
%% we only support batch insert
on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) ->
do_query(InstanceId, Query, handover, State);
do_query(InstanceId, Query, sync, State);
on_batch_query(_InstanceId, Query, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}.
%% we only support batch insert
on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, Reply, State) ->
on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, ReplyCtx, State) ->
do_query(
InstanceId,
Query,
{handover_async, {?MODULE, do_async_reply, [Reply]}},
{async, ReplyCtx},
State
);
on_batch_query_async(_InstanceId, Query, _Reply, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}.
on_get_status(_InstanceId, #{pool_name := PoolName}) ->
Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
on_get_status(_InstanceId, #{pool_name := Pool}) ->
Health = emqx_resource_pool:health_check_workers(
Pool, {emqx_ee_connector_dynamo_client, is_connected, []}
),
status_result(Health).
do_get_status(_Conn) ->
%% because the dynamodb driver connection process is the ecpool worker self
%% so we must call the checker function inside the worker
case erlcloud_ddb2:list_tables() of
{ok, _} -> true;
_ -> false
end.
status_result(_Status = true) -> connected;
status_result(_Status = false) -> connecting.
@ -185,8 +170,8 @@ do_query(
),
Result = ecpool:pick_and_do(
PoolName,
{?MODULE, worker_do_query, [Table, Query, Templates]},
ApplyMode
{emqx_ee_connector_dynamo_client, query, [ApplyMode, Table, Query, Templates]},
no_handover
),
case Result of
@ -210,47 +195,10 @@ do_query(
Result
end.
worker_do_query(_Client, Table, Query0, Templates) ->
try
Query = apply_template(Query0, Templates),
execute(Query, Table)
catch
_Type:Reason ->
{error, {unrecoverable_error, {invalid_request, Reason}}}
end.
%% some simple query commands for authn/authz or test
execute({insert_item, Msg}, Table) ->
Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Table, Item);
execute({delete_item, Key}, Table) ->
erlcloud_ddb2:delete_item(Table, Key);
execute({get_item, Key}, Table) ->
erlcloud_ddb2:get_item(Table, Key);
%% commands for data bridge query or batch query
execute({send_message, Msg}, Table) ->
Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Table, Item);
execute([{put, _} | _] = Msgs, Table) ->
%% type of batch_write_item argument :: batch_write_item_request_items()
%% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item())
%% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())}
%% batch_write_item_request() :: {put, item()} | {delete, key()}
erlcloud_ddb2:batch_write_item({Table, Msgs}).
connect(Opts) ->
#{
aws_access_key_id := AccessKeyID,
aws_secret_access_key := SecretAccessKey,
host := Host,
port := Port,
schema := Schema
} = proplists:get_value(config, Opts),
erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema),
%% The dynamodb driver uses caller process as its connection process
%% so at here, the connection process is the ecpool worker self
{ok, self()}.
Options = proplists:get_value(config, Opts),
{ok, _Pid} = Result = emqx_ee_connector_dynamo_client:start_link(Options),
Result.
parse_template(Config) ->
Templates =
@ -276,61 +224,12 @@ to_str(List) when is_list(List) ->
to_str(Bin) when is_binary(Bin) ->
erlang:binary_to_list(Bin).
get_host_schema("http://" ++ Server) ->
{"http://", Server};
get_host_schema("https://" ++ Server) ->
{"https://", Server};
get_host_schema(Server) ->
{"http://", Server}.
apply_template({Key, Msg} = Req, Templates) ->
case maps:get(Key, Templates, undefined) of
undefined ->
Req;
Template ->
{Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)}
end;
%% now there is no batch delete, so
%% 1. we can simply replace the `send_message` to `put`
%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`,
%% so we can reduce some list map cost
apply_template([{send_message, _Msg} | _] = Msgs, Templates) ->
lists:map(
fun(Req) ->
{_, Msg} = apply_template(Req, Templates),
{put, convert_to_item(Msg)}
end,
Msgs
).
convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 ->
maps:fold(
fun
(_K, <<>>, AccIn) ->
AccIn;
(K, V, AccIn) ->
[{convert2binary(K), convert2binary(V)} | AccIn]
end,
[],
Msg
);
convert_to_item(MsgBin) when is_binary(MsgBin) ->
Msg = emqx_utils_json:decode(MsgBin),
convert_to_item(Msg);
convert_to_item(Item) ->
erlang:throw({invalid_item, Item}).
convert2binary(Value) when is_atom(Value) ->
erlang:atom_to_binary(Value, utf8);
convert2binary(Value) when is_binary(Value); is_number(Value) ->
Value;
convert2binary(Value) when is_list(Value) ->
unicode:characters_to_binary(Value);
convert2binary(Value) when is_map(Value) ->
emqx_utils_json:encode(Value).
do_async_reply(Result, {ReplyFun, [Context]}) ->
ReplyFun(Context, Result).
get_host_info("http://" ++ Server) ->
{"http://", Server, 80};
get_host_info("https://" ++ Server) ->
{"https://", Server, 443};
get_host_info(Server) ->
{"http://", Server, 80}.
redact(Data) ->
emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end).

View File

@ -0,0 +1,186 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector_dynamo_client).
-behaviour(gen_server).
%% API
-export([
start_link/1,
is_connected/1,
query/5,
query/4
]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
format_status/2
]).
-ifdef(TEST).
-export([execute/2]).
-endif.
%%%===================================================================
%%% API
%%%===================================================================
is_connected(Pid) ->
try
gen_server:call(Pid, is_connected)
catch
_:_ ->
false
end.
query(Pid, sync, Table, Query, Templates) ->
query(Pid, Table, Query, Templates);
query(Pid, {async, ReplyCtx}, Table, Query, Templates) ->
gen_server:cast(Pid, {query, Table, Query, Templates, ReplyCtx}).
query(Pid, Table, Query, Templates) ->
gen_server:call(Pid, {query, Table, Query, Templates}, infinity).
%%--------------------------------------------------------------------
%% @doc
%% Starts Bridge which transfer data to DynamoDB
%% @endn
%%--------------------------------------------------------------------
start_link(Options) ->
gen_server:start_link(?MODULE, Options, []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% Initialize dynamodb data bridge
init(#{
aws_access_key_id := AccessKeyID,
aws_secret_access_key := SecretAccessKey,
host := Host,
port := Port,
schema := Schema
}) ->
erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema),
{ok, #{}}.
handle_call(is_connected, _From, State) ->
IsConnected =
case erlcloud_ddb2:list_tables([{limit, 1}]) of
{ok, _} ->
true;
_ ->
false
end,
{reply, IsConnected, State};
handle_call({query, Table, Query, Templates}, _From, State) ->
Result = do_query(Table, Query, Templates),
{reply, Result, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}}, State) ->
Result = do_query(Table, Query, Templates),
ReplyFun(Context, Result),
{noreply, State};
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-spec format_status(
Opt :: normal | terminate,
Status :: list()
) -> Status :: term().
format_status(_Opt, Status) ->
Status.
%%%===================================================================
%%% Internal functions
%%%===================================================================
do_query(Table, Query0, Templates) ->
try
Query = apply_template(Query0, Templates),
execute(Query, Table)
catch
_Type:Reason ->
{error, {unrecoverable_error, {invalid_request, Reason}}}
end.
%% some simple query commands for authn/authz or test
execute({insert_item, Msg}, Table) ->
Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Table, Item);
execute({delete_item, Key}, Table) ->
erlcloud_ddb2:delete_item(Table, Key);
execute({get_item, Key}, Table) ->
erlcloud_ddb2:get_item(Table, Key);
%% commands for data bridge query or batch query
execute({send_message, Msg}, Table) ->
Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Table, Item);
execute([{put, _} | _] = Msgs, Table) ->
%% type of batch_write_item argument :: batch_write_item_request_items()
%% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item())
%% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())}
%% batch_write_item_request() :: {put, item()} | {delete, key()}
erlcloud_ddb2:batch_write_item({Table, Msgs}).
apply_template({Key, Msg} = Req, Templates) ->
case maps:get(Key, Templates, undefined) of
undefined ->
Req;
Template ->
{Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)}
end;
%% now there is no batch delete, so
%% 1. we can simply replace the `send_message` to `put`
%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`,
%% so we can reduce some list map cost
apply_template([{send_message, _Msg} | _] = Msgs, Templates) ->
lists:map(
fun(Req) ->
{_, Msg} = apply_template(Req, Templates),
{put, convert_to_item(Msg)}
end,
Msgs
).
convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 ->
maps:fold(
fun
(_K, <<>>, AccIn) ->
AccIn;
(K, V, AccIn) ->
[{convert2binary(K), convert2binary(V)} | AccIn]
end,
[],
Msg
);
convert_to_item(MsgBin) when is_binary(MsgBin) ->
Msg = emqx_utils_json:decode(MsgBin),
convert_to_item(Msg);
convert_to_item(Item) ->
erlang:throw({invalid_item, Item}).
convert2binary(Value) when is_atom(Value) ->
erlang:atom_to_binary(Value, utf8);
convert2binary(Value) when is_binary(Value); is_number(Value) ->
Value;
convert2binary(Value) when is_list(Value) ->
unicode:characters_to_binary(Value);
convert2binary(Value) when is_map(Value) ->
emqx_utils_json:encode(Value).

View File

@ -38,12 +38,28 @@ roots() ->
fields(config) ->
[
{server, server()},
{servers, servers()},
{topic,
mk(
binary(),
#{default => <<"TopicTest">>, desc => ?DESC(topic)}
)},
{access_key,
mk(
binary(),
#{default => <<>>, desc => ?DESC("access_key")}
)},
{secret_key,
mk(
binary(),
#{default => <<>>, desc => ?DESC("secret_key")}
)},
{security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})},
{sync_timeout,
mk(
emqx_schema:duration(),
#{default => <<"3s">>, desc => ?DESC(sync_timeout)}
)},
{refresh_interval,
mk(
emqx_schema:duration(),
@ -54,39 +70,15 @@ fields(config) ->
emqx_schema:bytesize(),
#{default => <<"1024KB">>, desc => ?DESC(send_buffer)}
)},
{security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})}
| relational_fields()
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
].
add_default_username(Fields) ->
lists:map(
fun
({username, OrigUsernameFn}) ->
{username, add_default_fn(OrigUsernameFn, <<"">>)};
(Field) ->
Field
end,
Fields
).
add_default_fn(OrigFn, Default) ->
fun
(default) -> Default;
(Field) -> OrigFn(Field)
end.
server() ->
Meta = #{desc => ?DESC("server")},
servers() ->
Meta = #{desc => ?DESC("servers")},
emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS).
relational_fields() ->
Fields = [username, password, auto_reconnect],
Values = lists:filter(
fun({E, _}) -> lists:member(E, Fields) end,
emqx_connector_schema_lib:relational_db_fields()
),
add_default_username(Values).
%%========================================================================================
%% `emqx_resource' API
%%========================================================================================
@ -97,34 +89,35 @@ is_buffer_supported() -> false.
on_start(
InstanceId,
#{server := Server, topic := Topic} = Config1
#{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config
) ->
?SLOG(info, #{
msg => "starting_rocketmq_connector",
connector => InstanceId,
config => redact(Config1)
config => redact(Config)
}),
Config = maps:merge(default_security_info(), Config1),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS),
Server1 = [{Host, Port}],
Servers = lists:map(
fun(#{hostname := Host, port := Port}) -> {Host, Port} end,
emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS)
),
ClientId = client_id(InstanceId),
ClientCfg = #{acl_info => #{}},
TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Config),
#{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config),
ClientCfg = #{acl_info => AclInfo},
Templates = parse_template(Config),
ProducersMapPID = create_producers_map(ClientId),
State = #{
client_id => ClientId,
topic => Topic,
topic_tokens => TopicTks,
config => Config,
sync_timeout => SyncTimeout,
templates => Templates,
producers_map_pid => ProducersMapPID,
producers_opts => ProducerOpts
},
case rocketmq:ensure_supervised_client(ClientId, Server1, ClientCfg) of
case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of
{ok, _Pid} ->
{ok, State};
{error, _Reason} = Error ->
@ -135,11 +128,21 @@ on_start(
Error
end.
on_stop(InstanceId, #{client_id := ClientId, producers_map_pid := Pid} = _State) ->
on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) ->
?SLOG(info, #{
msg => "stopping_rocketmq_connector",
connector => InstanceId
}),
Producers = ets:match(ClientId, {{RawTopic, '$1'}, '$2'}),
lists:foreach(
fun([Topic, Producer]) ->
ets:delete(ClientId, {RawTopic, Topic}),
_ = rocketmq:stop_and_delete_supervised_producers(Producer)
end,
Producers
),
Pid ! ok,
ok = rocketmq:stop_and_delete_supervised_client(ClientId).
@ -154,12 +157,15 @@ on_batch_query(_InstanceId, Query, _State) ->
on_get_status(_InstanceId, #{client_id := ClientId}) ->
case rocketmq_client_sup:find_client(ClientId) of
{ok, _Pid} ->
connected;
{ok, Pid} ->
status_result(rocketmq_client:get_status(Pid));
_ ->
connecting
end.
status_result(_Status = true) -> connected;
status_result(_Status) -> connecting.
%%========================================================================================
%% Helper fns
%%========================================================================================
@ -171,9 +177,10 @@ do_query(
#{
templates := Templates,
client_id := ClientId,
topic := RawTopic,
topic_tokens := TopicTks,
producers_opts := ProducerOpts,
config := #{topic := RawTopic, resource_opts := #{request_timeout := RequestTimeout}}
sync_timeout := RequestTimeout
} = State
) ->
?TRACE(
@ -267,6 +274,8 @@ client_id(InstanceId) ->
redact(Msg) ->
emqx_utils:redact(Msg, fun is_sensitive_key/1).
is_sensitive_key(secret_key) ->
true;
is_sensitive_key(security_token) ->
true;
is_sensitive_key(_) ->
@ -274,14 +283,14 @@ is_sensitive_key(_) ->
make_producer_opts(
#{
username := Username,
password := Password,
access_key := AccessKey,
secret_key := SecretKey,
security_token := SecurityToken,
send_buffer := SendBuff,
refresh_interval := RefreshInterval
}
) ->
ACLInfo = acl_info(Username, Password, SecurityToken),
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
#{
tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval,
@ -290,17 +299,17 @@ make_producer_opts(
acl_info(<<>>, <<>>, <<>>) ->
#{};
acl_info(Username, Password, <<>>) when is_binary(Username), is_binary(Password) ->
acl_info(AccessKey, SecretKey, <<>>) when is_binary(AccessKey), is_binary(SecretKey) ->
#{
access_key => Username,
secret_key => Password
access_key => AccessKey,
secret_key => SecretKey
};
acl_info(Username, Password, SecurityToken) when
is_binary(Username), is_binary(Password), is_binary(SecurityToken)
acl_info(AccessKey, SecretKey, SecurityToken) when
is_binary(AccessKey), is_binary(SecretKey), is_binary(SecurityToken)
->
#{
access_key => Username,
secret_key => Password,
access_key => AccessKey,
secret_key => SecretKey,
security_token => SecurityToken
};
acl_info(_, _, _) ->
@ -333,6 +342,3 @@ get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) ->
ets:insert(ClientId, {TopicKey, Producers0}),
Producers0
end.
default_security_info() ->
#{username => <<>>, password => <<>>, security_token => <<>>}.

View File

@ -43,7 +43,7 @@
-export([connect/1]).
%% Internal exports used to execute code with ecpool worker
-export([do_get_status/2, worker_do_insert/3, do_async_reply/2]).
-export([do_get_status/1, worker_do_insert/3, do_async_reply/2]).
-import(emqx_plugin_libs_rule, [str/1]).
-import(hoconsc, [mk/2, enum/1, ref/2]).
@ -304,12 +304,10 @@ on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) ->
),
do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State).
on_get_status(_InstanceId, #{pool_name := PoolName, resource_opts := ResourceOpts} = _State) ->
RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts),
on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
Health = emqx_resource_pool:health_check_workers(
PoolName,
{?MODULE, do_get_status, [RequestTimeout]},
RequestTimeout
{?MODULE, do_get_status, []}
),
status_result(Health).
@ -328,9 +326,9 @@ connect(Options) ->
Opts = proplists:get_value(options, Options, []),
odbc:connect(ConnectStr, Opts).
-spec do_get_status(connection_reference(), time_out()) -> Result :: boolean().
do_get_status(Conn, RequestTimeout) ->
case execute(Conn, <<"SELECT 1">>, RequestTimeout) of
-spec do_get_status(connection_reference()) -> Result :: boolean().
do_get_status(Conn) ->
case execute(Conn, <<"SELECT 1">>) of
{selected, [[]], [{1}]} -> true;
_ -> false
end.
@ -444,6 +442,15 @@ worker_do_insert(
{error, {unrecoverable_error, {invalid_request, Reason}}}
end.
-spec execute(pid(), sql()) ->
updated_tuple()
| selected_tuple()
| [updated_tuple()]
| [selected_tuple()]
| {error, common_reason()}.
execute(Conn, SQL) ->
odbc:sql_query(Conn, str(SQL)).
-spec execute(pid(), sql(), time_out()) ->
updated_tuple()
| selected_tuple()

View File

@ -33,7 +33,10 @@ local_topic.label:
"""Local Topic"""
template.desc:
"""Template, the default value is empty. When this value is empty the whole message will be stored in the RocketMQ"""
"""Template, the default value is empty. When this value is empty the whole message will be stored in the RocketMQ.<br>
The template can be any valid string with placeholders, example:<br>
- ${id}, ${username}, ${clientid}, ${timestamp}<br>
- {"id" : ${id}, "username" : ${username}}"""
template.label:
"""Template"""

View File

@ -1,11 +1,23 @@
emqx_ee_connector_rocketmq {
access_key.desc:
"""RocketMQ server `accessKey`."""
access_key.label:
"""AccessKey"""
refresh_interval.desc:
"""RocketMQ Topic Route Refresh Interval."""
refresh_interval.label:
"""Topic Route Refresh Interval"""
secret_key.desc:
"""RocketMQ server `secretKey`."""
secret_key.label:
"""SecretKey"""
security_token.desc:
"""RocketMQ Server Security Token"""
@ -18,14 +30,20 @@ send_buffer.desc:
send_buffer.label:
"""Send Buffer Size"""
server.desc:
servers.desc:
"""The IPv4 or IPv6 address or the hostname to connect to.<br/>
A host entry has the following form: `Host[:Port]`.<br/>
The RocketMQ default port 9876 is used if `[:Port]` is not specified."""
server.label:
servers.label:
"""Server Host"""
sync_timeout.desc:
"""Timeout of RocketMQ driver synchronous call."""
sync_timeout.label:
"""Sync Timeout"""
topic.desc:
"""RocketMQ Topic"""

View File

@ -32,7 +32,10 @@ local_topic.label:
"""本地 Topic"""
template.desc:
"""模板, 默认为空,为空时将会将整个消息转发给 RocketMQ"""
"""模板, 默认为空,为空时将会将整个消息转发给 RocketMQ。 <br>
模板可以是任意带有占位符的合法字符串, 例如:<br>
- ${id}, ${username}, ${clientid}, ${timestamp}<br>
- {"id" : ${id}, "username" : ${username}}"""
template.label:
"""模板"""

View File

@ -1,11 +1,23 @@
emqx_ee_connector_rocketmq {
access_key.desc:
"""RocketMQ 服务器的 `accessKey`。"""
access_key.label:
"""AccessKey"""
refresh_interval.desc:
"""RocketMQ 主题路由更新间隔。"""
refresh_interval.label:
"""主题路由更新间隔"""
secret_key.desc:
"""RocketMQ 服务器的 `secretKey`。"""
secret_key.label:
"""SecretKey"""
security_token.desc:
"""RocketMQ 服务器安全令牌"""
@ -18,14 +30,20 @@ send_buffer.desc:
send_buffer.label:
"""发送消息的缓冲区大小"""
server.desc:
servers.desc:
"""将要连接的 IPv4 或 IPv6 地址,或者主机名。<br/>
主机名具有以下形式:`Host[:Port]`。<br/>
如果未指定 `[:Port]`,则使用 RocketMQ 默认端口 9876。"""
server.label:
servers.label:
"""服务器地址"""
sync_timeout.desc:
"""RocketMQ 驱动同步调用的超时时间。"""
sync_timeout.label:
"""同步调用超时时间"""
topic.desc:
"""RocketMQ 主题"""

View File

@ -202,9 +202,9 @@ for dep in ${CT_DEPS}; do
done
if [ "$ODBC_REQUEST" = 'yes' ]; then
INSTALL_ODBC="./scripts/install-odbc-driver.sh"
INSTALL_ODBC="./scripts/install-msodbc-driver.sh"
else
INSTALL_ODBC="echo 'Driver msodbcsql driver not requested'"
INSTALL_ODBC="echo 'msodbc driver not requested'"
fi
F_OPTIONS=""