emqx/apps/emqx_gateway_coap/src/emqx_coap_pubsub_handler.erl

188 lines
6.3 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% a coap to mqtt adapter with a retained topic message database
-module(emqx_coap_pubsub_handler).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_access_control.hrl").
-include("emqx_coap.hrl").
-export([handle_request/4]).
-import(emqx_coap_message, [response/2, response/3]).
-import(emqx_coap_medium, [reply/2, reply/3]).
-import(emqx_coap_channel, [run_hooks/3]).
-define(UNSUB(Topic, Msg), #{subscribe => {Topic, Msg}}).
-define(SUB(Topic, Token, Msg), #{subscribe => {{Topic, Token}, Msg}}).
-define(SUBOPTS, #{qos => 0, rh => 1, rap => 0, nl => 0, is_new => false}).
%% TODO maybe can merge this code into emqx_coap_session, simplify the call chain
handle_request(Path, #coap_message{method = Method} = Msg, Ctx, CInfo) ->
case check_topic(Path) of
{ok, Topic} ->
handle_method(Method, Topic, Msg, Ctx, CInfo);
_ ->
reply({error, bad_request}, <<"invalid topic">>, Msg)
end.
handle_method(get, Topic, Msg, Ctx, CInfo) ->
case emqx_coap_message:get_option(observe, Msg) of
0 ->
subscribe(Msg, Topic, Ctx, CInfo);
1 ->
unsubscribe(Msg, Topic, Ctx, CInfo);
_ ->
reply({error, bad_request}, <<"invalid observe value">>, Msg)
end;
handle_method(post, Topic, #coap_message{payload = Payload} = Msg, Ctx, CInfo) ->
PublishOpts = get_publish_opts(Msg),
Qos = get_publish_qos(Msg, PublishOpts),
Action = ?AUTHZ_PUBLISH(Qos, get_publish_retain(PublishOpts)),
case emqx_coap_channel:validator(Action, Topic, Ctx, CInfo) of
allow ->
#{clientid := ClientId} = CInfo,
MountTopic = mount(CInfo, Topic),
%% TODO: Append message metadata into headers
MQTTMsg = emqx_message:make(ClientId, Qos, MountTopic, Payload),
MQTTMsg2 = apply_publish_opts(PublishOpts, MQTTMsg),
_ = emqx_broker:publish(MQTTMsg2),
reply({ok, changed}, Msg);
_ ->
reply({error, unauthorized}, Msg)
end;
handle_method(_, _, Msg, _, _) ->
reply({error, method_not_allowed}, Msg).
check_topic([]) ->
error;
check_topic(Path) ->
{ok, emqx_http_lib:uri_decode(iolist_to_binary(lists:join(<<"/">>, Path)))}.
get_sub_opts(Msg) ->
SubOpts = maps:fold(
fun parse_sub_opts/3, #{}, emqx_coap_message:extract_uri_query(Msg)
),
case SubOpts of
#{qos := _} ->
maps:merge(?SUBOPTS, SubOpts);
_ ->
CfgType = emqx_conf:get([gateway, coap, subscribe_qos], ?QOS_0),
maps:merge(?SUBOPTS#{qos => type_to_qos(CfgType, Msg)}, SubOpts)
end.
parse_sub_opts(<<"qos">>, V, Opts) ->
Opts#{qos => erlang:binary_to_integer(V)};
parse_sub_opts(<<"nl">>, V, Opts) ->
Opts#{nl => erlang:binary_to_integer(V)};
parse_sub_opts(<<"rh">>, V, Opts) ->
Opts#{rh => erlang:binary_to_integer(V)};
parse_sub_opts(_, _, Opts) ->
Opts.
type_to_qos(qos0, _) ->
?QOS_0;
type_to_qos(qos1, _) ->
?QOS_1;
type_to_qos(qos2, _) ->
?QOS_2;
type_to_qos(coap, #coap_message{type = Type}) ->
case Type of
non ->
?QOS_0;
_ ->
?QOS_1
end.
get_publish_opts(Msg) ->
Qs = emqx_coap_message:extract_uri_query(Msg),
maps:fold(
fun
(<<"retain">>, V, Acc) ->
Val = V =:= <<"true">>,
Acc#{retain => Val};
(<<"expiry">>, V, Acc) ->
Val = erlang:binary_to_integer(V),
Acc#{expiry_interval => Val};
(<<"qos">>, V, Acc) ->
Val = erlang:binary_to_integer(V),
Acc#{qos => Val};
(_, _, Acc) ->
Acc
end,
#{},
Qs
).
get_publish_qos(Msg, PublishOpts) ->
case PublishOpts of
#{qos := Qos} ->
Qos;
_ ->
CfgType = emqx_conf:get([gateway, coap, publish_qos], ?QOS_0),
type_to_qos(CfgType, Msg)
end.
get_publish_retain(PublishOpts) ->
maps:get(retain, PublishOpts, false).
apply_publish_opts(Opts, MQTTMsg) ->
maps:fold(
fun
(retain, Val, Acc) ->
emqx_message:set_flag(retain, Val, Acc);
(expiry, Val, Acc) ->
Props = emqx_message:get_header(properties, Acc),
emqx_message:set_header(
properties,
Props#{'Message-Expiry-Interval' => Val},
Acc
);
(_, _, Acc) ->
Acc
end,
MQTTMsg,
Opts
).
subscribe(#coap_message{token = <<>>} = Msg, _, _, _) ->
reply({error, bad_request}, <<"observe without token">>, Msg);
subscribe(#coap_message{token = Token} = Msg, Topic, Ctx, CInfo) ->
#{qos := Qos} = SubOpts = get_sub_opts(Msg),
Action = ?AUTHZ_SUBSCRIBE(Qos),
case emqx_coap_channel:validator(Action, Topic, Ctx, CInfo) of
allow ->
#{clientid := ClientId} = CInfo,
MountTopic = mount(CInfo, Topic),
emqx_broker:subscribe(MountTopic, ClientId, SubOpts),
run_hooks(Ctx, 'session.subscribed', [CInfo, MountTopic, SubOpts]),
?SUB(MountTopic, Token, Msg);
_ ->
reply({error, unauthorized}, Msg)
end.
unsubscribe(Msg, Topic, Ctx, CInfo) ->
MountTopic = mount(CInfo, Topic),
emqx_broker:unsubscribe(MountTopic),
run_hooks(Ctx, 'session.unsubscribed', [CInfo, Topic, ?SUBOPTS]),
?UNSUB(MountTopic, Msg).
mount(#{mountpoint := Mountpoint}, Topic) ->
<<Mountpoint/binary, Topic/binary>>.