emqx/apps/emqx_coap/src/emqx_coap_resource.erl

160 lines
5.9 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_resource).
-behaviour(coap_resource).
-include("emqx_coap.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("gen_coap/include/coap.hrl").
-logger_header("[CoAP-RES]").
-export([ coap_discover/2
, coap_get/5
, coap_post/4
, coap_put/4
, coap_delete/3
, coap_observe/5
, coap_unobserve/1
, handle_info/2
, coap_ack/2
]).
-ifdef(TEST).
-export([topic/1]).
-endif.
-define(MQTT_PREFIX, [<<"mqtt">>]).
% resource operations
coap_discover(_Prefix, _Args) ->
[{absolute, [<<"mqtt">>], []}].
coap_get(ChId, ?MQTT_PREFIX, Path, Query, _Content) ->
?LOG(debug, "coap_get() Path=~p, Query=~p~n", [Path, redact_query(Query)]),
#coap_mqtt_auth{clientid = Clientid, username = Usr, password = Passwd} = get_auth(Query),
case emqx_coap_mqtt_adapter:client_pid(Clientid, Usr, Passwd, ChId) of
{ok, Pid} ->
put(mqtt_client_pid, Pid),
#coap_content{};
{error, auth_failure} ->
put(mqtt_client_pid, undefined),
{error, unauthorized};
{error, bad_request} ->
put(mqtt_client_pid, undefined),
{error, bad_request};
{error, _Other} ->
put(mqtt_client_pid, undefined),
{error, internal_server_error}
end;
coap_get(ChId, Prefix, Path, Query, _Content) ->
?LOG(error, "ignore bad get request ChId=~p, Prefix=~p, Path=~p, Query=~p",
[ChId, Prefix, Path, redact_query(Query)]),
{error, bad_request}.
coap_post(_ChId, _Prefix, _Topic, _Content) ->
{error, method_not_allowed}.
coap_put(_ChId, ?MQTT_PREFIX, Topic, #coap_content{payload = Payload}) when Topic =/= [] ->
?LOG(debug, "put message, Topic=~p, Payload=~p~n", [Topic, Payload]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload);
coap_put(_ChId, Prefix, Topic, Content) ->
?LOG(error, "put has error, Prefix=~p, Topic=~p, Content=~p", [Prefix, Topic, Content]),
{error, bad_request}.
coap_delete(_ChId, _Prefix, _Topic) ->
{error, method_not_allowed}.
coap_observe(ChId, ?MQTT_PREFIX, Topic, Ack, Content) when Topic =/= [] ->
TrueTopic = topic(Topic),
?LOG(debug, "observe Topic=~p, Ack=~p", [TrueTopic, Ack]),
Pid = get(mqtt_client_pid),
case emqx_coap_mqtt_adapter:subscribe(Pid, TrueTopic) of
ok -> {ok, {state, ChId, ?MQTT_PREFIX, [TrueTopic]}, content, Content};
{error, Code} -> {error, Code}
end;
coap_observe(ChId, Prefix, Topic, Ack, _Content) ->
?LOG(error, "unknown observe request ChId=~p, Prefix=~p, Topic=~p, Ack=~p", [ChId, Prefix, Topic, Ack]),
{error, bad_request}.
coap_unobserve({state, _ChId, ?MQTT_PREFIX, Topic}) when Topic =/= [] ->
?LOG(debug, "unobserve ~p", [Topic]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:unsubscribe(Pid, topic(Topic)),
ok;
coap_unobserve({state, ChId, Prefix, Topic}) ->
?LOG(error, "ignore unknown unobserve request ChId=~p, Prefix=~p, Topic=~p", [ChId, Prefix, Topic]),
ok.
handle_info({dispatch, Topic, Payload}, State) ->
%% This clause should never be matched any more. We keep it here to handle
%% the old format messages during the release upgrade.
%% In this case the second function clause of `coap_ack/2` will be called,
%% and the ACKs is discarded.
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info({dispatch, Msg}, State) ->
Payload = emqx_coap_mqtt_adapter:message_payload(Msg),
{notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info(Message, State) ->
emqx_coap_mqtt_adapter:handle_info(Message, State).
coap_ack({pub, Msg}, State) ->
?LOG(debug, "received coap ack for publish msg: ~p", [Msg]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:received_puback(Pid, Msg),
{ok, State};
coap_ack(_Ref, State) ->
?LOG(debug, "received coap ack: ~p", [_Ref]),
{ok, State}.
get_auth(Query) ->
get_auth(Query, #coap_mqtt_auth{}).
get_auth([], Auth=#coap_mqtt_auth{}) ->
Auth;
get_auth([<<$c, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
get_auth(T, Auth#coap_mqtt_auth{clientid = Rest});
get_auth([<<$u, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
get_auth(T, Auth#coap_mqtt_auth{username = Rest});
get_auth([<<$p, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
get_auth(T, Auth#coap_mqtt_auth{password = Rest});
get_auth([Param|T], Auth=#coap_mqtt_auth{}) ->
?LOG(error, "ignore unknown parameter ~p", [Param]),
get_auth(T, Auth).
topic(Topic) when is_binary(Topic) -> Topic;
topic([]) -> <<>>;
topic([Path | TopicPath]) ->
case topic(TopicPath) of
<<>> -> Path;
RemTopic ->
<<Path/binary, $/, RemTopic/binary>>
end.
redact_query(Auths) ->
lists:map(fun(<<$p, $=, _Rest/binary>>) ->
<<$p, $=, "******">>;
(E) ->
E
end,
Auths).