> ## Documentation Index
> Fetch the complete documentation index at: https://honcho.dev/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Gmail

> Load Gmail threads into Honcho to give your AI agents memory of email conversations.

In this tutorial, we'll walk through how to ingest your Gmail emails into Honcho. By the end, each email thread will be a Honcho session and each participant will be a peer — giving your agents memory of who said what across your email history.

This guide includes a ready-to-run Python script that handles everything: Gmail OAuth, thread fetching, participant extraction, and Honcho ingestion. You can run it as-is or use the full tutorial below to understand each piece as you go.

<Note>
  The full script is available on [GitHub](https://github.com/plastic-labs/honcho/tree/main/examples/gmail). This is a developer-focused tutorial — it requires creating a Google Cloud project and OAuth credentials.
</Note>

## TL;DR

If you just want to get your emails into Honcho, here's everything you need.

### 1. Set Up Google Cloud Credentials

Follow Google's official [Gmail API Python Quickstart](https://developers.google.com/gmail/api/quickstart/python) to:

1. Create a Google Cloud project and enable the Gmail API
2. Configure the OAuth consent screen
3. Create OAuth credentials (select **Desktop app** as the application type)
4. Download the credentials JSON into the same directory as the script

The script auto-detects Google's default `client_secret_*.json` filename, so no renaming needed. The script only needs the `gmail.readonly` scope.

### 2. Install Dependencies

<CodeGroup>
  ```bash uv theme={null}
  uv pip install google-api-python-client google-auth-oauthlib honcho-ai
  ```

  ```bash pip theme={null}
  pip install google-api-python-client google-auth-oauthlib honcho-ai
  ```
</CodeGroup>

### 3. Preview with a Dry Run

<CodeGroup>
  ```bash uv theme={null}
  uv run honcho_gmail.py --dry-run --max-threads 5
  ```

  ```bash python theme={null}
  python honcho_gmail.py --dry-run --max-threads 5
  ```
</CodeGroup>

On first run, a browser window opens for OAuth consent. After authorizing, a `token.json` file is created — future runs skip this step.

### 4. Load into Honcho

<CodeGroup>
  ```bash uv theme={null}
  export HONCHO_API_KEY=your_api_key
  uv run honcho_gmail.py --workspace gmail-inbox --max-threads 20
  ```

  ```bash python theme={null}
  export HONCHO_API_KEY=your_api_key
  python honcho_gmail.py --workspace gmail-inbox --max-threads 20
  ```
</CodeGroup>

You can filter threads with Gmail search syntax:

<CodeGroup>
  ```bash uv theme={null}
  uv run honcho_gmail.py --query "from:alice@example.com"
  uv run honcho_gmail.py --label INBOX
  uv run honcho_gmail.py --query "after:2024/01/01 has:attachment" --max-threads 50
  ```

  ```bash python theme={null}
  python honcho_gmail.py --query "from:alice@example.com"
  python honcho_gmail.py --label INBOX
  python honcho_gmail.py --query "after:2024/01/01 has:attachment" --max-threads 50
  ```
</CodeGroup>

That's it — your emails are now queryable in Honcho. Read on if you want to understand how the script works and the design decisions behind it.

***

## Full Tutorial

### How Gmail Maps to Honcho

The core idea is straightforward: each Gmail thread becomes a Honcho session, and each email participant becomes a peer. Here's the full mapping:

| Gmail Concept      | Honcho Concept                | Details                                           |
| ------------------ | ----------------------------- | ------------------------------------------------- |
| Your Gmail account | Workspace (`gmail`)           | One workspace for all email data                  |
| Email participant  | Peer                          | Email address as ID for deduplication             |
| Email thread       | Session (`gmail-thread-{id}`) | One session per thread, all participants attached |
| Individual email   | Message                       | Attributed to the sender with original timestamp  |

### Email as Peer ID

The script normalizes email addresses into URL-safe peer IDs — `alice@example.com` becomes `alice-example-com`. This means the same person is automatically deduplicated across threads. If Alice emails you in 10 different threads, all of those conversations accumulate under a single peer.

```python theme={null}
def peer_id_from_email(email: str) -> str:
    """Convert email to a valid Honcho peer ID."""
    return email.replace("@", "-").replace(".", "-")
```

This also means peers are consistent across data sources. If you import Granola meetings and Gmail threads for the same person, they merge under the same peer ID.

### Extracting Participants

Every email has a sender, recipients, and optionally CC/BCC addresses. The script extracts all of these to build a complete picture of who's involved in each thread:

```python theme={null}
for m in msgs:
    register_peer(m["from"])
    for addr in parse_address_list(m["to"]):
        register_peer(addr)
    for addr in parse_address_list(m["cc"]):
        register_peer(addr)
    for addr in parse_address_list(m["bcc"]):
        register_peer(addr)
```

Display names are extracted when available (e.g., `Alice Smith <alice@example.com>` → name: "Alice Smith"). When only an email is present, the script generates a name from the local part.

### Message Attribution and Timestamps

Each email becomes a message attributed to its sender via `peer.message()`. The original email timestamp is preserved using `created_at`, so Honcho sees the conversation in chronological order — not the order you imported it.

```python theme={null}
honcho_msgs.append(peer.message(
    content,
    metadata={
        "gmail_id": m["id"],
        "subject": m["subject"],
        "from": m["from"],
        "to": m["to"],
        "labels": m["labels"],
    },
    created_at=m["timestamp"],
))
```

### Multi-Peer Sessions

Each thread's session is linked to all participants using `session.add_peers()`. This means when you query Honcho about a peer, it has context not just from their messages but from the full conversations they participated in.

```python theme={null}
session = honcho.session(session_id, metadata={
    "gmail_thread_id": tid,
    "subject": subject,
    "source": "gmail",
    "message_count": len(msgs),
})
session.add_peers(thread_peers)
```

### Stripping Quoted Replies

Email threads are full of quoted replies — each message repeats everything above it. The script strips these out so only the new content is stored per message, avoiding duplication in Honcho's memory:

```python theme={null}
def strip_quoted_replies(text: str) -> str:
    """Strip quoted reply text, keeping only the new content."""
    lines = text.split("\n")
    clean_lines = []
    for line in lines:
        stripped = line.strip()
        if re.match(r"^On .+wrote:\s*$", stripped):
            break
        if stripped.startswith(">"):
            break
        # ... other reply markers
        clean_lines.append(line)
    return "\n".join(clean_lines).rstrip()
```

### Querying After Import

Once your emails are in Honcho, you can query any peer:

```python theme={null}
import os
from honcho import Honcho

honcho = Honcho(workspace_id="gmail-inbox", api_key=os.environ["HONCHO_API_KEY"])

alice = honcho.peer("alice-example-com")
print(alice.chat("What has Alice been discussing with me?"))
print(alice.chat("What action items has Alice mentioned?"))
```

***

## CLI Reference

```
usage: honcho_gmail.py [-h] [--workspace WORKSPACE] [--query QUERY]
                          [--label LABEL] [--max-threads N] [--dry-run]
                          [--credentials PATH] [--token PATH]

options:
  --workspace, -w    Honcho workspace ID (default: gmail)
  --query, -q        Gmail search query (e.g., 'from:alice@example.com')
  --label, -l        Gmail label to filter by (e.g., INBOX)
  --max-threads, -n  Max threads to fetch (default: 10)
  --dry-run          Preview without writing to Honcho
  --credentials, -c  Path to OAuth credentials JSON (auto-detects client_secret*.json)
  --token, -t        Path to store access token (default: token.json)
```

## Troubleshooting

### "No client\_secret\*.json file found"

Download OAuth credentials from Google Cloud Console and place the `client_secret_*.json` file in the same directory as the script.

### "Access blocked: This app's request is invalid"

Your OAuth consent screen may not be configured correctly. Ensure you've added the `gmail.readonly` scope.

### "Token has been expired or revoked"

Delete `token.json` and run the script again to re-authenticate.

### Rate Limits

The script includes a small delay when creating peers to avoid hitting Honcho's rate limits. For large imports (100+ threads), consider running in batches.

### Unique Messages

Use an AI assistant in your inbox? Want to parse out its messages differently? Feel free to modify and improve the structure of this script to fit your bespoke email setup. This script was written for agents and as such is easy to update with your coding assistant.

## Full Script

<Accordion title="honcho_gmail.py">
  ```python theme={null}
  #!/usr/bin/env python3
  """Load Gmail messages into Honcho.

  Uses the Gmail API directly (with OAuth) to fetch emails and the Honcho Python SDK to store them.
  Each Gmail thread becomes a Honcho session, each sender becomes a peer.

  Prerequisites:
  1. Create a Google Cloud project and enable the Gmail API
  2. Create OAuth 2.0 credentials (Desktop app type)
  3. Download the credentials JSON (client_secret_*.json) into this directory
  4. Install dependencies:
     pip install google-api-python-client google-auth-oauthlib honcho-ai

  On first run, a browser window will open for OAuth consent. After authorizing,
  a 'token.json' file will be created to store your credentials for future runs.
  """

  import argparse
  import base64
  import glob
  import os
  import re
  import time
  from datetime import datetime, timezone
  from email.header import decode_header, make_header
  from email.utils import getaddresses, parseaddr

  from google.auth.transport.requests import Request
  from google.oauth2.credentials import Credentials
  from google_auth_oauthlib.flow import InstalledAppFlow
  from googleapiclient.discovery import build
  from googleapiclient.errors import HttpError

  SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
  PEER_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]+$")


  def find_credentials() -> str:
      """Find a Google OAuth credentials file in the current directory."""
      matches = glob.glob("client_secret*.json")
      if matches:
          return matches[0]
      raise FileNotFoundError(
          "No client_secret*.json file found.\n"
          "Download OAuth credentials from Google Cloud Console:\n"
          "1. Go to console.cloud.google.com\n"
          "2. Create/select a project and enable Gmail API\n"
          "3. Create OAuth 2.0 credentials (Desktop app)\n"
          "4. Download the JSON into this directory"
      )


  def get_gmail_service(credentials_file: str | None = None, token_file: str = "token.json"):
      """Authenticate and return a Gmail API service instance."""
      creds = None

      if os.path.exists(token_file):
          creds = Credentials.from_authorized_user_file(token_file, SCOPES)

      if not creds or not creds.valid:
          if creds and creds.expired and creds.refresh_token:
              print("Refreshing expired credentials...")
              creds.refresh(Request())
          else:
              if credentials_file is None:
                  credentials_file = find_credentials()
              print(f"Using credentials: {credentials_file}")
              print("Opening browser for OAuth consent...")
              flow = InstalledAppFlow.from_client_secrets_file(credentials_file, SCOPES)
              creds = flow.run_local_server(port=0)

          with open(token_file, "w") as token:
              token.write(creds.to_json())
          print(f"Credentials saved to {token_file}")

      return build("gmail", "v1", credentials=creds)


  def list_threads(service, query: str = None, label_ids: list = None, max_results: int = 10) -> list[dict]:
      """List Gmail threads with pagination support."""
      all_threads = []
      page_token = None

      while len(all_threads) < max_results:
          try:
              params = {
                  "userId": "me",
                  "maxResults": min(100, max_results - len(all_threads)),
              }
              if query:
                  params["q"] = query
              if label_ids:
                  params["labelIds"] = label_ids
              if page_token:
                  params["pageToken"] = page_token

              response = service.users().threads().list(**params).execute()
              threads = response.get("threads", [])
              all_threads.extend(threads)

              page_token = response.get("nextPageToken")
              if not page_token:
                  break

          except HttpError as e:
              print(f"Error listing threads: {e}")
              break

      return all_threads[:max_results]


  def get_thread(service, thread_id: str) -> dict:
      """Fetch a complete Gmail thread with all messages."""
      try:
          return service.users().threads().get(
              userId="me",
              id=thread_id,
              format="full"
          ).execute()
      except HttpError as e:
          print(f"Error fetching thread {thread_id}: {e}")
          return {}


  def _decode_header_str(header: str) -> str:
      """Decode an RFC 2047 encoded header string to plain Unicode."""
      return str(make_header(decode_header(header)))


  def extract_email(from_header: str) -> str:
      """Extract bare email from an RFC 5322 header value."""
      _, addr = parseaddr(_decode_header_str(from_header))
      return addr.lower().strip()


  def extract_name(from_header: str) -> str:
      """Extract display name from an RFC 5322 header value."""
      name, _ = parseaddr(_decode_header_str(from_header))
      return name.strip() or from_header.strip()


  def decode_body(payload: dict) -> str:
      """Recursively extract plain text from a Gmail message payload."""
      if payload.get("mimeType") == "text/plain":
          data = payload.get("body", {}).get("data", "")
          if data:
              return base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")

      parts = payload.get("parts", [])
      for part in parts:
          text = decode_body(part)
          if text:
              return text
      return ""


  def strip_quoted_replies(text: str) -> str:
      """Strip quoted reply text from an email body, keeping only the new content."""
      lines = text.split("\n")
      clean_lines = []
      for line in lines:
          stripped = line.strip()
          if re.match(r"^On .+wrote:\s*$", stripped):
              break
          if stripped.startswith("---------- Forwarded message"):
              break
          if stripped.startswith(">"):
              break
          if re.match(r"^[-_]{10,}$", stripped):
              break
          clean_lines.append(line)
      return "\n".join(clean_lines).rstrip()


  def parse_address_list(header: str) -> list[str]:
      """Parse a comma-separated email header into individual addresses."""
      if not header.strip():
          return []
      decoded = _decode_header_str(header)
      return [
          f"{name} <{addr}>" if name else addr
          for name, addr in getaddresses([decoded])
          if addr
      ]


  def peer_id_from_email(email: str) -> str:
      """Convert email to a valid Honcho peer ID."""
      peer_id = re.sub(r"[^A-Za-z0-9_-]+", "-", email).strip("-").lower()
      peer_id = re.sub(r"-{2,}", "-", peer_id)
      if not peer_id:
          peer_id = "unknown-peer"

      if not PEER_ID_PATTERN.fullmatch(peer_id):
          raise ValueError(f"Generated peer ID is invalid: {peer_id!r}")
      return peer_id


  def fetch_thread_messages(service, thread_id: str) -> list[dict]:
      """Fetch all messages in a Gmail thread with full content."""
      data = get_thread(service, thread_id)
      messages = []

      for msg in data.get("messages", []):
          headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
          body = strip_quoted_replies(decode_body(msg.get("payload", {})))
          ts = int(msg.get("internalDate", "0")) / 1000

          messages.append({
              "id": msg["id"],
              "thread_id": msg["threadId"],
              "from": headers.get("From", ""),
              "to": headers.get("To", ""),
              "cc": headers.get("Cc", ""),
              "bcc": headers.get("Bcc", ""),
              "subject": headers.get("Subject", ""),
              "date": headers.get("Date", ""),
              "timestamp": datetime.fromtimestamp(ts, tz=timezone.utc),
              "body": body.strip(),
              "labels": msg.get("labelIds", []),
              "snippet": msg.get("snippet", ""),
          })

      return messages


  def main():
      parser = argparse.ArgumentParser(description="Load Gmail messages into Honcho")
      parser.add_argument("--workspace", "-w", default="gmail", help="Honcho workspace ID (default: gmail)")
      parser.add_argument("--query", "-q", default=None, help="Gmail search query (e.g. 'from:alice@example.com')")
      parser.add_argument("--label", "-l", default=None, help="Gmail label to filter by (e.g. INBOX)")
      parser.add_argument("--max-threads", "-n", type=int, default=10, help="Max threads to fetch (default: 10)")
      parser.add_argument("--dry-run", action="store_true", help="Print what would be loaded without writing to Honcho")
      parser.add_argument("--credentials", "-c", default=None, help="Path to OAuth credentials JSON (auto-detects client_secret*.json)")
      parser.add_argument("--token", "-t", default="token.json", help="Path to store/load access token")
      args = parser.parse_args()

      # Authenticate
      print("Authenticating with Gmail API...")
      service = get_gmail_service(args.credentials, args.token)
      print("  Authenticated successfully!")

      label_ids = [args.label] if args.label else None

      # List threads
      print(f"\nFetching up to {args.max_threads} threads from Gmail...")
      threads = list_threads(service, query=args.query, label_ids=label_ids, max_results=args.max_threads)
      print(f"  Found {len(threads)} threads")

      if not threads:
          print("No threads found. Try adjusting --query or --label.")
          return

      # Fetch full messages for each thread
      all_thread_messages = {}
      seen_peers = {}

      def register_peer(addr: str):
          email = extract_email(addr)
          if email and email not in seen_peers:
              name = extract_name(addr)
              if name.lower().strip() == email or "@" in name:
                  name = email.split("@")[0].replace(".", " ").title()
              seen_peers[email] = {
                  "name": name,
                  "peer_id": peer_id_from_email(email),
                  "email": email,
              }

      for i, t in enumerate(threads):
          tid = t["id"]
          print(f"  Fetching thread {i+1}/{len(threads)}: {tid}")
          msgs = fetch_thread_messages(service, tid)
          all_thread_messages[tid] = msgs
          for m in msgs:
              register_peer(m["from"])
              for addr in parse_address_list(m["to"]):
                  register_peer(addr)
              for addr in parse_address_list(m["cc"]):
                  register_peer(addr)
              for addr in parse_address_list(m["bcc"]):
                  register_peer(addr)

      # Summary
      total_msgs = sum(len(v) for v in all_thread_messages.values())
      print("\nSummary:")
      print(f"  Threads: {len(all_thread_messages)}")
      print(f"  Messages: {total_msgs}")
      print(f"  Unique participants: {len(seen_peers)}")
      for email, info in seen_peers.items():
          print(f"    {info['peer_id']} ({info['name']} <{email}>)")

      if args.dry_run:
          print("\n[DRY RUN] Would create the above in Honcho. Showing first message per thread:")
          for tid, msgs in all_thread_messages.items():
              m = msgs[0]
              body_preview = m["body"][:120].replace("\n", " ") if m["body"] else m["snippet"][:120]
              print(f"  Thread {tid}: {m['subject']}")
              print(f"    {m['from']} @ {m['date']}")
              print(f"    {body_preview}...")
          return

      # Load into Honcho
      from honcho import Honcho

      print(f"\nLoading into Honcho workspace '{args.workspace}'...")
      honcho = Honcho(workspace_id=args.workspace)

      # Create peers
      peers = {}
      for i, (email, info) in enumerate(seen_peers.items()):
          if i > 0 and i % 4 == 0:
              time.sleep(1)
          peers[email] = honcho.peer(info["peer_id"], metadata={
              "email": email,
              "name": info["name"],
              "source": "gmail",
          })
          print(f"  Peer: {info['peer_id']}")

      # Create sessions and messages per thread
      for tid, msgs in all_thread_messages.items():
          subject = msgs[0]["subject"] if msgs else "No subject"
          session_id = f"gmail-thread-{tid}"

          thread_peer_emails = set()
          for m in msgs:
              thread_peer_emails.add(extract_email(m["from"]))
              for addr in parse_address_list(m["to"]):
                  thread_peer_emails.add(extract_email(addr))
              for addr in parse_address_list(m["cc"]):
                  thread_peer_emails.add(extract_email(addr))
              for addr in parse_address_list(m["bcc"]):
                  thread_peer_emails.add(extract_email(addr))
          thread_peers = [peers[e] for e in thread_peer_emails if e in peers]

          session = honcho.session(session_id, metadata={
              "gmail_thread_id": tid,
              "subject": subject,
              "source": "gmail",
              "message_count": len(msgs),
          })
          session.add_peers(thread_peers)

          honcho_msgs = []
          for m in msgs:
              email = extract_email(m["from"])
              peer = peers.get(email)
              if not peer:
                  continue
              content = m["body"] if m["body"] else m["snippet"]
              if not content:
                  continue
              honcho_msgs.append(peer.message(
                  content,
                  metadata={
                      "gmail_id": m["id"],
                      "subject": m["subject"],
                      "from": m["from"],
                      "to": m["to"],
                      "labels": m["labels"],
                  },
                  created_at=m["timestamp"],
              ))

          if honcho_msgs:
              session.add_messages(honcho_msgs)
              print(f"  Session {session_id}: {len(honcho_msgs)} messages — {subject[:60]}")

      print(f"\nDone! Loaded {total_msgs} messages into workspace '{args.workspace}'.")


  if __name__ == "__main__":
      main()

  ```
</Accordion>

## Next Steps

<CardGroup cols={2}>
  <Card title="Design Patterns" icon="cubes" href="/v3/documentation/core-concepts/design-patterns">
    See how the Granola integration maps to common Honcho patterns.
  </Card>

  <Card title="GitHub Repository" icon="github" href="https://github.com/plastic-labs/honcho/tree/main/examples/gmail">
    Source code and example script.
  </Card>
</CardGroup>
