Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock in execution #847

Open
jordanrfrazier opened this issue Nov 2, 2023 · 1 comment
Open

Deadlock in execution #847

jordanrfrazier opened this issue Nov 2, 2023 · 1 comment
Assignees

Comments

@jordanrfrazier
Copy link
Collaborator

jordanrfrazier commented Nov 2, 2023

There exists a deadlock when running the following:

import kaskada as kd
import asyncio

kd.init_session()

data1 = "\n".join(
    [
        "time,key,m,n",
        "1996-12-19T16:39:57,A,5,10",
        "1996-12-19T16:39:58,B,24,3",
        "1996-12-19T16:39:59,A,17,6",
        "1996-12-19T16:40:00,A,,9",
        "1996-12-19T16:40:01,A,12,",
        "1996-12-19T16:40:02,A,,",
    ]
)

data2 = "\n".join(
    [
        "time,key,m,n",
        "1996-12-20T16:39:57,A,5,10",
        "1996-12-20T16:39:58,B,24,3",
        "1996-12-20T16:39:59,A,17,6",
        "1996-12-20T16:40:00,C,,9",
        "1996-12-20T16:40:01,A,12,",
        "1996-12-20T16:40:02,A,,",
    ]
)

------


source = await kd.sources.CsvString.create(data1, time_column="time", key_column="key")

execution = source.run_iter(mode="live")

async def add_more_data():
    print("waiting to send more data")
    await asyncio.sleep(0.5)
    await source.add_string(data2)
    print("sent more data")

async def stop_execution():
    print("waiting to stop execution")
    await asyncio.sleep(1.5)
    print("stopping execution", flush=True)
    await asyncio.sleep(0.001)
    execution.stop()
    print("stopped execution", flush=True)

async def output_batches():
    while True:
        try:
            print("waiting for next batch")
            async with asyncio.timeout(2):
                next_batch = await execution.__anext__()
                print(next_batch)
        except StopAsyncIteration:
            print("stop async iteration")
            break
        except StopIteration:
            print("stop iteration")
            break
        except TimeoutError:
            print("timeout")
            break
        except Exception as exp:
            print(f"other exception: {exp}")


stop_task = asyncio.create_task(stop_execution())
add_task = asyncio.create_task(add_more_data())
output_task = asyncio.create_task(output_batches())


# wait for the tasks to finish
await stop_task
print("stop task complete")
await add_task
print("add task complete")
await output_task
print("output task complete")
@jordanrfrazier
Copy link
Collaborator Author

A few issues:

  • Suspect some interesting aspects of asyncio or the GIL -- print statements in the execution.stop() code were not being executed (or displayed(?))
  • The execution lock will simply not work as-is -- the stop and next methods both require an owned instance of execution, which locks them out of each other. So, even if the above was fixed, this would cause deadlock on execution. We need to push down the mutex to the output channel, at least, to allow for the stop signal to be sent concurrently.

@jordanrfrazier jordanrfrazier self-assigned this Nov 2, 2023
@jordanrfrazier jordanrfrazier changed the title Deadlock with stop signal Deadlock in execution Nov 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant