emqx/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl

475 lines
16 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2016-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_cmd).
-include_lib("emqx/include/logger.hrl").
-include("src/coap/include/emqx_coap.hrl").
-include("src/lwm2m/include/emqx_lwm2m.hrl").
-export([
mqtt_to_coap/2,
coap_to_mqtt/4,
empty_ack_to_mqtt/1,
coap_failure_to_mqtt/2
]).
-export([path_list/1, extract_path/1]).
-define(STANDARD, 1).
%%-type msg_type() :: <<"create">>
%% | <<"delete">>
%% | <<"read">>
%% | <<"write">>
%% | <<"execute">>
%% | <<"discover">>
%% | <<"write-attr">>
%% | <<"observe">>
%% | <<"cancel-observe">>.
%%
%%-type cmd() :: #{ <<"msgType">> := msg_type()
%% , <<"data">> := maps()
%% %%%% more keys?
%% }.
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
mqtt_to_coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"create">>, <<"data">> := Data}) ->
{PathList, QueryList} = path_list(maps:get(<<"basePath">>, Data, <<"/">>)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
TlvData = emqx_lwm2m_message:json_to_tlv(PathList, maps:get(<<"content">>, Data)),
Payload = emqx_lwm2m_tlv:encode(TlvData),
CoapRequest = emqx_coap_message:request(
con,
post,
Payload,
[
{uri_path, FullPathList},
{uri_query, QueryList},
{content_format, <<"application/vnd.oma.lwm2m+tlv">>}
]
),
{CoapRequest, InputCmd};
mqtt_to_coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"delete">>, <<"data">> := Data}) ->
{PathList, QueryList} = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
{
emqx_coap_message:request(
con,
delete,
<<>>,
[
{uri_path, FullPathList},
{uri_query, QueryList}
]
),
InputCmd
};
mqtt_to_coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"read">>, <<"data">> := Data}) ->
{PathList, QueryList} = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
{
emqx_coap_message:request(
con,
get,
<<>>,
[
{uri_path, FullPathList},
{uri_query, QueryList}
]
),
InputCmd
};
mqtt_to_coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"write">>, <<"data">> := Data}) ->
CoapRequest =
case maps:get(<<"basePath">>, Data, <<"/">>) of
<<"/">> ->
single_write_request(AlternatePath, Data);
BasePath ->
batch_write_request(AlternatePath, BasePath, maps:get(<<"content">>, Data))
end,
{CoapRequest, InputCmd};
mqtt_to_coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"execute">>, <<"data">> := Data}) ->
{PathList, QueryList} = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
Args =
case maps:get(<<"args">>, Data, <<>>) of
<<"undefined">> -> <<>>;
undefined -> <<>>;
Arg1 -> Arg1
end,
{
emqx_coap_message:request(
con,
post,
Args,
[
{uri_path, FullPathList},
{uri_query, QueryList},
{content_format, <<"text/plain">>}
]
),
InputCmd
};
mqtt_to_coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"discover">>, <<"data">> := Data}) ->
{PathList, QueryList} = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
{
emqx_coap_message:request(
con,
get,
<<>>,
[
{uri_path, FullPathList},
{uri_query, QueryList},
{'accept', ?LWM2M_FORMAT_LINK}
]
),
InputCmd
};
mqtt_to_coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"write-attr">>, <<"data">> := Data}) ->
{PathList, QueryList} = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
Query = attr_query_list(Data),
{
emqx_coap_message:request(
con,
put,
<<>>,
[
{uri_path, FullPathList},
{uri_query, QueryList},
{uri_query, Query}
]
),
InputCmd
};
mqtt_to_coap(AlternatePath, InputCmd = #{<<"msgType">> := <<"observe">>, <<"data">> := Data}) ->
{PathList, QueryList} = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
{
emqx_coap_message:request(
con,
get,
<<>>,
[
{uri_path, FullPathList},
{uri_query, QueryList},
{observe, 0}
]
),
InputCmd
};
mqtt_to_coap(
AlternatePath, InputCmd = #{<<"msgType">> := <<"cancel-observe">>, <<"data">> := Data}
) ->
{PathList, QueryList} = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
{
emqx_coap_message:request(
con,
get,
<<>>,
[
{uri_path, FullPathList},
{uri_query, QueryList},
{observe, 1}
]
),
InputCmd
}.
coap_to_mqtt(_Method = {_, Code}, _CoapPayload, _Options, Ref = #{<<"msgType">> := <<"create">>}) ->
make_response(Code, Ref);
coap_to_mqtt(_Method = {_, Code}, _CoapPayload, _Options, Ref = #{<<"msgType">> := <<"delete">>}) ->
make_response(Code, Ref);
coap_to_mqtt(Method, CoapPayload, Options, Ref = #{<<"msgType">> := <<"read">>}) ->
read_resp_to_mqtt(Method, CoapPayload, data_format(Options), Ref);
coap_to_mqtt(Method, CoapPayload, _Options, Ref = #{<<"msgType">> := <<"write">>}) ->
write_resp_to_mqtt(Method, CoapPayload, Ref);
coap_to_mqtt(Method, _CoapPayload, _Options, Ref = #{<<"msgType">> := <<"execute">>}) ->
execute_resp_to_mqtt(Method, Ref);
coap_to_mqtt(Method, CoapPayload, _Options, Ref = #{<<"msgType">> := <<"discover">>}) ->
discover_resp_to_mqtt(Method, CoapPayload, Ref);
coap_to_mqtt(Method, CoapPayload, _Options, Ref = #{<<"msgType">> := <<"write-attr">>}) ->
writeattr_resp_to_mqtt(Method, CoapPayload, Ref);
coap_to_mqtt(Method, CoapPayload, Options, Ref = #{<<"msgType">> := <<"observe">>}) ->
observe_resp_to_mqtt(Method, CoapPayload, data_format(Options), observe_seq(Options), Ref);
coap_to_mqtt(Method, CoapPayload, Options, Ref = #{<<"msgType">> := <<"cancel-observe">>}) ->
cancel_observe_resp_to_mqtt(Method, CoapPayload, data_format(Options), Ref).
read_resp_to_mqtt({error, ErrorCode}, _CoapPayload, _Format, Ref) ->
make_response(ErrorCode, Ref);
read_resp_to_mqtt({ok, SuccessCode}, CoapPayload, Format, Ref) ->
try
Result = content_to_mqtt(CoapPayload, Format, Ref),
make_response(SuccessCode, Ref, Format, Result)
catch
error:not_implemented ->
make_response(not_implemented, Ref);
_:Ex:_ST ->
?SLOG(error, #{
msg => "bad_payload_format",
payload => CoapPayload,
reason => Ex,
stacktrace => _ST
}),
make_response(bad_request, Ref)
end.
empty_ack_to_mqtt(Ref) ->
make_base_response(maps:put(<<"msgType">>, <<"ack">>, Ref)).
coap_failure_to_mqtt(Ref, MsgType) ->
make_base_response(maps:put(<<"msgType">>, MsgType, Ref)).
content_to_mqtt(CoapPayload, <<"text/plain">>, Ref) ->
emqx_lwm2m_message:text_to_json(extract_path(Ref), CoapPayload);
content_to_mqtt(CoapPayload, <<"application/octet-stream">>, Ref) ->
emqx_lwm2m_message:opaque_to_json(extract_path(Ref), CoapPayload);
content_to_mqtt(CoapPayload, <<"application/vnd.oma.lwm2m+tlv">>, Ref) ->
emqx_lwm2m_message:tlv_to_json(extract_path(Ref), CoapPayload);
content_to_mqtt(CoapPayload, <<"application/vnd.oma.lwm2m+json">>, _Ref) ->
emqx_lwm2m_message:translate_json(CoapPayload).
write_resp_to_mqtt({ok, changed}, _CoapPayload, Ref) ->
make_response(changed, Ref);
write_resp_to_mqtt({ok, content}, CoapPayload, Ref) when CoapPayload =:= <<>> ->
make_response(method_not_allowed, Ref);
write_resp_to_mqtt({ok, content}, _CoapPayload, Ref) ->
make_response(changed, Ref);
write_resp_to_mqtt({error, Error}, _CoapPayload, Ref) ->
make_response(Error, Ref).
execute_resp_to_mqtt({ok, changed}, Ref) ->
make_response(changed, Ref);
execute_resp_to_mqtt({error, Error}, Ref) ->
make_response(Error, Ref).
discover_resp_to_mqtt({ok, content}, CoapPayload, Ref) ->
Links = binary:split(CoapPayload, <<",">>, [global]),
make_response(content, Ref, <<"application/link-format">>, Links);
discover_resp_to_mqtt({error, Error}, _CoapPayload, Ref) ->
make_response(Error, Ref).
writeattr_resp_to_mqtt({ok, changed}, _CoapPayload, Ref) ->
make_response(changed, Ref);
writeattr_resp_to_mqtt({error, Error}, _CoapPayload, Ref) ->
make_response(Error, Ref).
observe_resp_to_mqtt({error, Error}, _CoapPayload, _Format, _ObserveSeqNum, Ref) ->
make_response(Error, Ref);
observe_resp_to_mqtt({ok, content}, CoapPayload, Format, 0, Ref) ->
read_resp_to_mqtt({ok, content}, CoapPayload, Format, Ref);
observe_resp_to_mqtt({ok, content}, CoapPayload, Format, ObserveSeqNum, Ref) ->
read_resp_to_mqtt({ok, content}, CoapPayload, Format, Ref#{<<"seqNum">> => ObserveSeqNum}).
cancel_observe_resp_to_mqtt({ok, content}, CoapPayload, Format, Ref) ->
read_resp_to_mqtt({ok, content}, CoapPayload, Format, Ref);
cancel_observe_resp_to_mqtt({error, Error}, _CoapPayload, _Format, Ref) ->
make_response(Error, Ref).
make_response(Code, Ref = #{}) ->
BaseRsp = make_base_response(Ref),
make_data_response(BaseRsp, Code).
make_response(Code, Ref = #{}, _Format, Result) ->
BaseRsp = make_base_response(Ref),
make_data_response(BaseRsp, Code, _Format, Result).
%% The base response format is what included in the request:
%%
%% #{
%% <<"seqNum">> => SeqNum,
%% <<"imsi">> => maps:get(<<"imsi">>, Ref, null),
%% <<"imei">> => maps:get(<<"imei">>, Ref, null),
%% <<"requestID">> => maps:get(<<"requestID">>, Ref, null),
%% <<"cacheID">> => maps:get(<<"cacheID">>, Ref, null),
%% <<"msgType">> => maps:get(<<"msgType">>, Ref, null)
%% }
make_base_response(Ref = #{}) ->
remove_tmp_fields(Ref).
make_data_response(BaseRsp, Code) ->
BaseRsp#{
<<"data">> => #{
<<"reqPath">> => extract_path(BaseRsp),
<<"code">> => code(Code),
<<"codeMsg">> => Code
}
}.
make_data_response(BaseRsp, Code, _Format, Result) ->
BaseRsp#{
<<"data">> =>
#{
<<"reqPath">> => extract_path(BaseRsp),
<<"code">> => code(Code),
<<"codeMsg">> => Code,
<<"content">> => Result
}
}.
remove_tmp_fields(Ref) ->
maps:remove(observe_type, Ref).
-spec path_list(Path :: binary()) -> {[PathWord :: binary()], [Query :: binary()]}.
path_list(Path) ->
case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of
[ObjId, ObjInsId, ResId, LastPart] ->
{ResInstId, QueryList} = query_list(LastPart),
{[ObjId, ObjInsId, ResId, ResInstId], QueryList};
[ObjId, ObjInsId, LastPart] ->
{ResId, QueryList} = query_list(LastPart),
{[ObjId, ObjInsId, ResId], QueryList};
[ObjId, LastPart] ->
{ObjInsId, QueryList} = query_list(LastPart),
{[ObjId, ObjInsId], QueryList};
[LastPart] ->
{ObjId, QueryList} = query_list(LastPart),
{[ObjId], QueryList}
end.
query_list(PathWithQuery) ->
case binary:split(PathWithQuery, [<<$?>>], []) of
[Path] -> {Path, []};
[Path, Querys] -> {Path, binary:split(Querys, [<<$&>>], [global])}
end.
attr_query_list(Data) ->
attr_query_list(Data, valid_attr_keys(), []).
attr_query_list(QueryJson = #{}, ValidAttrKeys, QueryList) ->
maps:fold(
fun
(_K, null, Acc) ->
Acc;
(K, V, Acc) ->
case lists:member(K, ValidAttrKeys) of
true ->
KV = <<K/binary, "=", V/binary>>,
Acc ++ [KV];
false ->
Acc
end
end,
QueryList,
QueryJson
).
valid_attr_keys() ->
[<<"pmin">>, <<"pmax">>, <<"gt">>, <<"lt">>, <<"st">>].
data_format(Options) ->
maps:get(content_format, Options, <<"text/plain">>).
observe_seq(Options) ->
maps:get(observe, Options, rand:uniform(1000000) + 1).
add_alternate_path_prefix(<<"/">>, PathList) ->
PathList;
add_alternate_path_prefix(AlternatePath, PathList) ->
[binary_util:trim(AlternatePath, $/) | PathList].
extract_path(Ref = #{}) ->
drop_query(
case Ref of
#{<<"data">> := Data} ->
case maps:get(<<"path">>, Data, undefined) of
undefined -> maps:get(<<"basePath">>, Data, undefined);
Path -> Path
end;
#{<<"path">> := Path} ->
Path
end
).
batch_write_request(AlternatePath, BasePath, Content) ->
{PathList, QueryList} = path_list(BasePath),
Method =
case length(PathList) of
2 -> post;
3 -> put
end,
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
TlvData = emqx_lwm2m_message:json_to_tlv(PathList, Content),
Payload = emqx_lwm2m_tlv:encode(TlvData),
emqx_coap_message:request(
con,
Method,
Payload,
[
{uri_path, FullPathList},
{uri_query, QueryList},
{content_format, <<"application/vnd.oma.lwm2m+tlv">>}
]
).
single_write_request(AlternatePath, Data) ->
{PathList, QueryList} = path_list(maps:get(<<"path">>, Data)),
FullPathList = add_alternate_path_prefix(AlternatePath, PathList),
%% TO DO: handle write to resource instance, e.g. /4/0/1/0
TlvData = emqx_lwm2m_message:json_to_tlv(PathList, [Data]),
Payload = emqx_lwm2m_tlv:encode(TlvData),
emqx_coap_message:request(
con,
put,
Payload,
[
{uri_path, FullPathList},
{uri_query, QueryList},
{content_format, <<"application/vnd.oma.lwm2m+tlv">>}
]
).
drop_query(Path) ->
case binary:split(Path, [<<$?>>]) of
[Path] -> Path;
[PathOnly, _Query] -> PathOnly
end.
code(get) -> <<"0.01">>;
code(post) -> <<"0.02">>;
code(put) -> <<"0.03">>;
code(delete) -> <<"0.04">>;
code(created) -> <<"2.01">>;
code(deleted) -> <<"2.02">>;
code(valid) -> <<"2.03">>;
code(changed) -> <<"2.04">>;
code(content) -> <<"2.05">>;
code(continue) -> <<"2.31">>;
code(bad_request) -> <<"4.00">>;
code(unauthorized) -> <<"4.01">>;
code(bad_option) -> <<"4.02">>;
code(forbidden) -> <<"4.03">>;
code(not_found) -> <<"4.04">>;
code(method_not_allowed) -> <<"4.05">>;
code(not_acceptable) -> <<"4.06">>;
code(request_entity_incomplete) -> <<"4.08">>;
code(precondition_failed) -> <<"4.12">>;
code(request_entity_too_large) -> <<"4.13">>;
code(unsupported_content_format) -> <<"4.15">>;
code(internal_server_error) -> <<"5.00">>;
code(not_implemented) -> <<"5.01">>;
code(bad_gateway) -> <<"5.02">>;
code(service_unavailable) -> <<"5.03">>;
code(gateway_timeout) -> <<"5.04">>;
code(proxying_not_supported) -> <<"5.05">>.