feat(exhook): expose headers for on_messages_publish hook

This commit is contained in:
JianBo He 2021-11-05 18:32:43 +08:00 committed by JianBo He
parent ad4d3fc652
commit 6fb3ff1f9f
4 changed files with 78 additions and 7 deletions

View File

@ -358,6 +358,31 @@ message Message {
bytes payload = 6; bytes payload = 6;
uint64 timestamp = 7; uint64 timestamp = 7;
// The key of header can be:
// - username:
// * Readonly
// * The username of sender client
// * Value type: utf8 string
// - protocol:
// * Readonly
// * The protocol name of sender client
// * Value type: string enum with "mqtt", "mqtt-sn", ...
// - peerhost:
// * Readonly
// * The peerhost of sender client
// * Value type: ip address string
// - allow_publish:
// * Writable
// * Whether to allow the message to be published by emqx
// * Value type: string enum with "true", "false", default is "true"
//
// Notes: All header may be missing, which means that the message does not
// carry these headers. We can guarantee that clients coming from MQTT,
// MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will
// carry these headers, but there is no guarantee that messages published
// by other means will do, e.g. messages published by HTTP-API
map<string, string> headers = 8;
} }
message Property { message Property {

View File

@ -258,17 +258,57 @@ clientinfo(ClientInfo =
cn => maybe(maps:get(cn, ClientInfo, undefined)), cn => maybe(maps:get(cn, ClientInfo, undefined)),
dn => maybe(maps:get(dn, ClientInfo, undefined))}. dn => maybe(maps:get(dn, ClientInfo, undefined))}.
message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) -> message(#message{id = Id, qos = Qos, from = From, topic = Topic,
payload = Payload, timestamp = Ts, headers = Headers}) ->
#{node => stringfy(node()), #{node => stringfy(node()),
id => emqx_guid:to_hexstr(Id), id => emqx_guid:to_hexstr(Id),
qos => Qos, qos => Qos,
from => stringfy(From), from => stringfy(From),
topic => Topic, topic => Topic,
payload => Payload, payload => Payload,
timestamp => Ts}. timestamp => Ts,
headers => headers(Headers)
}.
assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) -> headers(undefined) ->
Message#message{qos = Qos, topic = Topic, payload = Payload}. #{};
headers(Headers) ->
Ls = [username, protocol, peerhost, allow_publish],
maps:fold(
fun
(_, undefined, Acc) ->
Acc; %% Ignore undefined value
(K, V, Acc) ->
case lists:member(K, Ls) of
true ->
Acc#{atom_to_binary(K) => bin(K, V)};
_ ->
Acc
end
end, #{}, Headers).
bin(K, V) when K == username;
K == protocol;
K == allow_publish ->
bin(V);
bin(peerhost, V) ->
bin(inet:ntoa(V)).
bin(V) when is_binary(V) -> V;
bin(V) when is_atom(V) -> atom_to_binary(V);
bin(V) when is_list(V) -> iolist_to_binary(V).
assign_to_message(InMessage = #{qos := Qos, topic := Topic,
payload := Payload}, Message) ->
NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload},
enrich_header(maps:get(headers, InMessage, #{}), NMsg).
enrich_header(Headers, Message) ->
AllowPub = case maps:get(<<"allow_publish">>, Headers, <<"true">>) of
<<"false">> -> false;
_ -> true
end,
emqx_message:set_header(allow_publish, AllowPub, Message).
topicfilters(Tfs) when is_list(Tfs) -> topicfilters(Tfs) when is_list(Tfs) ->
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].

View File

@ -295,14 +295,14 @@ on_session_terminated(Req, Md) ->
| {error, grpc_cowboy_h:error_response()}. | {error, grpc_cowboy_h:error_response()}.
on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}), ?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), io:format(standard_error, "fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
%% some cases for testing %% some cases for testing
case From of case From of
<<"baduser">> -> <<"baduser">> ->
NMsg = Msg#{qos => 0, NMsg = deny(Msg#{qos => 0,
topic => <<"">>, topic => <<"">>,
payload => <<"">> payload => <<"">>
}, }),
{ok, #{type => 'STOP_AND_RETURN', {ok, #{type => 'STOP_AND_RETURN',
value => {message, NMsg}}, Md}; value => {message, NMsg}}, Md};
<<"gooduser">> -> <<"gooduser">> ->
@ -314,6 +314,11 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
{ok, #{type => 'IGNORE'}, Md} {ok, #{type => 'IGNORE'}, Md}
end. end.
deny(Msg) ->
NHeader = maps:put(<<"allow_publish">>, <<"false">>,
maps:get(headers, Msg, #{})),
maps:put(headers, NHeader, Msg).
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata()) -spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}. | {error, grpc_cowboy_h:error_response()}.

View File

@ -193,6 +193,7 @@
username => username(), username => username(),
peerhost => peerhost(), peerhost => peerhost(),
properties => properties(), properties => properties(),
allow_publish => boolean(),
atom() => term()}). atom() => term()}).
-type(banned() :: #banned{}). -type(banned() :: #banned{}).