Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioned: Interpolation of Merge outputs #834

Open
jordanrfrazier opened this issue Oct 27, 2023 · 0 comments
Open

Partitioned: Interpolation of Merge outputs #834

jordanrfrazier opened this issue Oct 27, 2023 · 0 comments

Comments

@jordanrfrazier
Copy link
Collaborator

jordanrfrazier commented Oct 27, 2023

Summary

Merge uses a special spread kernel that keeps track of past values for each column that are being merged. The behavior may be latched or unlatched, depending on if it is interpolated as continuous or discrete.

The new partitioned code attempts to simplify the output types by using

Batch { time: TimestampNanosecondArray, subsort: UInt64Array, key_hash: Uint64Array, data: StructArray }

where data keeps the individual columns. This allows us to work with datatypes rather than pass a separate Schema and individual columns around. It also simplifies accessing columns of a merge output -- rather than have to name each column and lookup by name, we can keep them in each side's output structs, and access via that side/index. This avoids naming collisions.

However, a consequence of that is we can no longer (with the current implementation) interpolate each column individually. Since the values are now in a struct array, each column will be interpreted as part of the struct value at that time, meaning that we cannot support latched and unlatched values within the same input.

For example, If we have {a: 5} at 10:00 AM, and {a: 6} at 11:00 AM, and {c: 4} at 10:30 AM, we likely expect:

{a: 5, a_sum: 5, c: null}
{a: null, a_sum: 5, c: 4}
{a: 6, a_sum: 11, c: null}

But with interpolating each struct individually, we would see

{a: 5, a_sum: 5, c: null}
{a: null, a_sum: null, c: 4}
{a: 6, a_sum: 11, c: null}

Ideas

Fastest to parity:

The likely fastest way to get back to parity with the old merge behavior is to flatten the data back into individual columns. It's possible we'll have refactoring needed to work with named columns rather than indices, but it's preferable to revert to existing behavior we think works.

Future ideas:

The logical -> physical conversion may be able to detect interpolation of merge columns, and add a project after that uses the spread kernel.

Step 0: Source0

Step 1: Project(Step0)
  { a: step0.a,
    a_sum: step0.a.sum(),
  }

Step 2: Source1

Step 3: Merge (Step 1, Step 2)

Step 4: Project(Step 3)
  { 
    a: step3.step1.a,
    a_sum: step3.step1.a_sum.spread(???),
    c: step3.step2.c
  }

The benefits of this approach are:

  • Merge is now stateless, meaning we can perform concurrent merging
  • Spread is separated out of merge, significantly reducing merge pipeline complexity
  • Unlatched spread is the default behavior, so we wouldn't need functionality for that
  • We can visualize the spread functionality within the plan, rather than it being an opaque attribute of merge.

Open questions:

  • The implementation of the spread(???) needs to determine "should I latch here"
    • It needs to distinguish between a) Null because there was no Step 1 at this time (Step 2 existing and caused a row to be created during merge), and b) Null because the aggregation cleared itself (through windowing).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant