Skip to content

Commit

Permalink
ft!: change behaviour to allow algorithms with immutable state
Browse files Browse the repository at this point in the history
  • Loading branch information
hauleth committed Jan 4, 2024
1 parent 4c4a19f commit 3ff0750
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 27 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Add `:plug_caisson` to dependencies in `mix.exs`:
```elixir
def deps do
[
{:plug_caisson, "~> 0.1.1"},
{:plug_caisson, "~> 0.2.0"},
# optional, for brotli support
{:brotli, "~> 0.3.2"},
# optional, for zstd support
Expand Down
124 changes: 108 additions & 16 deletions lib/plug_caisson.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,46 @@
defmodule PlugCaisson do
@moduledoc """
Body reader for supporting compressed `Plug` requests.
## Implementing algorithm
In addition to the built in algorithms (see `read_body/2`) it is possible to
implement custom algorithms by implementing behaviour defined in this module.
### Example
```elixir
defmodule BobbyCompression do
@behaviour #{inspect(__MODULE__)}
# Initialise decompression state with whatever is needed
@impl true
def init(opts), do: {:ok, Bobby.open()}
# Gracefully close the state
@impl true
def deinit(state), do: Bobby.finish(state)
# Process read data to decompress them
@impl true
def process(state, data, _opts) do
case Bobby.decompress(state, data) do
{:finished, decompressed, new_state} ->
# All data was decompressed and there is no more data to be
# decompressed in stream
{:ok, decompressed, new_state}
{:more, decompressed, new_state} ->
# It can happen that `decompressed == ""` and this is perfectly fine
# as long as there is possibility that there will be more data in
# future
{:more, decompressed, new_state}
{:error, _} = error -> error
end
end
end
```
"""

@default %{
Expand All @@ -10,10 +50,48 @@ defmodule PlugCaisson do
"zstd" => {PlugCaisson.Zstandard, []}
}

@doc """
Initialise state for the decompression algorithm.
This callback will be called if and only if given algorithm was picked as a
suitable option for decompression. The returned state will be stored in the
`Plug.Conn.t()`. It is guaranteed that it will be called only on first call to
`read_body/2` and all subsequent calls will not call this function again.
It will receive data passed as a second value in tuple declared in the
algorithm map.
"""
@callback init(opts :: term()) :: {:ok, state :: term()} | {:error, term()}

@doc """
Cleanup for the state. This will be called in
`Plug.Conn.register_before_send/2` callback, so the same conditions as with
these apply. It is guaranteed that it will be called only once for each
connection.
"""
@callback deinit(state :: term()) :: term()

@doc """
Process chunk of data.
It receives current state, binary read from the request and list of options
passed to the `read_body/2` as a 2nd argument.
## Return value
In case of success it should return 3-ary tuple:
- `{:ok, binary(), new_state :: term()}` - wich mean that all data was read
and there is no more data left in the internal buffer.
- `{:more, binary(), new_state :: term()}` - which mean that data was
processed, but there is more data left to be read in future calls.
If error occured during processing `{:error, term()}` tuple should be returned.
"""
@callback process(state :: term(), data :: binary(), opts :: keyword()) ::
{:ok, binary()} | {:more, binary()} | {:error, term()}
{:ok, binary(), new_state :: term()}
| {:more, binary(), new_state :: term()}
| {:error, term()}

@doc """
Read `Plug.Conn` request body and decompress it if needed.
Expand All @@ -23,7 +101,7 @@ defmodule PlugCaisson do
Accepts the same set of options as `Plug.Conn.read_body/2` with one option
extra: `:algorithms` which is map containing algorithm identifier as key and
tuple containing module name for module that implements `#{inspect(__MODULE__)}`
behaviour and value that will be passed as 2nd argument to the `c:decompress/2`
behaviour and value that will be passed as 2nd argument to the `c:init/1`
callback.
By default the value is set to:
Expand Down Expand Up @@ -52,26 +130,30 @@ defmodule PlugCaisson do
def read_body(conn, opts \\ []) do
opts = Keyword.merge([length: 8_000_000], opts)

with {:ok, decoder, conn} <- fetch_decompressor(conn, opts[:algorithms] || @default) do
case Plug.Conn.read_body(conn, opts) do
{type, body, conn} when type in [:ok, :more] ->
case try_decompress(body, decoder, opts) do
{:error, _} = error -> error
{:ok, data} when type == :ok -> {:ok, data, conn}
{_, data} -> {:more, data, conn}
end

{:error, _} = error ->
error
end
with {:ok, decoder, conn} <- fetch_decompressor(conn, opts[:algorithms] || @default),
{read_return, body, conn} <- Plug.Conn.read_body(conn, opts),
{return, data, new_state} <- try_decompress(body, decoder, opts) do
{return(return, read_return), data, Plug.Conn.put_private(conn, __MODULE__, new_state)}
end
end

# If there is no more data in body and no more data in decompression stream,
# then return `:ok`
defp return(:ok, :ok), do: :ok
defp return(_, _), do: :more

# If the decompressor is already initialised, then return current
# implementation and its state
defp fetch_decompressor(%Plug.Conn{private: %{__MODULE__ => {mod, state}}} = conn, _types) do
{:ok, {mod, state}, conn}
end

defp fetch_decompressor(conn, types) do
# XXX: Theoretically we should parse `content-encoding` header to split the
# algorithms by comma, as `gzip, br` is correct value there, but as double
# compression makes almost sense and spec explicitly disallows values
# like `identity, gzip` or `gzip, idenity`, then we simply can ignore
# parsing and use value as is
case Plug.Conn.get_req_header(conn, "content-encoding") do
[] ->
{:ok, :raw, conn}
Expand All @@ -95,10 +177,20 @@ defmodule PlugCaisson do
end
end

defp try_decompress(data, :raw, _), do: {:ok, data}
# Special case for `identity` case as well case when there is no compression
# algorithm defined at all
defp try_decompress(data, :raw, _), do: {:ok, data, :raw}

defp try_decompress(data, {mod, state}, opts), do: mod.process(state, data, opts)
defp try_decompress(data, {mod, state}, opts) do
with {result, data, new_state} when result in [:ok, :more] <- mod.process(state, data, opts) do
# Add `mod` to the returned state to simplify `read_body/2`
{result, data, {mod, new_state}}
end
end

# Setup `Plug.Conn.t()` to contain call to `mod.deinit/1` and set the
# decompressor state in the private section of `Plug.Conn.t()` for future
# reference
defp set_state(conn, mod, state) do
conn
|> Plug.Conn.put_private(__MODULE__, {mod, state})
Expand Down
5 changes: 3 additions & 2 deletions lib/plug_caisson/brotli.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ defmodule PlugCaisson.Brotli do

@impl true
def process(decoder, data, _opts) do
with :error <- :brotli_decoder.stream(decoder, data) do
{:error, :decompression_error}
case :brotli_decoder.stream(decoder, data) do
{result, data} when result in [:ok, :more] -> {result, data, decoder}
:error -> {:error, :decompression_error}
end
end
else
Expand Down
9 changes: 5 additions & 4 deletions lib/plug_caisson/zlib.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ defmodule PlugCaisson.Zlib do
## Options
- `:window_bits` - size of the decompression window - `:type` - either `:gzip`
or `:deflate` that will set proper `:window_bits` according to each algorithm
- `:window_bits` - size of the decompression window
- `:type` - either `:gzip` or `:deflate` that will set proper `:window_bits`
according to each algorithm
It is preferred to use `:type` over `:window_bits`, but if both are specified,
then `:window_bits` take precedence.
Expand Down Expand Up @@ -44,8 +45,8 @@ defmodule PlugCaisson.Zlib do
length = opts[:length]

case chunked_inflate(state, data, length) do
{:finished, data} -> {:ok, IO.iodata_to_binary(data)}
{:more, data} -> {:more, IO.iodata_to_binary(data)}
{:finished, data} -> {:ok, IO.iodata_to_binary(data), state}
{:more, data} -> {:more, IO.iodata_to_binary(data), state}
{:need_dictionary, _, _} -> {:error, :no_dictionary}
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/plug_caisson/zstandard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ defmodule PlugCaisson.Zstandard do
def deinit(_state), do: :ok

@impl true
def process(_state, data, _opts) do
def process(state, data, _opts) do
case :ezstd.decompress(data) do
{:error, _} = error -> error
decompressed -> {:ok, decompressed}
decompressed -> {:ok, decompressed, state}
end
end
else
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule PlugCaisson.MixProject do
def project do
[
app: :plug_caisson,
version: "0.1.2",
version: "0.2.0",
elixir: "~> 1.15",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
2 changes: 1 addition & 1 deletion test/plug_caisson_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule PlugCaissonTest do
def deinit(pid), do: send(pid, :deinit)

@impl true
def process(_state, data, _opts), do: {:ok, data}
def process(state, data, _opts), do: {:ok, data, state}
end

test "deinit callback is called" do
Expand Down

0 comments on commit 3ff0750

Please sign in to comment.