diff --git a/Makefile b/Makefile index 10e6d1424..a39f44c07 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,8 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh) export EMQX_DASHBOARD_VERSION ?= v1.2.3 -export EMQX_EE_DASHBOARD_VERSION ?= e1.0.6-beta.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.0.6-beta.2 + export EMQX_REL_FORM ?= tgz export QUICER_DOWNLOAD_FROM_RELEASE = 1 ifeq ($(OS),Windows_NT) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index ec3883c77..eff228621 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.0.23"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.0.3-alpha.1"). +-define(EMQX_RELEASE_EE, "5.0.3-alpha.3"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index c94f25ead..9561263ca 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -333,6 +333,7 @@ init_load(SchemaMod, Conf, Opts) when is_list(Conf) orelse is_binary(Conf) -> init_load(HasDeprecatedFile, SchemaMod, RawConf, Opts). init_load(true, SchemaMod, RawConf, Opts) when is_map(RawConf) -> + ok = save_schema_mod_and_names(SchemaMod), %% deprecated conf will be removed in 5.1 %% Merge environment variable overrides on top RawConfWithEnvs = merge_envs(SchemaMod, RawConf), diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index b54f67f07..3b75ee4f3 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -58,3 +58,22 @@ t_fill_default_values(_) -> %% ensure JSON compatible _ = emqx_utils_json:encode(WithDefaults), ok. + +t_init_load(_Config) -> + ConfFile = "./test_emqx.conf", + ok = file:write_file(ConfFile, <<"">>), + ExpectRootNames = lists:sort(hocon_schema:root_names(emqx_schema)), + emqx_config:erase_schema_mod_and_names(), + {ok, DeprecatedFile} = application:get_env(emqx, cluster_override_conf_file), + ?assertEqual(false, filelib:is_regular(DeprecatedFile), DeprecatedFile), + %% Don't has deprecated file + ok = emqx_config:init_load(emqx_schema, [ConfFile]), + ?assertEqual(ExpectRootNames, lists:sort(emqx_config:get_root_names())), + ?assertMatch({ok, #{raw_config := 256}}, emqx:update_config([mqtt, max_topic_levels], 256)), + emqx_config:erase_schema_mod_and_names(), + %% Has deprecated file + ok = file:write_file(DeprecatedFile, <<"{}">>), + ok = emqx_config:init_load(emqx_schema, [ConfFile]), + ?assertEqual(ExpectRootNames, lists:sort(emqx_config:get_root_names())), + ?assertMatch({ok, #{raw_config := 128}}, emqx:update_config([mqtt, max_topic_levels], 128)), + ok = file:delete(DeprecatedFile). diff --git a/apps/emqx_dashboard/etc/emqx_dashboard.conf b/apps/emqx_dashboard/etc/emqx_dashboard.conf index 856779500..67e3f61ec 100644 --- a/apps/emqx_dashboard/etc/emqx_dashboard.conf +++ b/apps/emqx_dashboard/etc/emqx_dashboard.conf @@ -2,6 +2,4 @@ dashboard { listeners.http { bind = 18083 } - default_username = "admin" - default_password = "public" } diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index a00dcdcd2..ae30c3927 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -141,9 +141,5 @@ ensure_disk_queue_dir_absent(ResourceId, Index) -> ok. ensure_worker_pool_removed(ResId) -> - try - gproc_pool:delete(ResId) - catch - error:badarg -> ok - end, + gproc_pool:force_delete(ResId), ok. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl index e216299c2..49db815a6 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl @@ -22,11 +22,11 @@ ]). -define(DEFAULT_SQL, << - "insert into t_mqtt_msg(msgid, topic, qos, payload)" - "values (${id}, ${topic}, ${qos}, ${payload})" + "insert into t_mqtt_msg(msgid, topic, qos, payload) " + "values ( ${id}, ${topic}, ${qos}, ${payload} )" >>). --define(DEFAULT_DRIVER, <<"ms-sqlserver-18">>). +-define(DEFAULT_DRIVER, <<"ms-sql">>). conn_bridge_examples(Method) -> [ diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl index 9cf7eb8f4..3b07acbe0 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl @@ -251,7 +251,7 @@ directly_setup_dynamo() -> directly_query(Query) -> directly_setup_dynamo(), - emqx_ee_connector_dynamo:execute(Query, ?TABLE_BIN). + emqx_ee_connector_dynamo_client:execute(Query, ?TABLE_BIN). directly_get_payload(Key) -> case directly_query({get_item, {<<"id">>, Key}}) of diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl index 0cb14e5c3..33a83d2d8 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl @@ -136,7 +136,7 @@ rocketmq_config(BridgeType, Config) -> io_lib:format( "bridges.~s.~s {\n" " enable = true\n" - " server = ~p\n" + " servers = ~p\n" " topic = ~p\n" " resource_opts = {\n" " request_timeout = 1500ms\n" diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index f45f8ca2f..5eee882ce 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -28,22 +28,11 @@ ]). -export([ - connect/1, - do_get_status/1, - do_async_reply/2, - worker_do_query/4 + connect/1 ]). -import(hoconsc, [mk/2, enum/1, ref/2]). --define(DYNAMO_HOST_OPTIONS, #{ - default_port => 8000 -}). - --ifdef(TEST). --export([execute/2]). --endif. - %%===================================================================== %% Hocon schema roots() -> @@ -91,8 +80,10 @@ on_start( config => redact(Config) }), - {Schema, Server} = get_host_schema(to_str(Url)), - #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS), + {Schema, Server, DefaultPort} = get_host_info(to_str(Url)), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{ + default_port => DefaultPort + }), Options = [ {config, #{ @@ -126,45 +117,39 @@ on_stop(InstanceId, #{pool_name := PoolName}) -> emqx_resource_pool:stop(PoolName). on_query(InstanceId, Query, State) -> - do_query(InstanceId, Query, handover, State). + do_query(InstanceId, Query, sync, State). -on_query_async(InstanceId, Query, Reply, State) -> +on_query_async(InstanceId, Query, ReplyCtx, State) -> do_query( InstanceId, Query, - {handover_async, {?MODULE, do_async_reply, [Reply]}}, + {async, ReplyCtx}, State ). %% we only support batch insert on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> - do_query(InstanceId, Query, handover, State); + do_query(InstanceId, Query, sync, State); on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. %% we only support batch insert -on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, Reply, State) -> +on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, ReplyCtx, State) -> do_query( InstanceId, Query, - {handover_async, {?MODULE, do_async_reply, [Reply]}}, + {async, ReplyCtx}, State ); on_batch_query_async(_InstanceId, Query, _Reply, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. -on_get_status(_InstanceId, #{pool_name := PoolName}) -> - Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), +on_get_status(_InstanceId, #{pool_name := Pool}) -> + Health = emqx_resource_pool:health_check_workers( + Pool, {emqx_ee_connector_dynamo_client, is_connected, []} + ), status_result(Health). -do_get_status(_Conn) -> - %% because the dynamodb driver connection process is the ecpool worker self - %% so we must call the checker function inside the worker - case erlcloud_ddb2:list_tables() of - {ok, _} -> true; - _ -> false - end. - status_result(_Status = true) -> connected; status_result(_Status = false) -> connecting. @@ -185,8 +170,8 @@ do_query( ), Result = ecpool:pick_and_do( PoolName, - {?MODULE, worker_do_query, [Table, Query, Templates]}, - ApplyMode + {emqx_ee_connector_dynamo_client, query, [ApplyMode, Table, Query, Templates]}, + no_handover ), case Result of @@ -210,47 +195,10 @@ do_query( Result end. -worker_do_query(_Client, Table, Query0, Templates) -> - try - Query = apply_template(Query0, Templates), - execute(Query, Table) - catch - _Type:Reason -> - {error, {unrecoverable_error, {invalid_request, Reason}}} - end. - -%% some simple query commands for authn/authz or test -execute({insert_item, Msg}, Table) -> - Item = convert_to_item(Msg), - erlcloud_ddb2:put_item(Table, Item); -execute({delete_item, Key}, Table) -> - erlcloud_ddb2:delete_item(Table, Key); -execute({get_item, Key}, Table) -> - erlcloud_ddb2:get_item(Table, Key); -%% commands for data bridge query or batch query -execute({send_message, Msg}, Table) -> - Item = convert_to_item(Msg), - erlcloud_ddb2:put_item(Table, Item); -execute([{put, _} | _] = Msgs, Table) -> - %% type of batch_write_item argument :: batch_write_item_request_items() - %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item()) - %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())} - %% batch_write_item_request() :: {put, item()} | {delete, key()} - erlcloud_ddb2:batch_write_item({Table, Msgs}). - connect(Opts) -> - #{ - aws_access_key_id := AccessKeyID, - aws_secret_access_key := SecretAccessKey, - host := Host, - port := Port, - schema := Schema - } = proplists:get_value(config, Opts), - erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema), - - %% The dynamodb driver uses caller process as its connection process - %% so at here, the connection process is the ecpool worker self - {ok, self()}. + Options = proplists:get_value(config, Opts), + {ok, _Pid} = Result = emqx_ee_connector_dynamo_client:start_link(Options), + Result. parse_template(Config) -> Templates = @@ -276,61 +224,12 @@ to_str(List) when is_list(List) -> to_str(Bin) when is_binary(Bin) -> erlang:binary_to_list(Bin). -get_host_schema("http://" ++ Server) -> - {"http://", Server}; -get_host_schema("https://" ++ Server) -> - {"https://", Server}; -get_host_schema(Server) -> - {"http://", Server}. - -apply_template({Key, Msg} = Req, Templates) -> - case maps:get(Key, Templates, undefined) of - undefined -> - Req; - Template -> - {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} - end; -%% now there is no batch delete, so -%% 1. we can simply replace the `send_message` to `put` -%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`, -%% so we can reduce some list map cost -apply_template([{send_message, _Msg} | _] = Msgs, Templates) -> - lists:map( - fun(Req) -> - {_, Msg} = apply_template(Req, Templates), - {put, convert_to_item(Msg)} - end, - Msgs - ). - -convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 -> - maps:fold( - fun - (_K, <<>>, AccIn) -> - AccIn; - (K, V, AccIn) -> - [{convert2binary(K), convert2binary(V)} | AccIn] - end, - [], - Msg - ); -convert_to_item(MsgBin) when is_binary(MsgBin) -> - Msg = emqx_utils_json:decode(MsgBin), - convert_to_item(Msg); -convert_to_item(Item) -> - erlang:throw({invalid_item, Item}). - -convert2binary(Value) when is_atom(Value) -> - erlang:atom_to_binary(Value, utf8); -convert2binary(Value) when is_binary(Value); is_number(Value) -> - Value; -convert2binary(Value) when is_list(Value) -> - unicode:characters_to_binary(Value); -convert2binary(Value) when is_map(Value) -> - emqx_utils_json:encode(Value). - -do_async_reply(Result, {ReplyFun, [Context]}) -> - ReplyFun(Context, Result). +get_host_info("http://" ++ Server) -> + {"http://", Server, 80}; +get_host_info("https://" ++ Server) -> + {"https://", Server, 443}; +get_host_info(Server) -> + {"http://", Server, 80}. redact(Data) -> emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl new file mode 100644 index 000000000..0340655b4 --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl @@ -0,0 +1,186 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_connector_dynamo_client). + +-behaviour(gen_server). + +%% API +-export([ + start_link/1, + is_connected/1, + query/5, + query/4 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + format_status/2 +]). + +-ifdef(TEST). +-export([execute/2]). +-endif. + +%%%=================================================================== +%%% API +%%%=================================================================== +is_connected(Pid) -> + try + gen_server:call(Pid, is_connected) + catch + _:_ -> + false + end. + +query(Pid, sync, Table, Query, Templates) -> + query(Pid, Table, Query, Templates); +query(Pid, {async, ReplyCtx}, Table, Query, Templates) -> + gen_server:cast(Pid, {query, Table, Query, Templates, ReplyCtx}). + +query(Pid, Table, Query, Templates) -> + gen_server:call(Pid, {query, Table, Query, Templates}, infinity). + +%%-------------------------------------------------------------------- +%% @doc +%% Starts Bridge which transfer data to DynamoDB +%% @endn +%%-------------------------------------------------------------------- +start_link(Options) -> + gen_server:start_link(?MODULE, Options, []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% Initialize dynamodb data bridge +init(#{ + aws_access_key_id := AccessKeyID, + aws_secret_access_key := SecretAccessKey, + host := Host, + port := Port, + schema := Schema +}) -> + erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema), + {ok, #{}}. + +handle_call(is_connected, _From, State) -> + IsConnected = + case erlcloud_ddb2:list_tables([{limit, 1}]) of + {ok, _} -> + true; + _ -> + false + end, + {reply, IsConnected, State}; +handle_call({query, Table, Query, Templates}, _From, State) -> + Result = do_query(Table, Query, Templates), + {reply, Result, State}; +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}}, State) -> + Result = do_query(Table, Query, Templates), + ReplyFun(Context, Result), + {noreply, State}; +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +-spec format_status( + Opt :: normal | terminate, + Status :: list() +) -> Status :: term(). +format_status(_Opt, Status) -> + Status. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +do_query(Table, Query0, Templates) -> + try + Query = apply_template(Query0, Templates), + execute(Query, Table) + catch + _Type:Reason -> + {error, {unrecoverable_error, {invalid_request, Reason}}} + end. + +%% some simple query commands for authn/authz or test +execute({insert_item, Msg}, Table) -> + Item = convert_to_item(Msg), + erlcloud_ddb2:put_item(Table, Item); +execute({delete_item, Key}, Table) -> + erlcloud_ddb2:delete_item(Table, Key); +execute({get_item, Key}, Table) -> + erlcloud_ddb2:get_item(Table, Key); +%% commands for data bridge query or batch query +execute({send_message, Msg}, Table) -> + Item = convert_to_item(Msg), + erlcloud_ddb2:put_item(Table, Item); +execute([{put, _} | _] = Msgs, Table) -> + %% type of batch_write_item argument :: batch_write_item_request_items() + %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item()) + %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())} + %% batch_write_item_request() :: {put, item()} | {delete, key()} + erlcloud_ddb2:batch_write_item({Table, Msgs}). + +apply_template({Key, Msg} = Req, Templates) -> + case maps:get(Key, Templates, undefined) of + undefined -> + Req; + Template -> + {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} + end; +%% now there is no batch delete, so +%% 1. we can simply replace the `send_message` to `put` +%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`, +%% so we can reduce some list map cost +apply_template([{send_message, _Msg} | _] = Msgs, Templates) -> + lists:map( + fun(Req) -> + {_, Msg} = apply_template(Req, Templates), + {put, convert_to_item(Msg)} + end, + Msgs + ). + +convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 -> + maps:fold( + fun + (_K, <<>>, AccIn) -> + AccIn; + (K, V, AccIn) -> + [{convert2binary(K), convert2binary(V)} | AccIn] + end, + [], + Msg + ); +convert_to_item(MsgBin) when is_binary(MsgBin) -> + Msg = emqx_utils_json:decode(MsgBin), + convert_to_item(Msg); +convert_to_item(Item) -> + erlang:throw({invalid_item, Item}). + +convert2binary(Value) when is_atom(Value) -> + erlang:atom_to_binary(Value, utf8); +convert2binary(Value) when is_binary(Value); is_number(Value) -> + Value; +convert2binary(Value) when is_list(Value) -> + unicode:characters_to_binary(Value); +convert2binary(Value) when is_map(Value) -> + emqx_utils_json:encode(Value). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 74fb4eedd..2e1730b52 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -38,12 +38,28 @@ roots() -> fields(config) -> [ - {server, server()}, + {servers, servers()}, {topic, mk( binary(), #{default => <<"TopicTest">>, desc => ?DESC(topic)} )}, + {access_key, + mk( + binary(), + #{default => <<>>, desc => ?DESC("access_key")} + )}, + {secret_key, + mk( + binary(), + #{default => <<>>, desc => ?DESC("secret_key")} + )}, + {security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})}, + {sync_timeout, + mk( + emqx_schema:duration(), + #{default => <<"3s">>, desc => ?DESC(sync_timeout)} + )}, {refresh_interval, mk( emqx_schema:duration(), @@ -54,39 +70,15 @@ fields(config) -> emqx_schema:bytesize(), #{default => <<"1024KB">>, desc => ?DESC(send_buffer)} )}, - {security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})} - | relational_fields() + + {pool_size, fun emqx_connector_schema_lib:pool_size/1}, + {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} ]. -add_default_username(Fields) -> - lists:map( - fun - ({username, OrigUsernameFn}) -> - {username, add_default_fn(OrigUsernameFn, <<"">>)}; - (Field) -> - Field - end, - Fields - ). - -add_default_fn(OrigFn, Default) -> - fun - (default) -> Default; - (Field) -> OrigFn(Field) - end. - -server() -> - Meta = #{desc => ?DESC("server")}, +servers() -> + Meta = #{desc => ?DESC("servers")}, emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). -relational_fields() -> - Fields = [username, password, auto_reconnect], - Values = lists:filter( - fun({E, _}) -> lists:member(E, Fields) end, - emqx_connector_schema_lib:relational_db_fields() - ), - add_default_username(Values). - %%======================================================================================== %% `emqx_resource' API %%======================================================================================== @@ -97,34 +89,35 @@ is_buffer_supported() -> false. on_start( InstanceId, - #{server := Server, topic := Topic} = Config1 + #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config ) -> ?SLOG(info, #{ msg => "starting_rocketmq_connector", connector => InstanceId, - config => redact(Config1) + config => redact(Config) }), - Config = maps:merge(default_security_info(), Config1), - #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS), - - Server1 = [{Host, Port}], + Servers = lists:map( + fun(#{hostname := Host, port := Port}) -> {Host, Port} end, + emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS) + ), ClientId = client_id(InstanceId), - ClientCfg = #{acl_info => #{}}, TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), - ProducerOpts = make_producer_opts(Config), + #{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config), + ClientCfg = #{acl_info => AclInfo}, Templates = parse_template(Config), ProducersMapPID = create_producers_map(ClientId), State = #{ client_id => ClientId, + topic => Topic, topic_tokens => TopicTks, - config => Config, + sync_timeout => SyncTimeout, templates => Templates, producers_map_pid => ProducersMapPID, producers_opts => ProducerOpts }, - case rocketmq:ensure_supervised_client(ClientId, Server1, ClientCfg) of + case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of {ok, _Pid} -> {ok, State}; {error, _Reason} = Error -> @@ -135,11 +128,21 @@ on_start( Error end. -on_stop(InstanceId, #{client_id := ClientId, producers_map_pid := Pid} = _State) -> +on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) -> ?SLOG(info, #{ msg => "stopping_rocketmq_connector", connector => InstanceId }), + + Producers = ets:match(ClientId, {{RawTopic, '$1'}, '$2'}), + lists:foreach( + fun([Topic, Producer]) -> + ets:delete(ClientId, {RawTopic, Topic}), + _ = rocketmq:stop_and_delete_supervised_producers(Producer) + end, + Producers + ), + Pid ! ok, ok = rocketmq:stop_and_delete_supervised_client(ClientId). @@ -154,12 +157,15 @@ on_batch_query(_InstanceId, Query, _State) -> on_get_status(_InstanceId, #{client_id := ClientId}) -> case rocketmq_client_sup:find_client(ClientId) of - {ok, _Pid} -> - connected; + {ok, Pid} -> + status_result(rocketmq_client:get_status(Pid)); _ -> connecting end. +status_result(_Status = true) -> connected; +status_result(_Status) -> connecting. + %%======================================================================================== %% Helper fns %%======================================================================================== @@ -171,9 +177,10 @@ do_query( #{ templates := Templates, client_id := ClientId, + topic := RawTopic, topic_tokens := TopicTks, producers_opts := ProducerOpts, - config := #{topic := RawTopic, resource_opts := #{request_timeout := RequestTimeout}} + sync_timeout := RequestTimeout } = State ) -> ?TRACE( @@ -267,6 +274,8 @@ client_id(InstanceId) -> redact(Msg) -> emqx_utils:redact(Msg, fun is_sensitive_key/1). +is_sensitive_key(secret_key) -> + true; is_sensitive_key(security_token) -> true; is_sensitive_key(_) -> @@ -274,14 +283,14 @@ is_sensitive_key(_) -> make_producer_opts( #{ - username := Username, - password := Password, + access_key := AccessKey, + secret_key := SecretKey, security_token := SecurityToken, send_buffer := SendBuff, refresh_interval := RefreshInterval } ) -> - ACLInfo = acl_info(Username, Password, SecurityToken), + ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), #{ tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, @@ -290,17 +299,17 @@ make_producer_opts( acl_info(<<>>, <<>>, <<>>) -> #{}; -acl_info(Username, Password, <<>>) when is_binary(Username), is_binary(Password) -> +acl_info(AccessKey, SecretKey, <<>>) when is_binary(AccessKey), is_binary(SecretKey) -> #{ - access_key => Username, - secret_key => Password + access_key => AccessKey, + secret_key => SecretKey }; -acl_info(Username, Password, SecurityToken) when - is_binary(Username), is_binary(Password), is_binary(SecurityToken) +acl_info(AccessKey, SecretKey, SecurityToken) when + is_binary(AccessKey), is_binary(SecretKey), is_binary(SecurityToken) -> #{ - access_key => Username, - secret_key => Password, + access_key => AccessKey, + secret_key => SecretKey, security_token => SecurityToken }; acl_info(_, _, _) -> @@ -333,6 +342,3 @@ get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) -> ets:insert(ClientId, {TopicKey, Producers0}), Producers0 end. - -default_security_info() -> - #{username => <<>>, password => <<>>, security_token => <<>>}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index 8ea4429d0..90d90cb36 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -43,7 +43,7 @@ -export([connect/1]). %% Internal exports used to execute code with ecpool worker --export([do_get_status/2, worker_do_insert/3, do_async_reply/2]). +-export([do_get_status/1, worker_do_insert/3, do_async_reply/2]). -import(emqx_plugin_libs_rule, [str/1]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -304,12 +304,10 @@ on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> ), do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). -on_get_status(_InstanceId, #{pool_name := PoolName, resource_opts := ResourceOpts} = _State) -> - RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts), +on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> Health = emqx_resource_pool:health_check_workers( PoolName, - {?MODULE, do_get_status, [RequestTimeout]}, - RequestTimeout + {?MODULE, do_get_status, []} ), status_result(Health). @@ -328,9 +326,9 @@ connect(Options) -> Opts = proplists:get_value(options, Options, []), odbc:connect(ConnectStr, Opts). --spec do_get_status(connection_reference(), time_out()) -> Result :: boolean(). -do_get_status(Conn, RequestTimeout) -> - case execute(Conn, <<"SELECT 1">>, RequestTimeout) of +-spec do_get_status(connection_reference()) -> Result :: boolean(). +do_get_status(Conn) -> + case execute(Conn, <<"SELECT 1">>) of {selected, [[]], [{1}]} -> true; _ -> false end. @@ -444,6 +442,15 @@ worker_do_insert( {error, {unrecoverable_error, {invalid_request, Reason}}} end. +-spec execute(pid(), sql()) -> + updated_tuple() + | selected_tuple() + | [updated_tuple()] + | [selected_tuple()] + | {error, common_reason()}. +execute(Conn, SQL) -> + odbc:sql_query(Conn, str(SQL)). + -spec execute(pid(), sql(), time_out()) -> updated_tuple() | selected_tuple() diff --git a/rel/i18n/emqx_ee_bridge_rocketmq.hocon b/rel/i18n/emqx_ee_bridge_rocketmq.hocon index a545a7fca..e079220b6 100644 --- a/rel/i18n/emqx_ee_bridge_rocketmq.hocon +++ b/rel/i18n/emqx_ee_bridge_rocketmq.hocon @@ -33,7 +33,10 @@ local_topic.label: """Local Topic""" template.desc: -"""Template, the default value is empty. When this value is empty the whole message will be stored in the RocketMQ""" +"""Template, the default value is empty. When this value is empty the whole message will be stored in the RocketMQ.
+ The template can be any valid string with placeholders, example:
+ - ${id}, ${username}, ${clientid}, ${timestamp}
+ - {"id" : ${id}, "username" : ${username}}""" template.label: """Template""" diff --git a/rel/i18n/emqx_ee_connector_rocketmq.hocon b/rel/i18n/emqx_ee_connector_rocketmq.hocon index 672dcafce..d3d59a389 100644 --- a/rel/i18n/emqx_ee_connector_rocketmq.hocon +++ b/rel/i18n/emqx_ee_connector_rocketmq.hocon @@ -1,11 +1,23 @@ emqx_ee_connector_rocketmq { +access_key.desc: +"""RocketMQ server `accessKey`.""" + +access_key.label: +"""AccessKey""" + refresh_interval.desc: """RocketMQ Topic Route Refresh Interval.""" refresh_interval.label: """Topic Route Refresh Interval""" +secret_key.desc: +"""RocketMQ server `secretKey`.""" + +secret_key.label: +"""SecretKey""" + security_token.desc: """RocketMQ Server Security Token""" @@ -18,14 +30,20 @@ send_buffer.desc: send_buffer.label: """Send Buffer Size""" -server.desc: +servers.desc: """The IPv4 or IPv6 address or the hostname to connect to.
A host entry has the following form: `Host[:Port]`.
The RocketMQ default port 9876 is used if `[:Port]` is not specified.""" -server.label: +servers.label: """Server Host""" +sync_timeout.desc: +"""Timeout of RocketMQ driver synchronous call.""" + +sync_timeout.label: +"""Sync Timeout""" + topic.desc: """RocketMQ Topic""" diff --git a/rel/i18n/zh/emqx_ee_bridge_rocketmq.hocon b/rel/i18n/zh/emqx_ee_bridge_rocketmq.hocon index 924004361..445a54232 100644 --- a/rel/i18n/zh/emqx_ee_bridge_rocketmq.hocon +++ b/rel/i18n/zh/emqx_ee_bridge_rocketmq.hocon @@ -32,7 +32,10 @@ local_topic.label: """本地 Topic""" template.desc: -"""模板, 默认为空,为空时将会将整个消息转发给 RocketMQ""" +"""模板, 默认为空,为空时将会将整个消息转发给 RocketMQ。
+ 模板可以是任意带有占位符的合法字符串, 例如:
+ - ${id}, ${username}, ${clientid}, ${timestamp}
+ - {"id" : ${id}, "username" : ${username}}""" template.label: """模板""" diff --git a/rel/i18n/zh/emqx_ee_connector_rocketmq.hocon b/rel/i18n/zh/emqx_ee_connector_rocketmq.hocon index d32e6ea01..58a1f7ddb 100644 --- a/rel/i18n/zh/emqx_ee_connector_rocketmq.hocon +++ b/rel/i18n/zh/emqx_ee_connector_rocketmq.hocon @@ -1,11 +1,23 @@ emqx_ee_connector_rocketmq { +access_key.desc: +"""RocketMQ 服务器的 `accessKey`。""" + +access_key.label: +"""AccessKey""" + refresh_interval.desc: """RocketMQ 主题路由更新间隔。""" refresh_interval.label: """主题路由更新间隔""" +secret_key.desc: +"""RocketMQ 服务器的 `secretKey`。""" + +secret_key.label: +"""SecretKey""" + security_token.desc: """RocketMQ 服务器安全令牌""" @@ -18,14 +30,20 @@ send_buffer.desc: send_buffer.label: """发送消息的缓冲区大小""" -server.desc: +servers.desc: """将要连接的 IPv4 或 IPv6 地址,或者主机名。
主机名具有以下形式:`Host[:Port]`。
如果未指定 `[:Port]`,则使用 RocketMQ 默认端口 9876。""" -server.label: +servers.label: """服务器地址""" +sync_timeout.desc: +"""RocketMQ 驱动同步调用的超时时间。""" + +sync_timeout.label: +"""同步调用超时时间""" + topic.desc: """RocketMQ 主题""" diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 3a796821c..fec0d589c 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -202,9 +202,9 @@ for dep in ${CT_DEPS}; do done if [ "$ODBC_REQUEST" = 'yes' ]; then - INSTALL_ODBC="./scripts/install-odbc-driver.sh" + INSTALL_ODBC="./scripts/install-msodbc-driver.sh" else - INSTALL_ODBC="echo 'Driver msodbcsql driver not requested'" + INSTALL_ODBC="echo 'msodbc driver not requested'" fi F_OPTIONS="" diff --git a/scripts/install-odbc-driver.sh b/scripts/install-msodbc-driver.sh similarity index 100% rename from scripts/install-odbc-driver.sh rename to scripts/install-msodbc-driver.sh