emqx/apps/emqx_lwm2m/src/emqx_lwm2m_cmd_handler.erl

313 lines
14 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_cmd_handler).
-include("emqx_lwm2m.hrl").
-include_lib("lwm2m_coap/include/coap.hrl").
-export([ mqtt2coap/2
, coap2mqtt/4
, ack2mqtt/1
, extract_path/1
]).
-export([path_list/1]).
-define(LOG(Level, Format, Args), logger:Level("LWM2M-CMD: " ++ Format, Args)).
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"create">>, <<"data">> := Data}) ->
PathList = path_list(maps:get(<<"basePath">>, Data, <<"/">>)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
TlvData = emqx_lwm2m_message:json_to_tlv(PathList, maps:get(<<"content">>, Data)),
Payload = emqx_lwm2m_tlv:encode(TlvData),
CoapRequest = lwm2m_coap_message:request(con, post, Payload, [{uri_path, FullPathList},
{content_format, <<"application/vnd.oma.lwm2m+tlv">>}]),
{CoapRequest, InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"delete">>, <<"data">> := Data}) ->
FullPathList = add_alternate_path_prefix(AlternatePath, path_list(maps:get(<<"path">>, Data))),
{lwm2m_coap_message:request(con, delete, <<>>, [{uri_path, FullPathList}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"read">>, <<"data">> := Data}) ->
FullPathList = add_alternate_path_prefix(AlternatePath, path_list(maps:get(<<"path">>, Data))),
{lwm2m_coap_message:request(con, get, <<>>, [{uri_path, FullPathList}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"write">>, <<"data">> := Data}) ->
Encoding = maps:get(<<"encoding">>, InputCmd, <<"plain">>),
CoapRequest =
case maps:get(<<"basePath">>, Data, <<"/">>) of
<<"/">> ->
single_write_request(AlternatePath, Data, Encoding);
BasePath ->
batch_write_request(AlternatePath, BasePath, maps:get(<<"content">>, Data), Encoding)
end,
{CoapRequest, InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"execute">>, <<"data">> := Data}) ->
FullPathList = add_alternate_path_prefix(AlternatePath, path_list(maps:get(<<"path">>, Data))),
Args =
case maps:get(<<"args">>, Data, <<>>) of
<<"undefined">> -> <<>>;
undefined -> <<>>;
Arg1 -> Arg1
end,
{lwm2m_coap_message:request(con, post, Args, [{uri_path, FullPathList}, {content_format, <<"text/plain">>}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"discover">>, <<"data">> := Data}) ->
FullPathList = add_alternate_path_prefix(AlternatePath, path_list(maps:get(<<"path">>, Data))),
{lwm2m_coap_message:request(con, get, <<>>, [{uri_path, FullPathList}, {'accept', ?LWM2M_FORMAT_LINK}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"write-attr">>, <<"data">> := Data}) ->
FullPathList = add_alternate_path_prefix(AlternatePath, path_list(maps:get(<<"path">>, Data))),
Query = attr_query_list(Data),
{lwm2m_coap_message:request(con, put, <<>>, [{uri_path, FullPathList}, {uri_query, Query}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"observe">>, <<"data">> := Data}) ->
PathList = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
{lwm2m_coap_message:request(con, get, <<>>, [{uri_path, FullPathList}, {observe, 0}]), InputCmd};
mqtt2coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"cancel-observe">>, <<"data">> := Data}) ->
PathList = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
{lwm2m_coap_message:request(con, get, <<>>, [{uri_path, FullPathList}, {observe, 1}]), InputCmd}.
coap2mqtt(_Method = {_, Code}, _CoapPayload, _Options, Ref=#{<<"msgType">> := <<"create">>}) ->
make_response(Code, Ref);
coap2mqtt(_Method = {_, Code}, _CoapPayload, _Options, Ref=#{<<"msgType">> := <<"delete">>}) ->
make_response(Code, Ref);
coap2mqtt(Method, CoapPayload, Options, Ref=#{<<"msgType">> := <<"read">>}) ->
coap_read_to_mqtt(Method, CoapPayload, data_format(Options), Ref);
coap2mqtt(Method, _CoapPayload, _Options, Ref=#{<<"msgType">> := <<"write">>}) ->
coap_write_to_mqtt(Method, Ref);
coap2mqtt(Method, _CoapPayload, _Options, Ref=#{<<"msgType">> := <<"execute">>}) ->
coap_execute_to_mqtt(Method, Ref);
coap2mqtt(Method, CoapPayload, _Options, Ref=#{<<"msgType">> := <<"discover">>}) ->
coap_discover_to_mqtt(Method, CoapPayload, Ref);
coap2mqtt(Method, CoapPayload, _Options, Ref=#{<<"msgType">> := <<"write-attr">>}) ->
coap_writeattr_to_mqtt(Method, CoapPayload, Ref);
coap2mqtt(Method, CoapPayload, Options, Ref=#{<<"msgType">> := <<"observe">>}) ->
coap_observe_to_mqtt(Method, CoapPayload, data_format(Options), observe_seq(Options), Ref);
coap2mqtt(Method, CoapPayload, Options, Ref=#{<<"msgType">> := <<"cancel-observe">>}) ->
coap_cancel_observe_to_mqtt(Method, CoapPayload, data_format(Options), Ref).
coap_read_to_mqtt({error, ErrorCode}, _CoapPayload, _Format, Ref) ->
make_response(ErrorCode, Ref);
coap_read_to_mqtt({ok, SuccessCode}, CoapPayload, Format, Ref) ->
try
Result = coap_content_to_mqtt_payload(CoapPayload, Format, Ref),
make_response(SuccessCode, Ref, Format, Result)
catch
throw : {bad_request, Reason} ->
?LOG(error, "bad_request, reason=~p, payload=~p", [Reason, CoapPayload]),
make_response(bad_request, Ref);
C:R:Stack ->
?LOG(error, "bad_request, error=~p, stacktrace=~p~npayload=~p", [{C, R}, Stack, CoapPayload]),
make_response(bad_request, Ref)
end.
ack2mqtt(Ref) ->
make_base_response(Ref).
coap_content_to_mqtt_payload(CoapPayload, <<"text/plain">>, Ref) ->
emqx_lwm2m_message:text_to_json(extract_path(Ref), CoapPayload);
coap_content_to_mqtt_payload(CoapPayload, <<"application/octet-stream">>, Ref) ->
emqx_lwm2m_message:opaque_to_json(extract_path(Ref), CoapPayload);
coap_content_to_mqtt_payload(CoapPayload, <<"application/vnd.oma.lwm2m+tlv">>, Ref) ->
emqx_lwm2m_message:tlv_to_json(extract_path(Ref), CoapPayload);
coap_content_to_mqtt_payload(CoapPayload, <<"application/vnd.oma.lwm2m+json">>, _Ref) ->
emqx_lwm2m_message:translate_json(CoapPayload).
coap_write_to_mqtt({ok, changed}, Ref) ->
make_response(changed, Ref);
coap_write_to_mqtt({error, Error}, Ref) ->
make_response(Error, Ref).
coap_execute_to_mqtt({ok, changed}, Ref) ->
make_response(changed, Ref);
coap_execute_to_mqtt({error, Error}, Ref) ->
make_response(Error, Ref).
coap_discover_to_mqtt({ok, content}, CoapPayload, Ref) ->
Links = binary:split(CoapPayload, <<",">>),
make_response(content, Ref, <<"application/link-format">>, Links);
coap_discover_to_mqtt({error, Error}, _CoapPayload, Ref) ->
make_response(Error, Ref).
coap_writeattr_to_mqtt({ok, changed}, _CoapPayload, Ref) ->
make_response(changed, Ref);
coap_writeattr_to_mqtt({error, Error}, _CoapPayload, Ref) ->
make_response(Error, Ref).
coap_observe_to_mqtt({error, Error}, _CoapPayload, _Format, _ObserveSeqNum, Ref) ->
make_response(Error, Ref);
coap_observe_to_mqtt({ok, content}, CoapPayload, Format, 0, Ref) ->
coap_read_to_mqtt({ok, content}, CoapPayload, Format, Ref);
coap_observe_to_mqtt({ok, content}, CoapPayload, Format, ObserveSeqNum, Ref) ->
RefWithObserve = maps:put(<<"seqNum">>, ObserveSeqNum, Ref),
RefNotify = maps:put(<<"msgType">>, <<"notify">>, RefWithObserve),
coap_read_to_mqtt({ok, content}, CoapPayload, Format, RefNotify).
coap_cancel_observe_to_mqtt({ok, content}, CoapPayload, Format, Ref) ->
coap_read_to_mqtt({ok, content}, CoapPayload, Format, Ref);
coap_cancel_observe_to_mqtt({error, Error}, _CoapPayload, _Format, Ref) ->
make_response(Error, Ref).
make_response(Code, Ref=#{}) ->
BaseRsp = make_base_response(Ref),
make_data_response(BaseRsp, Code).
make_response(Code, Ref=#{}, _Format, Result) ->
BaseRsp = make_base_response(Ref),
make_data_response(BaseRsp, Code, _Format, Result).
%% The base response format is what included in the request:
%%
%% #{
%% <<"seqNum">> => SeqNum,
%% <<"requestID">> => maps:get(<<"requestID">>, Ref, null),
%% <<"cacheID">> => maps:get(<<"cacheID">>, Ref, null),
%% <<"msgType">> => maps:get(<<"msgType">>, Ref, null)
%% }
make_base_response(Ref=#{}) ->
remove_tmp_fields(Ref).
make_data_response(BaseRsp, Code) ->
BaseRsp#{
<<"data">> => #{
<<"reqPath">> => extract_path(BaseRsp),
<<"code">> => code(Code),
<<"codeMsg">> => Code
}
}.
make_data_response(BaseRsp, Code, _Format, Result) ->
BaseRsp#{
<<"data">> => #{
<<"reqPath">> => extract_path(BaseRsp),
<<"code">> => code(Code),
<<"codeMsg">> => Code,
<<"content">> => Result
}
}.
remove_tmp_fields(Ref) ->
maps:remove(observe_type, Ref).
path_list(Path) ->
case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of
[ObjId, ObjInsId, ResId, ResInstId] -> [ObjId, ObjInsId, ResId, ResInstId];
[ObjId, ObjInsId, ResId] -> [ObjId, ObjInsId, ResId];
[ObjId, ObjInsId] -> [ObjId, ObjInsId];
[ObjId] -> [ObjId]
end.
attr_query_list(Data) ->
attr_query_list(Data, valid_attr_keys(), []).
attr_query_list(QueryJson = #{}, ValidAttrKeys, QueryList) ->
maps:fold(
fun
(_K, null, Acc) -> Acc;
(K, V, Acc) ->
case lists:member(K, ValidAttrKeys) of
true ->
Val = bin(V),
KV = <<K/binary, "=", Val/binary>>,
Acc ++ [KV];
false ->
Acc
end
end, QueryList, QueryJson).
valid_attr_keys() ->
[<<"pmin">>, <<"pmax">>, <<"gt">>, <<"lt">>, <<"st">>].
data_format(Options) ->
proplists:get_value(content_format, Options, <<"text/plain">>).
observe_seq(Options) ->
proplists:get_value(observe, Options, rand:uniform(1000000) + 1 ).
add_alternate_path_prefix(<<"/">>, PathList) ->
PathList;
add_alternate_path_prefix(AlternatePath, PathList) ->
[binary_util:trim(AlternatePath, $/) | PathList].
extract_path(Ref = #{}) ->
case Ref of
#{<<"data">> := Data} ->
case maps:get(<<"path">>, Data, nil) of
nil -> maps:get(<<"basePath">>, Data, undefined);
Path -> Path
end;
#{<<"path">> := Path} ->
Path
end.
batch_write_request(AlternatePath, BasePath, Content, Encoding) ->
PathList = path_list(BasePath),
Method = case length(PathList) of
2 -> post;
3 -> put
end,
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
Content1 = decoding(Content, Encoding),
TlvData = emqx_lwm2m_message:json_to_tlv(PathList, Content1),
Payload = emqx_lwm2m_tlv:encode(TlvData),
lwm2m_coap_message:request(con, Method, Payload, [{uri_path, FullPathList}, {content_format, <<"application/vnd.oma.lwm2m+tlv">>}]).
single_write_request(AlternatePath, Data, Encoding) ->
PathList = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
Datas = decoding([Data], Encoding),
TlvData = emqx_lwm2m_message:json_to_tlv(PathList, Datas),
Payload = emqx_lwm2m_tlv:encode(TlvData),
lwm2m_coap_message:request(con, put, Payload, [{uri_path, FullPathList}, {content_format, <<"application/vnd.oma.lwm2m+tlv">>}]).
code(get) -> <<"0.01">>;
code(post) -> <<"0.02">>;
code(put) -> <<"0.03">>;
code(delete) -> <<"0.04">>;
code(created) -> <<"2.01">>;
code(deleted) -> <<"2.02">>;
code(valid) -> <<"2.03">>;
code(changed) -> <<"2.04">>;
code(content) -> <<"2.05">>;
code(continue) -> <<"2.31">>;
code(bad_request) -> <<"4.00">>;
code(uauthorized) -> <<"4.01">>;
code(bad_option) -> <<"4.02">>;
code(forbidden) -> <<"4.03">>;
code(not_found) -> <<"4.04">>;
code(method_not_allowed) -> <<"4.05">>;
code(not_acceptable) -> <<"4.06">>;
code(request_entity_incomplete) -> <<"4.08">>;
code(precondition_failed) -> <<"4.12">>;
code(request_entity_too_large) -> <<"4.13">>;
code(unsupported_content_format) -> <<"4.15">>;
code(internal_server_error) -> <<"5.00">>;
code(not_implemented) -> <<"5.01">>;
code(bad_gateway) -> <<"5.02">>;
code(service_unavailable) -> <<"5.03">>;
code(gateway_timeout) -> <<"5.04">>;
code(proxying_not_supported) -> <<"5.05">>.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Int) when is_integer(Int) -> integer_to_binary(Int);
bin(Float) when is_float(Float) -> float_to_binary(Float).
decoding(Datas, <<"hex">>) ->
lists:map(fun(Data = #{<<"value">> := Value}) ->
Data#{<<"value">> => emqx_misc:hexstr2bin(Value)}
end, Datas);
decoding(Datas, _) ->
Datas.