Schedule long running Rust future on main context?

Hello,

I’m trying to integrate the asynchronous API of the zbus crate with a glib mainloop. zbus is a pure Rust implementation of DBus.

I used to extract the file descriptor out of a zbus connection to a DBus broker and add that as a source to the main context, but a recent refactoring in zbus somehow broke this (see https://gitlab.freedesktop.org/dbus/zbus/-/issues/195).

However around the same time zbus introduced an asynchronous API based on Rust futures, and the glib crate already allows me to execute futures on a MainContext, so I came up with the following code:

/// Run an object server on the given connection.
///
/// Continuously polls the connection for new messagesand dispatches them to `server`.
pub async fn run_server(mut connection: zbus::azync::Connection, mut server: zbus::ObjectServer) {
    while let Some(result) = connection.next().await {
        match result {
            Ok(message) => match server.dispatch_message(&message) {
                Ok(true) => trace!("Message dispatched to object server: {:?} ", message),
                Ok(false) => warn!("Message not handled by object server: {:?}", message),
                Err(error) => error!(
                    "Failed to dispatch message {:?} on object server: {}",
                    message, error
                ),
            },
            Err(error) => error!("Failed to receive message from bus connection: {:?}", error),
        }
    }
}

fn main() {
    let context = glib::MainContext::default();
    context.push_thread_default();

    let mainloop = glib::MainLoop::new(Some(&context), false);
    
    let connection =
        zbus::Connection::session().with_context(|| "Failed to connect to session bus")?;

    info!("Registering all search providers");
    let mut object_server = zbus::ObjectServer::new(&connection);
    register_search_providers(&connection, &mut object_server)?;

    info!("All providers registered, acquiring {}", BUSNAME);
    let object_server = object_server
        .request_name(BUSNAME)
        .with_context(|| format!("Failed to request bus name {}", BUSNAME))?;

    info!("Name acquired, starting server and main loop");
    context.spawn_local(run_server(connection.inner().clone(), object_server));

    mainloop.run();
}

This code effectively creates a long running future which continuously receives and dispatches DBus messages on a connection, and then executes that future on the main context.

Is this a good idea? I’m not very familiar with Rust’s futures, and even less so with Glib.

Hi,

As I indicated on the zbus issue, I’m currently working on a solution that will make the need for explicit action to receive messages for the ObjectServer redundant and also the dispatch_method will likely go away. I’ve spent quite many days this week to make this work and I think I’m close (assuming my changes are accepted by other contributors). After that, you’ll not need to do anything with the fd etc to integrate with the GMainLoop (or any other runtime). Right now the option is to run a thread as workaround where you continuously call ObjectServer::try_handle_next or ObjectServer::dispatch_method.

I am curious how your approach will look like. I think I do prefer to keep transfer and dispatch separate though, and I’d be sorry to see dispatch_method go away.

If zbus automatically handled everything on its own thread I’d have to care for thread safety and make sure to explicitly channel messages to the main loop for things that need to run on the main loop thread. Whereas currently I just spawn the message dispatch futures on the main loop and everything automatically runs on the main loop thread, and I don’t need to care for anything else.

Conceptually I find that simpler, and my small DBus service doesn’t need to handle that much load.

In other words, as said in the zbus issue I’d love to have a message dispatcher to which I feed incoming messages and get one or more outgoing messages in return which I can then put back into a future/stream for outgoing transfer. That’d make it quite simple to receive messages on one thread and dispatch on another.


That said right now all I’d like to know is whether the code I sketched above will work safely, or if there are any serious drawbacks in using futures like that with glib.

As said on the DBus issue spawning a separate thread for try_handle_next is not really an option for me; it’s a lot more complex and brings me back to the issue outlined above: I’d need to channel messages back to the mainloop explicitly for all code whose dispatch handler needs to run on the mainloop.

Not very different than how it’s like right now, just that ObjectServer will do it’s job automatically and you don’t have to do anything. zbus already has a thread running per connection, so it’ll leverage that to also run the ObjectServer machinery. See my changes to the test cases in the last WIP commit here for example.

I think I do prefer to keep transfer and dispatch separate though, and I’d be sorry to see dispatch_method go away.

Sorry but people’s personal preference isn’t something I can possibly cater to. What matters is the use case and how common it is. That’s especially true for high-level API like ObjectServer.

That’s actually not a bad idea/design anyway. Channels are a pretty good way of communicating between threads.

BTW, just FYI we started requiring Send+Sync from Interface implementers (i-e dbus_interface) recently.

From what I can tell (not the glib-rs expert here), that code should work well with our current API. If you really want more control and would like to receive message yourself asynchronously and dispatch, you can always implement the service manually. For simple cases, that’s not very hard actually and people have been doing that in C (which is a lot more complicated) even.

See, that’s what I find hard to believe: Right now I have a single threaded program around the Glib main loop, so I can safely call any GObject based thing inside my interface implementations without much concern for thread safety, and I can freely mutate the internal state of interface implementations to implement a stateful DBus API.

If the object server dispatch moves to a different thread my interface implementations do so as well, and now I have a multi-threaded program, with all sorts of implications around my code: What I can call safely from GLib-based libraries, everything needs to be send+sync so mutable state becomes harder, etc.

From my point of view I don’t need all that because my DBus server will never see so much load that it needs to move outside of the mainloop.

I don’t think that integration into a given message and event loop is an uncommon use case.

See, but I don’t have any threads other than those of zbus. My program is a single-threaded glib event loop, and I’d like it to remain so.

I can’t say I’m happy about this :man_shrugging:

Thanks, that’s all I needed to know.

Actually you don’t. As said before, zbus’ Connection already runs a thread (you can disable that of course but I understood that you don’t). It’s just that currently you don’t face any consequences of that on the ObjectServer API.

I didn’t say that and the future changes will actually make this integration work out of the box for all runtimes/eventloops as it will work independently of them.

The number of threads won’t change here. From your POV, it’s still “single-threaded”. Two things here:

  1. I don’t see how channels are more complicated than what you were doing before with adding new glib source through an FD. Channels are so much simpler and idiomatic in the Rust world.
  2. If you need to communicate more than just “it’s time to call it quits” from your Interface implementations to your main thread, your service isn’t as simple as you think.

BTW, there is also event-listener crate, that makes waiting for events even simpler and has both async and sync API to wait on events. If you depend on recent zbus, you already have an indirect dependency on it. This is very useful for the simple “it’s time to call it quits” case I mentioned above.

Right, that information isn’t helpful. It’d be helpful to know why it makes you unhappy. Could you point to the sources of your service implementation so I could see how this impacts you?

Look, I got my answer, which I’m grateful for, but beyond that I don’t feel that continuing this discussion here helps either of us. I’ve added another comment to the issue, and I’d like to leave it at that, so that we can return to more productive ways to spent our time. I thank you for your help, it’s much appreciated.


As a final word here I’d like to say that I’m sorry that my feedback became unhelpful, but this discussion was beginning to frustrate me: I saw my use case blatantly disregarded as mere “personal preference”, and got told that my my “service isn’t as simple as [I] think” which amounts to saying I don’t understand my own program. I guess that’s the actual misunderstanding here, and I’m sure you didn’t mean say what I heard, but nonetheless it didn’t help. I’m sorry.

:+1: However, I do feel like clarifying two things from your last post:

Another misunderstanding. :slight_smile: I’m sorry for doing a bad job of explaining myself and coming across harsh. I was in fact trying hard to understand your exact use case. I wasn’t dismissive of your use case but rather disagreeing completely with your opinion on how your use case should be satisfied. There is a difference. :slight_smile:

There was an “if” involved. I actually meant more like the other way around: your service is simple so you likely don’t need channels. Again, sorry for very bad choice of words there.

I can confirm that, there’s nothing wrong with that from the glib-rs / glib::MainContext point of view. You’d run the future on the main context, and whenever there’s a message to handle it would wake up and you’d handle it.

The only thing to keep an eye on (if that wasn’t clear anyway) is to make sure that server.dispatch_message() does not block or otherwise take a “long time”, as while it’s running it would also block GTK event handling, UI updates, etc.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.