Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use openPDC adapter to pass data into a specified partition under kafka topic? #160

Open
snail526 opened this issue May 26, 2023 · 4 comments

Comments

@snail526
Copy link

Hi,community

I am having the following problem with the kafka adapter that comes with openPDC:
I have created a kafka topic with 2 partitions and using openPDC's KafkaAdapter to pass data into the kafka partitions. Since there are 2 partitions under the topic and the data deposit process is polled, I want to know what I need to do if I want to pass a set of data into specified partition. Do I need to change some configuration parameters in the adapter?
image

I would like to get your reply, thanks!

@StephenCWills
Copy link
Member

You can specify Partitions=2 in the connection string of the adapter. It looks like the adapter uses the ID field of the input measurements to determine which partition they should go into, and it's fairly arbitrary. Measurements with even IDs would go into partition 0 and measurements with odd IDs would go into partition 1.

https://github.com/GridProtectionAlliance/gsf/blob/42929a71f25b663bcffc12625739695ef14153fb/Source/Libraries/Adapters/Kafka/TimeSeriesProducer.cs#L495

@snail526
Copy link
Author

Thank you for your reply.

I also have the following questions regarding the use of the kafka adapter that I would like to have answered by you:
As shown in line 495 of the TimeSeriesProducer.cs code:
image

If you want to get the value of PartitionId, you need to know the value of measurement.Key.ID first. So how do I get or modify the measurement.Key.ID parameter of the measurement data in openPDC using the test device SHELBY as an example?
image

Once I have obtained the measurement.Key.ID, I can determine the number of partitions based on the number of groups of measurement data (under the same kafka topic) and thus pass the data from different groups into the partition I want.

So, I think the question is how to get the measurement.Key.ID parameter.

These are my questions and I look forward to your professional response, thank you!

@StephenCWills
Copy link
Member

The ID column in your screenshot is what is referred to as the measurement key in the code. So measurement.Key.ID for the selected frequency in your screenshot would be 2, and measurement.Key.ID for the selected frequency delta measurement would be 4.

The (SQLite) definition for the Measurement table in the database looks like this.

CREATE TABLE Measurement(
PointID INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
SignalID NCHAR(36) NULL,
HistorianID INTEGER NULL,
DeviceID INTEGER NULL,
PointTag VARCHAR(200) NOT NULL,
AlternateTag TEXT NULL,
SignalTypeID INTEGER NOT NULL,
PhasorSourceIndex INTEGER NULL,
SignalReference VARCHAR(200) NOT NULL,
Adder DOUBLE NOT NULL DEFAULT 0.0,
Multiplier DOUBLE NOT NULL DEFAULT 1.0,
Description TEXT NULL,
Subscribed BOOLEAN NOT NULL DEFAULT 0,
Internal BOOLEAN NOT NULL DEFAULT 1,
Enabled BOOLEAN NOT NULL DEFAULT 0,
CreatedOn DATETIME NOT NULL DEFAULT '',
CreatedBy VARCHAR(200) NOT NULL DEFAULT '',
UpdatedOn DATETIME NOT NULL DEFAULT '',
UpdatedBy VARCHAR(200) NOT NULL DEFAULT '',
CONSTRAINT IX_Measurement UNIQUE (PointID ASC),
CONSTRAINT FK_Measurement_Device FOREIGN KEY(DeviceID) REFERENCES Device (ID) ON DELETE CASCADE,
CONSTRAINT FK_Measurement_SignalType FOREIGN KEY(SignalTypeID) REFERENCES SignalType (ID)
);

In this definition, the PointID field is the one that is referred to as measurement.Key.ID in the code. So you would need to update that field in order to change the partition manually.

The field is an autoincrementing integer, sometimes used as a primary key, and it's referenced by foreign keys in other tables so you may have some difficulty changing it. You might be better off making a change to the Kafka Producer adapter that would enable you to explicitly specify which partition each measurement goes to.

@snail526
Copy link
Author

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants