import subprocess, sys
def _pip_install(*pkgs):
   try:
       subprocess.run([sys.executable, "-m", "pip", "install", "-q", *pkgs], check=True)
   except Exception as e:
       print(f"(pip install skipped/failed for {pkgs}: {e})")
_HAVE_OPENAI = False
try:
   import openai
   _HAVE_OPENAI = True
except Exception:
   _pip_install("openai>=1.0.0")
   try:
       import openai
       _HAVE_OPENAI = True
   except Exception:
       _HAVE_OPENAI = False
try:
   import nest_asyncio
   nest_asyncio.apply()
except Exception:
   try:
       _pip_install("nest_asyncio")
       import nest_asyncio
       nest_asyncio.apply()
   except Exception:
       pass
import os
import re
import json
import time
import math
import asyncio
import inspect
import textwrap
import contextlib
import io
from dataclasses import dataclass, field
from typing import Any, Callable, Optional, Awaitable, get_type_hints
def banner(title: str) -> None:
   line = "═" * 78
   print(f"\n{line}\n  {title}\n{line}")
@dataclass
class ToolCall:
   """A normalized request from the model to run one tool."""
   id: str
   name: str
   arguments: dict
@dataclass
class Usage:
   prompt_tokens: int = 0
   completion_tokens: int = 0
   @property
   def total(self) -> int:
       return self.prompt_tokens + self.completion_tokens
@dataclass
class LLMResponse:
   """The single shape every provider must return."""
   content: Optional[str]
   tool_calls: list[ToolCall] = field(default_factory=list)
   finish_reason: str = "stop"
   usage: Usage = field(default_factory=Usage)
class Provider:
   """Base class. A provider turns (messages, tools) into an LLMResponse."""
   name = "base"
   async def complete(self, messages: list[dict], tools: list[dict]) -> LLMResponse:
       raise NotImplementedError
class OpenAICompatibleProvider(Provider):
   """
   Works with OpenAI and every OpenAI-compatible gateway (OpenRouter, DeepSeek,
   Together, vLLM, LM Studio, Ollama's /v1, ...). This mirrors how nanobot speaks
   to most providers under the hood.
   """
   name = "openai-compatible"
   def __init__(self, api_key: str, model: str, base_url: Optional[str] = None):
       from openai import AsyncOpenAI
       self.model = model
       self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
   async def complete(self, messages: list[dict], tools: list[dict]) -> LLMResponse:
       kwargs: dict[str, Any] = {"model": self.model, "messages": messages}
       if tools:
           kwargs["tools"] = tools
           kwargs["tool_choice"] = "auto"
       resp = await self.client.chat.completions.create(**kwargs)
       choice = resp.choices[0]
       msg = choice.message
       calls: list[ToolCall] = []
       for tc in (msg.tool_calls or []):
           try:
               args = json.loads(tc.function.arguments or "{}")
           except json.JSONDecodeError:
               args = {"_raw": tc.function.arguments}
           calls.append(ToolCall(id=tc.id, name=tc.function.name, arguments=args))
       usage = Usage(
           prompt_tokens=getattr(resp.usage, "prompt_tokens", 0) or 0,
           completion_tokens=getattr(resp.usage, "completion_tokens", 0) or 0,
       )
       return LLMResponse(
           content=msg.content,
           tool_calls=calls,
           finish_reason=choice.finish_reason or "stop",
           usage=usage,
       )
class MockProvider(Provider):
   """
   A deterministic, rule-based "LLM" so this entire tutorial runs with NO API key
   and NO network — letting you watch the agent loop, tool calls, and memory work.
   It imitates the ONE thing that matters for the loop: deciding to emit a tool call
   (in the exact normalized shape a real model would) and then, once tool results
   come back, producing a final natural-language answer. The agent loop cannot tell
   it apart from OpenAI — that's the whole point of the provider contract.
   """
   name = "mock"
   def __init__(self, model: str = "mock-1"):
       self.model = model
   @staticmethod
   def _last_user_text(messages: list[dict]) -> str:
       for m in reversed(messages):
           if m.get("role") == "user":
               c = m.get("content")
               return c if isinstance(c, str) else json.dumps(c)
       return ""
   @staticmethod
   def _already_called(messages: list[dict], tool_name: str) -> bool:
       for m in messages:
           if m.get("role") == "assistant" and m.get("tool_calls"):
               for tc in m["tool_calls"]:
                   if tc["function"]["name"] == tool_name:
                       return True
       return False
   @staticmethod
   def _extract_math(text: str) -> str:
       """Pull the first math-looking chunk out of a sentence (mock-only helper)."""
       t = re.sub(r"square roots? of (\d+(?:\.\d+)?)", r"sqrt(\1)", text)
       t = t.replace("^", "**")
       pattern = (r"(?:sqrt\(\d+(?:\.\d+)?\)|\d+(?:\.\d+)?)"
                  r"(?:\s*(?:\*\*|[\+\-\*\/])\s*(?:sqrt\(\d+(?:\.\d+)?\)|\d+(?:\.\d+)?))*")
       m = re.search(pattern, t)
       return m.group(0).strip() if m else t.strip()
   @staticmethod
   def _scan_memory(messages: list[dict]) -> tuple[Optional[str], Optional[str]]:
       """Read back simple facts from prior USER turns — proves session memory is
       actually being fed to the model (mock-only convenience)."""
       name = love = None
       for m in messages:
           if m.get("role") == "user" and isinstance(m.get("content"), str):
               tx = m["content"].lower()
               nm = re.search(r"my name is (\w+)", tx)
               if nm:
                   name = nm.group(1).title()
               lv = re.search(r"i (?:love|like) (\w+)", tx)
               if lv:
                   love = lv.group(1).title()
       return name, love
   async def complete(self, messages: list[dict], tools: list[dict]) -> LLMResponse:
       await asyncio.sleep(0)
       user = self._last_user_text(messages).lower()
       tool_names = {t["function"]["name"] for t in tools}
       usage = Usage(prompt_tokens=sum(len(str(m)) for m in messages) // 4, completion_tokens=12)
       def call(name, args):
           return LLMResponse(
               content=None,
               tool_calls=[ToolCall(id=f"call_{name}_{int(time.time()*1000)%100000}",
                                    name=name, arguments=args)],
               finish_reason="tool_calls",
               usage=usage,
           )
       has_digit = bool(re.search(r"\d", user))
       wants_math = has_digit and (
           bool(re.search(r"[\+\-\*\/\^]", user)) or "sqrt" in user
           or "square root" in user
           or any(w in user for w in ["calculate", "compute", "evaluate", "what is", "what's"]))
       if "calculator" in tool_names and wants_math and not self._already_called(messages, "calculator"):
           return call("calculator", {"expression": self._extract_math(user)})
       if "get_current_time" in tool_names and not self._already_called(messages, "get_current_time"):
           if any(w in user for w in ["time", "date", "today", "now", "o'clock"]):
               tz = "UTC"
               m = re.search(r"in ([a-zA-Z_\/ ]+)", user)
               if m:
                   cand = m.group(1).strip().title().replace(" ", "_")
                   tz = {"Tokyo": "Asia/Tokyo", "Delhi": "Asia/Kolkata",
                         "New_York": "America/New_York", "London": "Europe/London"}.get(cand, cand)
               return call("get_current_time", {"timezone": tz})
       if "remember_fact" in tool_names and not self._already_called(messages, "remember_fact"):
           m = re.search(r"my favorite (?:programming )?language is (\w+)", user)
           if m:
               return call("remember_fact", {"key": "favorite_language", "value": m.group(1)})
       if "recall_fact" in tool_names and not self._already_called(messages, "recall_fact"):
           if any(w in user for w in ["my favorite", "do you remember", "recall", "what did i tell"]):
               key = "favorite_language" if "language" in user else "note"
               return call("recall_fact", {"key": key})
       if "run_python" in tool_names and not self._already_called(messages, "run_python"):
           py_kw = any(w in user for w in ["fibonacci", "prime", "factorial", "simulate"])
           py_action = "python" in user and any(
               w in user for w in ["run", "write", "code", "print", "execute", "snippet"])
           if py_kw or py_action:
               if "fibonacci" in user:
                   code = ("def fib(n):\n a,b=0,1\n out=[]\n"
                           " for _ in range(n):\n  out.append(a); a,b=b,a+b\n return out\n"
                           "print(fib(12))")
               elif "prime" in user:
                   code = ("primes=[n for n in range(2,50) "
                           "if all(n%d for d in range(2,int(n**0.5)+1))]\nprint(primes)")
               elif "factorial" in user:
                   code = "import math; print(math.factorial(10))"
               else:
                   code = "print(sum(range(1,101)))"
               return call("run_python", {"code": code})
       if "web_search" in tool_names and not self._already_called(messages, "web_search"):
           if any(w in user for w in ["search", "look up", "latest", "news about", "find information"]):
               return call("web_search", {"query": self._last_user_text(messages)})
       if any(p in user for p in ["my name", "who am i", "what do i love", "what i love"]):
           name, love = self._scan_memory(messages)
           bits = []
           if name:
               bits.append(f"your name is {name}")
           if love:
               bits.append(f"you love {love}")
           if bits:
               return LLMResponse(content="From our conversation, " + " and ".join(bits) + ".",
                                  tool_calls=[], finish_reason="stop", usage=usage)
       tool_outputs = [m["content"] for m in messages if m.get("role") == "tool"]
       if tool_outputs:
           joined = " ".join(tool_outputs)
           answer = f"Based on the tool results, here's what I found: {joined}"
       elif any(w in user for w in ["hello", "hi", "hey"]):
           answer = "Hello! I'm a mock nanobot agent. Ask me to calculate, tell time, run Python, or remember things."
       else:
           answer = ("[mock LLM] I would normally reason about this with a real model. "
                     "Set NANOBOT_API_KEY to use a live LLM. For now, try prompts with math, "
                     "time, Python, or memory so you can see the tool loop fire.")
       return LLMResponse(content=answer, tool_calls=[], finish_reason="stop", usage=usage)

اكتشاف المزيد من كحيل | أخبار التقنية

اشترك للحصول على أحدث التدوينات المرسلة إلى بريدك الإلكتروني.

شاركها.
اترك تعليقاً

اكتشاف المزيد من كحيل | أخبار التقنية

اشترك الآن للاستمرار في القراءة والحصول على حق الوصول إلى الأرشيف الكامل.

Continue reading