-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35355][State] Internal async aggregating state and corresponding state descriptor #24810
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! Overall LGTM % several minor comments.
import java.io.Serializable; | ||
|
||
/** Test implementation of Aggregate function */ | ||
public class SumAggregator implements AggregateFunction<Integer, Integer, Integer>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest this to be an inner class if only for testing purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it was an inner class, but it is used in both AggregatingStateDescriptorTest
and InternalAggregatingStateTest
, so I created this class to reuse the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option 1: This could be a static package-private class within AggregatingStateDescriptorTest
reused in both tests. This is common when some test reference the inner static class/variable from another test.
Option 2: Two test inner classes. This makes sense if it is private.
Also, anonymous class is better than inner class.
The reason why I'd suggest make it inner/anonymous is that, typically functional interfaces like this vary in different scenarios, so there's nothing to share. I'd prefer option 2 with anonymous class, since making these tests self contained looks better to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I made it inner class in latest commit. For AggregatingStateDescriptorTest
, I found it doesn't need a real instance, so I mocked it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems flink doesn't encourage the mock, will change it later.
public class InternalAggregatingState<K, IN, ACC, OUT> extends InternalKeyedState<K, ACC> | ||
implements AggregatingState<IN, OUT> { | ||
|
||
private final AggregateFunction<IN, ACC, OUT> aggregateFunction; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better be protected
since the sub-classes will leverage this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me, will fix it in next commit.
Hi @Zakelly, Thanks for the comments, I have added a commit for addressing it. |
544cfb5
to
a1a0d30
Compare
Hi @Zakelly, I've addressed comments, please help double review it. Thanks in advance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update! LGTM
@jectpro7 Seems the author of this commit is different from the committer? Do you mind change that? Or I can merge this as it is. |
@Zakelly, It looks weird, seems the git configuration is not correct. I will fix that. |
9f8b359
to
45fab29
Compare
@Zakelly, looks good now. Thanks for the reminder. 👍 |
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
…ng state descriptor
CI green, merging.... |
This PR defines the V2
AggreatingState
and itsStateDescriptor
. And provides the defaultInternalAggregatingState
implementation which delegates all aycn request to AEC.What is the purpose of the change
Supporting aggregate functionality regarding to FLIP-424.
Brief change log
AggregatingState
andAggregatingStateDescriptor
AggregatingState
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation