From 08ab350fec032b41f5f437471b1018b22426cf24 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 27 Aug 2019 16:47:34 +0800 Subject: [PATCH] Move the 'pipeline' function to 'emqx_misc' module --- src/emqx_channel.erl | 41 ++++++++++++----------------------------- src/emqx_misc.erl | 19 +++++++++++++++++++ 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 6dce540e2..e3179a9cc 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -50,9 +50,11 @@ -export([gc/3]). --import(emqx_misc, [maybe_apply/2]). - --import(emqx_access_control, [check_acl/3]). +-import(emqx_misc, + [ run_fold/3 + , pipeline/3 + , maybe_apply/2 + ]). -export_type([channel/0]). @@ -480,10 +482,10 @@ do_unsubscribe(TopicFilter, _SubOpts, %%-------------------------------------------------------------------- handle_out({connack, ?RC_SUCCESS, SP}, Channel = #channel{client = Client}) -> - AckProps = emqx_misc:run_fold([fun enrich_caps/2, - fun enrich_server_keepalive/2, - fun enrich_assigned_clientid/2 - ], #{}, Channel), + AckProps = run_fold([fun enrich_caps/2, + fun enrich_server_keepalive/2, + fun enrich_assigned_clientid/2 + ], #{}, Channel), AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), Channel1 = ensure_keepalive(AckProps, ensure_connected(Channel)), ok = emqx_hooks:run('client.connected', @@ -1016,7 +1018,8 @@ check_publish(Packet, Channel) -> %% Check Pub ACL check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #channel{client = Client}) -> - case is_acl_enabled(Client) andalso check_acl(Client, publish, Topic) of + case is_acl_enabled(Client) andalso + emqx_access_control:check_acl(Client, publish, Topic) of false -> ok; allow -> ok; deny -> {error, ?RC_NOT_AUTHORIZED} @@ -1057,7 +1060,7 @@ check_subscribe(TopicFilter, SubOpts, Channel) -> %% Check Sub ACL check_sub_acl(TopicFilter, #channel{client = Client}) -> case is_acl_enabled(Client) andalso - check_acl(Client, subscribe, TopicFilter) of + emqx_access_control:check_acl(Client, subscribe, TopicFilter) of false -> allow; Result -> Result end. @@ -1176,26 +1179,6 @@ unmount(Client = #{mountpoint := MountPoint}, TopicOrMsg) -> emqx_mountpoint:unmount( emqx_mountpoint:replvar(MountPoint, Client), TopicOrMsg). -%%-------------------------------------------------------------------- -%% Pipeline -%%-------------------------------------------------------------------- - -pipeline([], Packet, Channel) -> - {ok, Packet, Channel}; - -pipeline([Fun|More], Packet, Channel) -> - case Fun(Packet, Channel) of - ok -> pipeline(More, Packet, Channel); - {ok, NChannel} -> - pipeline(More, Packet, NChannel); - {ok, NPacket, NChannel} -> - pipeline(More, NPacket, NChannel); - {error, ReasonCode} -> - {error, ReasonCode, Channel}; - {error, ReasonCode, NChannel} -> - {error, ReasonCode, NChannel} - end. - %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index a325dc94b..249d14005 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -21,6 +21,7 @@ -export([ merge_opts/2 , maybe_apply/2 , run_fold/3 + , pipeline/3 , start_timer/2 , start_timer/3 , cancel_timer/1 @@ -57,11 +58,29 @@ maybe_apply(_Fun, undefined) -> maybe_apply(Fun, Arg) when is_function(Fun) -> erlang:apply(Fun, [Arg]). +%% @doc RunFold run_fold([], Acc, _State) -> Acc; run_fold([Fun|More], Acc, State) -> run_fold(More, Fun(Acc, State), State). +%% @doc Pipeline +pipeline([], Input, State) -> + {ok, Input, State}; + +pipeline([Fun|More], Input, State) -> + case Fun(Input, State) of + ok -> pipeline(More, Input, State); + {ok, NState} -> + pipeline(More, Input, NState); + {ok, NInput, NState} -> + pipeline(More, NInput, NState); + {error, Reason} -> + {error, Reason, State}; + {error, Reason, NState} -> + {error, Reason, NState} + end. + -spec(start_timer(integer(), term()) -> reference()). start_timer(Interval, Msg) -> start_timer(Interval, self(), Msg).