Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Esper EPL pattern operators inside dataflows #184

Open
juanboubeta opened this issue Feb 21, 2020 · 2 comments
Open

Allow Esper EPL pattern operators inside dataflows #184

juanboubeta opened this issue Feb 21, 2020 · 2 comments

Comments

@juanboubeta
Copy link

Dear all,

During the last 10 years, I've been working with the Esper CEP engine. In the last versions of the engine, I find the support of dataflows very powerful. However, according to the Esper documentation (http://esper.espertech.com/release-8.3.0/reference-esper/html/dataflow.html):

Only filter-based streams are allowed in the from clause and patterns or named windows are not supported. Also not allowed are the insert into clause, the irstream keyword and subselects.

I'd need to use pattern operators such as every, every-distinct, repeat, until, followed-by, etc. to define these types of conditions inside a dataflow. Please could you recommend how I could make use of these types of operators with dataflows?

As an example, I'd like to include the following pattern into the dataflow:

@Name('NO2_Avg')
@Priority(3)
insert into NO2_Avg
select a1.stationId.toString() || current_timestamp().toString() as id,
  current_timestamp() as timestamp, a1.stationId as stationId,
  avg(a1.no2) as value
from pattern [every (a1 = AirMeasurement -> a2 = AirMeasurement(a2.no2 > a1.no2))].win:time(3600 milliseconds)
group by a1.stationId

String epl = "create dataflow PollutantDataFlow\n" +
"create schema AirQualityType(timestamp long, stationId integer, no2 double),\n" +
"FileSource -> airqualitystream<AirQualityType>{\n" +
" file: 'sensor_events.csv', \n" +
" propertyNames: ['timestamp','stationId','no2'] \n" +
"}\n" +
"Select(airqualitystream) -> no2_avg{\n" +
" select: (select a1.stationId.toString() || current_timestamp().toString() as id,\r\n" +
"   current_timestamp() as timestamp, a1.stationId as stationId, \r\n" +
"   avg(a1.no2) as value \r\n" +
" from [every (a1 = AirMeasurement -> a2 = AirMeasurement(a2.no2 > a1.no2))].win:time(3600000 milliseconds) group by a1.stationId) }\r\n" +
" \r\n" +
" LogSink(no2_good) {\r\n" +
" layout : '%t [%e]',\r\n" +
"    log : false,\r\n" +
"    linefeed : true,\r\n" +
"    title : 'Input:'}\r\n";

Thank you very much for your time and help.

Best regards,

Juan

@bernhardttom bernhardttom changed the title How to use Esper EPL pattern operators inside dataflows? Allow Esper EPL pattern operators inside dataflows Feb 24, 2020
@bernhardttom
Copy link
Contributor

Match-recognize would seem to solve this as well

@DavidCorral94
Copy link

I would like to suggest this temporal solution:

According to Esper Docs EventBusSink:

The EventBusSink operator send events received from a data flow into the event bus.

So, would it be possible to create a dataflow that collects events using any kind of input operator (AMQP, File, etc.), publish them into the Event Bus and, then, deploy these required patterns against this new input stream?

Instead of using AirMeasurement in the from pattern clause, we will be using, for example, AirMeasurementStream.

I think that this solution results into duplication of information, right? Because we are inserting two times the same event: first when it is collected within the dataflow input operator and then when it is published into the Event Bus by the dataflow sink operator.

What are your thoughts about this @bernhardttom?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants