Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In DataFlow threaded execution variables passed to UDF in event filter when Filter-Optimizable setting is on evaluating to null #224

Open
icholy opened this issue Dec 3, 2020 · 4 comments

Comments

@icholy
Copy link
Contributor

icholy commented Dec 3, 2020

Variables passed to UDFs in event filters evaluate to null if the event originates from a dataflow operator.

create schema Empty ();

create dataflow InputOutput
  BeaconSource -> events<Empty> {}
  EventBusSink(events) {};

create variable String X = String.valueOf(1);

select * from Empty(AssertNotNull(X));
import com.espertech.esper.common.client.configuration.Configuration;
import com.espertech.esper.common.client.dataflow.core.EPDataFlowInstance;
import com.espertech.esper.compiler.client.CompilerArguments;
import com.espertech.esper.compiler.client.EPCompilerProvider;
import com.espertech.esper.runtime.client.EPRuntimeProvider;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;

public class Main {

    public static boolean AssertNotNull(Object v) throws Exception {
        if (v == null) {
            throw new Exception("assertion failure: got null value");
        }
        return true;
    }

    public static void main(String[] args) throws Exception {

        // read epl
        var epl = Files.readString(Path.of("query.epl"), StandardCharsets.US_ASCII);

        // setup configuration
        var configuration = new Configuration();
        configuration.getCompiler().addPlugInSingleRowFunction("AssertNotNull", Main.class.getCanonicalName(), "AssertNotNull");

        // compile it
        var compiler = EPCompilerProvider.getCompiler();
        var arguments = new CompilerArguments(configuration);
        var module = compiler.parseModule(epl);
        var compiled = compiler.compile(module, arguments);

        // deploy it
        var epRuntime = EPRuntimeProvider.getRuntime("", configuration);
        epRuntime.getDeploymentService().deploy(compiled);

        // start all dataflows
        var epDataFlow = epRuntime.getDataFlowService();
        var instances = new ArrayList<EPDataFlowInstance>();
        for (var dataflow: epDataFlow.getDataFlows()) {
            var instance = epDataFlow.instantiate(dataflow.getDeploymentId(), dataflow.getName());
            instances.add(instance);
            instance.start();
        }

        // wait for dataflows to complete
        for (var instance : instances) {
            instance.join();
        }
    }
}
@icholy icholy changed the title Variables in On-Select filter-criteria evaluating to null Variables passed to UDF in filter-criteria evaluating to null Dec 3, 2020
@icholy icholy changed the title Variables passed to UDF in filter-criteria evaluating to null Variables passed to UDF in event filter evaluating to null Dec 3, 2020
@bernhardttom
Copy link
Contributor

bernhardttom commented Jan 22, 2021

Use ConfigurationCompilerPlugInSingleRowFunction.FilterOptimizable.DISABLED

For reference:

            
String epl = "@name('schema') @public @buseventtype create schema Empty ();\n" +
                    "create dataflow InputOutput\n" +
                    "  BeaconSource -> events {}\n" +
                    "  EventBusSink(events) {};\n" +
                    "create variable String X = String.valueOf(1);\n" +
                    "@name('s0') select * from Empty(localAssertNotNull(X));\n";
            env.compileDeploy(epl).addListener("s0");

            EPDataFlowInstance instance = env.runtime().getDataFlowService().instantiate(env.deploymentId("schema"), "InputOutput");
            instance.start();

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            env.assertListenerInvoked("s0");

            instance.cancel();

            env.undeployAll();

@bernhardttom bernhardttom changed the title Variables passed to UDF in event filter evaluating to null Variables passed to UDF in event filter when Filter-Optimizable setting is on evaluating to null Jan 22, 2021
@bernhardttom
Copy link
Contributor

Could however improve to detect the variable or provide an exception

@bernhardttom bernhardttom reopened this Jan 22, 2021
@bernhardttom bernhardttom changed the title Variables passed to UDF in event filter when Filter-Optimizable setting is on evaluating to null In DataFlow threaded execution variables passed to UDF in event filter when Filter-Optimizable setting is on evaluating to null Jan 22, 2021
@icholy
Copy link
Contributor Author

icholy commented Jan 25, 2021

If a constant is used instead of a variable, null is not passed.

@icholy
Copy link
Contributor Author

icholy commented Jan 25, 2021

@bernhardttom btw, I already have ConfigurationCompilerPlugInSingleRowFunction.ValueCache.DISABLED set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants