Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Initial journalling implementation #845

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Conversation

bjchambers
Copy link
Collaborator

This is part of #841.

  • Persistence of batches using Arrow IPC format.
  • Journalling and recovery of batches via append-only okaywal crate.
  • Concatenating batches to produce checkpoints (also in IPC format)
  • Recovering from checkpoints
  • Concatenating journal checkpoints with in-memory data for execution.

Next steps

  • Allow adding checkpointed batches directly (eg., for import from Parquet).
  • Allow subscribing to newly added batches (eg., possibly replacing existing in-memory implementation).

Future work:

  • Handle late data by merging with overlapping in-memory and possibly checkpointed batches.
  • Checkpoint to object store / read checkpoints from object stores. May lead to moving checkpoint storage to Parquet, even while using IPC for the write-ahead log.

This is part of #841.

- Persistence of batches using Arrow IPC format.
- Journalling and recovery of batches via append-only `okaywal` crate.
- Concatenating batches to produce checkpoints (also in IPC format)
- Recovering from checkpoints
- Concatenating journal checkpoints with in-memory data for execution.

Next steps
- Allow adding checkpointed batches directly (eg., for import from
  Parquet).
- Allow subscribing to newly added batches (eg., possibly replacing
  existing in-memory implementation).

Future work:
- Handle late data by merging with overlapping in-memory and possibly
  checkpointed batches.
- Checkpoint to object store / read checkpoints from object stores.
  May lead to moving checkpoint storage to Parquet, even while using IPC
  for the write-ahead log.
@cla-bot cla-bot bot added the cla-signed Set when all authors of a PR have signed our CLA label Nov 1, 2023
@github-actions github-actions bot added enhancement New feature or request sparrow labels Nov 1, 2023
}

#[cfg(test)]
mod tests {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move tests to a separate tests.rs file.

_checkpointed_entries: &mut okaywal::SegmentReader,
_wal: &okaywal::WriteAheadLog,
) -> std::io::Result<()> {
// TODO: This should use `_last_checkpointed_id` to ensure that only batches
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is probably critical, and it may reveal an issue with how things are structured (and how we anticipated implementing them).

Specifically:

  1. The checkpointing happens asynchronously. So we should not just use the current in-memory batches for checkpointing.
  2. This means that some batches may have been added to the new segment (and thus the "in-memory" batch vector) since we decided to checkpoint. These batches should not be part of the checkpoint (they'll be in the next one) or we'll have duplicate rows.

Possible options:

  1. De-duplicate if rows show up in the checkpoint and segment.
  2. When checkpointing, use the entries themselves, rather than the "in-memory batches" -- this wouldn't work since the in-memory batches need to be flushed at some point.
  3. Use some kind of "timestamped in-memory batches" (associated with the entry IDs). It shouldn't be difficult to associate each element in the vector with the corresponding EntryId, and only take a prefix of the vector corresponding to those entries.

I suspect that case 3 is the best option. While we're not trying to handle late data, it does mean that the late data strategy gets a little trickier. We have a few options:

A) When late data arrives, force a checkpoint. Simple, but in the presence of lots of late data this would lead to slow-down and excessively small checkpoints.
B) Leave the batches in memory unordered. When we need to execute on them, we can check to see if there is any overlap, and decide to do (i) concatenation if there is none or (ii) concatenation and sorting if there is or (iii) sorting and merging.
C) Something more intelligent, that manages both the entry IDs associated with batches and the late data.

I suspect B may be the best option, unless we have a way of detecting when a checkpoint is taken.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, critical to do. And agreed with option 3 -- disregarding late data issues for now, the checkpointer could keep track of the timestamps for each checkpoint, and have separate vectors for each set of batches. Or, we could include a checkpoint_version alongside each batch that flows through the system:

WatermarkedAndCheckpointBatch {
  watermark: i64
  checkpoint: usize
  batch: Batch 
} 

I'm not sure I understand the idea for unordered batches in memory, but that likely needs a whole conversation around late data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, a possibly cleaner way of handling this would be to use a different write-ahead log.

https://docs.rs/simple_wal/latest/simple_wal/struct.LogFile.html#method.compact

This one doesn't automatically decide to checkpoint for us, but it would let us do the following:

struct PendingCheckpoint {
  batches: Vec<Batch>,
  wal_offset: u64,
}

struct JournalState {
  pending_checkpoints: Vec<PendingCheckpoint>,
  active: Dequeue<Batch>,
  active_rows: usize,
  active_bytes: usize,
}

struct Journal {
  state: Mutex<JournalState>,
  wal: simple_wal::LogFile,
}

