From 20ba9d285fd85b1c1f4ca889aa954700a709154b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 5 Nov 2021 18:32:43 +0800 Subject: [PATCH 1/9] feat(exhook): expose headers for on_messages_publish hook --- apps/emqx/src/emqx_types.erl | 3 +- apps/emqx_exhook/priv/protos/exhook.proto | 25 ++++++++++ apps/emqx_exhook/src/emqx_exhook_handler.erl | 48 +++++++++++++++++-- .../emqx_exhook/test/emqx_exhook_demo_svr.erl | 11 +++-- 4 files changed, 79 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index ed17a59e4..4dc926274 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -192,7 +192,8 @@ username => username(), peerhost => peerhost(), properties => properties(), - atom() => term()}). + allow_publish => boolean(), + atom() => term()}). -type(banned() :: #banned{}). -type(deliver() :: {deliver, topic(), message()}). diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index 9f3fb4b88..ef9e7a843 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -358,6 +358,31 @@ message Message { bytes payload = 6; uint64 timestamp = 7; + + // The key of header can be: + // - username: + // * Readonly + // * The username of sender client + // * Value type: utf8 string + // - protocol: + // * Readonly + // * The protocol name of sender client + // * Value type: string enum with "mqtt", "mqtt-sn", ... + // - peerhost: + // * Readonly + // * The peerhost of sender client + // * Value type: ip address string + // - allow_publish: + // * Writable + // * Whether to allow the message to be published by emqx + // * Value type: string enum with "true", "false", default is "true" + // + // Notes: All header may be missing, which means that the message does not + // carry these headers. We can guarantee that clients coming from MQTT, + // MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will + // carry these headers, but there is no guarantee that messages published + // by other means will do, e.g. messages published by HTTP-API + map headers = 8; } message Property { diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index d3170ebad..eaf0cd0e0 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -257,17 +257,57 @@ clientinfo(ClientInfo = cn => maybe(maps:get(cn, ClientInfo, undefined)), dn => maybe(maps:get(dn, ClientInfo, undefined))}. -message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) -> +message(#message{id = Id, qos = Qos, from = From, topic = Topic, + payload = Payload, timestamp = Ts, headers = Headers}) -> #{node => stringfy(node()), id => emqx_guid:to_hexstr(Id), qos => Qos, from => stringfy(From), topic => Topic, payload => Payload, - timestamp => Ts}. + timestamp => Ts, + headers => headers(Headers) + }. -assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) -> - Message#message{qos = Qos, topic = Topic, payload = Payload}. +headers(undefined) -> + #{}; +headers(Headers) -> + Ls = [username, protocol, peerhost, allow_publish], + maps:fold( + fun + (_, undefined, Acc) -> + Acc; %% Ignore undefined value + (K, V, Acc) -> + case lists:member(K, Ls) of + true -> + Acc#{atom_to_binary(K) => bin(K, V)}; + _ -> + Acc + end + end, #{}, Headers). + +bin(K, V) when K == username; + K == protocol; + K == allow_publish -> + bin(V); +bin(peerhost, V) -> + bin(inet:ntoa(V)). + +bin(V) when is_binary(V) -> V; +bin(V) when is_atom(V) -> atom_to_binary(V); +bin(V) when is_list(V) -> iolist_to_binary(V). + +assign_to_message(InMessage = #{qos := Qos, topic := Topic, + payload := Payload}, Message) -> + NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload}, + enrich_header(maps:get(headers, InMessage, #{}), NMsg). + +enrich_header(Headers, Message) -> + AllowPub = case maps:get(<<"allow_publish">>, Headers, <<"true">>) of + <<"false">> -> false; + _ -> true + end, + emqx_message:set_header(allow_publish, AllowPub, Message). topicfilters(Tfs) when is_list(Tfs) -> [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index 0ced703d7..0bc4680c9 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -295,14 +295,14 @@ on_session_terminated(Req, Md) -> | {error, grpc_cowboy_h:error_response()}. on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + io:format(standard_error, "fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), %% some cases for testing case From of <<"baduser">> -> - NMsg = Msg#{qos => 0, + NMsg = deny(Msg#{qos => 0, topic => <<"">>, payload => <<"">> - }, + }), {ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md}; <<"gooduser">> -> @@ -314,6 +314,11 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> {ok, #{type => 'IGNORE'}, Md} end. +deny(Msg) -> + NHeader = maps:put(<<"allow_publish">>, <<"false">>, + maps:get(headers, Msg, #{})), + maps:put(headers, NHeader, Msg). + -spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. From 262fb13a7199765ecbd700f00cae700d26f177a1 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 8 Nov 2021 11:11:16 +0800 Subject: [PATCH 2/9] chore(exhook): fix diaylzer warnings --- apps/emqx_exhook/src/emqx_exhook_handler.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index eaf0cd0e0..bb2380e9b 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -269,8 +269,6 @@ message(#message{id = Id, qos = Qos, from = From, topic = Topic, headers => headers(Headers) }. -headers(undefined) -> - #{}; headers(Headers) -> Ls = [username, protocol, peerhost, allow_publish], maps:fold( From 2b7c311807429cae9138e18d625a8a791d62c92c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 8 Nov 2021 14:15:32 +0800 Subject: [PATCH 3/9] chore: upgrade grpc to 0.6.4 --- apps/emqx_exhook/rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config index 89dcb20a7..afdaad084 100644 --- a/apps/emqx_exhook/rebar.config +++ b/apps/emqx_exhook/rebar.config @@ -5,7 +5,7 @@ ]}. {deps, - [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} + [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}} ]}. {grpc, From 143c685452bf2b673899f8cd684b5b75a487baf6 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 8 Nov 2021 14:16:26 +0800 Subject: [PATCH 4/9] feat(exhook): expose process pool_size for grpc client --- apps/emqx_exhook/etc/emqx_exhook.conf | 6 ++++++ apps/emqx_exhook/src/emqx_exhook_mngr.erl | 11 +++++++++++ apps/emqx_exhook/src/emqx_exhook_server.erl | 11 ++++++++--- apps/emqx_exhook/src/emqx_exhook_sup.erl | 5 +++-- 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf index 8f3e25686..42bd04f19 100644 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -25,6 +25,12 @@ exhook { ## Value: false | Duration auto_reconnect = 60s + ## The process pool size for gRPC client + ## + ## Default: Equals cpu cores + ## Value: Integer + #pool_size = 16 + servers = [ # { name: "default" # url: "http://127.0.0.1:9000" diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index a982c6505..9df1e19ef 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -36,6 +36,8 @@ , server/1 , put_request_failed_action/1 , get_request_failed_action/0 + , put_pool_size/1 + , get_pool_size/0 ]). %% gen_server callbacks @@ -117,6 +119,9 @@ init([Servers, AutoReconnect, ReqOpts0]) -> put_request_failed_action( maps:get(request_failed_action, ReqOpts0, deny) ), + put_pool_size( + maps:get(pool_size, ReqOpts0, erlang:system_info(schedulers)) + ), %% Load the hook servers ReqOpts = maps:without([request_failed_action], ReqOpts0), @@ -291,6 +296,12 @@ put_request_failed_action(Val) -> get_request_failed_action() -> persistent_term:get({?APP, request_failed_action}). +put_pool_size(Val) -> + persistent_term:put({?APP, pool_size}, Val). + +get_pool_size() -> + persistent_term:get({?APP, pool_size}). + save(Name, ServerState) -> Saved = persistent_term:get(?APP, []), persistent_term:put(?APP, lists:reverse([Name | Saved])), diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index e667d967c..088720cbd 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -108,9 +108,10 @@ load(Name, Opts0, ReqOpts) -> %% @private channel_opts(Opts = #{url := URL}) -> + ClientOpts = #{pool_size => emqx_exhook_mngr:get_pool_size()}, case uri_string:parse(URL) of #{scheme := "http", host := Host, port := Port} -> - {format_http_uri("http", Host, Port), #{}}; + {format_http_uri("http", Host, Port), ClientOpts}; #{scheme := "https", host := Host, port := Port} -> SslOpts = case maps:get(ssl, Opts, undefined) of @@ -122,8 +123,12 @@ channel_opts(Opts = #{url := URL}) -> {keyfile, maps:get(keyfile, MapOpts, undefined)} ]) end, - {format_http_uri("https", Host, Port), - #{gun_opts => #{transport => ssl, transport_opts => SslOpts}}}; + NClientOpts = ClientOpts#{ + gun_opts => + #{transport => ssl, + transport_opts => SslOpts} + }, + {format_http_uri("https", Host, Port), NClientOpts}; _ -> error(bad_server_url) end. diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index ea03a54f9..ca8d7c856 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -54,7 +54,8 @@ auto_reconnect() -> request_options() -> #{timeout => env(request_timeout, 5000), - request_failed_action => env(request_failed_action, deny) + request_failed_action => env(request_failed_action, deny), + pool_size => env(pool_size, erlang:system_info(schedulers)) }. env(Key, Def) -> @@ -67,7 +68,7 @@ env(Key, Def) -> -spec start_grpc_client_channel( binary(), uri_string:uri_string(), - grpc_client:options()) -> {ok, pid()} | {error, term()}. + grpc_client_sup:options()) -> {ok, pid()} | {error, term()}. start_grpc_client_channel(Name, SvrAddr, Options) -> grpc_client_sup:create_channel_pool(Name, SvrAddr, Options). From f132d0948304bd6a27cb7210e4d2f9a9d7e55316 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 15 Nov 2021 11:15:34 +0800 Subject: [PATCH 5/9] chore(exhook): parse pool_size --- apps/emqx_exhook/src/emqx_exhook_schema.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/emqx_exhook/src/emqx_exhook_schema.erl b/apps/emqx_exhook/src/emqx_exhook_schema.erl index 9e988c6d8..21ca5c3f0 100644 --- a/apps/emqx_exhook/src/emqx_exhook_schema.erl +++ b/apps/emqx_exhook/src/emqx_exhook_schema.erl @@ -49,6 +49,10 @@ fields(exhook) -> sc(hoconsc:union([false, duration()]), #{ default => "60s" })} + , {pool_size, + sc(integer(), + #{ nullable => true + })} , {servers, sc(hoconsc:array(ref(servers)), #{default => []})} From e7ccd88719b83f178dc5d74383db74a70fc8d1ab Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 8 Nov 2021 15:05:54 +0800 Subject: [PATCH 6/9] test(props): cover messages headers --- apps/emqx_exhook/src/emqx_exhook_handler.erl | 14 +++++++++----- .../emqx_exhook/test/emqx_exhook_demo_svr.erl | 9 +++++++-- .../test/props/prop_exhook_hooks.erl | 19 +++++++++++++------ 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index bb2380e9b..cb0f2467e 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -49,6 +49,7 @@ %% Utils -export([ message/1 + , headers/1 , stringfy/1 , merge_responsed_bool/2 , merge_responsed_message/2 @@ -301,11 +302,14 @@ assign_to_message(InMessage = #{qos := Qos, topic := Topic, enrich_header(maps:get(headers, InMessage, #{}), NMsg). enrich_header(Headers, Message) -> - AllowPub = case maps:get(<<"allow_publish">>, Headers, <<"true">>) of - <<"false">> -> false; - _ -> true - end, - emqx_message:set_header(allow_publish, AllowPub, Message). + case maps:get(<<"allow_publish">>, Headers, undefined) of + <<"false">> -> + emqx_message:set_header(allow_publish, false, Message); + <<"true">> -> + emqx_message:set_header(allow_publish, true, Message); + _ -> + Message + end. topicfilters(Tfs) when is_list(Tfs) -> [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index 0bc4680c9..aa1d02a15 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -306,8 +306,8 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> {ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md}; <<"gooduser">> -> - NMsg = Msg#{topic => From, - payload => From}, + NMsg = allow(Msg#{topic => From, + payload => From}), {ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md}; _ -> @@ -319,6 +319,11 @@ deny(Msg) -> maps:get(headers, Msg, #{})), maps:put(headers, NHeader, Msg). +allow(Msg) -> + NHeader = maps:put(<<"allow_publish">>, <<"true">>, + maps:get(headers, Msg, #{})), + maps:put(headers, NHeader, Msg). + -spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index c7804346f..cbd7a2a2a 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -296,19 +296,24 @@ prop_message_publish() -> _ -> ExpectedOutMsg = case emqx_message:from(Msg) of <<"baduser">> -> - MsgMap = emqx_message:to_map(Msg), + MsgMap = #{headers := Headers} + = emqx_message:to_map(Msg), emqx_message:from_map( MsgMap#{qos => 0, topic => <<"">>, - payload => <<"">> + payload => <<"">>, + headers => maps:put(allow_publish, false, Headers) }); <<"gooduser">> = From -> - MsgMap = emqx_message:to_map(Msg), + MsgMap = #{headers := Headers} + = emqx_message:to_map(Msg), emqx_message:from_map( MsgMap#{topic => From, - payload => From + payload => From, + headers => maps:put(allow_publish, true, Headers) }); - _ -> Msg + _ -> + Msg end, ?assertEqual(ExpectedOutMsg, OutMsg), @@ -461,7 +466,9 @@ from_message(Msg) -> from => stringfy(emqx_message:from(Msg)), topic => emqx_message:topic(Msg), payload => emqx_message:payload(Msg), - timestamp => emqx_message:timestamp(Msg) + timestamp => emqx_message:timestamp(Msg), + headers => emqx_exhook_handler:headers( + emqx_message:get_headers(Msg)) }. %%-------------------------------------------------------------------- From 34cc8cc7f5f54f377f5b02213b3bf482cfa5e515 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 10 Nov 2021 18:40:40 +0800 Subject: [PATCH 7/9] chore: put the pool_size default value to avoid hot upgrade failure --- apps/emqx_exhook/src/emqx_exhook_mngr.erl | 4 +++- apps/emqx_exhook/test/emqx_exhook_demo_svr.erl | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index 9df1e19ef..d91652904 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -300,7 +300,9 @@ put_pool_size(Val) -> persistent_term:put({?APP, pool_size}, Val). get_pool_size() -> - persistent_term:get({?APP, pool_size}). + %% Avoid the scenario that the parameter is not set after + %% the hot upgrade completed. + persistent_term:get({?APP, pool_size}, erlang:system_info(schedulers)). save(Name, ServerState) -> Saved = persistent_term:get(?APP, []), diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index aa1d02a15..b1e3801b2 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -295,7 +295,7 @@ on_session_terminated(Req, Md) -> | {error, grpc_cowboy_h:error_response()}. on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> ?MODULE:in({?FUNCTION_NAME, Req}), - io:format(standard_error, "fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), %% some cases for testing case From of <<"baduser">> -> From 5d4604701cdcc59d67036ea2934eb48842222a61 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 15 Nov 2021 17:00:28 +0800 Subject: [PATCH 8/9] chore: fix elvis warnings --- apps/emqx_exhook/src/emqx_exhook_handler.erl | 17 +++++------- apps/emqx_exhook/src/emqx_exhook_mngr.erl | 2 +- apps/emqx_exhook/src/emqx_exhook_server.erl | 27 ++++++++++++-------- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index cb0f2467e..fea7e8b9a 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -62,6 +62,8 @@ , call_fold/3 ]). +-elvis([{elvis_style, god_modules, disable}]). + %%-------------------------------------------------------------------- %% Clients %%-------------------------------------------------------------------- @@ -340,11 +342,7 @@ merge_responsed_bool(_Req, #{type := 'IGNORE'}) -> ignore; merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) when is_boolean(NewBool) -> - NReq = Req#{result => NewBool}, - case Type of - 'CONTINUE' -> {ok, NReq}; - 'STOP_AND_RETURN' -> {stop, NReq} - end; + {ret(Type), Req#{result => NewBool}}; merge_responsed_bool(_Req, Resp) -> ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}), ignore. @@ -352,11 +350,10 @@ merge_responsed_bool(_Req, Resp) -> merge_responsed_message(_Req, #{type := 'IGNORE'}) -> ignore; merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) -> - NReq = Req#{message => NMessage}, - case Type of - 'CONTINUE' -> {ok, NReq}; - 'STOP_AND_RETURN' -> {stop, NReq} - end; + {ret(Type), Req#{message => NMessage}}; merge_responsed_message(_Req, Resp) -> ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}), ignore. + +ret('CONTINUE') -> ok; +ret('STOP_AND_RETURN') -> stop. diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index d91652904..cd2658f93 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -141,7 +141,7 @@ load_all_servers(Servers, ReqOpts) -> load_all_servers(Servers, ReqOpts, #{}, #{}). load_all_servers([], _Request, Waiting, Running) -> {Waiting, Running}; -load_all_servers([#{name := Name0} = Options0|More], ReqOpts, Waiting, Running) -> +load_all_servers([#{name := Name0} = Options0 | More], ReqOpts, Waiting, Running) -> Name = iolist_to_binary(Name0), Options = Options0#{name => Name}, {NWaiting, NRunning} = diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 088720cbd..b66b30a26 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -75,6 +75,8 @@ -dialyzer({nowarn_function, [inc_metrics/2]}). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + %%-------------------------------------------------------------------- %% Load/Unload APIs %%-------------------------------------------------------------------- @@ -178,16 +180,19 @@ resolve_hookspec(HookSpecs) when is_list(HookSpecs) -> case maps:get(name, HookSpec, undefined) of undefined -> Acc; Name0 -> - Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end, - case lists:member(Name, AvailableHooks) of - true -> - case lists:member(Name, MessageHooks) of - true -> - Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}}; - _ -> - Acc#{Name => #{}} - end; - _ -> error({unknown_hookpoint, Name}) + Name = try + binary_to_existing_atom(Name0, utf8) + catch T:R:_ -> {T,R} + end, + case {lists:member(Name, AvailableHooks), + lists:member(Name, MessageHooks)} of + {false, _} -> + error({unknown_hookpoint, Name}); + {true, false} -> + Acc#{Name => #{}}; + {true, true} -> + Acc#{Name => #{ + topics => maps:get(topics, HookSpec, [])}} end end end, #{}, HookSpecs). @@ -260,7 +265,7 @@ call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts, %% @private inc_metrics(IncFun, Name) when is_function(IncFun) -> %% BACKW: e4.2.0-e4.2.2 - {env, [Prefix|_]} = erlang:fun_info(IncFun, env), + {env, [Prefix | _]} = erlang:fun_info(IncFun, env), inc_metrics(Prefix, Name); inc_metrics(Prefix, Name) when is_list(Prefix) -> emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))). From 159ba635309faa4aac968a8fad49a895eb31a152 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 16 Nov 2021 09:16:26 +0800 Subject: [PATCH 9/9] chore: upgrade grpc-erl to 0.6.4 --- apps/emqx_gateway/rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_gateway/rebar.config b/apps/emqx_gateway/rebar.config index fe088d7d8..44f74eacf 100644 --- a/apps/emqx_gateway/rebar.config +++ b/apps/emqx_gateway/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [debug_info]}. {deps, [ - {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} + {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}} ]}. {plugins, [