Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

with window.externalTime(), not able to fetch events from stream #1750

Closed
Connect2naga opened this issue Sep 27, 2021 · 7 comments
Closed

with window.externalTime(), not able to fetch events from stream #1750

Connect2naga opened this issue Sep 27, 2021 · 7 comments
Assignees

Comments

@Connect2naga
Copy link

Connect2naga commented Sep 27, 2021

Description:

create configuration like below

@App:name('alarm-count-rule')
@App:description('Receive events via HTTP transport and view the output on the console')
@source(type = 'kafka', bootstrap.servers = "0.0.0.0:9092", topic.list = "event_2", group.id = "newprocessor_1", threading.option = "single.thread", 
    @map(type = 'json'))
define stream FMNotification (dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long);
-- @sink(type = 'kafka', bootstrap.servers = "0.0.0.0:9092", topic = "alarmCountRule", is.binary.message = "False", 
--  @map(type = 'json'))
-- --define stream OutputStream (dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long,alarmCount long);
-- define stream OutputStream (dn string, alarmCount long);
@sink(type = 'kafka', bootstrap.servers = "0.0.0.0:9092", topic = "alarmCountRule", is.binary.message = "False", 
    @map(type = 'json'))
--define stream OutputStream (dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long,alarmCount long);
define stream ProessedEvents(dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long,alarmCount long);

@info(name = 'fmNotification')
from FMNotification#window.externalTime(systemNotificationTime, 5 sec) 
--from FMNotification#window.time(3 sec)
select dn,specificProblem,notificationType,presentationName,moClass,systemNotificationTime,count() as alarmCount
group by dn
insert all events into TempStream;
@info(name = 'alarmCount')
from TempStream [alarmCount > 2]
select dn, specificProblem,notificationType,presentationName,moClass,systemNotificationTime, alarmCount
insert into ProessedEvents;

Expected: once i got 3 events with in 3 seconds, we should get all the participated events.
Issue : instead the last event is printing in stream also the subsequent events satisfies the conditions

Affected Siddhi Version:
5.1.1

OS, DB, other environment details and versions:

Steps to reproduce:

Related Issues:

@Connect2naga
Copy link
Author

Could you please help on this.

@senthuran16
Copy link
Member

Hi @Connect2naga ,
Have you tried using the externalTimeBatch window with expired events?

from InStream#window.externalTimeBatch(timestamp, 3 sec)
select a, b, c
insert expired events into OutStream;

@vankin09
Copy link

vankin09 commented Nov 5, 2021

Hi,

I tried using the above but this doesn't help with requirement. I get only the expired events which doesn't serve any purpose.
The use case is as follows (with an example)
When there are n events within a particular time window, this should be notified.
Example: If and only if 3 or more events arrive within 5 seconds of each other. (one field to serve as the reference time), all 3 events should be put into an OutputStream
Say, there are these events

Event 1 with fieldTime 12:00:00
Event 2 with fieldTime 12:00:03
Event 3 with fieldTime 12:00:06
Event 4 with fieldTime 12:00:09
Event 5 with fieldTime 12:00:10

In this case, only events 3,4,5 should be forwarded to OutputStream
To achieve this, I tried a different approach

@App:name('alarm-count-rule-1')

@app:description('Receive events via kafka and put the correlated events in kafka sink')

@source(type = 'kafka', bootstrap.servers = "0.0.0.0:9092", topic.list = "event_2", group.id = "newprocessor_1", threading.option = "single.thread",
@Map(type = 'json'))
define stream TestNotification (id string, dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long);

define window TestNotificationWindow (id string, dn string, specificProblem string, notificationType string, presentationName string, moClass string, systemNotificationTime long) externalTime(systemNotificationTime, 5 sec);

@sink(type = 'kafka', bootstrap.servers = "0.0.0.0:9092", topic = "alarmCountRule", is.binary.message = "False",
@Map(type = 'json'))
define stream corr_1 (corr_r string, corr1 string, corr2 string);

@info(name = "window")
from TestNotification
insert into TestNotificationWindow;

@info(name = "Event table")
partition with (dn of TestNotificationWindow)
begin
    from every( e1=TestNotificationWindow)
         -> e2 = TestNotificationWindow[e1.systemNotificationTime < systemNotificationTime and e1.systemNotificationTime + 5000 > systemNotificationTime and e1.id != id]< 2: >
    select e1.id as corr_r, e2[0].id as corr1, e2[1].id as corr2
    insert into corr_1
end;

With this the idea is to get the ids of all the correlated events and push it to outputstream. Then it could be used to fetch the data using http query.
However, the problem with this is that this is hardcoded, if somehow the events could be looped, then that would help
select e1.id as corr_r, e2[0].id as corr1, e2[1].id as corr2

@senthuran16
Copy link
Member

Hi @vankin09 ,

We can avoid hardcoding by merging the ids of e1 and all the e2s into a list, using functions of siddhi-execution-list. Please see an example below:

@App:name("SiddhiAppSimple")


@App:description("Description of the plan")

@sink(type='log', prefix='>>>Input')
define stream InputStream(id string, timestamp long);

@sink(type='log', prefix=">>>>>>Collected")
define stream MatchedCorrelatedIdsStream(ids object);

@sink(type='log', prefix='CORRELATED ID')
define stream CorrelatedIdsStream(id string);


from every( e1=InputStream)
         -> e2 = InputStream[e1.timestamp < timestamp and e1.timestamp + 5000 > timestamp and e1.timestamp != timestamp]< 2: >
    select list:addAll(list:create(e1.id), list:merge(e2.id)) as ids
    insert into MatchedCorrelatedIdsStream;

from MatchedCorrelatedIdsStream#list:tokenize(ids)
select value
insert into CorrelatedIdsStreamCorrelatedIdsStream;

from CorrelatedIdsStreamCorrelatedIdsStream#window.batch(1)
select convert(value, 'string') as id
insert into CorrelatedIdsStream;

Sample input simulated to InputStream:

event1,60000
event2,63000
event3,66000
event4,69000
event5,70000

event3, event4, and event5 will be published to CorrelatedIdsStream when simulating the above.

Output:

INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636459436385, data=[event1, 60000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636459437385, data=[event2, 63000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636459438385, data=[event3, 66000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636459439385, data=[event4, 69000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636459440385, data=[event3], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636459440385, data=[event4], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636459440385, data=[event5], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>>>>Collected : Event{timestamp=1636459440385, data=[[event3, event4, event5]], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636459440385, data=[event5, 70000], isExpired=false} (Encoded)

However, we can observe that the created lists don't seem to be getting cleared properly.
I.e, when simulating the following events after simulating event1 - event5 above:

event6,80000
event7,83000
event8,86000
event9,89000
event10,90000

the expected outcome would be just event8, event9, event10, but what we are observing is as follows (event4 and event5 are still present in the list):

INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636460007318, data=[event6, 80000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636460016239, data=[event7, 83000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636460024238, data=[event8, 86000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636460033711, data=[event9, 89000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636460040519, data=[event8], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636460040519, data=[event4], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636460040519, data=[event5], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636460040519, data=[event9], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - CORRELATED ID : Event{timestamp=1636460040519, data=[event10], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>>>>Collected : Event{timestamp=1636460040519, data=[[event8, event4, event5, event9, event10]], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636460040519, data=[event10, 90000], isExpired=false} (Encoded)

We will have to look into this further, as to how to address this behaviour.

@vankin09
Copy link

We will also try to check from our end on top of the solution you have provided.

Apart from this, we also had a query related to the out of order. The scenario is:
I have a window configured with externalTime for 5 seconds with event that match is 3 and I have the following sequence of events(external time of the event as mentioned below)

Window: 5 seconds, number of events: 3
event1, 08000
event2, 09000
event3, 16000
event4, 10000
event5, 25000
event6, 17000
event7, 18000

The expected outcome of this is:

There are 2 different sets found as listed below

  • event1, event2, event4
  • event3, event6, event7

Can you let me know if the window used will result in expected outcome ?

@senthuran16
Copy link
Member

Hi @vankin09 ,

Yes, the above requirement can be achieved using Siddhi Pattern. Please see the following Siddhi application that demonstrates this behaviour.

@App:name("PatternMatching")

@sink(type='log', prefix='>>>Input')
define stream InputStream(id string, timestamp long);

@sink(type='log', prefix='MATCHED')
define stream LogStream(e1Id string, e2Id string, e3Id string);


from every( e1=InputStream)
         -> e2 = InputStream[e1.timestamp < timestamp and e1.timestamp + 5000 > timestamp and e1.timestamp != timestamp]< 2: >
    select e1.id as e1Id, e2[0].id as e2Id, e2[1].id as e3Id
    insert into LogStream;

Input:

event1,8000
event2,9000
event3,16000
event4,10000
event5,25000
event6,17000
event7,18000

Output:

INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896589549, data=[event1, 8000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896590549, data=[event2, 9000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896591549, data=[event3, 16000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - MATCHED : Event{timestamp=1636896592549, data=[event1, event2, event4], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896592549, data=[event4, 10000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896593549, data=[event5, 25000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896594549, data=[event6, 17000], isExpired=false} (Encoded) 
INFO {io.siddhi.core.stream.output.sink.LogSink} - MATCHED : Event{timestamp=1636896595549, data=[event3, event6, event7], isExpired=false} 
INFO {io.siddhi.core.stream.output.sink.LogSink} - >>>Input : Event{timestamp=1636896595549, data=[event7, 18000], isExpired=false} (Encoded)

@AnuGayan
Copy link
Contributor

Closing the issue since the answer is provided above

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants