test(props): cover messages headers
This commit is contained in:
parent
f132d09483
commit
e7ccd88719
|
@ -49,6 +49,7 @@
|
||||||
|
|
||||||
%% Utils
|
%% Utils
|
||||||
-export([ message/1
|
-export([ message/1
|
||||||
|
, headers/1
|
||||||
, stringfy/1
|
, stringfy/1
|
||||||
, merge_responsed_bool/2
|
, merge_responsed_bool/2
|
||||||
, merge_responsed_message/2
|
, merge_responsed_message/2
|
||||||
|
@ -301,11 +302,14 @@ assign_to_message(InMessage = #{qos := Qos, topic := Topic,
|
||||||
enrich_header(maps:get(headers, InMessage, #{}), NMsg).
|
enrich_header(maps:get(headers, InMessage, #{}), NMsg).
|
||||||
|
|
||||||
enrich_header(Headers, Message) ->
|
enrich_header(Headers, Message) ->
|
||||||
AllowPub = case maps:get(<<"allow_publish">>, Headers, <<"true">>) of
|
case maps:get(<<"allow_publish">>, Headers, undefined) of
|
||||||
<<"false">> -> false;
|
<<"false">> ->
|
||||||
_ -> true
|
emqx_message:set_header(allow_publish, false, Message);
|
||||||
end,
|
<<"true">> ->
|
||||||
emqx_message:set_header(allow_publish, AllowPub, Message).
|
emqx_message:set_header(allow_publish, true, Message);
|
||||||
|
_ ->
|
||||||
|
Message
|
||||||
|
end.
|
||||||
|
|
||||||
topicfilters(Tfs) when is_list(Tfs) ->
|
topicfilters(Tfs) when is_list(Tfs) ->
|
||||||
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
|
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
|
||||||
|
|
|
@ -306,8 +306,8 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
|
||||||
{ok, #{type => 'STOP_AND_RETURN',
|
{ok, #{type => 'STOP_AND_RETURN',
|
||||||
value => {message, NMsg}}, Md};
|
value => {message, NMsg}}, Md};
|
||||||
<<"gooduser">> ->
|
<<"gooduser">> ->
|
||||||
NMsg = Msg#{topic => From,
|
NMsg = allow(Msg#{topic => From,
|
||||||
payload => From},
|
payload => From}),
|
||||||
{ok, #{type => 'STOP_AND_RETURN',
|
{ok, #{type => 'STOP_AND_RETURN',
|
||||||
value => {message, NMsg}}, Md};
|
value => {message, NMsg}}, Md};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -319,6 +319,11 @@ deny(Msg) ->
|
||||||
maps:get(headers, Msg, #{})),
|
maps:get(headers, Msg, #{})),
|
||||||
maps:put(headers, NHeader, Msg).
|
maps:put(headers, NHeader, Msg).
|
||||||
|
|
||||||
|
allow(Msg) ->
|
||||||
|
NHeader = maps:put(<<"allow_publish">>, <<"true">>,
|
||||||
|
maps:get(headers, Msg, #{})),
|
||||||
|
maps:put(headers, NHeader, Msg).
|
||||||
|
|
||||||
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
|
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
|
||||||
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
|
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
|
||||||
| {error, grpc_cowboy_h:error_response()}.
|
| {error, grpc_cowboy_h:error_response()}.
|
||||||
|
|
|
@ -296,19 +296,24 @@ prop_message_publish() ->
|
||||||
_ ->
|
_ ->
|
||||||
ExpectedOutMsg = case emqx_message:from(Msg) of
|
ExpectedOutMsg = case emqx_message:from(Msg) of
|
||||||
<<"baduser">> ->
|
<<"baduser">> ->
|
||||||
MsgMap = emqx_message:to_map(Msg),
|
MsgMap = #{headers := Headers}
|
||||||
|
= emqx_message:to_map(Msg),
|
||||||
emqx_message:from_map(
|
emqx_message:from_map(
|
||||||
MsgMap#{qos => 0,
|
MsgMap#{qos => 0,
|
||||||
topic => <<"">>,
|
topic => <<"">>,
|
||||||
payload => <<"">>
|
payload => <<"">>,
|
||||||
|
headers => maps:put(allow_publish, false, Headers)
|
||||||
});
|
});
|
||||||
<<"gooduser">> = From ->
|
<<"gooduser">> = From ->
|
||||||
MsgMap = emqx_message:to_map(Msg),
|
MsgMap = #{headers := Headers}
|
||||||
|
= emqx_message:to_map(Msg),
|
||||||
emqx_message:from_map(
|
emqx_message:from_map(
|
||||||
MsgMap#{topic => From,
|
MsgMap#{topic => From,
|
||||||
payload => From
|
payload => From,
|
||||||
|
headers => maps:put(allow_publish, true, Headers)
|
||||||
});
|
});
|
||||||
_ -> Msg
|
_ ->
|
||||||
|
Msg
|
||||||
end,
|
end,
|
||||||
?assertEqual(ExpectedOutMsg, OutMsg),
|
?assertEqual(ExpectedOutMsg, OutMsg),
|
||||||
|
|
||||||
|
@ -461,7 +466,9 @@ from_message(Msg) ->
|
||||||
from => stringfy(emqx_message:from(Msg)),
|
from => stringfy(emqx_message:from(Msg)),
|
||||||
topic => emqx_message:topic(Msg),
|
topic => emqx_message:topic(Msg),
|
||||||
payload => emqx_message:payload(Msg),
|
payload => emqx_message:payload(Msg),
|
||||||
timestamp => emqx_message:timestamp(Msg)
|
timestamp => emqx_message:timestamp(Msg),
|
||||||
|
headers => emqx_exhook_handler:headers(
|
||||||
|
emqx_message:get_headers(Msg))
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue