Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removal of the last stored event in the InMemory table leads to the broken references chain #1822

Open
ydidukh opened this issue Dec 4, 2023 · 0 comments

Comments

@ydidukh
Copy link

ydidukh commented Dec 4, 2023

Description:
The removal of the last stored event in the InMemory table disrupts the references chain, subsequently causing improper behavior within the InMemory table. Any future inserts are erroneously appended to the removed event, leading to unexpected and undesired outcomes.

Affected Siddhi Version:

4.1.27 and higher. The problem has been introduced in the version 4.1.27 when support of incremental snapshots has been added (SnapshotableStreamEventQueue class).

OS, DB, other environment details and versions: N/A

Steps to reproduce:

    @Test
    public void deleteFromTableTest6() throws InterruptedException {
        log.info("deleteFromTableTest6");

        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "" +
                "define stream StockStream (symbol string, price float, vol long); " +
                "define stream DeleteStockStream (symbol string, price float, vol long); " +
                "define stream CountStockStream (symbol string); " +
                "define table StockTable (symbol string, price float, volume long); ";
        String query = "" +
                "@info(name = 'query1') " +
                "from StockStream " +
                "select symbol, price, vol as volume " +
                "insert into StockTable ;" +
                "" +
                "@info(name = 'query2') " +
                "from DeleteStockStream[vol>=100] " +
                "delete StockTable " +
                "   on StockTable.symbol==symbol ;" +
                "" +
                "@info(name = 'query3') " +
                "from CountStockStream#window.length(0) join StockTable" +
                " on CountStockStream.symbol==StockTable.symbol " +
                "select CountStockStream.symbol as symbol " +
                "insert into CountResultsStream ;";

        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);

        InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream");
        InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream");
        InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream");

        siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                EventPrinter.print(events);
                inEventCount += events.length;
            }
        });
        siddhiAppRuntime.start();

        stockStream.send(new Object[]{"WSO2", 55.6f, 100L});
        stockStream.send(new Object[]{"IBM", 75.6f, 100L});
        // Remove last event in the StockTable
        deleteStockStream.send(new Object[]{"IBM", 57.6f, 100L});

        stockStream.send(new Object[]{"WSO2", 57.6f, 100L});
        countStockStream.send(new Object[]{"WSO2"});

        Thread.sleep(500);
        AssertJUnit.assertEquals(2, inEventCount);
        siddhiAppRuntime.shutdown();

    }

Related Issues:
N/A

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant