diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 818087c37..231e145e0 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -19,6 +19,7 @@ File format: management HTTPS listeners. [#8129] - Add message republish supports using placeholder variables to specify QoS and Retain values. Set `${qos}` and `${flags.retain}` use the original QoS & Retain flag. - Add supports specifying the network interface address of the cluster listener & rcp call listener. Specify `0.0.0.0` use all network interfaces, or a particular network interface IP address. +- ExHook supports to customize the socket parameters for gRPC client. [#8314] ### Bug fixes @@ -29,6 +30,8 @@ File format: information to fill them. [#8280] - Fixed issue in Lua hook that didn't prevent a topic from being subscribed to. [#8288] +- Ensuring that exhook dispatches the client events are sequential. [#8311] +- Ensure start dashboard ok event if default_username is missing. ## v4.3.15 diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf index 70dba4e06..2cffbf528 100644 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -53,3 +53,20 @@ exhook.server.default.url = http://127.0.0.1:9000 #exhook.server.default.ssl.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem #exhook.server.default.ssl.certfile = {{ platform_etc_dir }}/certs/cert.pem #exhook.server.default.ssl.keyfile = {{ platform_etc_dir }}/certs/key.pem + +## Enables/disables periodic transmission on a connected socket when no other data is exchanged. +## If the other end does not respond, the connection is considered broken and an error message is sent to the controlling process. +## +## Default: true +#exhook.server.default.socket_options.keepalive = true + +## If true, option TCP_NODELAY is turned on for the socket, which means that also small amounts of data are sent immediately. +## +## Default: true +#exhook.server.default.socket_options.nodelay = true + +## The minimum size of the receive buffer to use for the socket. +#exhook.server.default.socket_options.recbuf = 64KB + +## The minimum size of the send buffer to use for the socket. +#exhook.server.default.socket_options.sndbuf = 16KB \ No newline at end of file diff --git a/apps/emqx_exhook/priv/emqx_exhook.schema b/apps/emqx_exhook/priv/emqx_exhook.schema index 277e05f0c..3612ede85 100644 --- a/apps/emqx_exhook/priv/emqx_exhook.schema +++ b/apps/emqx_exhook/priv/emqx_exhook.schema @@ -51,12 +51,42 @@ end}. {datatype, string} ]}. +{mapping, "exhook.server.$name.socket_options.keepalive", "emqx_exhook.servers", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "exhook.server.$name.socket_options.nodelay", "emqx_exhook.servers", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "exhook.server.$name.socket_options.recbuf", "emqx_exhook.servers", [ + {datatype, bytesize} +]}. + +{mapping, "exhook.server.$name.socket_options.sndbuf", "emqx_exhook.servers", [ + {datatype, bytesize} +]}. + {translation, "emqx_exhook.servers", fun(Conf) -> Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + + MkSockOpts = fun(Prefix, Conf) -> + {socket_options, + Filter([{Opt, + cuttlefish:conf_get(Prefix ++ ".socket_options." ++ atom_to_list(Opt), + Conf, + undefined)} + || Opt <- [keepalive, nodelay, recbuf, sndbuf]])} + end, + ServerOptions = fun(Prefix) -> case http_uri:parse(cuttlefish:conf_get(Prefix ++ ".url", Conf)) of {ok, {http, _, Host, Port, _, _}} -> - [{scheme, http}, {host, Host}, {port, Port}]; + [{scheme, http}, {host, Host}, {port, Port}, + MkSockOpts(Prefix, Conf) + ]; {ok, {https, _, Host, Port, _, _}} -> [{scheme, https}, {host, Host}, {port, Port}, {ssl_options, @@ -64,10 +94,13 @@ end}. {certfile, cuttlefish:conf_get(Prefix ++ ".ssl.certfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get(Prefix ++ ".ssl.keyfile", Conf, undefined)}, {cacertfile, cuttlefish:conf_get(Prefix ++ ".ssl.cacertfile", Conf, undefined)} - ])}]; + ])}, + MkSockOpts(Prefix, Conf) + ]; _ -> error(invalid_server_options) end end, + [{list_to_atom(Name), ServerOptions("exhook.server." ++ Name)} || {["exhook", "server", Name, "url"], _} <- cuttlefish_variable:filter_by_prefix("exhook.server", Conf)] end}. diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index 2004230be..3a51ad3ec 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, - {vsn, "4.4.1"}, + {vsn, "4.4.2"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index 5a81af051..ea57059a6 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -1,7 +1,9 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.0", + [{"4.4.1", + [{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}]}, + {"4.4.0", [{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, {load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, @@ -9,7 +11,9 @@ {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, {update, emqx_exhook_mngr, {advanced, ["4.4.0"]}}]}, {<<".*">>,[]}], - [{"4.4.0", + [{"4.4.1", + [{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}]}, + {"4.4.0", [{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, {load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 7c1d0ff69..9e18c043b 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -126,6 +126,7 @@ channel_opts(Opts) -> Host = proplists:get_value(host, Opts), Port = proplists:get_value(port, Opts), SvrAddr = format_http_uri(Scheme, Host, Port), + SockOpts = proplists:get_value(socket_options, Opts), ClientOpts = case Scheme of https -> SslOpts = lists:keydelete( @@ -135,8 +136,10 @@ channel_opts(Opts) -> ), #{gun_opts => #{transport => ssl, - transport_opts => SslOpts}}; - _ -> #{} + transport_opts => SockOpts ++ SslOpts}}; + _ -> + #{gun_opts => + #{transport_opts => SockOpts}} end, NClientOpts = ClientOpts#{pool_size => emqx_exhook_mngr:get_pool_size()}, {SvrAddr, NClientOpts}. @@ -277,7 +280,7 @@ match_topic_filter(TopicName, TopicFilter) -> -spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}. do_call(ChannName, Fun, Req, ReqOpts) -> NReq = Req#{meta => emqx_exhook:request_meta()}, - Options = ReqOpts#{channel => ChannName}, + Options = ReqOpts#{channel => ChannName, key_dispatch => key_dispatch(NReq)}, ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, NReq, Options]), case catch apply(?PB_CLIENT_MOD, Fun, [NReq, Options]) of {ok, Resp, Metadata} -> @@ -335,3 +338,13 @@ available_hooks() -> 'session.created', 'session.subscribed', 'session.unsubscribed', 'session.resumed', 'session.discarded', 'session.takeovered', 'session.terminated' | message_hooks()]. + +%% @doc Get dispatch_key for each request +key_dispatch(_Req = #{clientinfo := #{clientid := ClientId}}) -> + ClientId; +key_dispatch(_Req = #{conninfo := #{clientid := ClientId}}) -> + ClientId; +key_dispatch(_Req = #{message := #{from := From}}) -> + From; +key_dispatch(_Req) -> + self(). diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index daa2e25b3..a8bd34b5d 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -46,7 +46,8 @@ set_special_cfgs(emqx) -> application:set_env(emqx, allow_anonymous, false), application:set_env(emqx, enable_acl_cache, false), application:set_env(emqx, plugins_loaded_file, undefined), - application:set_env(emqx, modules_loaded_file, undefined); + application:set_env(emqx, modules_loaded_file, undefined), + application:set_env(ekka, cluster_name, ?OTHER_CLUSTER_NAME_ATOM); set_special_cfgs(emqx_exhook) -> ok. diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index e74b1ea0a..fbfc70d9c 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -140,13 +140,13 @@ list_nodes() -> lookup_node(Node) -> node_info(Node). node_info(Node) when Node =:= node() -> - Memory = emqx_vm:get_memory(), + {UsedRatio, Total} = get_sys_memory(), Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]), BrokerInfo = emqx_sys:info(), Info#{node => node(), otp_release => iolist_to_binary(otp_rel()), - memory_total => proplists:get_value(allocated, Memory), - memory_used => proplists:get_value(used, Memory), + memory_total => Total, + memory_used => erlang:round(Total * UsedRatio), process_available => erlang:system_info(process_limit), process_used => erlang:system_info(process_count), max_fds => proplists:get_value(max_fds, @@ -159,6 +159,14 @@ node_info(Node) when Node =:= node() -> node_info(Node) -> rpc_call(Node, node_info, [Node]). +get_sys_memory() -> + case os:type() of + {unix, linux} -> + load_ctl:get_sys_memory(); + _ -> + {0, 0} + end. + stopped_node_info(Node) -> #{name => Node, node_status => 'Stopped'}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index ae82d56c9..d665a0c96 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -175,7 +175,8 @@ on_action_republish(_Selected, Envs = #{ }) -> ?LOG(error, "[republish] recursively republish detected, msg topic: ~p, target topic: ~p", [Topic, ?bound_v('TargetTopic', Envs)]), - emqx_rule_metrics:inc_actions_error(?bound_v('Id', Envs)); + emqx_rule_metrics:inc_actions_error(?bound_v('Id', Envs)), + {badact, recursively_republish}; on_action_republish(Selected, _Envs = #{ qos := QoS, flags := Flags, timestamp := Timestamp, diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index aa0face38..ae68e79e1 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -268,13 +268,16 @@ on_action_data_to_webserver(Selected, _Envs = emqx_rule_metrics:inc_actions_success(Id); {ok, StatusCode, _} -> ?LOG(warning, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]), - emqx_rule_metrics:inc_actions_error(Id); + emqx_rule_metrics:inc_actions_error(Id), + {badact, StatusCode}; {ok, StatusCode, _, _} -> ?LOG(warning, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]), - emqx_rule_metrics:inc_actions_error(Id); + emqx_rule_metrics:inc_actions_error(Id), + {badact, StatusCode}; {error, Reason} -> ?LOG(error, "HTTP request failed path: ~p error: ~p", [NPath, Reason]), - emqx_rule_metrics:inc_actions_error(Id) + emqx_rule_metrics:inc_actions_error(Id), + {badact, Reason} end. format_msg([], Data) -> diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index 5e5771aae..7961f91ec 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.4.5-beta.2"}). +-define(EMQX_RELEASE, {opensource, "4.4.5-beta.3"}). -else. diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl index a4504fe29..a76ed9cff 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -22,6 +22,7 @@ -include("emqx_dashboard.hrl"). -include_lib("emqx/include/logger.hrl"). +-define(DEFAULT_PASSWORD, <<"public">>). -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). @@ -206,11 +207,15 @@ is_valid_pwd(<>, Password) -> %%-------------------------------------------------------------------- init([]) -> - %% Add default admin user - {ok, _} = mnesia:subscribe({table, mqtt_admin, simple}), - PasswordHash = ensure_default_user_in_db(binenv(default_user_username)), - ok = ensure_default_user_passwd_hashed_in_pt(PasswordHash), - ok = maybe_warn_default_pwd(), + case binenv(default_user_username) of + <<>> -> ok; + UserName -> + %% Add default admin user + {ok, _} = mnesia:subscribe({table, mqtt_admin, simple}), + PasswordHash = ensure_default_user_in_db(UserName), + ok = ensure_default_user_passwd_hashed_in_pt(PasswordHash), + ok = maybe_warn_default_pwd() + end, {ok, state}. handle_call(_Req, _From, State) -> @@ -220,7 +225,7 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({mnesia_table_event, {write, Admin, _}}, State) -> - %% the password is chagned from another node, sync it to persistent_term + %% the password is changed from another node, sync it to persistent_term #mqtt_admin{username = Username, password = HashedPassword} = Admin, case binenv(default_user_username) of Username -> @@ -258,6 +263,7 @@ salt() -> binenv(Key) -> iolist_to_binary(application:get_env(emqx_dashboard, Key, <<>>)). +ensure_default_user_in_db(<<>>) -> <<>>; ensure_default_user_in_db(Username) -> F = fun() -> @@ -279,12 +285,9 @@ ensure_default_user_in_db(Username) -> initial_default_user_passwd_hashed() -> case get_default_user_passwd_hashed_from_pt() of Empty when ?EMPTY_KEY(Empty) -> - %% in case it's not set yet case binenv(default_user_passwd) of - Empty when ?EMPTY_KEY(Empty) -> - error({missing_configuration, default_user_passwd}); - Pwd -> - hash(Pwd) + Empty when ?EMPTY_KEY(Empty) -> hash(?DEFAULT_PASSWORD); + Pwd -> hash(Pwd) end; PwdHash -> PwdHash @@ -300,7 +303,7 @@ get_default_user_passwd_hashed_from_pt() -> persistent_term:get({?MODULE, default_user_passwd_hashed}, <<>>). maybe_warn_default_pwd() -> - case is_valid_pwd(initial_default_user_passwd_hashed(), <<"public">>) of + case is_valid_pwd(initial_default_user_passwd_hashed(), ?DEFAULT_PASSWORD) of true -> ?LOG(warning, "[Dashboard] Using default password for dashboard 'admin' user. " diff --git a/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl index b51d29291..73f4be5be 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl @@ -45,9 +45,9 @@ load(RawRules) -> {PubRules, SubRules} = compile(RawRules), - emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), - emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), - emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}). + emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000), + emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000), + emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000). rewrite_subscribe(ClientInfo, _Properties, TopicFilters, Rules) -> Binds = fill_client_binds(ClientInfo), diff --git a/rebar.config b/rebar.config index 6d286552e..7aa72ebc9 100644 --- a/rebar.config +++ b/rebar.config @@ -39,12 +39,13 @@ {deps, [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps + , {redbug, "2.0.7"} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.2"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.5"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.6"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.10"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.1"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}} @@ -61,7 +62,7 @@ , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}} , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} - , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.5"}}} + , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}} ]}. {xref_ignores, diff --git a/rebar.config.erl b/rebar.config.erl index 5dcfb141e..dd627bc71 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -250,6 +250,7 @@ relx_apps(ReleaseType) -> , inets , compiler , runtime_tools + , redbug , cuttlefish , emqx , {mnesia, load}