diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index df4c904c3..b4c9841f8 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -39,7 +39,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(?CMD_REPLACE, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]), meck:unload(emqx_resource), ok. diff --git a/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl index 737a089a2..b4a8f2756 100644 --- a/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_mnesia_SUITE.erl @@ -39,7 +39,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]), ok. diff --git a/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl index 5ad8f1f29..cf1110a2a 100644 --- a/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl @@ -39,7 +39,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]), ok. diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index 4fbbed84d..94d1d0995 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -110,7 +110,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]), meck:unload(emqx_resource), ok. diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl index 848179687..441395282 100644 --- a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl @@ -48,7 +48,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]), meck:unload(emqx_resource), ok. diff --git a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl index 875ff3d09..39579c1f5 100644 --- a/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mnesia_SUITE.erl @@ -36,7 +36,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]), ok. diff --git a/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl index 2e3edd42a..9c9cbad17 100644 --- a/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl @@ -51,7 +51,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]), meck:unload(emqx_resource), ok. diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl index a2d264fc9..2fa01898b 100644 --- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl @@ -53,7 +53,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]), meck:unload(emqx_resource), ok. diff --git a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl index 233701b34..9f963d926 100644 --- a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl @@ -51,7 +51,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]), meck:unload(emqx_resource), ok. diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index 4b76bcfab..02f70c8cc 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -51,7 +51,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]), meck:unload(emqx_resource), ok. diff --git a/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl b/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl index 630d42dce..8d1146ddb 100644 --- a/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl @@ -41,7 +41,11 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - {ok, _} = emqx_authz:update(replace, []), + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => <<"allow">>, + <<"cache">> => #{<<"enable">> => <<"true">>}, + <<"sources">> => []}), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]), ok. diff --git a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl index d1d73e9b9..76d065523 100644 --- a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl +++ b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl @@ -56,6 +56,7 @@ handle_method(post, Topic, #coap_message{payload = Payload} = Msg, Ctx, CInfo) - #{clientid := ClientId} = CInfo, MountTopic = mount(CInfo, Topic), QOS = get_publish_qos(Msg), + %% TODO: Append message metadata into headers MQTTMsg = emqx_message:make(ClientId, QOS, MountTopic, Payload), MQTTMsg2 = apply_publish_opts(Msg, MQTTMsg), _ = emqx_broker:publish(MQTTMsg2), diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl index f59ec219e..950b0a5e0 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl @@ -603,6 +603,7 @@ send_to_mqtt(Ctx, EventType, Payload, {Topic, Qos}, proto_publish(Topic, Payload, Qos, Headers, WithContext, #session{endpoint_name = Epn} = Session) -> MountedTopic = mount(Topic, Session), + %% TODO: Append message metadata into headers Msg = emqx_message:make(Epn, Qos, MountedTopic, emqx_json:encode(Payload), #{}, Headers), WithContext(publish, [MountedTopic, Msg]), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 550f89413..9c2a15593 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -790,16 +790,26 @@ check_pub_authz({TopicName, _Flags, _Data}, end. convert_pub_to_msg({TopicName, Flags, Data}, - Channel = #channel{ - clientinfo = #{clientid := ClientId}}) -> + Channel = #channel{clientinfo = #{clientid := ClientId}}) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, NewQoS = get_corrected_qos(QoS), - Message = emqx_message:make(ClientId, NewQoS, TopicName, Data), - NMessage = emqx_message:set_flags( - #{dup => Dup, retain => Retain}, - Message - ), - {ok, NMessage, Channel}. + Message = put_message_headers( + emqx_message:make( + ClientId, NewQoS, TopicName, Data, + #{dup => Dup, retain => Retain}, #{}), Channel), + {ok, Message, Channel}. + +put_message_headers(Msg, #channel{ + conninfo = #{proto_ver := ProtoVer}, + clientinfo = #{ + protocol := Protocol, + username := Username, + peerhost := PeerHost}}) -> + emqx_message:set_headers( + #{proto_ver => ProtoVer, + protocol => Protocol, + username => Username, + peerhost => PeerHost}, Msg). get_corrected_qos(?QOS_NEG1) -> ?QOS_0; get_corrected_qos(QoS) -> QoS. @@ -1307,7 +1317,7 @@ ensure_disconnected(Reason, Channel = #channel{ mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) -> Channel; mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> - ok = publish_will_msg(WillMsg), + ok = publish_will_msg(put_message_headers(WillMsg, Channel)), Channel#channel{will_msg = undefined}. publish_will_msg(Msg) -> diff --git a/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl index babf9d36c..85975d37b 100644 --- a/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl @@ -107,16 +107,16 @@ t_lookup_cmd_read(Config) -> emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), timer:sleep(200), %% step 1, device register ... - test_send_coap_request( UdpSock, - post, - sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), - #coap_content{ - content_format = <<"text/plain">>, - payload = <<";rt=\"oma.lwm2m\";ct=11543," - ",,">> - }, - [], - MsgId1), + test_send_coap_request( + UdpSock, + post, + sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), + #coap_content{ + content_format = <<"text/plain">>, + payload = <<";rt=\"oma.lwm2m\";ct=11543," + ",,">>}, + [], + MsgId1), #coap_message{method = Method1} = test_recv_coap_response(UdpSock), ?assertEqual({ok,created}, Method1), @@ -192,7 +192,8 @@ t_lookup_cmd_discover(Config) -> "127.0.0.1", ?PORT, {ok, content}, - #coap_content{content_format = <<"application/link-format">>, payload = PayloadDiscover}, + #coap_content{content_format = <<"application/link-format">>, + payload = PayloadDiscover}, Request2, true), timer:sleep(200), @@ -206,13 +207,15 @@ t_read(Config) -> emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), timer:sleep(200), %% step 1, device register ... - test_send_coap_request( UdpSock, - post, - sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), - #coap_content{content_format = <<"text/plain">>, - payload = <<";rt=\"oma.lwm2m\";ct=11543,,,">>}, - [], - MsgId1), + test_send_coap_request( + UdpSock, + post, + sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), + #coap_content{content_format = <<"text/plain">>, + payload = <<";rt=\"oma.lwm2m\";ct=11543," + ",,">>}, + [], + MsgId1), #coap_message{method = Method1} = test_recv_coap_response(UdpSock), ?assertEqual({ok,created}, Method1), @@ -236,13 +239,15 @@ t_write(Config) -> emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), timer:sleep(200), %% step 1, device register ... - test_send_coap_request( UdpSock, - post, - sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), - #coap_content{content_format = <<"text/plain">>, - payload = <<";rt=\"oma.lwm2m\";ct=11543,,,">>}, - [], - MsgId1), + test_send_coap_request( + UdpSock, + post, + sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), + #coap_content{content_format = <<"text/plain">>, + payload = <<";rt=\"oma.lwm2m\";ct=11543," + ",,">>}, + [], + MsgId1), #coap_message{method = Method1} = test_recv_coap_response(UdpSock), ?assertEqual({ok,created}, Method1), @@ -268,13 +273,15 @@ t_observe(Config) -> emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), timer:sleep(200), %% step 1, device register ... - test_send_coap_request( UdpSock, - post, - sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), - #coap_content{content_format = <<"text/plain">>, - payload = <<";rt=\"oma.lwm2m\";ct=11543,,,">>}, - [], - MsgId1), + test_send_coap_request( + UdpSock, + post, + sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), + #coap_content{content_format = <<"text/plain">>, + payload = <<";rt=\"oma.lwm2m\";ct=11543," + ",,">>}, + [], + MsgId1), #coap_message{method = Method1} = test_recv_coap_response(UdpSock), ?assertEqual({ok,created}, Method1),