Skip to content

Welcome to the Instructor Blog

The goal of the blog is to capture some content that does not neatly fit within documentation or the cookbooks.

Advanced Topics

  1. What is Query Understanding, how does it go beyond embeddings?
  2. How can one achieve GPT-4 level summaries using GPT-3.5-turbo?
  3. What are the basics of Guardrails and Validation in AI models?
  4. How does one validate citations in AI-generated content?
  5. What are the methods and benefits of fine-tuning and distillation in AI models?

Learning Python

Talks

Introduction to Caching in Python

Instructor makes working with language models easy, but they are still computationally expensive.

Today, we're diving into optimizing instructor code while maintaining the excellent DX offered by Pydantic models. We'll tackle the challenges of caching Pydantic models, typically incompatible with pickle, and explore solutions that use decorators like functools.cache. Then, we'll craft custom decorators with diskcache and redis to support persistent caching and distributed systems.

Lets first consider our canonical example, using the OpenAI Python client to extract user details.

import instructor
from openai import OpenAI
from pydantic import BaseModel

# Enables `response_model`
client = instructor.patch(OpenAI())

class UserDetail(BaseModel):
    name: str
    age: int

def extract(data) -> UserDetail:
    return client.chat.completions.create(
    model="gpt-3.5-turbo",
    response_model=UserDetail,
    messages=[
        {"role": "user", "content": data},
    ]
)

Now imagine batch processing data, running tests or experiments, or simply calling extract multiple times over a workflow. We'll quickly run into performance issues, as the function may be called repeatedly, and the same data will be processed over and over again, costing us time and money.

1. functools.cache for Simple In-Memory Caching

When to Use: Ideal for functions with immutable arguments, called repeatedly with the same parameters in small to medium-sized applications. This makes sense when we might be reusing the same data within a single session or in an application where we don't need to persist the cache between sessions.

import functools

@functools.cache
def extract(data):
    return client.chat.completions.create(
        model="gpt-3.5-turbo",
        response_model=UserDetail,
        messages=[
            {"role": "user", "content": data},
        ]
    )

Changing the Model does not Invalidate the Cache

Note that changing the model does not invalidate the cache. This is because the cache key is based on the function's name and arguments, not the model. This means that if we change the model, the cache will still return the old result.

Now we can call extract multiple times with the same argument, and the result will be cached in memory for faster access.

import time

start = time.perf_counter() # (1)
model = extract("Extract jason is 25 years old")
print(f"Time taken: {time.perf_counter() - start}")

start = time.perf_counter()
model = extract("Extract jason is 25 years old") # (2)
print(f"Time taken: {time.perf_counter() - start}")

>>> Time taken: 0.9267581660533324
>>> Time taken: 1.2080417945981026e-06 # (3)
  1. Using time.perf_counter() to measure the time taken to run the function is better than using time.time() because it's more accurate and less susceptible to system clock changes.
  2. The second time we call extract, the result is returned from the cache, and the function is not called.
  3. The second call to extract is much faster because the result is returned from the cache!

Benefits: Easy to implement, provides fast access due to in-memory storage, and requires no additional libraries.

What is a decorator?

A decorator is a function that takes another function and extends the behavior of the latter function without explicitly modifying it. In Python, decorators are functions that take a function as an argument and return a closure.

def decorator(func):
    def wrapper(*args, **kwargs):
        print("Do something before") # (1)
        result = func(*args, **kwargs)
        print("Do something after") # (2)
        return result
    return wrapper

@decorator
def say_hello():
    print("Hello!")

say_hello()
>>> "Do something before"
>>> "Hello!"
>>> "Do something after"
  1. The code is executed before the function is called
  2. The code is executed after the function is called

2. diskcache for Persistent, Large Data Caching

Copy Caching Code

We'll be using the same instructor_cache decorator for both diskcache and redis caching. You can copy the code below and use it for both examples.

import functools
import inspect
import diskcache

cache = diskcache.Cache('./my_cache_directory') # (1)

def instructor_cache(func):
    """Cache a function that returns a Pydantic model"""
    return_type = inspect.signature(func).return_annotation
    if not issubclass(return_type, BaseModel): # (2)
        raise ValueError("The return type must be a Pydantic model")

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}"
        # Check if the result is already cached
        if (cached := cache.get(key)) is not None:
            # Deserialize from JSON based on the return type
            return return_type.model_validate_json(cached)

        # Call the function and cache its result
        result = func(*args, **kwargs)
        serialized_result = result.model_dump_json()
        cache.set(key, serialized_result)

        return result

    return wrapper
  1. We create a new diskcache.Cache instance to store the cached data. This will create a new directory called my_cache_directory in the current working directory.
  2. We only want to cache functions that return a Pydantic model to simplify serialization and deserialization logic in this example code

Remember that you can change this code to support non-Pydantic models, or to use a different caching backend. More over, don't forget that this cache does not invalidate when the model changes, so you might want to encode the Model.model_json_schema() as part of the key.

When to Use: Suitable for applications needing cache persistence between sessions or dealing with large datasets. This is useful when we want to reuse the same data across multiple sessions, or when we need to store large amounts of data!

import functools
import inspect
import instructor
import diskcache

from openai import OpenAI
from pydantic import BaseModel

client = instructor.patch(OpenAI())
cache = diskcache.Cache('./my_cache_directory')


def instructor_cache(func):
    """Cache a function that returns a Pydantic model"""
    return_type = inspect.signature(func).return_annotation # (4)
    if not issubclass(return_type, BaseModel): # (1)
        raise ValueError("The return type must be a Pydantic model")

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}" #  (2)
        # Check if the result is already cached
        if (cached := cache.get(key)) is not None:
            # Deserialize from JSON based on the return type (3)
            return return_type.model_validate_json(cached)

        # Call the function and cache its result
        result = func(*args, **kwargs)
        serialized_result = result.model_dump_json()
        cache.set(key, serialized_result)

        return result

    return wrapper

class UserDetail(BaseModel):
    name: str
    age: int

@instructor_cache
def extract(data) -> UserDetail:
    return client.chat.completions.create(
        model="gpt-3.5-turbo",
        response_model=UserDetail,
        messages=[
            {"role": "user", "content": data},
        ]
    )
  1. We only want to cache functions that return a Pydantic model to simplify serialization and deserialization logic
  2. We use functool's _make_key to generate a unique key based on the function's name and arguments. This is important because we want to cache the result of each function call separately.
  3. We use Pydantic's model_validate_json to deserialize the cached result into a Pydantic model.
  4. We use inspect.signature to get the function's return type annotation, which we use to validate the cached result.

Benefits: Reduces computation time for heavy data processing, provides disk-based caching for persistence.

2. Redis Caching Decorator for Distributed Systems

Copy Caching Code

We'll be using the same instructor_cache decorator for both diskcache and redis caching. You can copy the code below and use it for both examples.

import functools
import inspect
import redis

cache = redis.Redis("localhost")

def instructor_cache(func):
    """Cache a function that returns a Pydantic model"""
    return_type = inspect.signature(func).return_annotation
    if not issubclass(return_type, BaseModel):
        raise ValueError("The return type must be a Pydantic model")

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}"
        # Check if the result is already cached
        if (cached := cache.get(key)) is not None:
            # Deserialize from JSON based on the return type
            return return_type.model_validate_json(cached)

        # Call the function and cache its result
        result = func(*args, **kwargs)
        serialized_result = result.model_dump_json()
        cache.set(key, serialized_result)

        return result

    return wrapper

Remember that you can change this code to support non-Pydantic models, or to use a different caching backend. More over, don't forget that this cache does not invalidate when the model changes, so you might want to encode the Model.model_json_schema() as part of the key.

When to Use: Recommended for distributed systems where multiple processes need to access the cached data, or for applications requiring fast read/write access and handling complex data structures.

import redis
import functools
import inspect
import json
import instructor

from pydantic import BaseModel
from openai import OpenAI

client = instructor.patch(OpenAI())
cache = redis.Redis("localhost")

def instructor_cache(func):
    """Cache a function that returns a Pydantic model"""
    return_type = inspect.signature(func).return_annotation
    if not issubclass(return_type, BaseModel): # (1)
        raise ValueError("The return type must be a Pydantic model")

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}" # (2)
        # Check if the result is already cached
        if (cached := cache.get(key)) is not None:
            # Deserialize from JSON based on the return type
            return return_type.model_validate_json(cached)

        # Call the function and cache its result
        result = func(*args, **kwargs)
        serialized_result = result.model_dump_json()
        cache.set(key, serialized_result)

        return result

    return wrapper


class UserDetail(BaseModel):
    name: str
    age: int

@instructor_cache
def extract(data) -> UserDetail:
    # Assuming client.chat.completions.create returns a UserDetail instance
    return client.chat.completions.create(
        model="gpt-3.5-turbo",
        response_model=UserDetail,
        messages=[
            {"role": "user", "content": data},
        ]
    )
  1. We only want to cache functions that return a Pydantic model to simplify serialization and deserialization logic
  2. We use functool's _make_key to generate a unique key based on the function's name and arguments. This is important because we want to cache the result of each function call separately.

Benefits: Scalable for large-scale systems, supports fast in-memory data storage and retrieval, and is versatile for various data types.

Looking carefully

If you look carefully at the code above you'll notice that we're using the same instructor_cache decorator as before. The implementatino is the same, but we're using a different caching backend!

Conclusion

Choosing the right caching strategy depends on your application's specific needs, such as the size and type of data, the need for persistence, and the system's architecture. Whether it's optimizing a function's performance in a small application or managing large datasets in a distributed environment, Python offers robust solutions to improve efficiency and reduce computational overhead.

If you'd like to use this code, try to send it over to ChatGPT to understand it more, and to add additional features that might matter for you, for example, the cache isn't invalidated when your BaseModel changes, so you might want to encode the Model.model_json_schema() as part of the key.

If you like the content check out our GitHub as give us a star and checkout the library.

Generators and LLM Streaming

Latency is crucial, especially in eCommerce and newer chat applications like ChatGPT. Streaming is the solution that enables us to enhance the user experience without the need for faster response times.

And what makes streaming possible? Generators!

In this post, we're going to dive into the cool world of Python generators — these tools are more than just a coding syntax trick. We'll explore Python generators from the ground up and then delve into LLM streaming using the Instructor library.

Python Generators: An Efficient Approach to Iterables

Generators in Python are a game-changer for handling large data sets and stream processing. They allow functions to yield values one at a time, pausing and resuming their state, which is a faster and more memory-efficient approach compared to traditional collections that store all elements in memory.

The Basics: Yielding Values

A generator function in Python uses the yield keyword. It yields values one at a time, allowing the function to pause and resume its state.

def count_to_3():
    yield 1
    yield 2
    yield 3

for num in count_to_3():
    print(num)
1
2
3

Advantages Over Traditional Collections

  • Lazy Evaluation & reduced latency: The time to get the first element (or time-to-first-token in LLM land) from a generator is significantly lower. Generators only produce one value at a time, whereas accessing the first element of a collection will require that the whole collection be created first.
  • Memory Efficiency: Only one item is in memory at a time.
  • Maintain State: Automatically maintains state between executions.

Let's see how much faster generators are and where they really shine:

import time

def expensive_func(x):
    """Simulate an expensive operation."""
    time.sleep(1)
    return x ** 2

def calculate_time_for_first_result_with_list(func_input, func):
    """Calculate using a list comprehension and return the first result with its computation time."""
    start_perf = time.perf_counter()
    result = [func(x) for x in func_input][0]
    end_perf = time.perf_counter()
    print(f"Time for first result (list): {end_perf - start_perf:.2f} seconds")
    return result

def calculate_time_for_first_result_with_generator(func_input, func):
    """Calculate using a generator and return the first result with its computation time."""
    start_perf = time.perf_counter()
    result = next(func(x) for x in func_input)
    end_perf = time.perf_counter()
    print(f"Time for first result (generator): {end_perf - start_perf:.2f} seconds")
    return result

# Prepare inputs for the function
numbers = [1, 2, 3, 4, 5]

# Benchmarking
first_result_list = calculate_time_for_first_result_with_list(numbers, expensive_func)
first_result_gen = calculate_time_for_first_result_with_generator(numbers, expensive_func)
Time for first result (list): 5.02 seconds
Time for first result (generator): 1.01 seconds

The generator computes one expensive operation and returns the first result immediately, while the list comprehension computes the expensive operation for all elements in the list before returning the first result.

Generator Expressions: A Shortcut

Python also allows creating generators in a single line of code, known as generator expressions. They are syntactically similar to list comprehensions but use parentheses.

squares = (x*x for x in range(10))

Use Cases in Real-World Applications

Generators shine in scenarios like reading large files, data streaming (eg. llm token streaming), and pipeline creation for data processing.

LLM Streaming

If you've used ChatGPT, you'll see that the tokens are streamed out one by one, instead of the full response being shown at the end (can you imagine waiting for the full response??). This is made possible by generators.

Here's how a vanilla openai generator looks:

from openai import OpenAI

# Set your OpenAI API key
client = OpenAI(
    api_key="My API Key",
)

response_generator = client.chat.completions.create(
    model='gpt-3.5-turbo',
    messages=[
        {'role': 'user', 'content': "What are some good reasons to smile?"}
    ],
    temperature=0,
    stream=True
)

for chunk in response_generator:
    print(chunk.choices[0].delta.content, end="")

This is great, but what if we want to do some structured extraction on this stream? For instance, we might want to render frontend components based on product rankings that are streamed out by an LLM.

Should we wait for the entire stream to finish before extracting & validating the list of components or can we extract & validate the components in real time as they are streamed?

In e-commerce, every millisecond matters so the time-to-first-render can differentiate a successful and not-so-successful e commerce store (and i know how a failing e commerce store feels :/ ).

Let's see how we can use Instructor to handle extraction from this real time stream!

E-commerce Product Ranking

Scenario

Imagine an e-commerce platform where we have:

a customer profile: this includes a detailed history of purchases, browsing behavior, product ratings, preferences in various categories, search history, and even responses to previous recommendations. This extensive data is crucial for generating highly personalized and relevant product suggestions.

a list of candidate products: these could be some shortlisted products we think the customer would like.

Our goal is to re-rerank these candidate products for the best conversion and we'll use an LLM!

Stream Processing

User Data:

Let's assume we have the following user profile:

profile_data = """
Customer ID: 12345
Recent Purchases: [Laptop, Wireless Headphones, Smart Watch]
Frequently Browsed Categories: [Electronics, Books, Fitness Equipment]
Product Ratings: {Laptop: 5 stars, Wireless Headphones: 4 stars}
Recent Search History: [best budget laptops 2023, latest sci-fi books, yoga mats]
Preferred Brands: [Apple, AllBirds, Bench]
Responses to Previous Recommendations: {Philips: Not Interested, Adidas: Not Interested}
Loyalty Program Status: Gold Member
Average Monthly Spend: $500
Preferred Shopping Times: Weekend Evenings
...
"""

We want to rank the following products for this user:

products = [
    {"product_id": 1, "product_name": "Apple MacBook Air (2023) - Latest model, high performance, portable"},
    {"product_id": 2, "product_name": "Sony WH-1000XM4 Wireless Headphones - Noise-canceling, long battery life"},
    {"product_id": 3, "product_name": "Apple Watch Series 7 - Advanced fitness tracking, seamless integration with Apple ecosystem"},
    {"product_id": 4, "product_name": "Kindle Oasis - Premium e-reader with adjustable warm light"},
    {"product_id": 5, "product_name": "AllBirds Wool Runners - Comfortable, eco-friendly sneakers"},
    {"product_id": 6, "product_name": "Manduka PRO Yoga Mat - High-quality, durable, eco-friendly"},
    {"product_id": 7, "product_name": "Bench Hooded Jacket - Stylish, durable, suitable for outdoor activities"},
    {"product_id": 8, "product_name": "GoPro HERO9 Black - 5K video, waterproof, for action photography"},
    {"product_id": 9, "product_name": "Nespresso Vertuo Next Coffee Machine - Quality coffee, easy to use, compact design"},
    {"product_id": 10, "product_name": "Project Hail Mary by Andy Weir - Latest sci-fi book from a renowned author"}
]

Let's now define our models for structured extraction. Note: instructor will conveniently let us use Iterable to model an iterable of our class. In this case, once we define our product recommendation model, we can slap on Iterable to define what we ultimately want - a (ranked) list of product recommendations.

import instructor
from openai import OpenAI
from typing import Iterable
from pydantic import BaseModel

client = instructor.patch(OpenAI(), mode=instructor.function_calls.Mode.JSON)

class ProductRecommendation(BaseModel):
    product_id: str
    product_name: str

Recommendations = Iterable[ProductRecommendation]

Now let's use our instructor patch. Since we don't want to wait for all the tokens to finish, will set stream to True and process each product recommendation as it comes in:

prompt = f"Based on the following user profile:\n{profile_data}\nRank the following products from most relevant to least relevant:\n" + '\n'.join(f"{product['product_id']} {product['product_name']}" for product in products)

start_perf = time.perf_counter()
recommendations_stream = client.chat.completions.create(
    model="gpt-3.5-turbo-1106",
    temperature=0.1,
    response_model=Iterable[ProductRecommendation],
    stream=True,
    messages=[
        {"role": "system", "content": "Generate product recommendations based on the customer profile. Return in order of highest recommended first."},
        {"role": "user", "content": prompt}
    ]
)
for product in recommendations_stream:
    print(product)
    end_perf = time.perf_counter()
    print(f"Time for first result (generator): {end_perf - start_perf:.2f} seconds")
    break
product_id='1' product_name='Apple MacBook Air (2023)'
Time for first result (generator): 4.33 seconds

recommendations_stream is a generator! It yields the extracted products as it's processing the stream in real-time. Now let's get the same response without streaming and see how they compare.

start_perf = time.perf_counter()
recommendations_list = client.chat.completions.create(
    model="gpt-3.5-turbo-1106",
    temperature=0.1,
    response_model=Iterable[ProductRecommendation],
    stream=False,
    messages=[
        {"role": "system", "content": "Generate product recommendations based on the customer profile. Return in order of highest recommended first."},
        {"role": "user", "content": prompt}
    ]
)
print(recommendations_list[0])
end_perf = time.perf_counter()
print(f"Time for first result (list): {end_perf - start_perf:.2f} seconds")
product_id='1' product_name='Apple MacBook Air (2023)'
Time for first result (list): 8.63 seconds

Our web application now displays results faster. Even a 100ms improvement can lead to a 1% increase in revenue.

FastAPI

We can also take this and set up a streaming LLM API endpoint using FastAPI. Check out our docs on using FastAPI here!

Key Takeaways

To summarize, we looked at:

• Generators in Python: A powerful feature that allows for efficient data handling with reduced latency

• LLM Streaming: LLMs provide us generators to stream tokens and Instructor can let us validate and extract data from this stream. Real-time data validation ftw!

Don't forget to check our GitHub for more resources and give us a star if you find the library helpful!


If you have any questions or need further clarifications, feel free to reach out or dive into the Instructor library's documentation for more detailed information. Happy coding!

Verifying LLM Citations with Pydantic

Ensuring the accuracy of information is crucial. This blog post explores how Pydantic's powerful and flexible validators can enhance data accuracy through citation verification.

We'll start with using a simple substring check to verify citations. Then we'll use instructor itself to power an LLM to verify citations and align answers with the given citations. Finally, we'll explore how we can use these techniques to generate a dataset of accurate responses.

Example 1: Simple Substring Check

In this example, we use the Statements class to verify if a given substring quote exists within a text chunk. If the substring is not found, an error is raised.

Code Example:

from typing import List, Optional
from openai import OpenAI
from pydantic import BaseModel, Field, ValidationError, ValidationInfo, field_validator, model_validator
import instructor

client = instructor.patch(OpenAI())

class Statements(BaseModel):
    body: str
    substring_quote: str

    @field_validator("substring_quote")
    @classmethod
    def substring_quote_exists(cls, v: str, info: ValidationInfo):
        context = info.context.get("text_chunks", None)

        for text_chunk in context.values():
            if v in text_chunk: # (1)
                return v
        raise ValueError("Could not find substring_quote `{v}` in contexts")


class AnswerWithCitaton(BaseModel):
    question: str
    answer: List[Statements]
  1. While we use a simple substring check in this example, we can use more complex techniques like regex or Levenshtein distance.

Once the class is defined, we can use it to validate the context and raise an error if the substring is not found.

try:
    AnswerWithCitaton.model_validate(
        {
            "question": "What is the capital of France?",
            "answer": [
                {"body": "Paris", "substring_quote": "Paris is the capital of France"},
            ],
        },
        context={
            "text_chunks": {
                1: "Jason is a pirate",
                2: "Paris is not the capital of France",
                3: "Irrelevant data",
            }
        },
    )
except ValidationError as e:
    print(e)

Error Message Example:

answer.0.substring_quote
  Value error, Could not find substring_quote `Paris is the capital of France` in contexts [type=value_error, input_value='Paris is the capital of France', input_type=str]
    For further information visit [https://errors.pydantic.dev/2.4/v/value_error](https://errors.pydantic.dev/2.4/v/value_error)

Pydantic raises a validation error when the substring_quote attribute does not exist in the context. This approach can be used to validate more complex data using techniques like regex or Levenshtein distance.

Example 2: Using LLM for Verification

This approach leverages OpenAI's LLM to validate citations. If the citation does not exist in the context, the LLM returns an error message.

Code Example:

class Validation(BaseModel):
    is_valid: bool
    error_messages: Optional[str] = Field(None, description="Error messages if any")


class Statements(BaseModel):
    body: str
    substring_quote: str

    @model_validator(mode="after")
    def substring_quote_exists(self, info: ValidationInfo):
        context = info.context.get("text_chunks", None)

        resp: Validation = client.chat.completions.create(
            response_model=Validation,
            messages=[
                {
                    "role": "user",
                    "content": f"Does the following citation exist in the following context?\n\nCitation: {self.substring_quote}\n\nContext: {context}",
                }
            ],
            model="gpt-3.5-turbo",
        )

        if resp.is_valid:
            return self

        raise ValueError(resp.error_messages)


class AnswerWithCitaton(BaseModel):
    question: str
    answer: List[Statements]

Now when we use a correct citation, the LLM returns a valid response.

resp = AnswerWithCitaton.model_validate(
    {
        "question": "What is the capital of France?",
        "answer": [
            {"body": "Paris", "substring_quote": "Paris is the capital of France"},
        ],
    },
    context={
        "text_chunks": {
            1: "Jason is a pirate",
            2: "Paris is the capital of France",
            3: "Irrelevant data",
        }
    },
)
print(resp.model_dump_json(indent=2))

Result:

{
  "question": "What is the capital of France?",
  "answer": [
    {
      "body": "Paris",
      "substring_quote": "Paris is the capital of France"
    }
  ]
}

When we have citations that don't exist in the context, the LLM returns an error message.

try:
    AnswerWithCitaton.model_validate(
        {
            "question": "What is the capital of France?",
            "answer": [
                {"body": "Paris", "substring_quote": "Paris is the capital of France"},
            ],
        },
        context={
            "text_chunks": {
                1: "Jason is a pirate",
                2: "Paris is not the capital of France",
                3: "Irrelevant data",
            }
        },
    )
except ValidationError as e:
    print(e)

Error Message Example:

1 validation error for AnswerWithCitaton
answer.0
  Value error, Citation not found in context [type=value_error, input_value={'body': 'Paris', 'substr... the capital of France'}, input_type=dict]
    For further information visit [https://errors.pydantic.dev/2.4/v/value_error](https://errors.pydantic.dev/2.4/v/value_error)

Example 3: Aligning Citations and Answers

In this example, we ensure that the provided answers are aligned with the given citations and context. The LLM is used to verify the alignment.

We use the same Statements model as above, but we add a new model for the answer that also verifies the alignment of citations.

Code Example:

class AnswerWithCitaton(BaseModel):
    question: str
    answer: List[Statements]

    @model_validator(mode="after")
    def validate_answer(self, info: ValidationInfo):
        context = info.context.get("text_chunks", None)

        resp: Validation = client.chat.completions.create(
            response_model=Validation,
            messages=[
                {
                    "role": "user",
                    "content": f"Does the following answers match the question and the context?\n\nQuestion: {self.question}\n\nAnswer: {self.answer}\n\nContext: {context}",
                }
            ],
            model="gpt-3.5-turbo",
        )

        if resp.is_valid:
            return self

        raise ValueError(resp.error_messages)

When we have a mismatch between the answer and the citation, the LLM returns an error message.

try:
    AnswerWithCitaton.model_validate(
        {
            "question": "What is the capital of France?",
            "answer": [
                {"body": "Texas", "substring_quote": "Paris is the capital of France"},
            ],
        },
        context={
            "text_chunks": {
                1: "Jason is a pirate",
                2: "Paris is the capital of France",
                3: "Irrelevant data",
            }
        },
    )
except ValidationError as e:
    print(e)

Error Message Example:

1 validation error for AnswerWithCitaton
  Value error, The answer does not match the question and context [type=value_error, input_value={'question': 'What is the...he capital of France'}]}, input_type=dict]
    For further information visit [https://errors.pydantic.dev/2.4/v/value_error](https://errors.pydantic.dev/2.4/v/value_error)

Conclusion

These examples demonstrate the potential of using Pydantic and OpenAI to enhance data accuracy through citation verification. While the LLM-based approach may not be efficient for runtime operations, it has exciting implications for generating a dataset of accurate responses. By leveraging this method during data generation, we can fine-tune a model that excels in citation accuracy. Similar to our last post on finetuning a better summarizer.

If you like the content check out our GitHub as give us a star and checkout the library.

Introduction to Batch Processing using asyncio and Instructor

Today, I will introduce you to various approaches for using asyncio in Python. We will apply this to batch process data using instructor and learn how to use asyncio.gather and asyncio.as_completed for concurrent data processing. Additionally, we will explore how to limit the number of concurrent requests to a server using asyncio.Semaphore.

Github Example

If you want to run the code examples in this article, you can find them on jxnl/instructor

We will start by defining an async function that calls openai to extract data, and then examine four different ways to execute it. We will discuss the pros and cons of each approach and analyze the results of running them on a small batch.

Understanding asyncio

asyncio is a Python library that enables writing concurrent code using the async/await syntax. It is particularly useful for IO-bound and structured network code. If you are familiar with OpenAI's SDK, you might have encountered two classes: OpenAI() and AsyncOpenAI(). Today, we will be using the AsyncOpenAI() class, which processes data asynchronously.

By utilizing these tools in web applications or batch processing, we can significantly improve performance by handling multiple requests concurrently instead of sequentially.

Understanding async and await

We will be using the async and await keywords to define asynchronous functions. The async keyword is used to define a function that returns a coroutine object. The await keyword is used to wait for the result of a coroutine object.

If you want to understand the deeper details of asyncio, I recommend reading this article by Real Python.

Understanding gather vs as_completed

In this post we'll show two ways to run tasks concurrently: asyncio.gather and asyncio.as_completed. The gather method is used to run multiple tasks concurrently and return the results as a list. The as_completed returns a iterable is used to run multiple tasks concurrently and return the results as they complete. Another great resource on the differences between the two can be found here.

Example: Batch Processing

In this example, we will demonstrate how to use asyncio for batch processing tasks, specifically for extracting and processing data concurrently. The script will extract data from a list of texts and process it concurrently using asyncio.

import instructor
from pydantic import BaseModel
from openai import AsyncOpenAI

# Enables `response_model` in `create` method
client = instructor.apatch(AsyncOpenAI()) # (1)!

class Person(BaseModel):
    name: str
    age: int


async def extract_person(text: str) -> Person:
    return await client.chat.completions.create( # (2)!
        model="gpt-3.5-turbo",
        messages=[
            {"role": "user", "content": text},
        ],
        response_model=Person,
    )
  1. We use instructor.apatch to patch the create method of AsyncOpenAI to accept a response_model argument. This is because the create method of AsyncOpenAI does not accept a response_model argument without this patch.
  2. We use await here to wait for the response from the server before we return the result. This is because create returns a coroutine object, not the result of the coroutine.

Notice that now there are async and await keywords in the function definition. This is because we're using the asyncio library to run the function concurrently. Now lets define a batch of texts to process.

dataset = [
        "My name is John and I am 20 years old",
        "My name is Mary and I am 21 years old",
        "My name is Bob and I am 22 years old",
        "My name is Alice and I am 23 years old",
        "My name is Jane and I am 24 years old",
        "My name is Joe and I am 25 years old",
        "My name is Jill and I am 26 years old",
    ]

for loop: Running tasks sequentially.

persons = []
for text in dataset:
    person = await extract_person(text)
    persons.append(person)

Even though there is an await keyword, we still have to wait for each task to finish before starting the next one. This is because we're using a for loop to iterate over the dataset. This method, which uses a for loop, will be the slowest among the four methods discussed today.

asyncio.gather: Running tasks concurrently.

async def gather():
    tasks_get_persons = [extract_person(text) for text in dataset]
    all_persons = await asyncio.gather(*tasks_get_persons) # (1)!
  1. We use await here to wait for all the tasks to finish before assigning the result to all_persons. This is because asyncio.gather returns a coroutine object, not the result of the coroutine. Alternatively, we can use asyncio.as_completed to achieve the same result.

Using asyncio.gather allows us to return all the results at once. It is an effective way to speed up our code, but it's not the only way. Particularly, if we have a large dataset, we might not want to wait for everything to finish before starting to process the results. This is where asyncio.as_completed comes into play.

asyncio.as_completed: Handling tasks as they complete.

async def as_completed():
    all_persons = []
    tasks_get_persons = [extract_person(text) for text in dataset]
    for person in asyncio.as_completed(tasks_get_persons):
        all_persons.append(await person) # (1)!
  1. We use await here to wait for each task to complete before appending it to the list. This is because as_completed returns a coroutine object, not the result of the coroutine. Alternatively, we can use asyncio.gather to achieve the same result.

This method is a great way to handle large datasets. We can start processing the results as they come in, especially if we are streaming data back to a client.

However, these methods aim to complete as many tasks as possible as quickly as possible. This can be problematic if we want to be considerate to the server we're making requests to. This is where rate limiting comes into play. While there are libraries available to assist with rate limiting, for our initial defense, we will use a semaphore to limit the number of concurrent requests we make.

Ordering of results

Its important to note that the order of the results will not be the same as the order of the dataset. This is because the tasks are completed in the order they finish, not the order they were started. If you need to preserve the order of the results, you can use asyncio.gather instead.

Rate-Limited Gather: Using semaphores to limit concurrency.

sem = asyncio.Semaphore(2)

async def rate_limited_extract_person(text: str, sem: Semaphore) -> Person:
    async with sem: # (1)!
        return await extract_person(text)

async def rate_limited_gather(sem: Semaphore):
    tasks_get_persons = [rate_limited_extract_person(text, sem) for text in dataset]
    resp = await asyncio.gather(*tasks_get_persons)
  1. We use a semaphore to limit the number of concurrent requests to 2. This approach strikes a balance between speed and being considerate to the server we're making requests to.

Rate-Limited As Completed: Using semaphores to limit concurrency.

sem = asyncio.Semaphore(2)

async def rate_limited_extract_person(text: str, sem: Semaphore) -> Person:
    async with sem: # (1)!
        return await extract_person(text)

async def rate_limited_as_completed(sem: Semaphore):
    all_persons = []
    tasks_get_persons = [rate_limited_extract_person(text, sem) for text in dataset]
    for person in asyncio.as_completed(tasks_get_persons):
        all_persons.append(await person) # (2)!
  1. We use a semaphore to limit the number of concurrent requests to 2. This approach strikes a balance between speed and being considerate to the server we're making requests to.

  2. We use await here to wait for each task to complete before appending it to the list. This is because as_completed returns a coroutine object, not the result of the coroutine. Alternatively, we can use asyncio.gather to achieve the same result.

Now that we have seen the code, let's examine the results of processing 7 texts. As the prompts become longer or if we use GPT-4, the differences between these methods will become more pronounced.

Other Options

Its important to also note that here we are using a semaphore to limit the number of concurrent requests. However, there are other ways to limit concurrency esp since we have rate limit information from the openai request. You can imagine using a library like ratelimit to limit the number of requests per second. OR catching rate limit exceptions and using tenacity to retry the request after a certain amount of time.

Results

As you can see, the for loop is the slowest, while asyncio.as_completed and asyncio.gather are the fastest without any rate limiting.

Method Execution Time Rate Limited (Semaphore)
For Loop 6.17 seconds
Asyncio.gather 0.85 seconds
Asyncio.as_completed 0.95 seconds
Asyncio.gather 3.04 seconds 2
Asyncio.as_completed 3.26 seconds 2

Practical implications of batch processing

The choice of approach depends on the task's nature and the desired balance between speed and resource utilization.

Here are some guidelines to consider:

  • Use asyncio.gather for handling multiple independent tasks quickly.
  • Apply asyncio.as_completed for large datasets to process tasks as they complete.
  • Implement rate-limiting to avoid overwhelming servers or API endpoints.

If you find the content helpful or want to try out Instructor, please visit our GitHub page and give us a star!

Smarter Summaries w/ Finetuning GPT-3.5 and Chain of Density

Discover how to distil an iterative method like Chain Of Density into a single finetuned model using Instructor

In this article, we'll guide you through implementing the original Chain of Density method using Instructor, then show how to distile a GPT 3.5 model to match GPT-4's iterative summarization capabilities. Using these methods were able to decrease latency by 20x, reduce costs by 50x and maintain entity density.

By the end you'll end up with a GPT 3.5 model, (fine-tuned using Instructor's great tooling), capable of producing summaries that rival the effectiveness of Chain of Density [Adams et al. (2023)]. As always, all code is readily available in our examples/chain-of-density folder in our repo for your reference.

Datasets and Colab Notebook

We've also uploaded all our generated data to Hugging Face here for you to use if you'd like to try reproducing these experiments. We've also added a Colab Instance for you to check our generated values.

Part 1) Chain of Density

Summarizing extensive texts with AI can be challenging, often relying on inconsistent techniques. Their novel method, Chain Of Density prompting, enhances AI-based text summarization, outperforming human-generated summaries.

Initially, an AI produces a summary, then refines it through multiple iterations, adding missing article entities. Each iteration adds new article entities to the summary, keeping length consistent, leading to an entity-dense, informative summary called Chain Of Density.

First introduced in the paper - From Sparse to Dense: GPT-4 Summarization with Chain of Density Prompting. The team has found that this method is able to consistently beats similar summaries written by human annotators.

Implementation Details

Note that our implementation uses a validator to ensure that the rewritten summary has a minimum length rather than a prompt. We also perform just 3 and not 5 rounds of rewrites, resulting in a lower final entity density.

Original Prompt

We can break down the original process into smaller api calls. This allows us to introduce validation at each step to ensure that we're getting the results that we want.

Original Chain of Density Prompt
Article: {{ARTICLE}}

You will generate increasingly concise, entity-dense summaries of the
above Article.

Repeat the following 2 steps 5 times.

Step 1. Identify 1-3 informative Entities (";" delimited) from the
Article which are missing from the previously generated summary.
Step 2. Write a new, denser summary of identical length which covers
every entity and detail from the previous summary plus the Missing
Entities.

A Missing Entity is:
- Relevant: to the main story.
- Specific: descriptive yet concise (5 words or fewer).
- Novel; not in the previous summary.
- Faithful: present in the Article.
- Anywhere: located anywhere in the Article.

Guidelines:
- The first summary should be long (4-5 sentences, -80 words) yet
highly non-specific, containing little information beyond the
entities marked as missing. Use overly verbose language and fillers
(e.g., "this article discusses") to reach -80 words.
- Make every word count: re-write the previous summary to improve
flow and make space for additional entities.
- Make space with fusion, compression, and removal of uninformative
phrases like "the article discusses"
- The summaries should become highly dense and concise yet
self-contained, e.g., easily understood without the Article.
- Missing entities can appear anywhere in the new summary.
- Never drop entities from the previous summary. If space cannot be
made, add fewer new entities.

Remember, use the exact same number of words for each summary.

Answer in JSON. The JSON should be a list (length 5) of dictionaries
whose keys are "Missing_Entities" and "Denser_Summary"

RAG

Improved process with Instructor

Data Modelling

Before we begin modelling the data, let's make sure we install all of our dependencies

pip install instructor aiohttp rich
Initial Summary

Let's start by walking through some of the data models that we'll be using as the response_model for our open ai function calls

Firstly, we'll need a data model for the initial summary that we will be generating. We'll take the description of this class straight from the original prompt. It's important to note that these docstrings serve a purpose, they are directly used by the LLM when generating the outputs.

A quick note on Docstrings

Under the hood, Instructor parses the response_model that you give us into a function call for OpenAI to execute. This means that the final output will be closely linked to the Pydantic model you specify.

For instance, this simple model that we later use in fine-tuning.

class GeneratedSummary(BaseModel):
"""
This represents a highly concise summary that includes as many entities as possible from the original source article.

An Entity is a real-world object that's assigned a name - for example, a person, country a product or a book title.

Guidelines
- Make every word count
- The new summary should be highly dense and concise yet self-contained, eg., easily understood without the Article.
- Make space with fusion, compression, and removal of uninformative phrases like "the article discusses"
"""

summary: str = Field(
    ...,
    description="This represents the final summary generated that captures the meaning of the original article which is as concise as possible. ",
)

We eventually transform it into an OpenAI function call as seen below.

{
"functions": [
    {
    "name": "GeneratedSummary",
    "description": "This represents a highly concise summary that includes as many entities as possible from the original source article.\n\nAn Entity is a real-world object that's assigned a name - for example, a person, country a product or a book title.\n\nGuidelines\n- Make every word count\n- The new summary should be highly dense and concise yet self-contained, eg., easily understood without the Article.\n- Make space with fusion, compression, and removal of uninformative phrases like \"the article discusses\"",
    "parameters": {
        "properties": {
        "summary": {
            "description": "This represents the final summary generated that captures the meaning of the original article which is as concise as possible. ",
            "title": "Summary",
            "type": "string"
        }
        },
        "required": [
        "summary"
        ],
        "type": "object"
    }
    }
]
}
}

Therefore this means that the more elaborate and detailed your descriptions are, the better the outputs you will be able to get back. But we don't just stop there, since it's all Pydantic under the hood, you can validate and parse the resulting output to make sure it is exactly what you specify. It's all python all the way down.

class InitialSummary(BaseModel):
    """
    This is an initial summary which should be long ( 4-5 sentences, ~80 words)
    yet highly non-specific, containing little information beyond the entities marked as missing.
    Use overly verbose languages and fillers (Eg. This article discusses) to reach ~80 words.
    """

    summary: str = Field(
        ...,
        description="This is a summary of the article provided which is overly verbose and uses fillers. It should be roughly 80 words in length",
    )
Rewritten Summary

We'll also need one additional class to help model the rewritten schema

class RewrittenSummary(BaseModel):
    """
    This is a new, denser summary of identical length which covers every entity
    and detail from the previous summary plus the Missing Entities.

    Guidelines
    - Make every word count : Rewrite the previous summary to improve flow and make space for additional entities
    - Never drop entities from the previous summary. If space cannot be made, add fewer new entities.
    - The new summary should be highly dense and concise yet self-contained, eg., easily understood without the Article.
    - Make space with fusion, compression, and removal of uninformative phrases like "the article discusses"
    - Missing entities can appear anywhere in the new summary

    An Entity is a real-world object that's assigned a name - for example, a person, country a product or a book title.
    """

    summary: str = Field(
        ...,
        description="This is a new, denser summary of identical length which covers every entity and detail from the previous summary plus the Missing Entities. It should have the same length ( ~ 80 words ) as the previous summary and should be easily understood without the Article",
    )
    absent: List[str] = Field(
        ...,
        default_factory=list,
        description="this is a list of Entities found absent from the new summary that were present in the previous summary",
    )
    missing: List[str] = Field(
        default_factory=list,
        description="This is a list of 1-3 informative Entities from the Article that are missing from the new summary which should be included in the next generated summary.",
    )

Using Pydantic Validators with Instructor

For a more in-depth walkthrough on how to use Pydantic validators with the Instructor library, we recommend checking out our previous article on LLM validation - Good LLM Validation is just Good Validation

Ideally, we'd like for Missing to have a length between 1 and 3, Absent to be an empty list and for our rewritten summaries to keep a minimum entity density. With Instructor, we can implement this logic using native Pydantic validators that are simply declared as part of the class itself.

import nltk
import spacy

nlp = spacy.load("en_core_web_sm")

@field_validator("summary")
def min_length(cls, v: str):
    tokens = nltk.word_tokenize(v) #(1)!
    num_tokens = len(tokens)
    if num_tokens < 60:
        raise ValueError(
            "The current summary is too short. Please make sure that you generate a new summary that is around 80 words long."
        )
    return v

@field_validator("missing")
def has_missing_entities(cls, missing_entities: List[str]):
    if len(missing_entities) == 0:
        raise ValueError(
            "You must identify 1-3 informative Entities from the Article which are missing from the previously generated summary to be used in a new summary"
        )
    return missing_entities

@field_validator("absent")
def has_no_absent_entities(cls, absent_entities: List[str]):
    absent_entity_string = ",".join(absent_entities)
    if len(absent_entities) > 0:
        print(f"Detected absent entities of {absent_entity_string}")
        raise ValueError(
            f"Do not omit the following Entities {absent_entity_string} from the new summary"
        )
    return absent_entities

@field_validator("summary")
    def min_entity_density(cls, v: str):
        tokens = nltk.word_tokenize(v)
        num_tokens = len(tokens)

        # Extract Entities
        doc = nlp(v) #(2)!
        num_entities = len(doc.ents)

        density = num_entities / num_tokens
        if density < 0.08: #(3)!
            raise ValueError(
                f"The summary of {v} has too few entities. Please regenerate a new summary with more new entities added to it. Remember that new entities can be added at any point of the summary."
            )

        return v
  1. Similar to the original paper, we utilize the NLTK word tokenizer to count the number of tokens within our generated sentences. We aim for at least 60 tokens in our generated summary so that we don't lose information.

  2. We also use the spaCy library to calculate the entity density of the generated summary.

  3. We also implement a minimum entity density so that we stay within a given range. 0.08 is arbitrarily chosen in this case

Putting it all Together

Now that we have our models and the rough flow figured out, let's implement a function to summarize a piece of text using Chain Of Density summarization.

from openai import OpenAI
import instructor

client = instructor.patch(OpenAI()) #(1)!

def summarize_article(article: str, summary_steps: int = 3):
    summary_chain = []
    # We first generate an initial summary
    summary: InitialSummary = client.chat.completions.create(  # (2)!
        model="gpt-4-0613",
        response_model=InitialSummary,
        messages=[
            {
                "role": "system",
                "content": "Write a summary about the article that is long (4-5 sentences) yet highly non-specific. Use overly, verbose language and fillers(eg.,'this article discusses') to reach ~80 words",
            },
            {"role": "user", "content": f"Here is the Article: {article}"},
            {
                "role": "user",
                "content": "The generated summary should be about 80 words.",
            },
        ],
        max_retries=2,
    )
    prev_summary = None
    summary_chain.append(summary.summary)
    for i in range(summary_steps):
        missing_entity_message = (
            []
            if prev_summary is None
            else [
                {
                    "role": "user",
                    "content": f"Please include these Missing Entities: {','.join(prev_summary.missing)}",
                },
            ]
        )
        new_summary: RewrittenSummary = client.chat.completions.create( # (3)!
            model="gpt-4-0613",
            messages=[
                {
                    "role": "system",
                    "content": """
                You are going to generate an increasingly concise,entity-dense summary of the following article.

                Perform the following two tasks
                - Identify 1-3 informative entities from the following article which is missing from the previous summary
                - Write a new denser summary of identical length which covers every entity and detail from the previous summary plus the Missing Entities

                Guidelines
                - Make every word count: re-write the previous summary to improve flow and make space for additional entities
                - Make space with fusion, compression, and removal of uninformative phrases like "the article discusses".
                - The summaries should become highly dense and concise yet self-contained, e.g., easily understood without the Article.
                - Missing entities can appear anywhere in the new summary
                - Never drop entities from the previous summary. If space cannot be made, add fewer new entities.
                """,
                },
                {"role": "user", "content": f"Here is the Article: {article}"},
                {
                    "role": "user",
                    "content": f"Here is the previous summary: {summary_chain[-1]}",
                },
                *missing_entity_message,
            ],
            max_retries=3, #(4)!
            max_tokens=1000,
            response_model=RewrittenSummary,
        )
        summary_chain.append(new_summary.summary)
        prev_summary = new_summary

    return summary_chain
  1. We need to apply a patch function on the OpenAI client for us to get all of the benefits that Instructor provides. With a simple patch, we can get automatic type coercion of our outputs and automatic retries for invalid outputs out of the box!

  2. We first generate an initial summary. Note here that we explictly ask for a summary that has 80 words and is lengthy with overly verbose fillers in the system prompt

  3. We slightly modify the original system prompt used in the original paper to perform a rewrite of the summary. Using Instructor, we also get validation of the generated output with our field_validators that we defined above

  4. If you've chosen a value that is larger than 0.08, make sure to increase this value in case you need to do multiple rewrites

This summarization function yields a result which triples the number of entities while maintaining the same number of tokens. We can also see that stylistically, the summary is a lot more natural.

First Iteration

This article discusses the highly-anticipated boxing match between Manny Pacquiao and Floyd Mayweather. The article revolves around Manny Pacquiao's statements about his upcoming fight and his preparations for the same. A portion of the article provides details about the financial stipulations of the match and its significance in the sporting arena. Quotes from Pacquiao illustrating his determination and his battle strategy are highlighted. The tone of the article is largely centered around creating a build-up to the upcoming mega event.

Final Iteration

Manny Pacquiao, the Filipino boxer, anticipates the forthcoming May 2 showdown at the MGM Grand as the fight of his life, against the undefeated American Floyd Mayweather, in a $300m bout. Despite being seen as the underdog in this high-stakes Las Vegas match, Pacquiao is confident, promising a warrior's spirit and assuring the fans who have been awaiting this encounter for a decade, that it will indeed be the biggest sporting spectacle in history worthy of their anticipation

Part 2) Fine-Tuning

In this section, we'll look into how to fine-tune a GPT 3.5 model so that it is able to perform at an equivalent level as a GPT-4 model. We'll then compare the performance of our model against that of GPT-4 to see how it stacks up.

Creating a Training Set

In order to prevent any contamination of data during testing, we randomly sampled 120 articles from the griffin/chain-of-density dataset and split these articles into a train.csv and a test.csv file which we uploaded to Hugging Face. Now, we just neeed to import the Instructions module from the Instructor package which allows you to generate a nicely formatted .jsonl file to be used for fine-tuning

from typing import List
from chain_of_density import summarize_article #(1)!
import csv
import logging
import instructor
from pydantic import BaseModel
from openai import OpenAI

client = instructor.patch(OpenAI()) # (2)!

logging.basicConfig(level=logging.INFO) #(3)!

instructions = instructor.Instructions( #(4)!
    name="Chain Of Density",
    finetune_format="messages",
    # log handler is used to save the data to a file
    # you can imagine saving it to a database or other storage
    # based on your needs!
    log_handlers=[logging.FileHandler("generated.jsonl")],
    openai_client=client,
)

class GeneratedSummary(BaseModel):
    """
    This represents a highly concise summary that includes as many entities as possible from the original source article.

    An Entity is a real-world object that's assigned a name - for example, a person, country a product or a book title.

    Guidelines
    - Make every word count
    - The new summary should be highly dense and concise yet self-contained, eg., easily understood without the Article.
    - Make space with fusion, compression, and removal of uninformative phrases like "the article discusses"
    """

    summary: str = Field(
        ...,
        description="This represents the final summary generated that captures the meaning of the original article which is as concise as possible. ",
    )

@instructions.distil #(4)!
def distil_summarization(text: str) -> GeneratedSummary:
    summary_chain: List[str] = summarize_article(text)
    return GeneratedSummary(summary=summary_chain[-1]) #(5)!

with open("train.csv", "r") as file:
    reader = csv.reader(file)
    next(reader)  # Skip the header
    for article, summary in reader:
        # Run Distillisation to generate the values
        distil_summarization(article)
  1. In this example, we're using the summarize_article that we defined up above. We saved it in a local file called chain_of_density.py, hence the import

  2. We patch the default OpenAI client so that we can use the Instructor library with it

  3. We also need to configure logging at the INFO level. This is very important, if this is not configured, your output will not be generated.

  4. We instantiate a Instruction object which will help us handle the conversion of our function calls into a valid .jsonl file. We also define the name of the .jsonl file in the log_handlers parameter

  5. We add in an instructions.distil annotation so that we automatically capture the input and output of the function we'd like to fine-tune our model to output

  6. We return a Pydantic object which matches the annotation that we use on our function. Note that we must specify a Pydantic object to be returned when using the instructions.distil annotation

Rate Limiting

We recommend running this script on a small subset of the dataset first to test you've got everything configured nicely. Don't forget to add in rate limiting error handling with tenacity and set the OPENAI_API_KEY shell environment variable before running any subsequent commands

Creating Fine-Tuning Jobs

Once we run this script, we'll have a new file called generated.jsonl in our local repository. Now all that's left is to run the command below to start fine-tuning your first model!

instructor jobs create-from-file generated.jsonl
Finetuning Reference

Checking out our Finetuning CLI to learn about other hyperparameters that you can tune to improve your model's performance.

Once the job is complete, all we need to do is to then change the annotation in the function call to distil_summarization in our original file above to start using our new model.

@instructions.distil(model='gpt-3.5-turbo:finetuned-123', mode="dispatch") #(1)!
def distil_summarization(text: str) -> GeneratedSummary:
    summary_chain: List[str] = summarize_article(text)
    return GeneratedSummary(summary=summary_chain[-1])
  1. Don't forget to replace this with your new model id. OpenAI identifies fine tuned models with an id of ft:gpt-3.5-turbo-0613:personal:: under their Fine-tuning tab on their dashboard

With that, you've now got your own fine-tuned model ready to go and serve data in production. We've seen how Instructor can make your life easier, from fine-tuning to distillation.

Results and Benchmarks

We'll be comparing the following models in 3 ways using 20 articles that were not used for fine-tuning.

  • Entity Density : This is entities per token, the higher the better for density.
  • Latency : Time to last token generated in seconds
  • Costs : Total cost to generate outputs - we break down the cost into training and inference costs for easy reference
3.5 Finetuned (n)

This is a GPT 3.5 model that we fine-tuned on n examples. Each model was finetuned for 4-5 epochs ( This was automatically decided by the OpenAI scheduler )

GPT-4 (COD)

This is a GPT4 model which we applied 3 rounds of Chain Of Density rewrites to generate a summary with using the methodology above

GPT-3.5 (Vanilla)

This is a GPT 3.5 model that we asked to generate entity-dense summaries which were concise. Summaries were generated in a single pass targetting about 80-90 tokens.

Model Mean Latency (s) Mean Entity Density
3.5 Finetuned (20) 2.1 0.15
3.5 Finetuned (50) 2.1 0.14
3.5 Finetuned (76) 2.1 0.14
GPT-3.5 (Vanilla) 16.8 0.12
GPT-4 (COD) 49.5 0.15
Finetuning Datasets

For our finetuned models, we did a few optimisations to raise the performance.

We only included summaries that had a minimum density of 0.15 in the dataset, took the summary in the entire chain with the highest density as the final one, forced every regenerated summary to have a minimum density of 0.12 and regenerated summaries up to three times if they didn't meet the summaries. This is a much more expensive strategy and can cost up to 2.5x or more what we do in this tutorial

This resulted in the total cost of $63.46 to generate just 75 examples due to the stringent requirements, translating to about $0.85 per generated summary example.

Using the OpenAI Usage Dashboard, we can calculate the cost of generating 20 summaries as seen below.

Model Training Cost ($) Inference Cost ($) Tokens Used Total Cost ($)
GPT-3.5 (Vanilla) - 0.20 51,162 0.2
3.5 Finetuned (20) 0.7 0.20 56,573 0.8
3.5 Finetuned (50) 1.4 0.17 49,057 1.3
3.5 Finetuned (76) 1.8 0.17 51,583 2.5
GPT-4 (COD) - 12.9 409,062 12.9

Here, we can see that GPT-4 has an approximate inference cost of 0.65 per summary while our finetuned models have an inference cost of 0.0091 per summary which is ~ 72x cheaper.

Interestingly, the model finetuned with the least examples seems to outperform the others. While the reason for this is unknown, a few potential reasons could be that either we didn't train for sufficient epochs ( We chose the default 5 epochs ) or that the models started learning to imitate other behaviour such as more abstract writing styles from the larger variety of samples, resulting in a decrease in entity density.

Conclusions

Finetuning this iterative method was 20-40x faster while improving overall performance, resulting in massive efficiency gains by finetuning and distilling capabilities into specialized models.

We've seen how Instructor can make your life easier, from data modeling to distillation and finetuning. If you enjoy the content or want to try out instructor check out the github and don't forget to give us a star!

AI Engineer Keynote: Pydantic is all you need

Pydantic is all you need

Click here to watch the full talk

Last month, I ventured back onto the speaking circuit at the inaugural AI Engineer Summit, sharing insights on leveraging Pydantic for effective prompt engineering. I dove deep into what is covered in our documentation and standard blog posts,

I'd genuinely appreciate any feedback on the talk – every bit helps in refining the art. So, take a moment to check out the full talk here, and let's continue pushing the boundaries of what's possible.

Good LLM Validation is Just Good Validation

What if your validation logic could learn and adapt like a human, but operate at the speed of software? This is the future of validation and it's already here.

Validation is the backbone of reliable software. But traditional methods are static, rule-based, and can't adapt to new challenges. This post looks at how to bring dynamic, machine learning-driven validation into your software stack using Python libraries like Pydantic and Instructor. We validate these outputs using a validation function which conforms to the structure seen below.

def validation_function(value):
    if condition(value):
        raise ValueError("Value is not valid")
    return mutation(value)

What is Instructor?

Instructor helps to ensure you get the exact response type you're looking for when using openai's function call api. Once you've defined the Pydantic model for your desired response, Instructor handles all the complicated logic in-between - from the parsing/validation of the response to the automatic retries for invalid responses. This means that we can build in validators 'for free' and have a clear separation of concerns between the prompt and the code that calls openai.

from openai import OpenAI
import instructor # pip install instructor
from pydantic import BaseModel

# This enables response_model keyword
# from client.chat.completions.create
client = instructor.patch(OpenAI()) # (1)!

class UserDetail(BaseModel):
    name: str
    age: int


user: UserDetail = client.chat.completions.create(
    model="gpt-3.5-turbo",
    response_model=UserDetail,
    messages=[
        {"role": "user", "content": "Extract Jason is 25 years old"},
    ]
    max_retries=3 # (2)!
)

assert user.name == "Jason" # (3)!
assert user.age == 25
  1. To simplify your work with OpenAI models and streamline the extraction of Pydantic objects from prompts, we offer a patching mechanism for the ChatCompletion class.

  2. Invalid responses that fail to be validated succesfully will trigger up to as many reattempts as you define.

  3. As long as you pass in a response_model parameter to the ChatCompletion api call, the returned object will always be a validated Pydantic object.

In this post, we'll explore how to evolve from static, rule-based validation methods to dynamic, machine learning-driven ones. You'll learn to use Pydantic and Instructor to leverage language models and dive into advanced topics like content moderation, validating chain of thought reasoning, and contextual validation.

Let's examine how these approaches with a example. Imagine that you run a software company who wants to ensure you never serve hateful and racist content. This isn't an easy job since the language around these topics change very quickly and frequently.

Software 1.0: Introduction to Validations in Pydantic

A simple method could be to compile a list of different words that are often associated with hate speech. For simplicity, let's assume that we've found that the words Steal and Rob are good predictors of hateful speech from our database. We can modify our validation structure above to accomodate this.

This will throw an error if we pass in a string like Let's rob the bank! or We should steal from the supermarkets.

Pydantic offers two approaches for this validation: using the field_validator decorator or the Annotated hints.

Using field_validator decorator

We can use the field_validator decorator to define a validator for a field in Pydantic. Here's a quick example of how we might be able to do so.

from pydantic import BaseModel, ValidationError, field_validator
from pydantic.fields import Field

class UserMessage(BaseModel):
    message: str

    @field_validator('message')
    def message_cannot_have_blacklisted_words(cls, v: str) -> str:
        for word in v.split(): # (1)!
            if word.lower() in {'rob','steal'}:
                raise ValueError(f"`{word}` was found in the message `{v}`")
        return v

try:
    UserMessage(message="This is a lovely day")
    UserMessage(message="We should go and rob a bank")
except ValidationError as e:
    print(e)
  1. We split the sentence into its individual words and iterate through each of the words. We then try to see if any of these words are in our blacklist which in this case is just rob and steal

Since the message This is a lovely day does not have any blacklisted words, no errors are thrown. However, in the given example above, the validation fails for the message We should go and rob a bank due to the presence of the word rob and the corresponding error message is displayed.

1 validation error for UserMessage
message
  Value error, `rob` was found in the message `We should go and rob a bank` [type=value_error, input_value='We should go and rob a bank', input_type=str]
    For further information visit https://errors.pydantic.dev/2.4/v/value_error

Using Annotated

Alternatively, you can use the Annotated function to perform the same validation. Here's an example where we utilise the same function we started with.

from pydantic import BaseModel, ValidationError
from typing import Annotated
from pydantic.functional_validators import AfterValidator


def message_cannot_have_blacklisted_words(value:str):
    for word in value.split():
        if word.lower() in {'rob','steal'}:
            raise ValueError(f"`{word}` was found in the message `{value}`")
    return value

class UserMessage(BaseModel):
    message: Annotated[str, AfterValidator(message_cannot_have_blacklisted_words)]

try:
    UserMessage(message="This is a lovely day")
    UserMessage(message="We should go and rob a bank")
except ValidationError as e:
    print(e)

This code snippet achieves the same validation result. If the user message contains any of the words in the blacklist, a ValueError is raised and the corresponding error message is displayed.

1 validation error for UserMessage
message
  Value error, `rob` was found in the message `We should go and rob a bank` [type=value_error, input_value='We should go and rob a bank', input_type=str]
    For further information visit https://errors.pydantic.dev/2.4/v/value_error

Validation is a fundamental concept in software development and remains the same when applied to AI systems. Existing programming concepts should be leveraged when possible instead of introducing new terms and standards. The underlying principles of validation remain unchanged.

Suppose now that we've gotten a new message - Violence is always acceptable, as long as we silence the witness. Our original validator wouldn't throw any errors when passed this new message since it uses neither the words rob or steal. However, it's clear that it is not a message which should be published. How can we ensure that our validation logic can adapt to new challenges?

Software 3.0: Validation for LLMs or powered by LLMs

Building upon the understanding of simple field validators, let's delve into probabilistic validation in software 3.0, (prompt engineering). We'll introduce an LLM-powered validator called llm_validator that uses a statement to verify the value.

We can get around this by using the inbuilt llm_validator class from Instructor.

from instructor import llm_validator
from pydantic import BaseModel, ValidationError
from typing import Annotated
from pydantic.functional_validators import AfterValidator

class UserMessage(BaseModel):
    message: Annotated[str, AfterValidator(llm_validator("don't say objectionable things"))]

try:
    UserMessage(message="Violence is always acceptable, as long as we silence the witness")
except ValidationError as e:
    print(e)

This produces the following error message as seen below

1 validation error for UserMessage
message
  Assertion failed, The statement promotes violence, which is objectionable. [type=assertion_error, input_value='Violence is always accep... we silence the witness', input_type=str]
    For further information visit https://errors.pydantic.dev/2.4/v/assertion_error

The error message is generated by the language model (LLM) rather than the code itself, making it helpful for re-asking the model in a later section. To better understand this approach, let's see how to build an llm_validator from scratch.

Creating Your Own Field Level llm_validator

Building your own llm_validator can be a valuable exercise to get started with Instructor and create custom validators.

Before we continue, let's review the anatomy of a validator:

def validation_function(value):
    if condition(value):
        raise ValueError("Value is not valid")
    return value

As we can see, a validator is simply a function that takes in a value and returns a value. If the value is not valid, it raises a ValueError. We can represent this using the following structure:

class Validation(BaseModel):
    is_valid: bool = Field(..., description="Whether the value is valid based on the rules")
    error_message: Optional[str] = Field(..., description="The error message if the value is not valid, to be used for re-asking the model")

Using this structure, we can implement the same logic as before and utilize Instructor to generate the validation.

import instructor
from openai import OpenAI

# Enables `response_model` and `max_retries` parameters
client = instructor.patch(OpenAI())

def validator(v):
    statement = "don't say objectionable things"
    resp = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "system",
                "content": "You are a validator. Determine if the value is valid for the statement. If it is not, explain why.",
            },
            {
                "role": "user",
                "content": f"Does `{v}` follow the rules: {statement}",
            },
        ],
        # this comes from client = instructor.patch(OpenAI())
        response_model=Validation, # (1)!
    )
    if not resp.is_valid:
        raise ValueError(resp.error_message)
    return v
  1. The new parameter of response_model comes from client = instructor.patch(OpenAI()) and does not exist in the original OpenAI SDK. This allows us to pass in the Pydantic model that we want as a response.

Now we can use this validator in the same way we used the llm_validator from Instructor.

class UserMessage(BaseModel):
    message: Annotated[str, AfterValidator(validator)]

Writing more complex validations

Validating Chain of Thought

A popular way of prompting large language models nowadays is known as chain of thought. This involves getting a model to generate reasons and explanations for an answer to a prompt.

We can utilise Pydantic and Instructor to perform a validation to check of the reasoning is reasonable, given both the answer and the chain of thought. To do this we can't build a field validator since we need to access multiple fields in the model. Instead we can use a model validator.

def validate_chain_of_thought(values):
    chain_of_thought = values["chain_of_thought"]
    answer = values["answer"]
    resp = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "system",
                "content": "You are a validator. Determine if the value is valid for the statement. If it is not, explain why.",
            },
            {
                "role": "user",
                "content": f"Verify that `{answer}` follows the chain of thought: {chain_of_thought}",
            },
        ],
        # this comes from client = instructor.patch(OpenAI())
        response_model=Validation,
    )
    if not resp.is_valid:
        raise ValueError(resp.error_message)
    return values

We can then take advantage of the model_validator decorator to perform a validation on a subset of the model's data.

We're defining a model validator here which runs before Pydantic parses the input into its respective fields. That's why we have a before keyword used in the model_validator class.

from pydantic import BaseModel, model_validator

class AIResponse(BaseModel):
    chain_of_thought: str
    answer: str

    @model_validator(mode='before')
    @classmethod
    def chain_of_thought_makes_sense(cls, data: Any) -> Any:
        # here we assume data is the dict representation of the model
        # since we use 'before' mode.
        return validate_chain_of_thought(data)

Now, when you create a AIResponse instance, the chain_of_thought_makes_sense validator will be invoked. Here's an example:

try:
    resp = AIResponse(
        chain_of_thought="1 + 1 = 2", answer="The meaning of life is 42"
    )
except ValidationError as e:
    print(e)

If we create a AIResponse instance with an answer that does not follow the chain of thought, we will get an error.

1 validation error for AIResponse
    Value error, The statement 'The meaning of life is 42' does not follow the chain of thought: 1 + 1 = 2.
    [type=value_error, input_value={'chain_of_thought': '1 +... meaning of life is 42'}, input_type=dict]

Validating Citations From Original Text

Let's see a more concrete example. Let's say that we've asked our model a question about some text source and we want to validate that the generated answer is supported by the source. This would allow us to minimize hallucinations and prevent statements that are not backed by the original text. While we could verify this by looking up the original source manually, a more scalable approach is to use a validator to do this automatically.

We can pass in additional context to our validation functions using the model_validate function in Pydantic so that our models have more information to work with when performing validation. This context is a normal python dictionary and can be accessed inside the info argument in our validator functions.

from pydantic import ValidationInfo,BaseModel,field_validator

class AnswerWithCitation(BaseModel):
    answer: str
    citation: str

    @field_validator('citation')
    @classmethod
    def citation_exists(cls, v: str, info: ValidationInfo): # (1)!
        context = info.context
        if context:
            context = context.get('text_chunk')
            if v not in context:
                raise ValueError(f"Citation `{v}` not found in text chunks")
        return v
  1. This info object corresponds to the value of context that we pass into the model_validate function as seen below.

We can then take our original example and test it against our new model

try:
    AnswerWithCitation.model_validate(
        {"answer": "Jason is a cool guy", "citation": "Jason is cool"},
        context={"text_chunk": "Jason is just a guy"}, # (1)!
    )
except ValidationError as e:
    print(e)
  1. This context object is just a normal python dictionary and can take in and store any arbitrary values

This in turn generates the following error since Jason is cool does not exist in the text Jason is just a guy.

1 validation error for AnswerWithCitation
citation
Value error, Citation `Jason is cool` not found in text chunks [type=value_error, input_value='Jason is cool', input_type=str]
    For further information visit https://errors.pydantic.dev/2.4/v/value_error

Putting it all together with client = instructor.patch(OpenAI())

To pass this context from the client.chat.completions.create call, client = instructor.patch(OpenAI()) also passes the validation_context, which will be accessible from the info argument in the decorated validator functions.

from openai import OpenAI
import instructor

# Enables `response_model` and `max_retries` parameters
client = instructor.patch(OpenAI())

def answer_question(question:str, text_chunk: str) -> AnswerWithCitation:
    return client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "user",
                "content": f"Answer the question: {question} with the text chunk: {text_chunk}",
            },
        ],
        response_model=AnswerWithCitation,
        validation_context={"text_chunk": text_chunk},
    )

Error Handling and Re-Asking

Validators can ensure certain properties of the outputs by throwing errors, in an AI system we can use the errors and allow language model to self correct. The by running client = instructor.patch(OpenAI()) not only do we add response_model and validation_context it also allows you to use the max_retries parameter to specify the number of times to try and self correct.

This approach provides a layer of defense against two types of bad outputs:

  1. Pydantic Validation Errors (code or LLM-based)
  2. JSON Decoding Errors (when the model returns an incorrect response)

Define the Response Model with Validators

To keep things simple lets assume we have a model that returns a UserModel object. We can define the response model using Pydantic and add a field validator to ensure that the name is in uppercase.

from pydantic import BaseModel, field_validator

class UserModel(BaseModel):
    name: str
    age: int

    @field_validator("name")
    @classmethod
    def validate_name(cls, v):
        if v.upper() != v:
            raise ValueError("Name must be in uppercase.")
        return v

This is where the max_retries parameter comes in. It allows the model to self correct and retry the prompt using the error message rather than the prompt.

model = client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[
        {"role": "user", "content": "Extract jason is 25 years old"},
    ],
    # Powered by client = instructor.patch(OpenAI())
    response_model=UserModel,
    max_retries=2,
)

assert model.name == "JASON"

In this example, even though there is no code explicitly transforming the name to uppercase, the model is able to correct the output.

Conclusion

From the simplicity of Pydantic and Instructor to the dynamic validation capabilities of LLMs, the landscape of validation is changing but without needing to introduce new contepts. It's clear that the future of validation is not just about preventing bad data but about allowing llms to understand the data and correcting it.

If you enjoy the content or want to try out Instructor please check out the github and give us a star!

Enhancing Python Functions with Instructor: A Guide to Fine-Tuning and Distillation

Introduction

Get ready to dive deep into the world of fine-tuning task specific language models with Python functions. We'll explore how the instructor.instructions streamlines this process, making the task you want to distil more efficient and powerful while preserving its original functionality and backwards compatibility.

If you want to see the full example checkout examples/distillation

Why use Instructor?

Imagine you're developing a backend service that uses a mix old and new school ML practises, it may involve pipelines with multiple function calls, validations, and data processing. Sounds cumbersome, right? That's where Instructor comes in. It simplifies complex procedures, making them more efficient and easier to manage by adding a decorator to your function that will automatically generate a dataset for fine-tuning and help you swap out the function implementation.

Quick Start: How to Use Instructor's Distillation Feature

Before we dig into the nitty-gritty, let's look at how easy it is to use Instructor's distillation feature to use function calling finetuning to export the data to a JSONL file.

import logging
import random
from pydantic import BaseModel
from instructor import Instructions # pip install instructor

# Logging setup
logging.basicConfig(level=logging.INFO)

instructions = Instructions(
    name="three_digit_multiply",
    finetune_format="messages",
    # log handler is used to save the data to a file
    # you can imagine saving it to a database or other storage
    # based on your needs! 
    log_handlers=[logging.FileHandler("math_finetunes.jsonl")]
)

class Multiply(BaseModel):
    a: int
    b: int
    result: int

# Define a function with distillation
# The decorator will automatically generate a dataset for fine-tuning
# They must return a pydantic model to leverage function calling
@instructions.distil
def fn(a: int, b: int) -> Multiply:
    resp = a * b
    return Multiply(a=a, b=b, result=resp)

# Generate some data
for _ in range(10):
    a = random.randint(100, 999)
    b = random.randint(100, 999)
    print(fn(a, b))

The Intricacies of Fine-tuning Language Models

Fine-tuning isn't just about writing a function like def f(a, b): return a * b. It requires detailed data preparation and logging. However, Instructor provides a built-in logging feature and structured outputs to simplify this.

Why Instructor and Distillation are Game Changers

The library offers two main benefits:

  1. Efficiency: Streamlines functions, distilling requirements into model weights and a few lines of code.
  2. Integration: Eases combining classical machine learning and language models by providing a simple interface that wraps existing functions.

Role of Instructor in Simplifying Fine-Tuning

The from instructor import Instructions feature is a time saver. It auto-generates a fine-tuning dataset, making it a breeze to imitate a function's behavior.

Logging Output and Running a Finetune

Here's how the logging output would look:

{
    "messages": [
        {"role": "system", "content": 'Predict the results of this function: ...'},
        {"role": "user", "content": 'Return fn(133, b=539)'},
        {"role": "assistant", 
            "function_call": 
                {
                    "name": "Multiply", 
                    "arguments": '{"a":133,"b":539,"result":89509}'
            }
        }
    ],
    "functions": [
        {"name": "Multiply", "description": "Correctly extracted `Multiply`..."}
    ]
}

Run a finetune like this:

Don't forget to set your OpenAI Key as an environment variable

All of the instructor jobs commands assume you've set an environment variable of OPENAI_API_KEY in your shell. You can set this by running the command export OPENAI_API_KEY=<Insert API Key Here> in your shell

instructor jobs create-from-file math_finetunes.jsonl

Next Steps and Future Plans

Here's a sneak peek of what I'm planning:

from instructor import Instructions, patch

patch() #(1)!

class Multiply(BaseModel):
    a: int
    b: int
    result: int

instructions = Instructions(
    name="three_digit_multiply",
)

@instructions.distil(model='gpt-3.5-turbo:finetuned-123', mode="dispatch") # (2)!
def fn(a: int, b: int) -> Multiply:
    resp = a + b
    return Multiply(a=a, b=b, result=resp)
  1. Don't forget to run the patch() command that we provide with the Instructor package. This helps automatically serialize the content back into the `Pydantic`` model that we're looking for.

  2. Don't forget to replace this with your new model id. OpenAI identifies fine tuned models with an id of ft:gpt-3.5-turbo-0613:personal::<id> under their Fine-tuning tab on their dashboard

With this, you can swap the function implementation, making it backward compatible. You can even imagine using the different models for different tasks or validating and runnign evals by using the original function and comparing it to the distillation.

Conclusion

We've seen how Instructor can make your life easier, from fine-tuning to distillation. Now if you're thinking wow, I'd love a backend service to do this for continously, you're in luck! Please check out the survey at useinstructor.com and let us know who you are.

If you enjoy the content or want to try out instructor please check out the github and give us a star!

With the advent of large language models (LLM), retrival augmented generation (RAG) has become a hot topic. However throught the past year of helping startups integrate LLMs into their stack I've noticed that the pattern of taking user queries, embedding them, and directly searching a vector store is effectively demoware.

What is RAG?

Retrival augmented generation (RAG) is a technique that uses an LLM to generate responses, but uses a search backend to augment the generation. In the past year using text embeddings with a vector databases has been the most popular approach I've seen being socialized.

RAG

Simple RAG that embedded the user query and makes a search.

So let's kick things off by examining what I like to call the 'Dumb' RAG Model—a basic setup that's more common than you'd think.

The 'Dumb' RAG Model

When you ask a question like, "what is the capital of France?" The RAG 'dumb' model embeds the query and searches in some unopinonated search endpoint. Limited to a single method API like search(query: str) -> List[str]. This is fine for simple queries, since you'd expect words like 'paris is the capital of france' to be in the top results of say, your wikipedia embeddings.

Why is this a problem?

  • Query-Document Mismatch: This model assumes that query embedding and the content embedding are similar in the embedding space, which is not always true based on the text you're trying to search over. Only using queries that are semantically similar to the content is a huge limitation!

  • Monolithic Search Backend: Assumes a single search backend, which is not always the case. You may have multiple search backends, each with their own API, and you want to route the query to vector stores, search clients, sql databases, and more.

  • Limitation of text search: Restricts complex queries to a single string ({query: str}), sacrificing expressiveness, in using keywords, filters, and other advanced features. For example, asking what problems did we fix last week cannot be answered by a simple text search since documents that contain problem, last week are going to be present at every week.

  • Limited ability to plan: Assumes that the query is the only input to the search backend, but you may want to use other information to improve the search, like the user's location, or the time of day using the context to rewrite the query. For example, if you present the language model of more context its able to plan a suite of queries to execute to return the best results.

Now let's dive into how we can make it smarter with query understanding. This is where things get interesting.

Improving the RAG Model with Query Understanding

Shoutouts

Much of this work has been inspired by / done in collab with a few of my clients at new.computer, Metaphor Systems, and Naro, go check them out!

Ultimately what you want to deploy is a system that understands how to take the query and rewrite it to improve precision and recall.

RAG

Query Understanding system routes to multiple search backends.

Not convinced? Let's move from theory to practice with a real-world example. First up, Metaphor Systems.

Whats instructor?

Instructor uses Pydantic to simplify the interaction between the programmer and language models via the function calling API.

  • Widespread Adoption: Pydantic is a popular tool among Python developers.
  • Simplicity: Pydantic allows model definition in Python.
  • Framework Compatibility: Many Python frameworks already use Pydantic.

Case Study 1: Metaphor Systems

Take Metaphor Systems, which turns natural language queries into their custom search-optimized query. If you take a look web UI you'll notice that they have an auto-prompt option, which uses function calls to furthur optimize your query using a language model, and turns it into a fully specified metaphor systems query.

Metaphor Systems

Metaphor Systems UI

If we peek under the hood, we can see that the query is actually a complex object, with a date range, and a list of domains to search in. It's actually more complex than this but this is a good start. We can model this structured output in Pydantic using the instructor library

class DateRange(BaseModel):
    start: datetime.date
    end: datetime.date

class MetaphorQuery(BaseModel):
    rewritten_query: str
    published_daterange: DateRange
    domains_allow_list: List[str]

    async def execute():
        return await metaphor.search(...)

Note how we model a rewritten query, range of published dates, and a list of domains to search in. This powerful pattern allows the user query to be restructured for better performance without the user having to know the details of how the search backend works.

import instructor
from openai import OpenAI

# Enables response_model in the openai client
client = instructor.patch(OpenAI())

query = client.chat.completions.create(
    model="gpt-4",
    response_model=MetaphorQuery,
    messages=[
        {
            "role": "system",
            "content": "You're a query understanding system for the Metafor Systems search engine. Here are some tips: ..."
        },
        {
            "role": "user",
            "content": "What are some recent developments in AI?"
        }
    ],
)

Example Output

{
  "rewritten_query": "novel developments advancements ai artificial intelligence machine learning",
  "published_daterange": {
    "start": "2023-09-17",
    "end": "2021-06-17"
  },
  "domains_allow_list": ["arxiv.org"]
}

This isn't just about adding some date ranges. It's about nuanced, tailored searches, that are deeply integrated with the backend. Metaphor Systems has a whole suite of other filters and options that you can use to build a powerful search query. They can even use some chain of thought prompting to improve how they use some of these advanced features.

class DateRange(BaseModel):
    start: datetime.date
    end: datetime.date
    chain_of_thought: str = Field(
        None,
        description="Think step by step to plan what is the best time range to search in"
    )

Now, let's see how this approach can help model an agent like personal assistant.

Case Study 2: Personal Assistant

Another great example of this multiple dispatch pattern is a personal assistant. You might ask, "What do I have today?", from a vague query you might want events, emails, reminders etc. That data will likely exist in multiple backends, but what you want is one unified summary of results. Here you can't assume that text of those documents are all embedded in a search backend. There might be a calendar client, email client, across personal and profession accounts.

class ClientSource(enum.Enum):
    GMAIL = "gmail"
    CALENDAR = "calendar"

class SearchClient(BaseModel):
    query: str
    keywords: List[str]
    email: str
    source: ClientSource
    start_date: datetime.date
    end_date: datetime.date

    async def execute(self) -> str:
        if self.source == ClientSource.GMAIL:
            ...
        elif self.source == ClientSource.CALENDAR:
            ...

class Retrival(BaseModel):
    queries: List[SearchClient]

    async def execute(self) -> str:
        return await asyncio.gather(*[query.execute() for query in self.queries])

Now we can call this with a simple query like "What do I have today?" and it will try to async dispatch to the correct backend. It's still important to prompt the language model well, but we'll leave that for another day.

import instructor
from openai import OpenAI

# Enables response_model in the openai client
client = instructor.patch(OpenAI())

retrival = client.chat.completions.create(
    model="gpt-4",
    response_model=Retrival,
    messages=[
        {"role": "system", "content": "You are Jason's personal assistant."},
        {"role": "user", "content": "What do I have today?"}
    ],
)

Example Output

{
    "queries": [
        {
            "query": None,
            "keywords": None,
            "email": "jason@example.com",
            "source": "gmail",
            "start_date": "2023-09-17",
            "end_date": None
        },
        {
            "query": None,
            "keywords": ["meeting", "call", "zoom"]]],
            "email": "jason@example.com",
            "source": "calendar",
            "start_date": "2023-09-17",
            "end_date": None

        }
    ]
}

Notice that we have a list of queries that route to different search backends (email and calendar). We can even dispatch them async to be as performance as possible. Not only do we dispatch to different backends (that we have no control over), but you are likely going to render them to the user differently as well. Perhaps you want to summarize the emails in text, but you want to render the calendar events as a list that they can scroll across on a mobile app.

Can I used framework X?

I get this question a lot, but it's just code. Within these dispatchs you can do whatever you want. You can use input() to ask the user for more information, make a post request, call a Langchain agent or LLamaindex query engine to get more information. The sky is the limit.

Both of these examples showcase how both search providors and consumers can use instructor to model their systems. This is a powerful pattern that allows you to build a system that can be used by anyone, and can be used to build an LLM layer, from scratch, in front of any arbitrary backend.

Conclusion

This isnt about fancy embedding tricks, it's just plain old information retrival and query understanding. The beauty of instructor is that it simplifies modeling the complex and lets you define the output of the language model, the prompts, and the payload we send to the backend in a single place.

What's Next?

Here I want to show that `instructor`` isn’t just about data extraction. It’s a powerful framework for building a data model and integrating it with your LLM. Structured output is just the beginning — the untapped goldmine is skilled use of tools and APIs.

If you enjoy the content or want to try out instructor please check out the github and give us a star!

Bridging Language Models with Python using Instructor, Pydantic, and OpenAI's Function Calls

Language models have seen significant growth. Using them effectively often requires complex frameworks. This post discusses how Instructor simplifies this process using Pydantic.

The Problem with Existing LLM Frameworks

Current frameworks for Language Learning Models (LLMs) have complex setups. Developers find it hard to control interactions with language models. Some frameworks require complex JSON Schema setups.

The OpenAI Function Calling Game-Changer

OpenAI's Function Calling feature provides a constrained interaction model. However, it has its own complexities, mostly around JSON Schema.

Why Pydantic?

Instructor uses Pydantic to simplify the interaction between the programmer and the language model.

  • Widespread Adoption: Pydantic is a popular tool among Python developers.
  • Simplicity: Pydantic allows model definition in Python.
  • Framework Compatibility: Many Python frameworks already use Pydantic.
import pydantic
import instructor
from openai import OpenAI

# Enables the response_model
client = instructor.patch(OpenAI())

class UserDetail(pydantic.BaseModel):
    name: str
    age: int

    def introduce(self):
        return f"Hello I'm {self.name} and I'm {self.age} years old"

user: UserDetail = client.chat.completions.create(
    model="gpt-3.5-turbo",
    response_model=UserDetail,
    messages=[
        {"role": "user", "content": "Extract Jason is 25 years old"},
    ]
)

Simplifying Validation Flow with Pydantic

Pydantic validators simplify features like re-asking or self-critique. This makes these tasks less complex compared to other frameworks.

from typing_extensions import Annotated
from pydantic import BaseModel, BeforeValidator
from instructor import llm_validator, patch

from openai import OpenAI

class QuestionAnswerNoEvil(BaseModel):
    question: str
    answer: Annotated[
        str,
        BeforeValidator(
            llm_validator("don't say objectionable things")
        ),
    ]

The Modular Approach

Pydantic allows for modular output schemas. This leads to more organized code.

Composition of Schemas

class UserDetails(BaseModel):
    name: str
    age: int

class UserWithAddress(UserDetails):
    address: str

Defining Relationships

class UserDetail(BaseModel):
    id: int
    age: int
    name: str
    friends: List[int]

class UserRelationships(BaseModel):
    users: List[UserDetail]

Using Enums

from enum import Enum, auto

class Role(Enum):
    PRINCIPAL = auto()
    TEACHER = auto()
    STUDENT = auto()
    OTHER = auto()

class UserDetail(BaseModel):
    age: int
    name: str
    role: Role

Flexible Schemas

from typing import List

class Property(BaseModel):
    key: str
    value: str

class UserDetail(BaseModel):
    age: int
    name: str
    properties: List[Property]

Chain of Thought

class TimeRange(BaseModel):
    chain_of_thought: str
    start_time: int
    end_time: int

class UserDetail(BaseModel):
    id: int
    age: int
    name: str
    work_time: TimeRange
    leisure_time: TimeRange

Language Models as Microservices

The architecture resembles FastAPI. Most code can be written as Python functions that use Pydantic objects. This eliminates the need for prompt chains.

FastAPI Stub

app = FastAPI()

@app.get("/user/{user_id}", response_model=UserDetails)
async def get_user(user_id: int) -> UserDetails:
    return UserDetails(...)

Using Instructor as a Function

def extract_user(str) -> UserDetails:
    return client.chat.completions(
           response_model=UserDetails,
           messages=[...]
    )

Response Modeling

class MaybeUser(BaseModel):
    result: Optional[UserDetail]
    error: bool
    message: Optional[str]

Conclusion

Instructor, with Pydantic, simplifies interaction with language models. It is usable for both experienced and new developers.

If you enjoy the content or want to try out instructor please check out the github and give us a star!