Merge pull request #12285 from HJianBo/shorten-coap-params
feat(coap): support short param names
This commit is contained in:
commit
b16920e796
|
@ -14,6 +14,8 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-ifndef(EMQX_COAP_HRL).
|
||||||
|
|
||||||
-define(APP, emqx_coap).
|
-define(APP, emqx_coap).
|
||||||
-define(DEFAULT_COAP_PORT, 5683).
|
-define(DEFAULT_COAP_PORT, 5683).
|
||||||
-define(DEFAULT_COAPS_PORT, 5684).
|
-define(DEFAULT_COAPS_PORT, 5684).
|
||||||
|
@ -79,3 +81,14 @@
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type coap_message() :: #coap_message{}.
|
-type coap_message() :: #coap_message{}.
|
||||||
|
|
||||||
|
-define(QUERY_PARAMS_MAPPING, [
|
||||||
|
{<<"c">>, <<"clientid">>},
|
||||||
|
{<<"t">>, <<"token">>},
|
||||||
|
{<<"u">>, <<"username">>},
|
||||||
|
{<<"p">>, <<"password">>},
|
||||||
|
{<<"q">>, <<"qos">>},
|
||||||
|
{<<"r">>, <<"retain">>}
|
||||||
|
]).
|
||||||
|
|
||||||
|
-endif.
|
||||||
|
|
|
@ -386,7 +386,7 @@ check_auth_state(Msg, #channel{connection_required = true} = Channel) ->
|
||||||
true ->
|
true ->
|
||||||
call_session(handle_request, Msg, Channel);
|
call_session(handle_request, Msg, Channel);
|
||||||
false ->
|
false ->
|
||||||
URIQuery = emqx_coap_message:get_option(uri_query, Msg, #{}),
|
URIQuery = emqx_coap_message:extract_uri_query(Msg),
|
||||||
case maps:get(<<"token">>, URIQuery, undefined) of
|
case maps:get(<<"token">>, URIQuery, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
?SLOG(debug, #{msg => "token_required_in_conn_mode", message => Msg});
|
?SLOG(debug, #{msg => "token_required_in_conn_mode", message => Msg});
|
||||||
|
@ -430,7 +430,7 @@ check_token(
|
||||||
) ->
|
) ->
|
||||||
IsDeleteConn = is_delete_connection_request(Msg),
|
IsDeleteConn = is_delete_connection_request(Msg),
|
||||||
#{clientid := ClientId} = ClientInfo,
|
#{clientid := ClientId} = ClientInfo,
|
||||||
case emqx_coap_message:get_option(uri_query, Msg) of
|
case emqx_coap_message:extract_uri_query(Msg) of
|
||||||
#{
|
#{
|
||||||
<<"clientid">> := ClientId,
|
<<"clientid">> := ClientId,
|
||||||
<<"token">> := Token
|
<<"token">> := Token
|
||||||
|
@ -742,7 +742,7 @@ process_connection(
|
||||||
Channel = #channel{conn_state = idle},
|
Channel = #channel{conn_state = idle},
|
||||||
Iter
|
Iter
|
||||||
) ->
|
) ->
|
||||||
Queries = emqx_coap_message:get_option(uri_query, Req),
|
Queries = emqx_coap_message:extract_uri_query(Req),
|
||||||
case
|
case
|
||||||
emqx_utils:pipeline(
|
emqx_utils:pipeline(
|
||||||
[
|
[
|
||||||
|
@ -775,7 +775,7 @@ process_connection(
|
||||||
) when
|
) when
|
||||||
ConnState == connected
|
ConnState == connected
|
||||||
->
|
->
|
||||||
Queries = emqx_coap_message:get_option(uri_query, Req),
|
Queries = emqx_coap_message:extract_uri_query(Req),
|
||||||
ErrMsg0 =
|
ErrMsg0 =
|
||||||
case Queries of
|
case Queries of
|
||||||
#{<<"clientid">> := ClientId} ->
|
#{<<"clientid">> := ClientId} ->
|
||||||
|
@ -793,7 +793,7 @@ process_connection(
|
||||||
Channel
|
Channel
|
||||||
);
|
);
|
||||||
process_connection({close, Msg}, _, Channel, _) ->
|
process_connection({close, Msg}, _, Channel, _) ->
|
||||||
Queries = emqx_coap_message:get_option(uri_query, Msg),
|
Queries = emqx_coap_message:extract_uri_query(Msg),
|
||||||
case maps:get(<<"clientid">>, Queries, undefined) of
|
case maps:get(<<"clientid">>, Queries, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
%% Copyright (c) 2015 Petr Gotthard <petr.gotthard@centrum.cz>
|
%% Copyright (c) 2015 Petr Gotthard <petr.gotthard@centrum.cz>
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -43,6 +43,11 @@
|
||||||
set_payload_block/3, set_payload_block/4
|
set_payload_block/3, set_payload_block/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
extract_uri_query/1,
|
||||||
|
query_params_mapping_table/0
|
||||||
|
]).
|
||||||
|
|
||||||
-include("emqx_coap.hrl").
|
-include("emqx_coap.hrl").
|
||||||
|
|
||||||
request(Type, Method) ->
|
request(Type, Method) ->
|
||||||
|
@ -111,6 +116,12 @@ get_option(Option, Msg) ->
|
||||||
get_option(Option, #coap_message{options = Options}, Def) ->
|
get_option(Option, #coap_message{options = Options}, Def) ->
|
||||||
maps:get(Option, Options, Def).
|
maps:get(Option, Options, Def).
|
||||||
|
|
||||||
|
extract_uri_query(Msg = #coap_message{}) ->
|
||||||
|
expand_short_param_name(get_option(uri_query, Msg, #{})).
|
||||||
|
|
||||||
|
query_params_mapping_table() ->
|
||||||
|
?QUERY_PARAMS_MAPPING.
|
||||||
|
|
||||||
set_payload(Payload, Msg) when is_binary(Payload) ->
|
set_payload(Payload, Msg) when is_binary(Payload) ->
|
||||||
Msg#coap_message{payload = Payload};
|
Msg#coap_message{payload = Payload};
|
||||||
set_payload(Payload, Msg) when is_list(Payload) ->
|
set_payload(Payload, Msg) when is_list(Payload) ->
|
||||||
|
@ -149,3 +160,17 @@ to_options(Opts) when is_map(Opts) ->
|
||||||
Opts;
|
Opts;
|
||||||
to_options(Opts) ->
|
to_options(Opts) ->
|
||||||
maps:from_list(Opts).
|
maps:from_list(Opts).
|
||||||
|
|
||||||
|
expand_short_param_name(Queries) when is_map(Queries) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun({Short, Long}, Acc) ->
|
||||||
|
case maps:take(Short, Acc) of
|
||||||
|
error ->
|
||||||
|
Acc;
|
||||||
|
{Value, Acc1} ->
|
||||||
|
maps:put(Long, Value, Acc1)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
Queries,
|
||||||
|
?QUERY_PARAMS_MAPPING
|
||||||
|
).
|
||||||
|
|
|
@ -76,7 +76,7 @@ check_topic(Path) ->
|
||||||
|
|
||||||
get_sub_opts(Msg) ->
|
get_sub_opts(Msg) ->
|
||||||
SubOpts = maps:fold(
|
SubOpts = maps:fold(
|
||||||
fun parse_sub_opts/3, #{}, emqx_coap_message:get_option(uri_query, Msg, #{})
|
fun parse_sub_opts/3, #{}, emqx_coap_message:extract_uri_query(Msg)
|
||||||
),
|
),
|
||||||
case SubOpts of
|
case SubOpts of
|
||||||
#{qos := _} ->
|
#{qos := _} ->
|
||||||
|
@ -110,28 +110,24 @@ type_to_qos(coap, #coap_message{type = Type}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_publish_opts(Msg) ->
|
get_publish_opts(Msg) ->
|
||||||
case emqx_coap_message:get_option(uri_query, Msg) of
|
Qs = emqx_coap_message:extract_uri_query(Msg),
|
||||||
undefined ->
|
maps:fold(
|
||||||
#{};
|
fun
|
||||||
Qs ->
|
(<<"retain">>, V, Acc) ->
|
||||||
maps:fold(
|
Val = V =:= <<"true">>,
|
||||||
fun
|
Acc#{retain => Val};
|
||||||
(<<"retain">>, V, Acc) ->
|
(<<"expiry">>, V, Acc) ->
|
||||||
Val = V =:= <<"true">>,
|
Val = erlang:binary_to_integer(V),
|
||||||
Acc#{retain => Val};
|
Acc#{expiry_interval => Val};
|
||||||
(<<"expiry">>, V, Acc) ->
|
(<<"qos">>, V, Acc) ->
|
||||||
Val = erlang:binary_to_integer(V),
|
Val = erlang:binary_to_integer(V),
|
||||||
Acc#{expiry_interval => Val};
|
Acc#{qos => Val};
|
||||||
(<<"qos">>, V, Acc) ->
|
(_, _, Acc) ->
|
||||||
Val = erlang:binary_to_integer(V),
|
Acc
|
||||||
Acc#{qos => Val};
|
end,
|
||||||
(_, _, Acc) ->
|
#{},
|
||||||
Acc
|
Qs
|
||||||
end,
|
).
|
||||||
#{},
|
|
||||||
Qs
|
|
||||||
)
|
|
||||||
end.
|
|
||||||
|
|
||||||
get_publish_qos(Msg, PublishOpts) ->
|
get_publish_qos(Msg, PublishOpts) ->
|
||||||
case PublishOpts of
|
case PublishOpts of
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_gateway_coap, [
|
{application, emqx_gateway_coap, [
|
||||||
{description, "CoAP Gateway"},
|
{description, "CoAP Gateway"},
|
||||||
{vsn, "0.1.6"},
|
{vsn, "0.1.7"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel, stdlib, emqx, emqx_gateway]},
|
{applications, [kernel, stdlib, emqx, emqx_gateway]},
|
||||||
{env, []},
|
{env, []},
|
||||||
|
|
|
@ -151,6 +151,30 @@ t_connection(_) ->
|
||||||
end,
|
end,
|
||||||
do(Action).
|
do(Action).
|
||||||
|
|
||||||
|
t_connection_with_short_param_name(_) ->
|
||||||
|
Action = fun(Channel) ->
|
||||||
|
%% connection
|
||||||
|
Token = connection(Channel, true),
|
||||||
|
|
||||||
|
timer:sleep(100),
|
||||||
|
?assertNotEqual(
|
||||||
|
[],
|
||||||
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% heartbeat
|
||||||
|
{ok, changed, _} = send_heartbeat(Token, true),
|
||||||
|
|
||||||
|
disconnection(Channel, Token, true),
|
||||||
|
|
||||||
|
timer:sleep(100),
|
||||||
|
?assertEqual(
|
||||||
|
[],
|
||||||
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
do(Action).
|
||||||
|
|
||||||
t_heartbeat(Config) ->
|
t_heartbeat(Config) ->
|
||||||
Heartbeat = ?config(new_heartbeat, Config),
|
Heartbeat = ?config(new_heartbeat, Config),
|
||||||
Action = fun(Channel) ->
|
Action = fun(Channel) ->
|
||||||
|
@ -583,36 +607,67 @@ t_connectionless_pubsub(_) ->
|
||||||
%% helpers
|
%% helpers
|
||||||
|
|
||||||
send_heartbeat(Token) ->
|
send_heartbeat(Token) ->
|
||||||
HeartURI =
|
send_heartbeat(Token, false).
|
||||||
?MQTT_PREFIX ++
|
|
||||||
"/connection?clientid=client1&token=" ++
|
|
||||||
Token,
|
|
||||||
|
|
||||||
?LOGT("send heartbeat request:~ts~n", [HeartURI]),
|
send_heartbeat(Token, ShortenParamName) ->
|
||||||
er_coap_client:request(put, HeartURI).
|
Prefix = ?MQTT_PREFIX ++ "/connection",
|
||||||
|
Queries = #{
|
||||||
|
"clientid" => <<"client1">>,
|
||||||
|
"token" => Token
|
||||||
|
},
|
||||||
|
URI = compose_uri(Prefix, Queries, ShortenParamName),
|
||||||
|
?LOGT("send heartbeat request:~ts~n", [URI]),
|
||||||
|
er_coap_client:request(put, URI).
|
||||||
|
|
||||||
connection(Channel) ->
|
connection(Channel) ->
|
||||||
URI =
|
connection(Channel, false).
|
||||||
?MQTT_PREFIX ++
|
|
||||||
"/connection?clientid=client1&username=admin&password=public",
|
connection(Channel, ShortenParamName) ->
|
||||||
|
Prefix = ?MQTT_PREFIX ++ "/connection",
|
||||||
|
Queries = #{
|
||||||
|
"clientid" => <<"client1">>,
|
||||||
|
"username" => <<"admin">>,
|
||||||
|
"password" => <<"public">>
|
||||||
|
},
|
||||||
|
URI = compose_uri(Prefix, Queries, ShortenParamName),
|
||||||
Req = make_req(post),
|
Req = make_req(post),
|
||||||
{ok, created, Data} = do_request(Channel, URI, Req),
|
{ok, created, Data} = do_request(Channel, URI, Req),
|
||||||
#coap_content{payload = BinToken} = Data,
|
#coap_content{payload = BinToken} = Data,
|
||||||
binary_to_list(BinToken).
|
binary_to_list(BinToken).
|
||||||
|
|
||||||
disconnection(Channel, Token) ->
|
disconnection(Channel, Token) ->
|
||||||
%% delete
|
disconnection(Channel, Token, false).
|
||||||
URI = ?MQTT_PREFIX ++ "/connection?clientid=client1&token=" ++ Token,
|
|
||||||
|
disconnection(Channel, Token, ShortenParamName) ->
|
||||||
|
Prefix = ?MQTT_PREFIX ++ "/connection",
|
||||||
|
Queries = #{
|
||||||
|
"clientid" => <<"client1">>,
|
||||||
|
"token" => Token
|
||||||
|
},
|
||||||
|
URI = compose_uri(Prefix, Queries, ShortenParamName),
|
||||||
Req = make_req(delete),
|
Req = make_req(delete),
|
||||||
{ok, deleted, _} = do_request(Channel, URI, Req).
|
{ok, deleted, _} = do_request(Channel, URI, Req).
|
||||||
|
|
||||||
observe(Channel, Token, true) ->
|
observe(Channel, Token, Observe) ->
|
||||||
URI = ?PS_PREFIX ++ "/coap/observe?clientid=client1&token=" ++ Token,
|
observe(Channel, Token, Observe, false).
|
||||||
|
|
||||||
|
observe(Channel, Token, true, ShortenParamName) ->
|
||||||
|
Prefix = ?PS_PREFIX ++ "/coap/observe",
|
||||||
|
Queries = #{
|
||||||
|
"clientid" => <<"client1">>,
|
||||||
|
"token" => Token
|
||||||
|
},
|
||||||
|
URI = compose_uri(Prefix, Queries, ShortenParamName),
|
||||||
Req = make_req(get, <<>>, [{observe, 0}]),
|
Req = make_req(get, <<>>, [{observe, 0}]),
|
||||||
{ok, content, _Data} = do_request(Channel, URI, Req),
|
{ok, content, _Data} = do_request(Channel, URI, Req),
|
||||||
ok;
|
ok;
|
||||||
observe(Channel, Token, false) ->
|
observe(Channel, Token, false, ShortenParamName) ->
|
||||||
URI = ?PS_PREFIX ++ "/coap/observe?clientid=client1&token=" ++ Token,
|
Prefix = ?PS_PREFIX ++ "/coap/observe",
|
||||||
|
Queries = #{
|
||||||
|
"clientid" => <<"client1">>,
|
||||||
|
"token" => Token
|
||||||
|
},
|
||||||
|
URI = compose_uri(Prefix, Queries, ShortenParamName),
|
||||||
Req = make_req(get, <<>>, [{observe, 1}]),
|
Req = make_req(get, <<>>, [{observe, 1}]),
|
||||||
{ok, nocontent, _Data} = do_request(Channel, URI, Req),
|
{ok, nocontent, _Data} = do_request(Channel, URI, Req),
|
||||||
ok.
|
ok.
|
||||||
|
@ -620,8 +675,16 @@ observe(Channel, Token, false) ->
|
||||||
pubsub_uri(Topic) when is_list(Topic) ->
|
pubsub_uri(Topic) when is_list(Topic) ->
|
||||||
?PS_PREFIX ++ "/" ++ Topic.
|
?PS_PREFIX ++ "/" ++ Topic.
|
||||||
|
|
||||||
pubsub_uri(Topic, Token) when is_list(Topic), is_list(Token) ->
|
pubsub_uri(Topic, Token) ->
|
||||||
?PS_PREFIX ++ "/" ++ Topic ++ "?clientid=client1&token=" ++ Token.
|
pubsub_uri(Topic, Token, false).
|
||||||
|
|
||||||
|
pubsub_uri(Topic, Token, ShortenParamName) when is_list(Topic), is_list(Token) ->
|
||||||
|
Prefix = ?PS_PREFIX ++ "/" ++ Topic,
|
||||||
|
Queries = #{
|
||||||
|
"clientid" => <<"client1">>,
|
||||||
|
"token" => Token
|
||||||
|
},
|
||||||
|
compose_uri(Prefix, Queries, ShortenParamName).
|
||||||
|
|
||||||
make_req(Method) ->
|
make_req(Method) ->
|
||||||
make_req(Method, <<>>).
|
make_req(Method, <<>>).
|
||||||
|
@ -701,3 +764,28 @@ get_field(type, #coap_message{type = Type}) ->
|
||||||
Type;
|
Type;
|
||||||
get_field(method, #coap_message{method = Method}) ->
|
get_field(method, #coap_message{method = Method}) ->
|
||||||
Method.
|
Method.
|
||||||
|
|
||||||
|
compose_uri(URI, Queries, ShortenParamName) ->
|
||||||
|
Queries1 = shorten_param_name(ShortenParamName, Queries),
|
||||||
|
case maps:size(Queries1) of
|
||||||
|
0 ->
|
||||||
|
URI;
|
||||||
|
_ ->
|
||||||
|
URI ++ "?" ++ uri_string:compose_query(maps:to_list(Queries1))
|
||||||
|
end.
|
||||||
|
|
||||||
|
shorten_param_name(false, Queries) ->
|
||||||
|
Queries;
|
||||||
|
shorten_param_name(true, Queries) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun({Short, Long}, Acc) ->
|
||||||
|
case maps:take(Long, Acc) of
|
||||||
|
error ->
|
||||||
|
Acc;
|
||||||
|
{Value, Acc1} ->
|
||||||
|
maps:put(Short, Value, Acc1)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
Queries,
|
||||||
|
emqx_coap_message:query_params_mapping_table()
|
||||||
|
).
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_coap_message_tests).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
short_name_uri_query_test() ->
|
||||||
|
UriQueryMap = #{
|
||||||
|
<<"c">> => <<"clientid_value">>,
|
||||||
|
<<"t">> => <<"token_value">>,
|
||||||
|
<<"u">> => <<"username_value">>,
|
||||||
|
<<"p">> => <<"password_value">>,
|
||||||
|
<<"q">> => 1,
|
||||||
|
<<"r">> => false
|
||||||
|
},
|
||||||
|
ParsedQuery = #{
|
||||||
|
<<"clientid">> => <<"clientid_value">>,
|
||||||
|
<<"token">> => <<"token_value">>,
|
||||||
|
<<"username">> => <<"username_value">>,
|
||||||
|
<<"password">> => <<"password_value">>,
|
||||||
|
<<"qos">> => 1,
|
||||||
|
<<"retain">> => false
|
||||||
|
},
|
||||||
|
Msg = emqx_coap_message:request(
|
||||||
|
con, put, <<"payload contents...">>, #{uri_query => UriQueryMap}
|
||||||
|
),
|
||||||
|
?assertEqual(ParsedQuery, emqx_coap_message:extract_uri_query(Msg)).
|
|
@ -0,0 +1,2 @@
|
||||||
|
The CoAP gateway supports short parameter names for slight savings in datagram size.
|
||||||
|
For example, `clientid=bar` can be written as `c=bar`.
|
Loading…
Reference in New Issue