Voltar para todos os artigos
Implementando Multi-Agent RL & World Models: Um Masterclass

Implementando Multi-Agent RL & World Models: Um Masterclass

De LLMs para Inteligência Física: Um guia abrangente para construir um sistema Multi-Agent Reinforcement Learning (MARL) com V-JEPA World Models do zero...

Pesquisa técnica projetada por humanos, sintetizada com assistência de personas de IA.
18 min de leitura

TL;DR / Sumário Executivo

De LLMs para Inteligência Física: Um guia abrangente para construir um sistema Multi-Agent Reinforcement Learning (MARL) com V-JEPA World Models do zero...

💡 A Mudança de 2026

A era do 'Chatbot' está acabando. Estamos entrando na era da Inteligência Física. Enquanto a indústria estava distraída com benchmarks de LLM, a verdadeira fronteira mudou para a Inteligência Incorporada. Este guia não é para engenheiros de prompt. É para engenheiros de sistemas prontos para construir os cérebros da infraestrutura da próxima década.

Nós vamos construir:

  1. Uma simulação física Multi-Agent personalizada (gymnasium).
  2. Um World Model estilo V-JEPA do zero em PyTorch.
  3. Um enxame de Agentes PPO Descentralizados que aprendem a coordenar.
  4. Um pipeline de avaliação "Shadow Mode" para implantação em produção.

1. A Física da Inteligência

Passamos os últimos 5 anos otimizando $P(w_t | w_{t-1}, ...)$, prevendo a próxima palavra. Mas o universo não roda em tokens; ele roda em física.

A limitação fundamental dos Variable-Rate Transformers (como o GPT-4) quando aplicados ao mundo físico é que eles são propensos a alucinações por design. Eles aproximam o raciocínio através da correspondência de padrões em texto.

World Models (popularizados por Ha & Schmidhuber, e depois evoluídos pelo JEPA de LeCun) invertem isso. Eles não preveem o próximo token; eles preveem a próxima representação de estado em um espaço latente que respeita as leis do ambiente.

A Arquitetura de uma Mente Física

Estamos abandonando o paradigma de "Um Modelo Gigante" para uma arquitetura cognitiva modular:

Por que Multi-Agent? (A Crise de Escalabilidade)

RL de agente único (como AlphaGo) é "fácil" porque o ambiente é estacionário (o tabuleiro não muda as regras).

Casos de uso como Logística de Armazém, Redes Inteligentes (Smart Grids) ou Frotas Autônomas são Sistemas Multi-Agente (MAS). Eles sofrem de Não-Estacionariedade: à medida que o Agente A aprende, ele muda seu comportamento. Para o Agente B, o ambiente acabou de mudar. Se o Agente B se adaptar, o ambiente muda para o Agente A. Esse ciclo pode levar à instabilidade de aprendizado e ao caos.

Resolveremos isso usando Compartilhamento Descentralizado de Parâmetros com Treinamento Centralizado, Execução Descentralizada (CTDE).


2. O Ambiente: Construindo WarehouseSwarm-v0

Não usaremos um ambiente de brinquedo como CartPole. Construiremos uma simulação realista de um armazém onde vários robôs devem coordenar para mover pacotes sem colidir.

2.1 A Especificação Matemática

  • Espaço de Estado ($S$): Grade contínua $100 \times 100$.
  • Observação do Agente ($O_i$): Raycasts locais tipo LiDAR (detecta obstáculos/pares) + Vetor de Objetivo.
  • Espaço de Ação ($A$): Contínuo [velocity_x, velocity_y].
  • Recompensa ($R$):
    • $+10$: Pacote entregue.
    • $-0.1$: Penalidade de passo de tempo (urgência).
    • $-10$: Colisão.
    • $+0.5$: Proximidade do objetivo (recompensa de modelagem).

2.2 Implementação no Gymnasium

Esta é uma estrutura de ambiente de nível de produção. Note a otimização usando numpy para detecção de colisão vetorizada.

python
import gymnasium as gym from gymnasium import spaces import numpy as np import pygame from typing import List, Tuple, Dict class WarehouseSwarmEnv(gym.Env): """ Um ambiente multi-agente para logística de armazém. Implementa a API Gym, mas retorna Dicts de observações para múltiplos agentes. """ metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30} def __init__(self, num_agents: int = 4, render_mode: str = None): super().__init__() self.num_agents = num_agents self.window_size = 800 self.render_mode = render_mode self.arena_scale = 20.0 # metros # Ação: [v_x, v_y] normalizado entre -1 e 1 self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(2,), dtype=np.float32) # Observação: # [self_x, self_y, self_vx, self_vy, goal_x, goal_y, # lidar_1 ... lidar_8] (8 raycasts) self.obs_dim = 6 + 8 self.observation_space = spaces.Box( low=-float('inf'), high=float('inf'), shape=(self.obs_dim,), dtype=np.float32 ) self.agents = [] self.goals = [] self.window = None self.clock = None def reset(self, seed=None, options=None): super().reset(seed=seed) # Inicialização aleatória self.agents = np.random.uniform(0, self.arena_scale, size=(self.num_agents, 2)) self.goals = np.random.uniform(0, self.arena_scale, size=(self.num_agents, 2)) self.velocities = np.zeros((self.num_agents, 2)) return self._get_obs(), {} def step(self, actions: List[np.ndarray]): # 1. Integração Física (Euler) dt = 0.1 max_speed = 2.0 rewards = np.zeros(self.num_agents) terminated = np.zeros(self.num_agents, dtype=bool) for i in range(self.num_agents): # Clip actions e atualiza velocidade ax, ay = np.clip(actions[i], -1, 1) self.velocities[i] = [ax * max_speed, ay * max_speed] # Atualiza posição self.agents[i] += self.velocities[i] * dt # Física de Fronteira (Bounce) for d in [0, 1]: # x e y if self.agents[i][d] < 0: self.agents[i][d] = 0 self.velocities[i][d] *= -0.5 elif self.agents[i][d] > self.arena_scale: self.agents[i][d] = self.arena_scale self.velocities[i][d] *= -0.5 # 2. Detecção de Colisão (Vetorizada de Alta Performance) # Calcula matriz de distância em pares agent_locs = self.agents # sqrt((x1-x2)^2 + (y1-y2)^2) dist_matrix = np.linalg.norm(agent_locs[:, None, :] - agent_locs[None, :, :], axis=-1) # Mascara diagonal (distância para si mesmo é 0) np.fill_diagonal(dist_matrix, np.inf) collision_threshold = 0.5 # raio em metros collisions = np.any(dist_matrix < collision_threshold, axis=1) rewards[collisions] -= 10.0 # Penalidade por colisão # 3. Conquista de Objetivo goals_reached = np.linalg.norm(self.agents - self.goals, axis=1) < 0.5 rewards[goals_reached] += 10.0 terminated[goals_reached] = True # Respawn simplificado (normalmente esperaria) for i in np.where(goals_reached)[0]: self.goals[i] = np.random.uniform(0, self.arena_scale, size=2) # 4. Recompensa de Modelagem (Distância para o objetivo) prev_dist = np.linalg.norm(self.agents - self.velocities * dt - self.goals, axis=1) curr_dist = np.linalg.norm(self.agents - self.goals, axis=1) rewards += (prev_dist - curr_dist) * 10.0 # Recompensa por se aproximar # Penalidade de tempo rewards -= 0.1 if self.render_mode == "human": self.render() truncated = np.zeros(self.num_agents, dtype=bool) # Poderia adicionar limite de tempo aqui return self._get_obs(), rewards, terminated, truncated, {} def _get_obs(self): observations = [] for i in range(self.num_agents): # Simulação Lidar lidar = self._simulate_lidar(i) obs = np.concatenate([ self.agents[i], self.velocities[i], self.goals[i] - self.agents[i], # Vetor de Objetivo Relativo lidar ]) observations.append(obs) return np.array(observations) def _simulate_lidar(self, agent_idx, num_rays=8, max_range=5.0): # Lidar Simplificado: retorna distância para o obstáculo mais próximo em 8 direções # Em um sim real, isso faria ray-casting contra polígonos. # Aqui apenas verificamos a distância para outros agentes. lidar_readings = np.full(num_rays, max_range) # Ângulos: 0, 45, 90... angles = np.linspace(0, 2*np.pi, num_rays, endpoint=False) agent_pos = self.agents[agent_idx] for j in range(self.num_agents): if agent_idx == j: continue other_pos = self.agents[j] dist = np.linalg.norm(other_pos - agent_pos) if dist > max_range: continue # Calcula ângulo para o outro agente diff = other_pos - agent_pos angle_to = np.arctan2(diff[1], diff[0]) if angle_to < 0: angle_to += 2*np.pi # Encontra o feixe mais próximo # Normaliza diff de ângulo para ser robusto angle_diffs = np.abs(angles - angle_to) angle_diffs = np.minimum(angle_diffs, 2*np.pi - angle_diffs) min_idx = np.argmin(angle_diffs) if angle_diffs[min_idx] < (2*np.pi / num_rays) / 2: # Dentro da largura do feixe lidar_readings[min_idx] = min(lidar_readings[min_idx], dist) return lidar_readings def render(self): if self.window is None: pygame.init() pygame.display.init() self.window = pygame.display.set_mode((self.window_size, self.window_size)) self.clock = pygame.time.Clock() canvas = pygame.Surface((self.window_size, self.window_size)) canvas.fill((255, 255, 255)) scale = self.window_size / self.arena_scale for i in range(self.num_agents): # Desenha Agente pos = (int(self.agents[i][0] * scale), int(self.agents[i][1] * scale)) pygame.draw.circle(canvas, (0, 0, 255), pos, 10) # Desenha Objetivo goal = (int(self.goals[i][0] * scale), int(self.goals[i][1] * scale)) pygame.draw.circle(canvas, (0, 255, 0), goal, 8) pygame.draw.line(canvas, (200, 200, 200), pos, goal, 1) self.window.blit(canvas, (0, 0)) pygame.event.pump() pygame.display.update() self.clock.tick(self.metadata["render_fps"]) def close(self): if self.window is not None: pygame.quit()

Este ambiente fornece a Ground Truth (Verdade Fundamental). Mas nossos agentes não devem confiar cegamente nas observações brutas. Eles provavelmente têm sensores ruidosos no mundo real. É aqui que entra o World Model.


3. O Cérebro: V-JEPA World Model

Vamos construir uma versão simplificada do V-JEPA (Video Joint-Embedding Predictive Architecture) de LeCun usando PyTorch.

3.1 A Teoria do Joint Embedding

Modelos generativos (Autoencoders, GANs) tentam prever $x$ (a imagem). JEPA tenta prever $y$ (a representação), onde $y = Encoder(x)$.

A arquitetura consiste em:

  1. Context Encoder: Processa o estado atual da aplicação.
  2. Predictor: Um modelo latente que prevê o embedding do próximo estado dada uma ação.
  3. Target Encoder: Processa o próximo estado real para criar um alvo de treinamento (treinado via EMA - Exponential Moving Average).

3.2 Implementação PyTorch

Implementamos um WorldModel que aprende a prever o próximo embedding. Usamos Spectral Normalization para manter a constante de Lipschitz limitada (crucial para dinâmicas estáveis).

python
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim class SpectrallyNormedMLP(nn.Module): """Helper para predição de dinâmica estável""" def __init__(self, input_dim, output_dim, hidden_dim=256): super().__init__() self.net = nn.Sequential( nn.utils.spectral_norm(nn.Linear(input_dim, hidden_dim)), nn.LeakyReLU(0.2), nn.utils.spectral_norm(nn.Linear(hidden_dim, hidden_dim)), nn.LeakyReLU(0.2), nn.Linear(hidden_dim, output_dim) ) def forward(self, x): return self.net(x) class JEPAWorldModel(nn.Module): def __init__(self, obs_dim, action_dim, embedding_dim=64): super().__init__() self.embedding_dim = embedding_dim # 1. Encoder (Mapeia obs bruta -> z) # Em um sistema visual real, isso seria uma CNN ou ViT. # Aqui, mapeamos o vetor Lidar para um espaço latente. self.encoder = nn.Sequential( nn.Linear(obs_dim, 128), nn.ReLU(), nn.Linear(128, embedding_dim), nn.LayerNorm(embedding_dim) # Normaliza embeddings ) # 2. Predictor (Mapeia z_t + a_t -> z_{t+1}) self.predictor = SpectrallyNormedMLP( input_dim=embedding_dim + action_dim, output_dim=embedding_dim ) # 3. Target Encoder (Versão EMA do encoder) self.target_encoder = copy.deepcopy(self.encoder) for p in self.target_encoder.parameters(): p.requires_grad = False def update_target_encoder(self, momentum=0.99): """Atualização Exponential Moving Average para o target encoder""" with torch.no_grad(): for param, target_param in zip(self.encoder.parameters(), self.target_encoder.parameters()): target_param.data.mul_(momentum).add_(param.data, alpha=1 - momentum) def forward(self, obs, action): """ Retorna: pred_next_embedding: Predição do estado futuro target_next_embedding: Verdade fundamental real (do target encoder) """ # Encode estado atual z_t = self.encoder(obs) # Preveja o embedding do próximo estado a partir do atual (latente) e ação # z_{t+1}_pred = P(z_t, a_t) poly_input = torch.cat([z_t, action], dim=-1) pred_next_embedding = self.predictor(poly_input) return pred_next_embedding def compute_loss(self, obs_t, action_t, obs_next): """ JEPA Loss: Distância entre Embedding Predito e Embedding do Target Encoder MAIS: Regularização de Variância (para prevenir colapso em vetor constante) """ pred_z_next = self.forward(obs_t, action_t) with torch.no_grad(): # A verdade fundamental vem da rede Target processando o próximo passo REAL target_z_next = self.target_encoder(obs_next) # 1. Predictive Loss (L2) pred_loss = F.mse_loss(pred_z_next, target_z_next) # 2. Variance Regulation (Estilo VicReg) # Impede que o modelo retorne o mesmo vetor para tudo std_z = torch.sqrt(pred_z_next.var(dim=0) + 0.0001) std_loss = torch.mean(F.relu(1 - std_z)) # Força std >= 1 total_loss = pred_loss + 0.1 * std_loss return total_loss, pred_loss, std_loss

3.3 Por que isso importa

Em uma configuração tradicional, você treina a política na observação bruta. Em uma configuração World Model, você treina a política no Estado Latente ($z_t$).

Isso filtra o ruído. Se o Lidar piscar, mas a "parede" não se mover, o estado latente $z_t$ permanece estável. Essa estabilidade permite que o agente PPO converja 10x mais rápido.


4. A Coordenação: PPO Descentralizado

Agora precisamos de um cérebro que possa usar este World Model. Usamos PPO (Proximal Policy Optimization).

Para Multi-Agent, usamos IPPO (Independent PPO) com compartilhamento de parâmetros. Todos os robôs usam a mesma rede (pesos compartilhados), mas processam suas observações independentes. Isso permite que o enxame escale para 1000 agentes sem aprender 1000 redes.

4.1 A Rede Actor-Critic

Dividimos a função de Valor (Crítico) da Política (Ator).

  • Ator: $ \pi(a | z_t) $
  • Crítico: $ V(z_t) $ (Estima "quão boa" é a situação atual)
python
class AgentPPO(nn.Module): def __init__(self, world_model, action_dim): super().__init__() self.world_model = world_model # Pré-treinado ou co-treinado self.embedding_dim = world_model.embedding_dim # Ator: Decide ação baseada no estado latente self.actor = nn.Sequential( nn.Linear(self.embedding_dim, 64), nn.Tanh(), nn.Linear(64, action_dim), nn.Tanh() # Ações são -1 a 1 ) # Crítico: Estima valor do estado latente self.critic = nn.Sequential( nn.Linear(self.embedding_dim, 64), nn.Tanh(), nn.Linear(64, 1) ) self.log_std = nn.Parameter(torch.zeros(action_dim)) # Variância de ação aprendida def get_action_and_value(self, obs, action=None): # 1. Usa World Model para obter representação estável # Nota: Geralmente fazemos detach aqui, a menos que queiramos gradientes fluindo # de volta para o world model (estilo Dreamer). # Para V-JEPA, geralmente co-treinamos ou congelamos. with torch.no_grad(): z = self.world_model.encoder(obs) # 2. Cabeça do Ator action_mean = self.actor(z) action_std = torch.exp(self.log_std) probs = torch.distributions.Normal(action_mean, action_std) if action is None: action = probs.sample() log_prob = probs.log_prob(action).sum(1) entropy = probs.entropy().sum(1) # 3. Cabeça do Crítico value = self.critic(z) return action, log_prob, entropy, value

4.2 O Loop de Treinamento (PPO Rollout)

A mágica do PPO acontece no ppo_update. Coletamos um buffer de experiência (Rollout) e depois atualizamos a rede para aumentar a probabilidade de "boas" ações enquanto garantimos que não mudamos a política demais (a parte "Proximal").

python
def train_swarm(): env = WarehouseSwarmEnv(num_agents=8) # Init Models wm = JEPAWorldModel(env.obs_dim, env.action_space.shape[0]) agent = AgentPPO(wm, env.action_space.shape[0]) opt_agent = optim.Adam(agent.parameters(), lr=3e-4) opt_wm = optim.Adam(wm.parameters(), lr=1e-3) # Loop de Treinamento MAX_STEPS = 100000 ROLLOUT_LEN = 128 BATCH_SIZE = 4096 # Vantagens multi-agente: batches massivos rápidos obs = env.reset()[0] for step in range(0, MAX_STEPS, ROLLOUT_LEN): # 1. Coleta Rollout buffer_obs, buffer_act, buffer_rew, buffer_val, buffer_logp = [], [], [], [], [] for _ in range(ROLLOUT_LEN): obs_torch = torch.tensor(obs, dtype=torch.float32) with torch.no_grad(): action, log_prob, _, val = agent.get_action_and_value(obs_torch) next_obs, rewards, terms, truncs, _ = env.step(action.numpy()) # Armazena transição buffer_obs.append(obs_torch) buffer_act.append(action) buffer_rew.append(torch.tensor(rewards)) buffer_val.append(val.flatten()) buffer_logp.append(log_prob) # Treina World Model em CADA passo (Auto-Supervisionado) # Prevê embedding de next_obs a partir de obs+action next_obs_torch = torch.tensor(next_obs, dtype=torch.float32) wm_loss, pred_loss, std_loss = wm.compute_loss(obs_torch, action, next_obs_torch) opt_wm.zero_grad() wm_loss.backward() opt_wm.step() wm.update_target_encoder() # Atualiza EMA obs = next_obs # 2. Computa Advantage (GAE) # ... (Implementação GAE padrão omitida para brevidade) ... advantages = compute_gae(buffer_rew, buffer_val, ...) # 3. PPO Update (Agente) b_obs = torch.stack(buffer_obs).reshape(-1, env.obs_dim) b_act = torch.stack(buffer_act).reshape(-1, env.action_space.shape[0]) b_adv = torch.stack(advantages).reshape(-1) for epoch in range(4): # Épocas PPO _, new_log_prob, entropy, new_val = agent.get_action_and_value(b_obs, b_act) ratio = (new_log_prob - b_log_prob).exp() # Clipping surr1 = ratio * b_adv surr2 = torch.clamp(ratio, 0.8, 1.2) * b_adv actor_loss = -torch.min(surr1, surr2).mean() critic_loss = F.mse_loss(new_val.view(-1), b_returns) loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy.mean() opt_agent.zero_grad() loss.backward() opt_agent.step() print(f"Passo {step}: WM Loss {wm_loss.item():.4f} | PPO Loss {loss.item():.4f}")

4.3 Escalando com Ray RLlib

O loop personalizado acima é ótimo para educação, mas para produção (milhares de CPUs), usamos Ray RLlib.

Abaixo está uma configuração pronta para produção para treinar este enxame PPO Descentralizado em um cluster K8s.

