-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mix ecto.ch.schema fails for columns with type AggregateFunction
#90
Comments
How would I define a schema for those fields as well?
and just using
|
I haven't used # uses `AggregateFunction(argMin, Decimal(18, 4), DateTime)` in the header
# uses `Decimal(18, 4)` for encoding and decoding
field :foo, Ch, type: "AggregateFunction(argMin, Decimal(18, 4), DateTime)", as: "Decimal(18, 4)" @Zarathustra2 what do you think? It would also be able to replace a custom type in Plausible field :is_bounce, Ch, type: "UInt8", as: :boolean PR: #91 |
or maybe it can be a field :foo, Ch, type: "Decimal(18, 4)", name: "AggregateFunction(argMin, Decimal(18, 4), DateTime)"
field :is_bounce, Ch, type: :boolean, name: "UInt8" |
It's funny, for me the intuitive naming convention would be the other way around: field :is_bounce, Ch, type: :boolean, as: "UInt8" I suppose it depends on which side you approach it from. Looking at it from the Clickhouse side, it's a Since Ecto lives in the context or our application, it's more intuitive for me that the |
@ruslandoga can you also add a test for inserting the data? I am wondering if we will run into the same issue as ClickHouse/clickhouse-java#1232 I personally prefer |
@Zarathustra2 what kind of test do you have in mind? Something like this? test "insert AggregateFunction", %{conn: conn} do
Ch.query!(conn, """
CREATE TABLE test_insert_aggregate_function (
uid Int16,
updated SimpleAggregateFunction(max, DateTime),
name AggregateFunction(argMax, String, DateTime)
) ENGINE AggregatingMergeTree ORDER BY uid
""")
rows = [
[1, ~N[2020-01-02 00:00:00], "b"],
[1, ~N[2020-01-01 00:00:00], "a"]
]
assert %{num_rows: 2} =
Ch.query!(
conn,
"""
INSERT INTO test_insert_aggregate_function
SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
FROM input('uid Int16, updated DateTime, name String')
FORMAT RowBinary\
""",
rows,
types: ["Int16", "DateTime", "String"]
)
assert Ch.query!(conn, """
SELECT uid, max(updated) AS updated, argMaxMerge(name)
FROM test_insert_aggregate_function
GROUP BY uid
""").rows == [[1, ~N[2020-01-02 00:00:00], "b"]]
end But note that |
Sorry bad phrasing, more like how would I even insert into this column with ecto: defmodule SomeSchema do
use Ecto.Schema
@primary_key false
schema "table" do
field :foo, Ch, type: "Decimal(18, 4)", name: "AggregateFunction(argMin, Decimal(18, 4), DateTime)"
end
end How would I insert into this column since I need to insert not |
How would you insert into that column with |
for my example above? |
Yes, since I haven't used |
create table table_test_agg (foo AggregateFunction(argMin, UInt8, DateTime)) Engine = Memory;
insert into table_test_agg (foo) Values (arrayReduce('argMinState', [1], [now()]));
on a different note: |
This query won't work in "streaming" formats like native or rowbinary, since they don't do any evaluation of the rows. You can use the same query with |
I am wondering whether the data can be encoded in a different way because the java issues states:
So it seems serializing a state of an aggregate function is possible but it is not documented? Maybe we have to ping someone on the slack? |
We probably won't be supporting undocumented APIs in this driver. However, it would be possible to support them in a separate library since encoded = YourRowBinaryEncoder.encode_rows(...)
Ch.query(conn, "insert into ... format RowBinary", encoded, encode: false) |
It seems to me right now that we won't be needing field :is_bounce, Ch, type: :boolean, as: "UInt8" can be already covered with a custom Ecto type which is more typing but more conventional and possibly easier to understand. And field :foo, Ch, type: "AggregateFunction(argMin, Decimal(18, 4), DateTime)", as: "Decimal(18, 4)" doesn't seem to be possible, since we can't just take a decimal and insert it into an aggregate function type. |
Yeah I don't think Do you think creating insert queries such as """
INSERT INTO test_insert_aggregate_function
SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
FROM input('uid Int16, updated DateTime, name String')
FORMAT RowBinary
""" would be possible in ecto_ch with a macro or something based on a given schema? EDIT: Actually this is kinda hard as we will never catch all edge cases (some may insert states with values from no fields of the table) |
I'll add tests that use the approaches shown in https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/. It seems like all of them can already be supported with the current functionality. |
yeah all of them can be used via |
That SQL statement can be constructed without macros. |
Something like this: defmodule Schema do
use Ecto.Schema
@primary_key false
schema "test_insert_aggregate_function" do
field :uid, Ch, type: "Int16"
field :updated, :naive_datetime
field :name, :string
end
end
table = Schema.__schema__(:source)
fields = Schema.__schema__(:fields)
types = Enum.map(fields, fn field -> Schema.__schema__(:type, field) |> Ecto.Type.type() end)
structure = Enum.zip(fields, types) |> Enum.map(fn {f, t} -> "#{f} #{t}" end) |> Enum.join(", ")
select = from i in fragment("input(?)", literal(^structure)),
select: %{uid: i.uid, updated: i.updated, name: fragment("arrayReduce('argMaxState', [?], [?])", i.name, i.updated)}
{select, _no_params = []} = Repo.to_sql(:all, select)
Repo.query!(["insert into ", table, ?\s, select, " format RowBinary"], rows, types: types) |
Oh that is sick, thanks @ruslandoga <3 Should this may be added to the readme of ecto_ch, I bet someone else will run into this as well :D |
I'll try to find a way to "automate" I'm thinking about doing something like import Ecto.Adapters.ClickHouse.API, only: [input: 1]
input =
from i in input(uid: "Int16", updated: "DateTime", name: "String"), # or input(Schema)
select: %{
uid: i.uid,
updated: i.updated,
name: fragment("arrayReduce('argMaxState', [?], [?])", i.name, i.updated)
}
rows = [
[uid: 1231, updated: ~N[2020-01-02 00:00:00], name: "Jane"],
[uid: 1231, updated: ~N[2020-01-01 00:00:00], name: "John"]
]
TestRepo.insert_all("users", rows, input: input) WIP: plausible/ecto_ch#79 |
@Zarathustra2 I've merged plausible/ecto_ch#79 and I wonder if it solves your use-case. def deps do
[
- {:ecto_ch, "~> 0.1.0"},
+ {:ecto_ch, github: "plausible/ecto_ch"}
]
end I'm not releasing it yet since it might need some more work depending on your test-drive. |
Oh that is sick!!! Let me test that today/tomorrow, I will ping you but just from reading over the code it looks pretty incredible easy to work with! <3 |
likely not getting to it today but should be getting to it tomorrow, sorry about that |
Currently getting |
You might be able to use the tests as a guide: https://github.com/plausible/ecto_ch/blob/fb995abad0207b1751898c237a9d89c93a2154e7/test/ecto/integration/aggregate_function_type_test.exs#L78-L152 |
Works now I had Super awesome! @ruslandoga <3 |
Ah, yes. I still don't know what the API should be. I considered both of your approaches (and picked the current one since we are still inserting rows, but "via" an input). And I also thought about |
I mean, I did a silly mistake so that is that :D
|
Getting this error:
when running
mix ecto.ch.schema default.<TABLE>
The table has the columns:
The text was updated successfully, but these errors were encountered: