diff --git a/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl b/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl index 53ca022bb..1d89f237a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl @@ -71,8 +71,8 @@ subscribe(_Bindings, Params) -> publish(_Bindings, Params) -> logger:debug("API publish Params:~p", [Params]), - {ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params), - case do_publish(ClientId, Topic, Qos, Retain, Payload) of + {ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params), + case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of {ok, MsgIds} -> case proplists:get_value(<<"return">>, Params, undefined) of undefined -> minirest:return(ok); @@ -114,7 +114,8 @@ loop_subscribe([Params | ParamsN], Acc) -> {_, Code0, _Reason} -> Code0 end, Result = #{clientid => ClientId, - topic => resp_topic(proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)), + topic => resp_topic(proplists:get_value(<<"topic">>, Params), + proplists:get_value(<<"topics">>, Params, <<"">>)), code => Code}, loop_subscribe(ParamsN, [Result | Acc]). @@ -123,12 +124,13 @@ loop_publish(Params) -> loop_publish([], Result) -> lists:reverse(Result); loop_publish([Params | ParamsN], Acc) -> - {ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params), - Code = case do_publish(ClientId, Topic, Qos, Retain, Payload) of + {ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params), + Code = case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of {ok, _} -> 0; {_, Code0, _} -> Code0 end, - Result = #{topic => resp_topic(proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)), + Result = #{topic => resp_topic(proplists:get_value(<<"topic">>, Params), + proplists:get_value(<<"topics">>, Params, <<"">>)), code => Code}, loop_publish(ParamsN, [Result | Acc]). @@ -143,7 +145,8 @@ loop_unsubscribe([Params | ParamsN], Acc) -> {_, Code0, _} -> Code0 end, Result = #{clientid => ClientId, - topic => resp_topic(proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)), + topic => resp_topic(proplists:get_value(<<"topic">>, Params), + proplists:get_value(<<"topics">>, Params, <<"">>)), code => Code}, loop_unsubscribe(ParamsN, [Result | Acc]). @@ -158,14 +161,17 @@ do_subscribe(ClientId, Topics, QoS) -> _ -> ok end. -do_publish(ClientId, _Topics, _Qos, _Retain, _Payload) when not (is_binary(ClientId) or (ClientId =:= undefined)) -> +do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _UserProps) + when not (is_binary(ClientId) or (ClientId =:= undefined)) -> {ok, ?ERROR8, <<"bad clientid: must be string">>}; -do_publish(_ClientId, [], _Qos, _Retain, _Payload) -> +do_publish(_ClientId, [], _Qos, _Retain, _Payload, _UserProps) -> {ok, ?ERROR15, bad_topic}; -do_publish(ClientId, Topics, Qos, Retain, Payload) -> +do_publish(ClientId, Topics, Qos, Retain, Payload, UserProps) -> MsgIds = lists:map(fun(Topic) -> Msg = emqx_message:make(ClientId, Qos, Topic, Payload), - _ = emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}}), + UserProps1 = #{'User-Property' => UserProps}, + _ = emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}, + headers = #{properties => UserProps1}}), emqx_guid:to_hexstr(Msg#message.id) end, Topics), {ok, MsgIds}. @@ -185,19 +191,22 @@ do_unsubscribe(ClientId, Topic) -> parse_subscribe_params(Params) -> ClientId = proplists:get_value(<<"clientid">>, Params), - Topics = topics(filter, proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)), + Topics = topics(filter, proplists:get_value(<<"topic">>, Params), + proplists:get_value(<<"topics">>, Params, <<"">>)), QoS = proplists:get_value(<<"qos">>, Params, 0), {ClientId, Topics, QoS}. parse_publish_params(Params) -> - Topics = topics(name, proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)), - ClientId = proplists:get_value(<<"clientid">>, Params), - Payload = decode_payload(proplists:get_value(<<"payload">>, Params, <<>>), - proplists:get_value(<<"encoding">>, Params, <<"plain">>)), - Qos = proplists:get_value(<<"qos">>, Params, 0), - Retain = proplists:get_value(<<"retain">>, Params, false), - Payload1 = maybe_maps_to_binary(Payload), - {ClientId, Topics, Qos, Retain, Payload1}. + Topics = topics(name, proplists:get_value(<<"topic">>, Params), + proplists:get_value(<<"topics">>, Params, <<"">>)), + ClientId = proplists:get_value(<<"clientid">>, Params), + Payload = decode_payload(proplists:get_value(<<"payload">>, Params, <<>>), + proplists:get_value(<<"encoding">>, Params, <<"plain">>)), + Qos = proplists:get_value(<<"qos">>, Params, 0), + Retain = proplists:get_value(<<"retain">>, Params, false), + Payload1 = maybe_maps_to_binary(Payload), + UserProps = check_user_props(proplists:get_value(<<"user_properties">>, Params, [])), + {ClientId, Topics, Qos, Retain, Payload1, UserProps}. parse_unsubscribe_params(Params) -> ClientId = proplists:get_value(<<"clientid">>, Params), @@ -251,3 +260,8 @@ maybe_maps_to_binary(Payload) -> _C : _E : S -> error({encode_payload_fail, S}) end. + +check_user_props(UserProps) when is_list(UserProps) -> + UserProps; +check_user_props(UserProps) -> + error({user_properties_type_error, UserProps}). diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index a3eea2ab3..a97381220 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -401,7 +401,7 @@ t_pubsub(_) -> ClientId = <<"client1">>, Options = #{clientid => ClientId, - proto_ver => 5}, + proto_ver => v5}, Topic = <<"mytopic">>, {ok, C1} = emqtt:start_link(Options), {ok, _} = emqtt:connect(C1), @@ -536,11 +536,29 @@ t_pubsub(_) -> api_path(["mqtt/unsubscribe_batch"]), [], auth_header_(), Body3), loop(maps:get(<<"data">>, jiffy:decode(list_to_binary(Data3), [return_maps]))), + {ok, _, [1]} = emqtt:subscribe(C1, <<"mytopic">>, qos1), + timer:sleep(50), + + %% user properties + {ok, Code} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(), + #{<<"clientid">> => ClientId, + <<"topic">> => <<"mytopic">>, + <<"qos">> => 1, + <<"payload">> => <<"hello world">>, + <<"user_properties">> => #{<<"porp_1">> => <<"porp_1">>}}), + ?assert(receive + {publish, #{payload := <<"hello world">>, + properties := #{'User-Property' := [{<<"porp_1">>,<<"porp_1">>}]}}} -> + true + after 100 -> + false + end), + ok = emqtt:disconnect(C1), - ?assertEqual(3, emqx_metrics:val('messages.qos1.received') - Qos1Received), + ?assertEqual(4, emqx_metrics:val('messages.qos1.received') - Qos1Received), ?assertEqual(2, emqx_metrics:val('messages.qos2.received') - Qos2Received), - ?assertEqual(5, emqx_metrics:val('messages.received') - Received). + ?assertEqual(6, emqx_metrics:val('messages.received') - Received). loop([]) -> []; diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 97e40439d..72f8345bf 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -454,7 +454,8 @@ columns_with_exam('message.publish') -> , {<<"topic">>, <<"t/a">>} , {<<"qos">>, 1} , {<<"flags">>, #{}} - , {<<"headers">>, undefined} + , {<<"headers">>, #{<<"properties">> => #{<<"User-Property">> => + #{'prop_key' => <<"prop_val">>}}}} , {<<"publish_received_at">>, erlang:system_time(millisecond)} , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 760205d62..747de87d7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -98,21 +98,8 @@ sql_test_action() -> fill_default_values(Event, Context) -> maps:merge(envs_examp(Event), Context). -envs_examp(<<"$events/", _/binary>> = EVENT_TOPIC) -> +envs_examp(EVENT_TOPIC) -> EventName = emqx_rule_events:event_name(EVENT_TOPIC), emqx_rule_maps:atom_key_map( maps:from_list( - emqx_rule_events:columns_with_exam(EventName))); -envs_examp(_) -> - #{id => emqx_guid:to_hexstr(emqx_guid:gen()), - clientid => <<"c_emqx">>, - username => <<"u_emqx">>, - payload => <<"{\"id\": 1, \"name\": \"ha\"}">>, - peerhost => <<"127.0.0.1">>, - topic => <<"t/a">>, - qos => 1, - flags => #{sys => true, event => true}, - publish_received_at => emqx_rule_utils:now_ms(), - timestamp => emqx_rule_utils:now_ms(), - node => node() - }. + emqx_rule_events:columns_with_exam(EventName))).