At Polars I developed a new query engine which uses a hybrid of push and pull. I gave a short (and not very technical) talk about the engine at our first meetup recently, which can be viewed here: https://www.youtube.com/watch?v=Ndil-eLynh4.
Each operator is a (set of) async functions which are connected to its input(s) and output(s) through capacity-1 spsc async channels. An operator pulls input by reading from a channel, and pushes output by writing into a channel. For an oversimplified example, consider a simple select operator:
while let Ok(morsel) = input.recv().await {
let result = select(morsel);
if output.send(result).await.is_err() {
break;
}
}
Note how it has two await points: on receive and send.
The nice part about this is that Rust will automatically transform these asynchronous functions to state machines which can pause execution when either a send or receive blocks, returning control to our scheduler. In the above example the operator is able to pause both to wait for more data, or to wait until the receiver is able to consume more data. This also makes for example pausing execution in the middle of a hash join probe to send some partial results onwards in the computational graph trivial.
rkerno 3 hours ago [-]
It's pretty easy in a push based model to let the 'pusher' know that no more data is required. It's just like unsubscribing from an event, or returning a 'no more' status from the callback. The push model does feel more natural to me, but perhaps that comes from familiarity with linux piping.
geocar 2 hours ago [-]
It's easy when the network is working.
If it isn't, the 'pusher' continues to fill memory buffers that can take minutes to dequeue. You need constant communication and TCP conspires against you on this. If your flow is primarily one-directional, you might be able to use keep-alives for this, but the defaults are terrible. Here's what I have used:
where 'x' is the anticipated RTT of ping+connect (usually 3-4x the measured RTT you get from TCP_INFO).
Remember: the moon is only about 1.5 seconds away, so unless you're pushing data to the moon these numbers are likely very small.
On older Windows, you've got `SIO_KEEPALIVE_VALS` which is hardcoded at 10 ticks, so you need to divide your distance by 10, but I think new windows supports `TCP_USER_TIMEOUT` like Linux.
Mac doesn't have these socket options. I think you can set net.inet.tcp.keep* with sysctl, but this affects/breaks everything, so I don't recommend xnu as a consumer to a high-volume push-stream.
I actually don't recommend any of this unless you have literally no control over higher-level parts of the protocol: TCP just isn't good enough for high-volume push-protocols, and so I haven't used this in my own stuff for decades now.
willvarfar 4 hours ago [-]
Interesting reading!
Does this explain a big inefficiency in BigQuery where it only ever does hash-joins? Is it because it is push so it never does merge-joins even when the inputs are all sorted or clustered etc?
Although tbh can't see why a system can't combine them both; some of the edges being push+buffer and some being buffer+pull.
Rendered at 10:29:31 GMT+0000 (Coordinated Universal Time) with Vercel.
Each operator is a (set of) async functions which are connected to its input(s) and output(s) through capacity-1 spsc async channels. An operator pulls input by reading from a channel, and pushes output by writing into a channel. For an oversimplified example, consider a simple select operator:
Note how it has two await points: on receive and send. The nice part about this is that Rust will automatically transform these asynchronous functions to state machines which can pause execution when either a send or receive blocks, returning control to our scheduler. In the above example the operator is able to pause both to wait for more data, or to wait until the receiver is able to consume more data. This also makes for example pausing execution in the middle of a hash join probe to send some partial results onwards in the computational graph trivial.If it isn't, the 'pusher' continues to fill memory buffers that can take minutes to dequeue. You need constant communication and TCP conspires against you on this. If your flow is primarily one-directional, you might be able to use keep-alives for this, but the defaults are terrible. Here's what I have used:
where 'x' is the anticipated RTT of ping+connect (usually 3-4x the measured RTT you get from TCP_INFO).Remember: the moon is only about 1.5 seconds away, so unless you're pushing data to the moon these numbers are likely very small.
On older Windows, you've got `SIO_KEEPALIVE_VALS` which is hardcoded at 10 ticks, so you need to divide your distance by 10, but I think new windows supports `TCP_USER_TIMEOUT` like Linux.
Mac doesn't have these socket options. I think you can set net.inet.tcp.keep* with sysctl, but this affects/breaks everything, so I don't recommend xnu as a consumer to a high-volume push-stream.
I actually don't recommend any of this unless you have literally no control over higher-level parts of the protocol: TCP just isn't good enough for high-volume push-protocols, and so I haven't used this in my own stuff for decades now.
Does this explain a big inefficiency in BigQuery where it only ever does hash-joins? Is it because it is push so it never does merge-joins even when the inputs are all sorted or clustered etc?
Although tbh can't see why a system can't combine them both; some of the edges being push+buffer and some being buffer+pull.