Streaming Examples
Any library that supports structured streaming outputs should work with the struct-strm
library. However, these use cases aren't widely doumented yet, so I'll add examples here for integrations.
OpenAI
First we can use the async OpenAI client with a structure to create a stream. Examples of creating the async stream with the OpenAI client can also be found in the OpenAI docs.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 | from openai import AsyncOpenAI
from pydantic import BaseModel
client = AsyncOpenAI(api_key="your token here")
# Setup up all of the required info for the query
class DefaultListItem(BaseModel):
item: str = ""
class DefaultListStruct(BaseModel):
# mostly just for testing
items: list[DefaultListItem] = []
# The few shot example is optional
few_shot_examples = DefaultListStruct(
items=[
DefaultListItem(item="The Hugging Face Transformers library is an open-source Python library that provides access to a vast collection of pre-trained Transformer models for various machine learning tasks. While initially focused on Natural Language Processing (NLP), its capabilities have expanded to include computer vision, audio processing, and multimodal applications.")
]
).model_dump_json()
query = "Create list describing 10 open source llm tools"
model = "gpt-4.1-mini"
# create the messages -
messages = []
messages.append({"role": "system", "content": f"example response: {few_shot_examples}"})
messages.append({"role": "user", "content": query})
stream = await client.chat.completions.parse(
model=model,
messages=messages,
response_format=DefaultListStruct,
temperature=0.0,
)
|
Now that we have the OpenAI completion stream we can wrap it using the struct-strm
library to return structures instead of strings.
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | from struct_strm import parse_openai_stream
import asyncio
# you'll want to use a function to handle the async generator
async def your_streamed_response_function(stream, DefaultListStruct):
structured_response_stream = parse_openai_stream(stream, DefaultListStruct)
async for structure in structured_response_stream:
async for list_struct in structure:
# do whatever you want with these results
print(list_struct)
# you would probably do something with this function,
# but we'll just run it for example purposes
asyncio.run(your_streamed_response_function(stream, DefaultListStruct))
|
Fully formed python classes are returned:
| >>> DefaultListStruct(items=[DefaultListItem(item="")])
>>> DefaultListStruct(items=[DefaultListItem(item="Pytorch")])
>>> DefaultListStruct(items=[DefaultListItem(item="Pytorch is")])
>>> etc....
|
Hugging Face
For an open source approach, we can use Hugging Face (or other libraries) and XGrammar. XGrammar is an open-source library for efficient, flexible, and portable structured generation. XGrammar is an open-source library for efficient, flexible, and portable structured generation. You can read more about XGrammar here, but I'll assume you have already read the docs for this example.
There is a bit of setup we need to do, since we will modify the logits of a foundation model to enable the grammar constrained decoding that matches our provided Pydantic (or json) schema.
First we'll import everything that we'll need and setup the initial model configs.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | from threading import Thread
from typing import Generator, AsyncGenerator
import asyncio
import torch
from transformers import (
AutoTokenizer,
AutoConfig,
AutoModelForCausalLM,
)
import xgrammar as xgr
from transformers import AsyncTextIteratorStreamer
from pydantic import BaseModel
from struct_strm.llm_wrappers import parse_hf_stream
def init_model():
device = 'cpu'
# tokenizer info:
model_name = "deepseek-ai/deepseek-coder-1.3b-instruct"
model = AutoModelForCausalLM.from_pretrained(
model_name, torch_dtype=torch.float32, device_map=device
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
config = AutoConfig.from_pretrained(model_name)
return model, tokenizer, config
|
The bulk of the logic is in this next section. Again, most of this is outlined in the XGrammar docs, but basically we need to:
- Create a compiled grammar based on our target structure with our model's tokenizer + XGrammar
- Create a logits processor based on the grammar to filter the response tokens to our specified grammar
- Start a new thread to support streaming the hugging face model response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 | async def test_async_query(user_query: str, Schema: BaseModel) -> AsyncGenerator[str, None]:
model, tokenizer, config = init_model()
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_query}
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = tokenizer(
text,
return_tensors="pt",
).to(model.device)
full_vocab_size = config.vocab_size
tokenizer_info = xgr.TokenizerInfo.from_huggingface(
tokenizer,
vocab_size=full_vocab_size,
)
compiler = xgr.GrammarCompiler(tokenizer_info)
compiled_grammar = compiler.compile_json_schema(Schema)
xgr_logits_processor = xgr.contrib.hf.LogitsProcessor(compiled_grammar)
async_streamer = AsyncTextIteratorStreamer(
tokenizer,
timeout=60.0,
skip_prompt=True,
skip_special_tokens=True
)
generation_kwargs = dict(
**model_inputs,
max_new_tokens=20,
logits_processor=[xgr_logits_processor],
streamer = async_streamer,
)
thread = Thread(target = model.generate, kwargs=generation_kwargs)
thread.start()
try:
async for token in async_streamer:
yield token
finally:
thread.join()
|
Finally we can take the stream generated by the model + xgrammar and pass it to the struct-strm
hugging face wrapper.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | async def test_hf_stream(
user_query: str,
Schema: BaseModel
) -> AsyncGenerator[BaseModel, None]:
"""
Test the Hugging Face stream with a given user query and schema.
"""
stream = test_async_query(user_query, Schema)
structured_response_stream = parse_hf_stream(stream, Schema)
async for structure in structured_response_stream:
async for schema_struct in structure:
print(f"Struct: {type(schema_struct)} - {schema_struct}")
# run the example
class TestPerson(BaseModel):
name: str = ""
age: str = ""
user_query = "Introduce yourself in JSON with two fields: name and age."
asyncio.run(test_hf_stream(user_query, TestPerson))
|
Fully formed python classes are returned:
| >>> TestPerson(name="", age="")
>>> TestPerson(name="bilbo", age="")
>>> TestPerson(name="bilbo baggins", age="")
>>> TestPerson(name="bilbo baggins", age="111")
|