impl Journal {
  pub fn journal(&self, batch: Batch) -> Result<()> {
    let bytes = batch_io::encode(&batch)?;

    // Lock state while writing journal.
    let state = self.state.lock();
    let wal_offset = wal.write(&bytes)?;
    
    // Append the batch, assuming no late data.
    // TODO: If there is late data, we may want to merge this with the
    // batches it overlaps if they are still in memory, to ensure the in-memory
    // set can be concatenated to create an ordered batch.
    state.active_rows += batch.num_rows();
    state.active_bytes += batch.num_bytes();
    state.active.push(batch);

    if state.active_rows > CHECKPOINT_ROWS || state.active_bytes > CHECKPOINT_BYTES {
      let batches = std::mem::take(&mut state.active);
      state.pending_checkpoints.push_back(PendingCheckpoint {
        batches,
        wal_offset,
      });
      state.active_rows = 0;
      state.active_bytes = 0;
      // TODO: Send the pending checkpoints to a checkpoint thread.
      // When that completes, it will call "after_checkpoint"
    }

    Ok()
  }

  fn read_wal(&self) -> Result<impl Stream<Item = Batch>> {
    // Reading the WAL involves reading the batches from both the "pending_checkpoints"
    // and the active. They'll need to be merged when we allow late data in case there are
    // overlaps.
    todo!()
  }

 fn after_checkpoint(&self, wal_offset: u64) -> Result<()> {
   let state = self.state.lock();
   let next_checkpoint = state.pending_checkpoint.pop_first().expect("pending checkpoint");
   assert_eq!(next_checkpoint.wal_offset, wal_offset);
   Ok(())
 }
}

Specifically, 3 is solved because we use a lock to append to the journal and to complete a checkpoint. Note that the work done "with the lock" is still relatively minimal (appending a batch to the checkpoint queue, etc.)

/// Decode the batch from the given entry.
///
/// If it was completely written, will return `Some(batch)`. If it wasn't completely
/// written, will return `None`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is None a valid state? What does it mean to not completely write a batch? I didn't see any further explanation in okayWal.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up_to_time,
)));
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic feels interesting. Iterating through all chunk pairs and adding the batch to the buffer, but once we peek and see we're at the last chunk pair, return a record batch?

Is it done this way because we have to set the last batch's message with the header_as_record_batch() instead of as_dict_batch()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah -- this roughly based on what the Arrow IPC writer does under the hood. Each "dictionary column" becomes a dictionary batch, and then you have the final batch. So you end up with:

  • Dict 1 Metadata, Dict 1 Data
  • Dict 2 Metadata, Dict 2 Data
  • ...
  • Dict N Metadata, Dict N Data
  • Batch Metadata, Batch Metadata

In many cases, there will be no dicts, but figured it was worth keeping.

fn decode_message(buf: &[u8]) -> error_stack::Result<arrow_ipc::Message<'_>, Error> {
arrow_ipc::root_as_message(buf)
.map_err(|e| {
// We'd like to conrert the flatbuffer error directly.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert

/// Manages the checkpointed files.
///
/// We could use Parquet files for check-pointing but we instead use
/// Arrow IPC files, since we believe they should be easier to read.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this for me? Why are Arrow IPC files > parquet

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrow IPC files write data out using the same format as Arrow, so they can largely be mapped into memory directly, while Parquet requires some things like decompression and changing formats. So, we'd likely expect:

  • Arrow IPC may be faster for read/write.
  • Parquet may have wider support as an archival format (eg., on object store)

.get("max_present_time")
.ok_or(Error::RecoveringCheckpoints)?;
let max_present_time =
RowTime::from_str(max_present_time).change_context(Error::RecoveringCheckpoints)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sheesh. Wonder if there would be a macro invocation or splitting this up into a helper or two to decrease the amount of into_report().change_context(Error::RecoveringCheckpoint)s

.into_report()
.change_context(Error::WriteCheckpoint)?;

tracing::info!("Checkpointed {rows} jouranl rows to {file_name}");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

journal

/// Create a check-point from the given batches.
///
/// This method assumes the batches are non-overlapping and ordered by time.
pub fn checkpoint(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this time, journals will include the "prepared" data, correct? (we have the Batch with separate time,subsort,key columns, so I assume that must be the case) Is there any reason we'd want to instead journal the user data directly? For example, if we support decimals in prepare, or any non-backwards-compatible change, we'd want to run prepare on the user data again.

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "checkpoint path ({}) is not a directory", "_0.display()")]
CheckpointDirNotDir(std::path::PathBuf),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just CheckpointDir? The text is descriptive


fn add_batch(&self, batch: Batch) -> error_stack::Result<(), Error> {
let mut lock = self.batches.lock();
batch.max_present_time();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

_checkpointed_entries: &mut okaywal::SegmentReader,
_wal: &okaywal::WriteAheadLog,
) -> std::io::Result<()> {
// TODO: This should use `_last_checkpointed_id` to ensure that only batches
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, critical to do. And agreed with option 3 -- disregarding late data issues for now, the checkpointer could keep track of the timestamps for each checkpoint, and have separate vectors for each set of batches. Or, we could include a checkpoint_version alongside each batch that flows through the system:

WatermarkedAndCheckpointBatch {
  watermark: i64
  checkpoint: usize
  batch: Batch 
} 

I'm not sure I understand the idea for unordered batches in memory, but that likely needs a whole conversation around late data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed Set when all authors of a PR have signed our CLA enhancement New feature or request sparrow
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants