Ecto - Using select_merge for flexible aggregates

Problem

PostgreSQL is a rock solid database.   I've been using aggregate expressions for a number of features over the past few years in various languages.  They can be used to build powerful features for your platform.  When teamed up with Elixir and Ecto, you get amazing flexibility in your query generation.  Typically the data you want to retrieve via a select is fairly static, or there are only one or two different versions of the select you need.  


@doc """
Returns an aggregation of user activity
 """
@spec user_activity_history() :: UserActivityHistory
def user_activity_history() do
query =
from(
u in User,
select: %UserActivityHistory{
captured_at: fragment("now()"),
hour_1: filter(count(u.id), fragment("? >= now() - interval '1 hour'", u.last_active)),
hour_6: filter(count(u.id), fragment("? >= now() - interval '6 hour'", u.last_active)),
hour_12: filter(count(u.id), fragment("? >= now() - interval '12 hour'", u.last_active)),
hour_24: filter(count(u.id), fragment("? >= now() - interval '24 hour'", u.last_active)),
day_1: filter(count(u.id), fragment("? >= now() - interval '1 day'", u.last_active)),
day_7: filter(count(u.id), fragment("? >= now() - interval '7 day'", u.last_active)),
day_30: filter(count(u.id), fragment("? >= now() - interval '30 day'", u.last_active))
}
)

Repo.replica().one(query)
end

The above code generates a rollup of user activity, but what if we wanted a custom interval, or only a subset of these intervals?   This is a great use case for  Ecto's select_merge, but I'm not going to dive into this example. Instead I'll show a more advanced feature where select_merge shined.

At SwayDM, we have the concept of Members, Vendors and Redemptions. Members perform activities on the platform to earn SwayCash (SC). Vendors offer Redemptions for a set amount of SwayCash to Members.

Example Redemptions:
  • 20% off a Yeti cooler: (50 SC)
  • One free coffee:  (8 SC)

Vendors want to set limits on their redemptions to ensure they are not abused.

Example Redemption Limits:
  • 20% off a Yeti cooler: (50 SC)
    • A member can only redeem the Yeti cooler deal once over the life of the redemption.
    • The Vendor only wants 5 Members to use this deal per week.
    • The Vendor only wants 100 Members to use this deal overall.
  • One free coffee drink:  (8 SC)
    • A Member can redeem a free coffee once per week.
    • A Member can redeem a free coffee 10 times over the life of the redemption.
    • A Vendor wants a global limit of 50 free coffees.

Representing Limits

First, let's take a look at how the limits are represented in the database.   There is a `redemptions` table, which embeds a list of `limits`.
# boilerplate code not included

schema "redemptions" do
	# ... all other redemption fields, such as name, price, etc
    embeds_many :limits, RedemptionLimit, on_replace: :delete
end

embedded_schema do
  field :limit_scope, Ecto.Enum, values: [:global, :user]
  field :limit_type, Ecto.Enum, values: [:count, :rate_limit]
  field :limit, :integer
  field :interval, :integer
  field :unit, Ecto.Enum, values: [:year, :month, :week, :day, :hour, :minute, :second]
  field :end_of_day, :boolean, default: false
end

Storing Redemption History

We store records of successful redemptions in a `redemptions_history` table.  This records the datetime, user, and redemption that occurred. The Member is the user type that is redeeming, and the Vendor is the member type that offers redemptions. 

schema "redemptions_history" do
  belongs_to :redemption, Redemption
  belongs_to :member, Member
  belongs_to :vendor, Vendor

  timestamps(updated_at: false)
end

Showing and Evaluating Limits

We have two main scenarios where we want to show, and potentially evaluate limits.  The first is when displaying available redemptions to the user.   We want them to see if a redemption they are interested has a limit.  Typically in this case we do not need to evaluate the limits.



The second is the redemption detail view, where we do want them to see the current state of the global, and user based limits.

Here we can see the member has hit their daily limit, and that they have 9 more remaining.


Solutions

Inefficient Solutions

Retrieve History: One potential solution would be to retrieve all redemption history records from the DB, and evaluate the limits in memory.  It would be inefficient to retrieve all history records every time we want to evaluate a limit, and the more history records that exist for the redemption, the slower it will become.  To optimize this, we'd probably have to start caching these history records, which would be tedious to keep consistent.

Query per Limit: A better solution would be to run one query per limit.  Here are two examples:

defmodule RedemptionLimits do

  @doc """
  Evaluate redemption limits

  # Scope
  * global - Limit is applied across all users.
  * user - Limit is applied per user.

  # Type
  * count - Limit is based on the number of redemptions.
  * rate_limit - Limit is based on the rate of redemptions.
  
  # Opts
  * member_id :: String.t() (Default nil) - If provided, user based limits will be evaluated.
  * as_of :: DateTime.t() (Default now()) - The reference point in time to evaluate rate limits.
  """
  @spec evaluate!(Redemption.t(), Keyword.t()) :: limit_result
  def evaluate!(%Redemption{id: id, limits: limits}, opts \\ []) do
    query = base_query(id)
    for limit <- limits do
      evaluate_limit(limit, query, opts)
    end
  end  

  # Global rate limit
  def evaluate_limit(%{id: id, limit_scope: :global, limit_type: :rate_limit, limit: limit} = l, query, opts) do
    as_of = Map.get(opts, :as_of, DateTime.utc_now())
    interval = l.interval
    unit = Atom.to_string(l.unit)
 
    query = from(
      base_query,
      [redemption: r, redemption_history: rh],
      select: %{
        id: ^id,
        limit_scope: :global,
        limit_type: :rate_limit,
        interval: type(^interval, :integer),
        unit: type(^unit, :string),
        limit: type(^limit, :integer),
        blocking:
          filter(
            count(rh.id),
            rh.member_id == ^member_id and rh.inserted_at >= datetime_add(^as_of, -1 * ^interval, ^unit)
          ) >= type(^limit, :integer),
        count:
          filter(
            count(rh.id),
            rh.member_id == ^member_id and rh.inserted_at >= datetime_add(^as_of, -1 * ^interval, ^unit)
          )
      }
    )
    Repo.one(query)
  end
    
  # ... Functions for all other limit types 
    
  def evaluate_limit(_, _, _), do: %{}
