chore(gw-sn): append messge headers

This commit is contained in:
JianBo He 2021-11-15 16:03:18 +08:00
parent d0bdf27e0c
commit ad2dbb5a49
15 changed files with 115 additions and 52 deletions

View File

@ -39,7 +39,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(?CMD_REPLACE, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource),
ok.

View File

@ -39,7 +39,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]),
ok.

View File

@ -39,7 +39,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]),
ok.

View File

@ -110,7 +110,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]),
meck:unload(emqx_resource),
ok.

View File

@ -48,7 +48,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource),
ok.

View File

@ -36,7 +36,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
ok.

View File

@ -51,7 +51,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource),
ok.

View File

@ -53,7 +53,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource),
ok.

View File

@ -51,7 +51,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource),
ok.

View File

@ -51,7 +51,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]),
meck:unload(emqx_resource),
ok.

View File

@ -41,7 +41,11 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
{ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
ok.

View File

@ -56,6 +56,7 @@ handle_method(post, Topic, #coap_message{payload = Payload} = Msg, Ctx, CInfo) -
#{clientid := ClientId} = CInfo,
MountTopic = mount(CInfo, Topic),
QOS = get_publish_qos(Msg),
%% TODO: Append message metadata into headers
MQTTMsg = emqx_message:make(ClientId, QOS, MountTopic, Payload),
MQTTMsg2 = apply_publish_opts(Msg, MQTTMsg),
_ = emqx_broker:publish(MQTTMsg2),

View File

@ -603,6 +603,7 @@ send_to_mqtt(Ctx, EventType, Payload, {Topic, Qos},
proto_publish(Topic, Payload, Qos, Headers, WithContext,
#session{endpoint_name = Epn} = Session) ->
MountedTopic = mount(Topic, Session),
%% TODO: Append message metadata into headers
Msg = emqx_message:make(Epn, Qos, MountedTopic,
emqx_json:encode(Payload), #{}, Headers),
WithContext(publish, [MountedTopic, Msg]),

View File

@ -790,16 +790,26 @@ check_pub_authz({TopicName, _Flags, _Data},
end.
convert_pub_to_msg({TopicName, Flags, Data},
Channel = #channel{
clientinfo = #{clientid := ClientId}}) ->
Channel = #channel{clientinfo = #{clientid := ClientId}}) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS),
Message = emqx_message:make(ClientId, NewQoS, TopicName, Data),
NMessage = emqx_message:set_flags(
#{dup => Dup, retain => Retain},
Message
),
{ok, NMessage, Channel}.
Message = put_message_headers(
emqx_message:make(
ClientId, NewQoS, TopicName, Data,
#{dup => Dup, retain => Retain}, #{}), Channel),
{ok, Message, Channel}.
put_message_headers(Msg, #channel{
conninfo = #{proto_ver := ProtoVer},
clientinfo = #{
protocol := Protocol,
username := Username,
peerhost := PeerHost}}) ->
emqx_message:set_headers(
#{proto_ver => ProtoVer,
protocol => Protocol,
username => Username,
peerhost => PeerHost}, Msg).
get_corrected_qos(?QOS_NEG1) -> ?QOS_0;
get_corrected_qos(QoS) -> QoS.
@ -1307,7 +1317,7 @@ ensure_disconnected(Reason, Channel = #channel{
mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
Channel;
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
ok = publish_will_msg(WillMsg),
ok = publish_will_msg(put_message_headers(WillMsg, Channel)),
Channel#channel{will_msg = undefined}.
publish_will_msg(Msg) ->

View File

@ -107,16 +107,16 @@ t_lookup_cmd_read(Config) ->
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200),
%% step 1, device register ...
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{
content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,"
"</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>
},
[],
MsgId1),
test_send_coap_request(
UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{
content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,"
"</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),
@ -192,7 +192,8 @@ t_lookup_cmd_discover(Config) ->
"127.0.0.1",
?PORT,
{ok, content},
#coap_content{content_format = <<"application/link-format">>, payload = PayloadDiscover},
#coap_content{content_format = <<"application/link-format">>,
payload = PayloadDiscover},
Request2,
true),
timer:sleep(200),
@ -206,13 +207,15 @@ t_read(Config) ->
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200),
%% step 1, device register ...
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
test_send_coap_request(
UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,"
"</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),
@ -236,13 +239,15 @@ t_write(Config) ->
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200),
%% step 1, device register ...
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
test_send_coap_request(
UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,"
"</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),
@ -268,13 +273,15 @@ t_observe(Config) ->
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200),
%% step 1, device register ...
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
test_send_coap_request(
UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,"
"</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),