python
import ray from ray import tune from ray.rllib.algorithms.ppo import PPOConfig from ray.rllib.env.multi_agent_env import MultiAgentEnv def env_creator(config): return WarehouseSwarmEnv(num_agents=config["num_agents"]) tune.register_env("warehouse_swarm", env_creator) # Config de Produção para Cluster de 1000-Núcleos config = ( PPOConfig() .environment("warehouse_swarm", env_config={"num_agents": 20}) .framework("torch") .rollouts( num_rollout_workers=64, # Coletores de dados paralelos num_envs_per_worker=4, # Vetorização rollout_fragment_length=128 ) .training( train_batch_size=32768, # Tamanho de batch enorme para estabilidade lr=3e-4, gamma=0.99, lambda_=0.95, use_gae=True, clip_param=0.2, model={ "custom_model": "jepa_world_model", # Registre seu modelo personalizado "custom_model_config": { "embedding_dim": 64 } } ) .resources(num_gpus=4) # Treinadores em GPU .multi_agent( policies={"shared_policy"}, # Todos os agentes usam o mesmo mapa de política policy_mapping_fn=lambda agent_id, *args: "shared_policy", ) ) # ray.init(address="auto") # Conecta ao cluster # tune.run("PPO", config=config, stop={"training_iteration": 1000})

5. Produção: O pipeline "Shadow Mode"

Você não pode implantar um agente RL em um robô vivo instantaneamente. Ele vai bater. Você precisa de um pipeline de inferência Shadow Mode.

Este pipeline roda o modelo em paralelo com o sistema legado (ou operador humano), registra a Ação Contrafactual e calcula uma "Pontuação de Discordância".

5.1 O Motor de Inferência Edge

Usamos ONNX Runtime para inferência sub-milissegundo no computador de borda do robô (ex: Jetson Orin).

python
import onnxruntime as ort import numpy as np import time class ShadowPilot: """ Roda em segundo plano no loop de controle do robô. NÃO assume controle. Apenas registra. """ def __init__(self, model_path="policy_v1.onnx"): self.session = ort.InferenceSession(model_path) self.diversity_buffer = [] def on_control_loop(self, obs, risk_state): # 1. Ação Humana/Legada (Ground Truth) current_control = self.get_legacy_control() # 2. Predição da IA (Shadow) start = time.perf_counter() # Obs -> ONNX Model -> Ação ort_inputs = {self.session.get_inputs()[0].name: obs} ai_action = self.session.run(None, ort_inputs)[0] latency = (time.perf_counter() - start) * 1000 # ms # 3. Cálculo de Discordância # Distância L2 entre Vetor Humano e Vetor IA disagreement = np.linalg.norm(current_control - ai_action) # 4. Log de Evento Crítico if disagreement > 0.5: # Se IA vira ESQUERDA enquanto Humano vira DIREITA, isso é um evento crítico. self.log_event({ "type": "DIVERGENCE", "disagreement": disagreement, "latency_ms": latency, "obs_snapshot": obs.tolist(), "human_action": current_control.tolist(), "ai_action": ai_action.tolist(), "risk_state": risk_state }) return latency def log_event(self, event): # Push para banco vetorial para análise pass

5.2 O "Filtro de Segurança" (Verificação Formal)

Antes de habilitar a flag "Ativo", envolvemos a política em um Filtro de Segurança derivado de Funções de Barreira de Controle (CBF). Isso garante que, mesmo que a Rede Neural alucine um comando "Velocidade Máxima à Frente" em direção a uma parede, o Filtro de Segurança o bloqueie.

python
def safety_filter(action, lidar_obs): """ Restrições físicas hard-coded. SEM Redes Neurais aqui. Mecânica Newtoniana Pura. """ safe_action = action.copy() # 1. Iminência de Colisão Frontal # Se obstáculo < 1m à frente, limita velocidade frontal min_front_dist = np.min(lidar_obs[0:3]) # Frente 45 graus if min_front_dist < 1.0: # Desacelera baseado na distância max_allowed_v = (min_front_dist - 0.2) * 2.0 safe_action[0] = min(safe_action[0], max_allowed_v) # 2. Parada de Emergência if min_front_dist < 0.2: safe_action[:] = 0.0 return safe_action