end    
        
This approach is an improvement, but still requires multiple queries.


One Query using Select Merge:  A solid solution IMO, which requires only one query, is to use Ecto's select_merge.  We can use the list of RedemptionLimits to generate one query which will evaluate all the limits at the DB level.  



defmodule RedemptionLimits do

  @doc """
  Evaluate redemption limits

  # Scope
  * global - Limit is applied across all users.
  * user - Limit is applied per user.

  # Type
  * count - Limit is based on the number of redemptions.
  * rate_limit - Limit is based on the rate of redemptions.
  
  # Opts
  * member_id :: String.t() (Default nil) - If provided, user based limits will be evaluated.
  * as_of :: DateTime.t() (Default now()) - The reference point in time to evaluate rate limits.
  """
  @spec evaluate!(Redemption.t(), Keyword.t()) :: limit_result
  def evaluate!(%Redemption{id: id, limits: limits}, opts \\ []) do
    query = base_query(id)
    opts = Map.new(opts)
    query = Enum.reduce(limits, query, &gen_select(&1, &2, opts))
    Repo.one(query)
  end

  @doc """
  Merge a select into an existing query based on the limit configuration.

  ## Opts
  * member_id :: String.t() (Default nil) - If provided, user based limits will be evaluated.
  * as_of :: DateTime.t() (Default now()) - The reference point in time to evaluate rate limits.
  """
  @spec gen_select(limit :: RedemptionLimit.t(), query :: Ecto.Query.t(), opts :: map()) :: Ecto.Query.t()
  def gen_select(limit, query, opts \\ %{})

  # Global count limit
  def gen_select(%{id: id, limit_scope: :global, limit_type: :count, limit: limit}, query, _member) do
    select_merge(query, [redemption: r, redemption_history: rh], %{
      ^id => %{
        id: ^id,
        limit_scope: :global,
        limit_type: :count,
        limit: type(^limit, :integer),
        blocking: count(rh.id) >= type(^limit, :integer),
        count: count(rh.id)
      }
    })
  end

  # User count limit
  def gen_select(%{id: id, limit_scope: :user, limit_type: :count, limit: limit}, query, %{member_id: member_id})
      when is_binary(pid) do
    select_merge(query, [redemption: r, redemption_history: rh], %{
      ^id => %{
        id: ^id,
        limit_scope: :user,
        limit_type: :count,
        limit: type(^limit, :integer),
        blocking: filter(count(rh.id), rh.member_id == ^member_id) >= type(^limit, :integer),
        count: filter(count(rh.id), rh.member_id == ^member_id)
      }
    })
  end

  # Global rate limit
  def gen_select(%{id: id, limit_scope: :global, limit_type: :rate_limit, limit: limit} = l, query, opts) do
    as_of = Map.get(opts, :as_of, DateTime.utc_now())
    interval = l.interval
    unit = Atom.to_string(l.unit)

    select_merge(query, [redemption: r, redemption_history: rh], %{
      ^id => %{
        id: ^id,
        limit_scope: :global,
        limit_type: :rate_limit,
        interval: type(^interval, :integer),
        unit: type(^unit, :string),
        limit: type(^limit, :integer),
        blocking:
          filter(
            count(rh.id),
            rh.inserted_at >= datetime_add(^as_of, -1 * ^interval, ^unit)
          ) >= type(^limit, :integer),
        count:
          filter(
            count(rh.id),
            rh.inserted_at >= datetime_add(^as_of, -1 * ^interval, ^unit)
          )
      }
    })
  end
  
  # ... Functions for all other limit types
  
  def gen_select(_limit, query, _opts), do: query
end
              

How does this work?

When you use select_merge, the result must be a map.  Ecto generates a query that executes all select statements, and then merges the results into a map as a result.  

In the example above, each limit generates a new select statement which returns a map. The map key is the redemption limit ID.  This makes it very easy to pick an evaluated limit from the result map.  See the example test below.


assert %{
:redemption_id => ^rid,
^global_limit_id => %{
blocking: false,
count: 5,
limit: 6,
limit_scope: :global,
limit_type: :count
},
^user_limit_id => %{
blocking: true,
count: 3,
limit: 3,
limit_scope: :user,
limit_type: :count
}
} = RedemptionLimits.evaluate!(redemption, member_id: member.id)


Using this approach with aggregates is beneficial as Postgres will more efficiently retrieve and aggregate the data when it is all one query, rather than multiple queries.

Adding a new limit is very easy, we simply add a new gen_select function for the new limit type.

Evaluating a subset of limits is also very straight forward, we just filter the list before generating the query.  

Conclusion

I hope you find this post useful.  There are better introductions to select_merge, but this is a real life use case that I thought others would find interesting and useful.

Comments

Popular posts from this blog

Write Admin Tools From Day One

Phoenix LiveView: Async Assign Pattern