diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index ed17a59e4..4dc926274 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -192,7 +192,8 @@ username => username(), peerhost => peerhost(), properties => properties(), - atom() => term()}). + allow_publish => boolean(), + atom() => term()}). -type(banned() :: #banned{}). -type(deliver() :: {deliver, topic(), message()}). diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf index 8f3e25686..42bd04f19 100644 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -25,6 +25,12 @@ exhook { ## Value: false | Duration auto_reconnect = 60s + ## The process pool size for gRPC client + ## + ## Default: Equals cpu cores + ## Value: Integer + #pool_size = 16 + servers = [ # { name: "default" # url: "http://127.0.0.1:9000" diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index 9f3fb4b88..ef9e7a843 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -358,6 +358,31 @@ message Message { bytes payload = 6; uint64 timestamp = 7; + + // The key of header can be: + // - username: + // * Readonly + // * The username of sender client + // * Value type: utf8 string + // - protocol: + // * Readonly + // * The protocol name of sender client + // * Value type: string enum with "mqtt", "mqtt-sn", ... + // - peerhost: + // * Readonly + // * The peerhost of sender client + // * Value type: ip address string + // - allow_publish: + // * Writable + // * Whether to allow the message to be published by emqx + // * Value type: string enum with "true", "false", default is "true" + // + // Notes: All header may be missing, which means that the message does not + // carry these headers. We can guarantee that clients coming from MQTT, + // MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will + // carry these headers, but there is no guarantee that messages published + // by other means will do, e.g. messages published by HTTP-API + map headers = 8; } message Property { diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config index 89dcb20a7..afdaad084 100644 --- a/apps/emqx_exhook/rebar.config +++ b/apps/emqx_exhook/rebar.config @@ -5,7 +5,7 @@ ]}. {deps, - [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} + [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}} ]}. {grpc, diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index d3170ebad..fea7e8b9a 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -49,6 +49,7 @@ %% Utils -export([ message/1 + , headers/1 , stringfy/1 , merge_responsed_bool/2 , merge_responsed_message/2 @@ -61,6 +62,8 @@ , call_fold/3 ]). +-elvis([{elvis_style, god_modules, disable}]). + %%-------------------------------------------------------------------- %% Clients %%-------------------------------------------------------------------- @@ -257,17 +260,58 @@ clientinfo(ClientInfo = cn => maybe(maps:get(cn, ClientInfo, undefined)), dn => maybe(maps:get(dn, ClientInfo, undefined))}. -message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) -> +message(#message{id = Id, qos = Qos, from = From, topic = Topic, + payload = Payload, timestamp = Ts, headers = Headers}) -> #{node => stringfy(node()), id => emqx_guid:to_hexstr(Id), qos => Qos, from => stringfy(From), topic => Topic, payload => Payload, - timestamp => Ts}. + timestamp => Ts, + headers => headers(Headers) + }. -assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) -> - Message#message{qos = Qos, topic = Topic, payload = Payload}. +headers(Headers) -> + Ls = [username, protocol, peerhost, allow_publish], + maps:fold( + fun + (_, undefined, Acc) -> + Acc; %% Ignore undefined value + (K, V, Acc) -> + case lists:member(K, Ls) of + true -> + Acc#{atom_to_binary(K) => bin(K, V)}; + _ -> + Acc + end + end, #{}, Headers). + +bin(K, V) when K == username; + K == protocol; + K == allow_publish -> + bin(V); +bin(peerhost, V) -> + bin(inet:ntoa(V)). + +bin(V) when is_binary(V) -> V; +bin(V) when is_atom(V) -> atom_to_binary(V); +bin(V) when is_list(V) -> iolist_to_binary(V). + +assign_to_message(InMessage = #{qos := Qos, topic := Topic, + payload := Payload}, Message) -> + NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload}, + enrich_header(maps:get(headers, InMessage, #{}), NMsg). + +enrich_header(Headers, Message) -> + case maps:get(<<"allow_publish">>, Headers, undefined) of + <<"false">> -> + emqx_message:set_header(allow_publish, false, Message); + <<"true">> -> + emqx_message:set_header(allow_publish, true, Message); + _ -> + Message + end. topicfilters(Tfs) when is_list(Tfs) -> [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. @@ -298,11 +342,7 @@ merge_responsed_bool(_Req, #{type := 'IGNORE'}) -> ignore; merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) when is_boolean(NewBool) -> - NReq = Req#{result => NewBool}, - case Type of - 'CONTINUE' -> {ok, NReq}; - 'STOP_AND_RETURN' -> {stop, NReq} - end; + {ret(Type), Req#{result => NewBool}}; merge_responsed_bool(_Req, Resp) -> ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}), ignore. @@ -310,11 +350,10 @@ merge_responsed_bool(_Req, Resp) -> merge_responsed_message(_Req, #{type := 'IGNORE'}) -> ignore; merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) -> - NReq = Req#{message => NMessage}, - case Type of - 'CONTINUE' -> {ok, NReq}; - 'STOP_AND_RETURN' -> {stop, NReq} - end; + {ret(Type), Req#{message => NMessage}}; merge_responsed_message(_Req, Resp) -> ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}), ignore. + +ret('CONTINUE') -> ok; +ret('STOP_AND_RETURN') -> stop. diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index a982c6505..cd2658f93 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -36,6 +36,8 @@ , server/1 , put_request_failed_action/1 , get_request_failed_action/0 + , put_pool_size/1 + , get_pool_size/0 ]). %% gen_server callbacks @@ -117,6 +119,9 @@ init([Servers, AutoReconnect, ReqOpts0]) -> put_request_failed_action( maps:get(request_failed_action, ReqOpts0, deny) ), + put_pool_size( + maps:get(pool_size, ReqOpts0, erlang:system_info(schedulers)) + ), %% Load the hook servers ReqOpts = maps:without([request_failed_action], ReqOpts0), @@ -136,7 +141,7 @@ load_all_servers(Servers, ReqOpts) -> load_all_servers(Servers, ReqOpts, #{}, #{}). load_all_servers([], _Request, Waiting, Running) -> {Waiting, Running}; -load_all_servers([#{name := Name0} = Options0|More], ReqOpts, Waiting, Running) -> +load_all_servers([#{name := Name0} = Options0 | More], ReqOpts, Waiting, Running) -> Name = iolist_to_binary(Name0), Options = Options0#{name => Name}, {NWaiting, NRunning} = @@ -291,6 +296,14 @@ put_request_failed_action(Val) -> get_request_failed_action() -> persistent_term:get({?APP, request_failed_action}). +put_pool_size(Val) -> + persistent_term:put({?APP, pool_size}, Val). + +get_pool_size() -> + %% Avoid the scenario that the parameter is not set after + %% the hot upgrade completed. + persistent_term:get({?APP, pool_size}, erlang:system_info(schedulers)). + save(Name, ServerState) -> Saved = persistent_term:get(?APP, []), persistent_term:put(?APP, lists:reverse([Name | Saved])), diff --git a/apps/emqx_exhook/src/emqx_exhook_schema.erl b/apps/emqx_exhook/src/emqx_exhook_schema.erl index 9e988c6d8..21ca5c3f0 100644 --- a/apps/emqx_exhook/src/emqx_exhook_schema.erl +++ b/apps/emqx_exhook/src/emqx_exhook_schema.erl @@ -49,6 +49,10 @@ fields(exhook) -> sc(hoconsc:union([false, duration()]), #{ default => "60s" })} + , {pool_size, + sc(integer(), + #{ nullable => true + })} , {servers, sc(hoconsc:array(ref(servers)), #{default => []})} diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index e667d967c..b66b30a26 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -75,6 +75,8 @@ -dialyzer({nowarn_function, [inc_metrics/2]}). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + %%-------------------------------------------------------------------- %% Load/Unload APIs %%-------------------------------------------------------------------- @@ -108,9 +110,10 @@ load(Name, Opts0, ReqOpts) -> %% @private channel_opts(Opts = #{url := URL}) -> + ClientOpts = #{pool_size => emqx_exhook_mngr:get_pool_size()}, case uri_string:parse(URL) of #{scheme := "http", host := Host, port := Port} -> - {format_http_uri("http", Host, Port), #{}}; + {format_http_uri("http", Host, Port), ClientOpts}; #{scheme := "https", host := Host, port := Port} -> SslOpts = case maps:get(ssl, Opts, undefined) of @@ -122,8 +125,12 @@ channel_opts(Opts = #{url := URL}) -> {keyfile, maps:get(keyfile, MapOpts, undefined)} ]) end, - {format_http_uri("https", Host, Port), - #{gun_opts => #{transport => ssl, transport_opts => SslOpts}}}; + NClientOpts = ClientOpts#{ + gun_opts => + #{transport => ssl, + transport_opts => SslOpts} + }, + {format_http_uri("https", Host, Port), NClientOpts}; _ -> error(bad_server_url) end. @@ -173,16 +180,19 @@ resolve_hookspec(HookSpecs) when is_list(HookSpecs) -> case maps:get(name, HookSpec, undefined) of undefined -> Acc; Name0 -> - Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end, - case lists:member(Name, AvailableHooks) of - true -> - case lists:member(Name, MessageHooks) of - true -> - Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}}; - _ -> - Acc#{Name => #{}} - end; - _ -> error({unknown_hookpoint, Name}) + Name = try + binary_to_existing_atom(Name0, utf8) + catch T:R:_ -> {T,R} + end, + case {lists:member(Name, AvailableHooks), + lists:member(Name, MessageHooks)} of + {false, _} -> + error({unknown_hookpoint, Name}); + {true, false} -> + Acc#{Name => #{}}; + {true, true} -> + Acc#{Name => #{ + topics => maps:get(topics, HookSpec, [])}} end end end, #{}, HookSpecs). @@ -255,7 +265,7 @@ call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts, %% @private inc_metrics(IncFun, Name) when is_function(IncFun) -> %% BACKW: e4.2.0-e4.2.2 - {env, [Prefix|_]} = erlang:fun_info(IncFun, env), + {env, [Prefix | _]} = erlang:fun_info(IncFun, env), inc_metrics(Prefix, Name); inc_metrics(Prefix, Name) when is_list(Prefix) -> emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))). diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index ea03a54f9..ca8d7c856 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -54,7 +54,8 @@ auto_reconnect() -> request_options() -> #{timeout => env(request_timeout, 5000), - request_failed_action => env(request_failed_action, deny) + request_failed_action => env(request_failed_action, deny), + pool_size => env(pool_size, erlang:system_info(schedulers)) }. env(Key, Def) -> @@ -67,7 +68,7 @@ env(Key, Def) -> -spec start_grpc_client_channel( binary(), uri_string:uri_string(), - grpc_client:options()) -> {ok, pid()} | {error, term()}. + grpc_client_sup:options()) -> {ok, pid()} | {error, term()}. start_grpc_client_channel(Name, SvrAddr, Options) -> grpc_client_sup:create_channel_pool(Name, SvrAddr, Options). diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index 0ced703d7..b1e3801b2 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -299,21 +299,31 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> %% some cases for testing case From of <<"baduser">> -> - NMsg = Msg#{qos => 0, + NMsg = deny(Msg#{qos => 0, topic => <<"">>, payload => <<"">> - }, + }), {ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md}; <<"gooduser">> -> - NMsg = Msg#{topic => From, - payload => From}, + NMsg = allow(Msg#{topic => From, + payload => From}), {ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md}; _ -> {ok, #{type => 'IGNORE'}, Md} end. +deny(Msg) -> + NHeader = maps:put(<<"allow_publish">>, <<"false">>, + maps:get(headers, Msg, #{})), + maps:put(headers, NHeader, Msg). + +allow(Msg) -> + NHeader = maps:put(<<"allow_publish">>, <<"true">>, + maps:get(headers, Msg, #{})), + maps:put(headers, NHeader, Msg). + -spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index c7804346f..cbd7a2a2a 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -296,19 +296,24 @@ prop_message_publish() -> _ -> ExpectedOutMsg = case emqx_message:from(Msg) of <<"baduser">> -> - MsgMap = emqx_message:to_map(Msg), + MsgMap = #{headers := Headers} + = emqx_message:to_map(Msg), emqx_message:from_map( MsgMap#{qos => 0, topic => <<"">>, - payload => <<"">> + payload => <<"">>, + headers => maps:put(allow_publish, false, Headers) }); <<"gooduser">> = From -> - MsgMap = emqx_message:to_map(Msg), + MsgMap = #{headers := Headers} + = emqx_message:to_map(Msg), emqx_message:from_map( MsgMap#{topic => From, - payload => From + payload => From, + headers => maps:put(allow_publish, true, Headers) }); - _ -> Msg + _ -> + Msg end, ?assertEqual(ExpectedOutMsg, OutMsg), @@ -461,7 +466,9 @@ from_message(Msg) -> from => stringfy(emqx_message:from(Msg)), topic => emqx_message:topic(Msg), payload => emqx_message:payload(Msg), - timestamp => emqx_message:timestamp(Msg) + timestamp => emqx_message:timestamp(Msg), + headers => emqx_exhook_handler:headers( + emqx_message:get_headers(Msg)) }. %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/rebar.config b/apps/emqx_gateway/rebar.config index fe088d7d8..44f74eacf 100644 --- a/apps/emqx_gateway/rebar.config +++ b/apps/emqx_gateway/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [debug_info]}. {deps, [ - {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} + {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}} ]}. {plugins, [