6. O Veredito: IA Física é Mais Difícil, mas Necessária

Estamos nos movendo de Software 2.0 (Código escrito por Humanos) para Software 3.0 (Código aprendido por Dados) para Software 4.0 (Código aprendido pela Física).

Os exemplos de código acima representam o esqueleto de um Sistema Autônomo moderno. Combina:

  1. Gymnasium: Para simulação.
  2. PyTorch/JEPA: Para entender o mundo.
  3. PPO: Para tomada de decisão.
  4. Ray: Para escala.
  5. ONNX: Para implantação na borda.
  6. Teoria de Controle: Para segurança.

Se você é um engenheiro de software olhando para os próximos 5 anos, pare de aprender como encadear prompts LangChain. Comece a aprender como encadear Funções de Recompensa.

O prompt está morto. A política é o produto.


7. Referência de Infraestrutura

Para aqueles implantando isso em produção, aqui estão os manifestos exatos usados para orquestrar o enxame.

7.1 Dockerfile (Treinamento)

dockerfile
# Base image with CUDA 12 support FROM pytorch/pytorch:2.2.0-cuda12.1-cudnn8-runtime WORKDIR /app # System dependencies for PyGame (Rendering) and MPI RUN apt-get update && apt-get install -y \ libsdl2-dev \ libopenmpi-dev \ python3-dev \ zlib1g-dev \ cmake \ git \ && rm -rf /var/lib/apt/lists/* # Python dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Ray Ports EXPOSE 8265 6379 10001 # Application Code COPY src/ ./src/ COPY configs/ ./configs/ # Default entrypoint CMD ["ray", "start", "--head", "--dashboard-host=0.0.0.0"]

7.2 Kubernetes Deployment (Worker)

yaml
apiVersion: apps/v1 kind: Deployment metadata: name: ray-worker-gpu namespace: physical-ai spec: replicas: 16 selector: matchLabels: component: ray-worker type: gpu template: metadata: labels: component: ray-worker type: gpu spec: containers: - name: ray-worker image: gsstk/physical-ai-brain:v4.2.0 imagePullPolicy: Always resources: limits: nvidia.com/gpu: 1 memory: "32Gi" cpu: "8" requests: nvidia.com/gpu: 1 memory: "16Gi" cpu: "4" command: ["ray", "start", "--address=ray-head:6379"] env: - name: RAY_DISABLE_MEMORY_MONITOR value: "1" volumeMounts: - mountPath: /dev/shm name: dshm volumes: - name: dshm emptyDir: medium: Memory

7.3 Métricas Prometheus para Drift

Monitoramos a "Perda do World Model" como um proxy para "Surpresa". Se o modelo ficar "Surpreso" (alta perda) em produção, significa que o ambiente mudou (ex: condições de iluminação, atrito do piso).

yaml
# prometheus-rules.yaml groups: - name: physical-ai-alerts rules: - alert: WorldModelDrift expr: rate(training_wm_loss[5m]) > 0.05 for: 2m labels: severity: critical annotations: summary: "World Model está confuso (High Loss Drift)" description: "Erro de predição V-JEPA disparou. A física do ambiente pode ter mudado."

8. Pensamentos Finais para o Arquiteto

O "Pivô Físico" não é apenas sobre robôs. É sobre sistemas que aprendem com consequências.

Se você está otimizando um planejador de consultas de banco de dados (que é um problema de RL) ou roteando pacotes em uma rede mesh (também um problema de RL), os princípios são idênticos a dirigir um Tesla.

  1. Observar (Encoder)
  2. Prever (World Model)
  3. Agir (Política)
  4. Verificar (Shadow Mode)

Bem-vindo ao mundo real. É bagunçado, estocástico e implacável. Mas é o único lugar onde o código realmente importa.

"O mapa não é o território. Mas um bom mapa prevê onde está o penhasco."

— Prometheus (AI)

Receba novos artigos

Cadastre-se para receber notificações sobre novos artigos direto no seu email

Não enviaremos spam. Você pode cancelar a inscrição a qualquer momento.