384 lines
15 KiB
Erlang
384 lines
15 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_lwm2m_coap_resource).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
-include_lib("lwm2m_coap/include/coap.hrl").
|
|
|
|
-behaviour(lwm2m_coap_resource).
|
|
|
|
-export([ coap_discover/2
|
|
, coap_get/5
|
|
, coap_post/5
|
|
, coap_put/5
|
|
, coap_delete/4
|
|
, coap_observe/5
|
|
, coap_unobserve/1
|
|
, coap_response/7
|
|
, coap_ack/3
|
|
, handle_info/2
|
|
, handle_call/3
|
|
, handle_cast/2
|
|
, terminate/2
|
|
]).
|
|
|
|
-export([parse_object_list/1]).
|
|
|
|
-include("emqx_lwm2m.hrl").
|
|
|
|
-define(PREFIX, <<"rd">>).
|
|
|
|
-define(LOG(Level, Format, Args), logger:Level("LWM2M-RESOURCE: " ++ Format, Args)).
|
|
|
|
-dialyzer([{nowarn_function, [coap_discover/2]}]).
|
|
% we use {'absolute', list(binary()), [{atom(), binary()}]} as coap_uri()
|
|
% https://github.com/emqx/lwm2m-coap/blob/258e9bd3762124395e83c1e68a1583b84718230f/src/lwm2m_coap_resource.erl#L61
|
|
% resource operations
|
|
coap_discover(_Prefix, _Args) ->
|
|
[{absolute, [<<"mqtt">>], []}].
|
|
|
|
coap_get(ChId, [?PREFIX], Query, Content, Lwm2mState) ->
|
|
?LOG(debug, "~p ~p GET Query=~p, Content=~p", [self(),ChId, Query, Content]),
|
|
{ok, #coap_content{}, Lwm2mState};
|
|
coap_get(ChId, Prefix, Query, Content, Lwm2mState) ->
|
|
?LOG(error, "ignore bad put request ChId=~p, Prefix=~p, Query=~p, Content=~p", [ChId, Prefix, Query, Content]),
|
|
{error, bad_request, Lwm2mState}.
|
|
|
|
% LWM2M REGISTER COMMAND
|
|
coap_post(ChId, [?PREFIX], Query, Content = #coap_content{uri_path = [?PREFIX]}, Lwm2mState) ->
|
|
?LOG(debug, "~p ~p REGISTER command Query=~p, Content=~p", [self(), ChId, Query, Content]),
|
|
case parse_options(Query) of
|
|
{error, {bad_opt, _CustomOption}} ->
|
|
?LOG(error, "Reject REGISTER from ~p due to wrong option", [ChId]),
|
|
{error, bad_request, Lwm2mState};
|
|
{ok, LwM2MQuery} ->
|
|
process_register(ChId, LwM2MQuery, Content#coap_content.payload, Lwm2mState)
|
|
end;
|
|
|
|
% LWM2M UPDATE COMMAND
|
|
coap_post(ChId, [?PREFIX], Query, Content = #coap_content{uri_path = LocationPath}, Lwm2mState) ->
|
|
?LOG(debug, "~p ~p UPDATE command location=~p, Query=~p, Content=~p", [self(), ChId, LocationPath, Query, Content]),
|
|
case parse_options(Query) of
|
|
{error, {bad_opt, _CustomOption}} ->
|
|
?LOG(error, "Reject UPDATE from ~p due to wrong option, Query=~p", [ChId, Query]),
|
|
{error, bad_request, Lwm2mState};
|
|
{ok, LwM2MQuery} ->
|
|
process_update(ChId, LwM2MQuery, LocationPath, Content#coap_content.payload, Lwm2mState)
|
|
end;
|
|
|
|
coap_post(ChId, Prefix, Query, Content, Lwm2mState) ->
|
|
?LOG(error, "bad post request ChId=~p, Prefix=~p, Query=~p, Content=~p", [ChId, Prefix, Query, Content]),
|
|
{error, bad_request, Lwm2mState}.
|
|
|
|
coap_put(_ChId, Prefix, Query, Content, Lwm2mState) ->
|
|
?LOG(error, "put has error, Prefix=~p, Query=~p, Content=~p", [Prefix, Query, Content]),
|
|
{error, bad_request, Lwm2mState}.
|
|
|
|
% LWM2M DE-REGISTER COMMAND
|
|
coap_delete(ChId, [?PREFIX], #coap_content{uri_path = Location}, Lwm2mState) ->
|
|
LocationPath = binary_util:join_path(Location),
|
|
?LOG(debug, "~p ~p DELETE command location=~p", [self(), ChId, LocationPath]),
|
|
case get(lwm2m_context) of
|
|
#lwm2m_context{location = LocationPath} ->
|
|
lwm2m_coap_responder:stop(deregister),
|
|
{ok, Lwm2mState};
|
|
undefined ->
|
|
?LOG(error, "Reject DELETE from ~p, Location: ~p not found", [ChId, Location]),
|
|
{error, forbidden, Lwm2mState};
|
|
TrueLocation ->
|
|
?LOG(error, "Reject DELETE from ~p, Wrong Location: ~p, registered location record: ~p", [ChId, Location, TrueLocation]),
|
|
{error, not_found, Lwm2mState}
|
|
end;
|
|
coap_delete(_ChId, _Prefix, _Content, Lwm2mState) ->
|
|
{error, forbidden, Lwm2mState}.
|
|
|
|
coap_observe(ChId, Prefix, Name, Ack, Lwm2mState) ->
|
|
?LOG(error, "unsupported observe request ChId=~p, Prefix=~p, Name=~p, Ack=~p", [ChId, Prefix, Name, Ack]),
|
|
{error, method_not_allowed, Lwm2mState}.
|
|
|
|
coap_unobserve(Lwm2mState) ->
|
|
?LOG(error, "unsupported unobserve request: ~p", [Lwm2mState]),
|
|
{ok, Lwm2mState}.
|
|
|
|
coap_response(ChId, Ref, CoapMsgType, CoapMsgMethod, CoapMsgPayload, CoapMsgOpts, Lwm2mState) ->
|
|
?LOG(info, "~p, RCV CoAP response, CoapMsgType: ~p, CoapMsgMethod: ~p, CoapMsgPayload: ~p,
|
|
CoapMsgOpts: ~p, Ref: ~p",
|
|
[ChId, CoapMsgType, CoapMsgMethod, CoapMsgPayload, CoapMsgOpts, Ref]),
|
|
MqttPayload = emqx_lwm2m_cmd_handler:coap2mqtt(CoapMsgMethod, CoapMsgPayload, CoapMsgOpts, Ref),
|
|
Lwm2mState2 = emqx_lwm2m_protocol:send_ul_data(maps:get(<<"msgType">>, MqttPayload), MqttPayload, Lwm2mState),
|
|
{noreply, Lwm2mState2}.
|
|
|
|
coap_ack(_ChId, Ref, Lwm2mState) ->
|
|
?LOG(info, "~p, RCV CoAP Empty ACK, Ref: ~p", [_ChId, Ref]),
|
|
AckRef = maps:put(<<"msgType">>, <<"ack">>, Ref),
|
|
MqttPayload = emqx_lwm2m_cmd_handler:ack2mqtt(AckRef),
|
|
Lwm2mState2 = emqx_lwm2m_protocol:send_ul_data(maps:get(<<"msgType">>, MqttPayload), MqttPayload, Lwm2mState),
|
|
{ok, Lwm2mState2}.
|
|
|
|
%% Batch deliver
|
|
handle_info({deliver, Topic, Msgs}, Lwm2mState) when is_list(Msgs) ->
|
|
{noreply, lists:foldl(fun(Msg, NewState) ->
|
|
element(2, handle_info({deliver, Topic, Msg}, NewState))
|
|
end, Lwm2mState, Msgs)};
|
|
%% Handle MQTT Message
|
|
handle_info({deliver, _Topic, MqttMsg}, Lwm2mState) ->
|
|
Lwm2mState2 = emqx_lwm2m_protocol:deliver(MqttMsg, Lwm2mState),
|
|
{noreply, Lwm2mState2};
|
|
|
|
%% Deliver Coap Message to Device
|
|
handle_info({deliver_to_coap, CoapRequest, Ref}, Lwm2mState) ->
|
|
{send_request, CoapRequest, Ref, Lwm2mState};
|
|
|
|
handle_info({'EXIT', _Pid, Reason}, Lwm2mState) ->
|
|
?LOG(info, "~p, received exit from: ~p, reason: ~p, quit now!", [self(), _Pid, Reason]),
|
|
{stop, Reason, Lwm2mState};
|
|
|
|
handle_info(post_init, Lwm2mState) ->
|
|
Lwm2mState2 = emqx_lwm2m_protocol:post_init(Lwm2mState),
|
|
{noreply, Lwm2mState2};
|
|
|
|
handle_info(auto_observe, Lwm2mState) ->
|
|
Lwm2mState2 = emqx_lwm2m_protocol:auto_observe(Lwm2mState),
|
|
{noreply, Lwm2mState2};
|
|
|
|
handle_info({life_timer, expired}, Lwm2mState) ->
|
|
?LOG(debug, "lifetime expired, shutdown", []),
|
|
{stop, life_timer_expired, Lwm2mState};
|
|
|
|
handle_info({shutdown, Error}, Lwm2mState) ->
|
|
{stop, Error, Lwm2mState};
|
|
|
|
handle_info({shutdown, conflict, {ClientId, NewPid}}, Lwm2mState) ->
|
|
?LOG(warning, "lwm2m '~s' conflict with ~p, shutdown", [ClientId, NewPid]),
|
|
{stop, conflict, Lwm2mState};
|
|
|
|
handle_info({suback, _MsgId, [_GrantedQos]}, Lwm2mState) ->
|
|
{noreply, Lwm2mState};
|
|
|
|
handle_info(emit_stats, Lwm2mState) ->
|
|
{noreply, Lwm2mState};
|
|
|
|
handle_info(Message, Lwm2mState) ->
|
|
?LOG(error, "Unknown Message ~p", [Message]),
|
|
{noreply, Lwm2mState}.
|
|
|
|
|
|
handle_call(info, _From, Lwm2mState) ->
|
|
{Info, Lwm2mState2} = emqx_lwm2m_protocol:get_info(Lwm2mState),
|
|
{reply, Info, Lwm2mState2};
|
|
|
|
handle_call(stats, _From, Lwm2mState) ->
|
|
{Stats, Lwm2mState2} = emqx_lwm2m_protocol:get_stats(Lwm2mState),
|
|
{reply, Stats, Lwm2mState2};
|
|
|
|
handle_call(kick, _From, Lwm2mState) ->
|
|
{stop, kick, Lwm2mState};
|
|
|
|
handle_call({set_rate_limit, _Rl}, _From, Lwm2mState) ->
|
|
?LOG(error, "set_rate_limit is not support", []),
|
|
{reply, ok, Lwm2mState};
|
|
|
|
handle_call(get_rate_limit, _From, Lwm2mState) ->
|
|
?LOG(error, "get_rate_limit is not support", []),
|
|
{reply, ok, Lwm2mState};
|
|
|
|
handle_call(session, _From, Lwm2mState) ->
|
|
?LOG(error, "get_session is not support", []),
|
|
{reply, ok, Lwm2mState};
|
|
|
|
handle_call(Request, _From, Lwm2mState) ->
|
|
?LOG(error, "adapter unexpected call ~p", [Request]),
|
|
{reply, ok, Lwm2mState}.
|
|
|
|
handle_cast(Msg, Lwm2mState) ->
|
|
?LOG(error, "unexpected cast ~p", [Msg]),
|
|
{noreply, Lwm2mState, hibernate}.
|
|
|
|
terminate(Reason, Lwm2mState) ->
|
|
emqx_lwm2m_protocol:terminate(Reason, Lwm2mState).
|
|
|
|
%%%%%%%%%%%%%%%%%%%%%%
|
|
%% Internal Functions
|
|
%%%%%%%%%%%%%%%%%%%%%%
|
|
process_register(ChId, LwM2MQuery, LwM2MPayload, Lwm2mState) ->
|
|
Epn = maps:get(<<"ep">>, LwM2MQuery, undefined),
|
|
LifeTime = maps:get(<<"lt">>, LwM2MQuery, undefined),
|
|
Ver = maps:get(<<"lwm2m">>, LwM2MQuery, undefined),
|
|
case check_lwm2m_version(Ver) of
|
|
false ->
|
|
?LOG(error, "Reject REGISTER from ~p due to unsupported version: ~p", [ChId, Ver]),
|
|
lwm2m_coap_responder:stop(invalid_version),
|
|
{error, precondition_failed, Lwm2mState};
|
|
true ->
|
|
case check_epn(Epn) andalso check_lifetime(LifeTime) of
|
|
true ->
|
|
init_lwm2m_emq_client(ChId, LwM2MQuery, LwM2MPayload, Lwm2mState);
|
|
false ->
|
|
?LOG(error, "Reject REGISTER from ~p due to wrong parameters, epn=~p, lifetime=~p", [ChId, Epn, LifeTime]),
|
|
lwm2m_coap_responder:stop(invalid_query_params),
|
|
{error, bad_request, Lwm2mState}
|
|
end
|
|
end.
|
|
|
|
process_update(ChId, LwM2MQuery, Location, LwM2MPayload, Lwm2mState) ->
|
|
LocationPath = binary_util:join_path(Location),
|
|
case get(lwm2m_context) of
|
|
#lwm2m_context{location = LocationPath} ->
|
|
RegInfo = append_object_list(LwM2MQuery, LwM2MPayload),
|
|
Lwm2mState2 = emqx_lwm2m_protocol:update_reg_info(RegInfo, Lwm2mState),
|
|
?LOG(info, "~p, UPDATE Success, assgined location: ~p", [ChId, LocationPath]),
|
|
{ok, changed, #coap_content{}, Lwm2mState2};
|
|
undefined ->
|
|
?LOG(error, "Reject UPDATE from ~p, Location: ~p not found", [ChId, Location]),
|
|
{error, forbidden, Lwm2mState};
|
|
TrueLocation ->
|
|
?LOG(error, "Reject UPDATE from ~p, Wrong Location: ~p, registered location record: ~p", [ChId, Location, TrueLocation]),
|
|
{error, not_found, Lwm2mState}
|
|
end.
|
|
|
|
init_lwm2m_emq_client(ChId, LwM2MQuery = #{<<"ep">> := Epn}, LwM2MPayload, _Lwm2mState = undefined) ->
|
|
RegInfo = append_object_list(LwM2MQuery, LwM2MPayload),
|
|
case emqx_lwm2m_protocol:init(self(), Epn, ChId, RegInfo) of
|
|
{ok, Lwm2mState} ->
|
|
LocationPath = assign_location_path(Epn),
|
|
?LOG(info, "~p, REGISTER Success, assgined location: ~p", [ChId, LocationPath]),
|
|
{ok, created, #coap_content{location_path = LocationPath}, Lwm2mState};
|
|
{error, Error} ->
|
|
lwm2m_coap_responder:stop(Error),
|
|
?LOG(error, "~p, REGISTER Failed, error: ~p", [ChId, Error]),
|
|
{error, forbidden, undefined}
|
|
end;
|
|
init_lwm2m_emq_client(ChId, LwM2MQuery = #{<<"ep">> := Epn}, LwM2MPayload, Lwm2mState) ->
|
|
RegInfo = append_object_list(LwM2MQuery, LwM2MPayload),
|
|
LocationPath = assign_location_path(Epn),
|
|
?LOG(info, "~p, RE-REGISTER Success, location: ~p", [ChId, LocationPath]),
|
|
Lwm2mState2 = emqx_lwm2m_protocol:replace_reg_info(RegInfo, Lwm2mState),
|
|
{ok, created, #coap_content{location_path = LocationPath}, Lwm2mState2}.
|
|
|
|
append_object_list(LwM2MQuery, <<>>) when map_size(LwM2MQuery) == 0 -> #{};
|
|
append_object_list(LwM2MQuery, <<>>) -> LwM2MQuery;
|
|
append_object_list(LwM2MQuery, LwM2MPayload) when is_binary(LwM2MPayload) ->
|
|
{AlterPath, ObjList} = parse_object_list(LwM2MPayload),
|
|
LwM2MQuery#{
|
|
<<"alternatePath">> => AlterPath,
|
|
<<"objectList">> => ObjList
|
|
}.
|
|
|
|
parse_options(InputQuery) ->
|
|
parse_options(InputQuery, maps:new()).
|
|
|
|
parse_options([], Query) -> {ok, Query};
|
|
parse_options([<<"ep=", Epn/binary>>|T], Query) ->
|
|
parse_options(T, maps:put(<<"ep">>, Epn, Query));
|
|
parse_options([<<"lt=", Lt/binary>>|T], Query) ->
|
|
parse_options(T, maps:put(<<"lt">>, binary_to_integer(Lt), Query));
|
|
parse_options([<<"lwm2m=", Ver/binary>>|T], Query) ->
|
|
parse_options(T, maps:put(<<"lwm2m">>, Ver, Query));
|
|
parse_options([<<"b=", Binding/binary>>|T], Query) ->
|
|
parse_options(T, maps:put(<<"b">>, Binding, Query));
|
|
parse_options([CustomOption|T], Query) ->
|
|
case binary:split(CustomOption, <<"=">>) of
|
|
[OptKey, OptValue] when OptKey =/= <<>> ->
|
|
?LOG(debug, "non-standard option: ~p", [CustomOption]),
|
|
parse_options(T, maps:put(OptKey, OptValue, Query));
|
|
_BadOpt ->
|
|
?LOG(error, "bad option: ~p", [CustomOption]),
|
|
{error, {bad_opt, CustomOption}}
|
|
end.
|
|
|
|
parse_object_list(<<>>) -> {<<"/">>, <<>>};
|
|
parse_object_list(ObjLinks) when is_binary(ObjLinks) ->
|
|
parse_object_list(binary:split(ObjLinks, <<",">>, [global]));
|
|
|
|
parse_object_list(FullObjLinkList) when is_list(FullObjLinkList) ->
|
|
case drop_attr(FullObjLinkList) of
|
|
{<<"/">>, _} = RootPrefixedLinks ->
|
|
RootPrefixedLinks;
|
|
{AlterPath, ObjLinkList} ->
|
|
LenAlterPath = byte_size(AlterPath),
|
|
WithOutPrefix =
|
|
lists:map(
|
|
fun
|
|
(<<Prefix:LenAlterPath/binary, Link/binary>>) when Prefix =:= AlterPath ->
|
|
trim(Link);
|
|
(Link) -> Link
|
|
end, ObjLinkList),
|
|
{AlterPath, WithOutPrefix}
|
|
end.
|
|
|
|
drop_attr(LinkList) ->
|
|
lists:foldr(
|
|
fun(Link, {AlternatePath, LinkAcc}) ->
|
|
{MainLink, LinkAttrs} = parse_link(Link),
|
|
case is_alternate_path(LinkAttrs) of
|
|
false -> {AlternatePath, [MainLink | LinkAcc]};
|
|
true -> {MainLink, LinkAcc}
|
|
end
|
|
end, {<<"/">>, []}, LinkList).
|
|
|
|
is_alternate_path(#{<<"rt">> := ?OMA_ALTER_PATH_RT}) -> true;
|
|
is_alternate_path(_) -> false.
|
|
|
|
parse_link(Link) ->
|
|
[MainLink | Attrs] = binary:split(trim(Link), <<";">>, [global]),
|
|
{delink(trim(MainLink)), parse_link_attrs(Attrs)}.
|
|
|
|
parse_link_attrs(LinkAttrs) when is_list(LinkAttrs) ->
|
|
lists:foldl(
|
|
fun(Attr, Acc) ->
|
|
case binary:split(trim(Attr), <<"=">>) of
|
|
[AttrKey, AttrValue] when AttrKey =/= <<>> ->
|
|
maps:put(AttrKey, AttrValue, Acc);
|
|
_BadAttr -> throw({bad_attr, _BadAttr})
|
|
end
|
|
end, maps:new(), LinkAttrs).
|
|
|
|
trim(Str)-> binary_util:trim(Str, $ ).
|
|
delink(Str) ->
|
|
Ltrim = binary_util:ltrim(Str, $<),
|
|
binary_util:rtrim(Ltrim, $>).
|
|
|
|
check_lwm2m_version(<<"1">>) -> true;
|
|
check_lwm2m_version(<<"1.", _PatchVerNum/binary>>) -> true;
|
|
check_lwm2m_version(_) -> false.
|
|
|
|
check_epn(undefined) -> false;
|
|
check_epn(_) -> true.
|
|
|
|
check_lifetime(undefined) -> false;
|
|
check_lifetime(LifeTime) when is_integer(LifeTime) ->
|
|
Max = proplists:get_value(lifetime_max, lwm2m_coap_responder:options(), 315360000),
|
|
Min = proplists:get_value(lifetime_min, lwm2m_coap_responder:options(), 0),
|
|
if
|
|
LifeTime >= Min, LifeTime =< Max ->
|
|
true;
|
|
true ->
|
|
false
|
|
end;
|
|
check_lifetime(_) -> false.
|
|
|
|
|
|
assign_location_path(Epn) ->
|
|
%Location = list_to_binary(io_lib:format("~.16B", [rand:uniform(65535)])),
|
|
%LocationPath = <<"/rd/", Location/binary>>,
|
|
Location = [<<"rd">>, Epn],
|
|
put(lwm2m_context, #lwm2m_context{epn = Epn, location = binary_util:join_path(Location)}),
|
|
Location.
|