Launch New Product Launch - Trieve Vector Inference! »

· 6 min read

cdxker

Streaming LLM assistant completions with the OpenAI API and Rust Actix-Web

Guide on how we were able to stream LLM assistant completions in real time using Actix-Web.

Guide on how we were able to stream LLM assistant completions in real time using Actix-Web.

When choosing to decide what software to build Arguflow AI with we were tired of using Javascript for our backend services. We wanted something better, something faster, something safer, something rusty. Our main motivation behind choosing to use rust was for the learning experience behind it.

Why Actix

When looking at what framework to use we had two choices actix_web or rocket. We chose to use actix and actix_web because we heard that it was faster than rocket from benchmarks.

Our Naive Solution

Streaming data with Actix

The first thing we saw on how to stream data in actix_web was using tokio’s built-in mpsc_channels.

pub type StreamItem = Result<Bytes, actix_web::Error>;

pub async fn stream_response(messages: Vec<Message>, pool: web::Data<Pool>)  {
    let (tx, rx) = mpsc::channel::<StreamItem>(1000);
    let receiver_stream: ReceiverStream<StreamItem> = ReceiverStream::new(rx);
    ...
    // Send data at some point
    let _ = tx.send(Ok("data".into())
    ...

    // Return Result<HttpResponse, actix_web::Error> to client
    Ok(HttpResponse::Ok().streaming(receiver_stream))
}

Getting Completions from openAI

For streaming we used the openai_dive crate. Ensure you enable the stream feature flag.

According to their documentation, you can stream data like this:

...
    let client = Client::new("sk-xxxxxxxxxxxxxxxx");

    let parameters = ChatCompletionParameters {
        messages: vec![
            ChatMessage {
                role: Role::User,
                content: "Hello!".to_string(),
                name: None,
            },
            ...
        ],
        ...
    };
    let mut stream = client.chat().create_stream(parameters).await.unwrap();
    while let Some(response) = stream.next().await {
        match response {
            Ok(chat_response) => chat_response.choices.iter().for_each(|choice| {
                if let Some(content) = choice.delta.content.as_ref() {
                    print!("{}", content);
                }
            }),
            Err(e) => eprintln!("{}", e),
        }
    }
...

Bringing both together

Streaming data won’t get saved to open AI, so as we iterate through the messages we should keep track of the full message so we can store it to the database when streaming finishes. Giving us this as our final function

pub async fn stream_response(
    messages: Vec<Message>,
    pool: web::Data<Pool>
) -> Result<HttpResponse, actix_web::Error> {

    let (tx, rx) = mpsc::channel::<StreamItem>(1000);
    let receiver_stream: ReceiverStream<StreamItm> = ReceiverStream::new(rx);

    let open_ai_messages: Vec<ChatMessage> = messages
        .iter()
        .map(|message| ChatMessage::from(message.clone()))
        .collect();
    let client = Client::new("sk-xxxxxxxxxxxxxxxx");

    let parameters = ChatCompletionParameters {
        messages: open_ai_messages,
        ...
    };

    let mut response_content = String::new();
    let mut completion_tokens = 0;
    let mut stream = client.chat().create_stream(parameters).await.unwrap();

    while let Some(response) = stream.next().await {
        match response {
            Ok(chat_response) => {
                completion_tokens += 1;

                log::info!("Got chat completion: {:?}", chat_response);

                let chat_content = chat_response.choices[0].delta.content.clone();
                if chat_content.is_none() {
                    log::error!("Chat content is none");
                    continue;
                }
                let chat_content = chat_content.unwrap();

                let multi_use_chat_content = chat_content.clone();
                let _ = tx.send(Ok(chat_content.into())).await;
                response_content.push_str(multi_use_chat_content.clone().as_str());
            }
            Err(e) => log::error!("Error getting chat completion: {}", e),
        }
    }

    let completion_message = Message::from_details(
        response_content,
        ...
    );

	// Lets hope this finishes in the background
    let _ = web::block(move || {
	create_message_query(completion_message, &pool)
    }).await?;

    Ok(HttpResponse::Ok().streaming(receiver_stream))
}

This has issues though, when we ran this, it returned to the client once the streaming context completed. Not in a streaming context.

The big issue that we didn’t see here is that calling await on the stream at all will block the main thread from returning until it has fully resolved.

This lead us down a rabbit hole of trying to put this while loop in a different process. This proved difficult because we wanted to spawn an asynchronous function on a separate thread, while Actix is much better suited for spawning synchronous functions on separate threads.

This had other larger issues because mpsc::channel channels don’t implement the Send trait that is vital for it to be sent across threads and frankly we didn’t have the knowledge to implement the Send trait ourselves

What went wrong

Eventually we went back a few steps and looked at it a different way. Instead of looking how to stream to the client first, then how to put the OpenAI messages into that stream. We looked at how to push the OpenAI stream to the client. Then how to push that data onto the server. Worse case-scenario, we have the client send back the completed message to the server.

Looking at tokio_stream I found the StreamExt trait which allows you to map a stream as if it was an iterator. This ended up giving us a much more concise function that looked very aesthetic (if you ask me).

How to properly stream

pub async fn stream_response(
    messages: Vec<models::Message>,
    pool: web::Data<Pool>
) -> Result<HttpResponse, actix_web::Error> {

    let open_ai_messages: Vec<ChatMessage> = messages
        .iter()
        .map(|message| ChatMessage::from(message.clone()))
        .collect();

    let client = Client::new("sk-xxxxxxxxxxxxxxxx");

    let parameters = ChatCompletionParameters {
        messages: open_ai_messages,
	    ...
    };

    let stream = client.chat().create_stream(parameters).await.unwrap();

    Ok(HttpResponse::Ok().streaming(
        stream.map(|response| -> Result<Bytes, actix_web::Error> {
            if let Ok(response) = response {
                let chat_content = response.choices[0].delta.content.clone();
                return Ok(Bytes::from(chat_content.unwrap_or("".to_string())));
            }
            log::error!("Something bad happened");
            Err(ServiceError::InternalServerError.into())
        })
    ))
}

Great, now its streaming, but we removed core functionality that pushed the completed message to the database. We need to both process data while sending it to the client. So… we went back to channels.

Actix Arbiter

In order for this to truly work how we want, we need to bring in another actix primitive, `Arbiters. Arbiters spawn a process on a different thread, on this separate thread we will need to get the completion messages via a channel and write to the database once the full response is received.

Arbiter::new().spawn(move {
    // Do stuff on other thread
});

mpsc_channels can’t be sent across threads, we instead used a different crate crossbeam-channel. This has channels that can be sent across threads.

This time when we looked at channels, we had the main thread transmit data over to the child thread. This was easier to do while mapping the stream before data gets sent down to the client.

Proper solution

pub async fn stream_response(
    messages: Vec<models::Message>,
    pool: web::Data<Pool>,
) -> Result<HttpResponse, actix_web::Error> {

    let open_ai_messages: Vec<ChatMessage> = messages
        .iter()
        .map(|message| ChatMessage::from(message.clone()))
        .collect();

    let open_ai_api_key = std::env::var("OPENAI_API_KEY")
							.expect("OPENAI_API_KEY must be set");

    let client = Client::new(open_ai_api_key);

    let parameters = ChatCompletionParameters {
        messages: open_ai_messages,
	    ...
    };

    let (sending_chan, receiving_chan) = crossbeam_channel::unbounded::<String>();
    let stream = client.chat().create_stream(parameters).await.unwrap();

    Arbiter::new().spawn(async move {
        let chunk_v: Vec<String> = receiving_chan.iter().collect();
        let completion = chunk_v.join("");

        let new_message = models::Message::from_details(
            completion,
            ...
        );

		// Write to database since we are in an arbiter no need to block
        let _ = create_message_query(new_message, &pool);
    });

    Ok(HttpResponse::Ok().streaming(stream.map(
        move |response| -> Result<Bytes, actix_web::Error> {
            if let Ok(response) = response {
                let chat_content = response.choices[0].delta.content.clone();
                if let Some(message) = chat_content.clone() {
				    // Send data to arbiter
                    sending_chan.send(message).unwrap();
                }
                return Ok(Bytes::from(chat_content.unwrap_or("".to_string())));
            }
            Err(ServiceError::InternalServerError.into())
        },
    )))
}

Conclusion

There are 2 major learnings we got from this experience.

The first is that in Rust its best to think of a solution that uses functional programming first. Then move on to a more imperative solution.

The second is that Rust still has very few resources on what is best to use. We had the idea to use crossbeam_channel from a random forum comment here that was 2 years old. We were very skeptical of even using it because it felt like a random third party hack that might not have worked with tokio’s runtime. We only chose it out of desperation. That’s the main motivator behind this blog post.

The current code has a few edits to it, but is essentially the same, you can navigate to it on the link below.

Good Luck Rustaceans and Happy Hacking!

GITHUB: https://github.com/devflowinc/trieve

Specific File: Here

Back to Blog

Related Posts

View All Posts »

Guide for Self-Hosting Trieve on a VPS

Instructions for self-hosting Trieve on a VPS using docker-compose. You'll be able to set up Trieve on a Hetzner server which comes with semantic and hybrid search, SPLADE fulltext search, re-ranker models, RAG AI Chat, recommendations, and analytics.