torch.tensor

Creating Tensors

torch.tensor([[1.5, 2, 3], [4, 5, 6]])
 
torch.zeros([2, 2]) # 2x2 constant tensor filled with 0
torch.ones([2, 2]) # 2x2 constant tensor filled with 1
torch.full([2, 2], 7) # 2x2 constant tensor filled with 7
 
torch.arange(10, 25, 5) # ndarray version of range (start, end, step)
torch.linspace(0, 2, 9) # 1d Array of evenly spaced values (start, end, num)
torch.eye(2) # 2x2 identity tensor
torch.rand(2, 2) # 2x2 uniform random tensor
torch.randn(2, 2) # 2x2 normal random tensor
 
torch.zeros_like(arr) # arr-like tensor filled with 3
torch.ones_like(arr) # arr-like tensor filled with 3
torch.full_like(arr, 3) # arr-like tensor filled with 3

Tensor Info

A.shape
A.dtype # default: torch.float32
A.device # what device is the tensor stored on (cpu, cuda)
 
tensor.type(torch.float16)
A.type(torch.int8)

dtypes

Data typeValue/Range
torch.boolTrue, False
torch.int8-128 ~ 127
torch.uint80 ~ 255
torch.int16, torch.short-32768 ~ 32767
torch.int32, torch.int-2147483648 ~ 2147483647
torch.int64, torch.long-9223372036854775808 ~ 9223372036854775807
torch.float16, torch.halfsign bit, 5 bits exponent, 10 bits mantissa
torch.float32, torch.floatsign bit, 8 bits exponent, 23 bits mantissa
torch.float64, torch.doublesign bit, 11 bits exponent, 52 bits mantissa
torch.complex64, torch.cfloattwo 32-bit floats (real and imaginary components)
torch.complex128, torch.cdoubletwo 64-bit floats (real and imaginary components)

Arithmetic Operations

A + B
torch.add(A, B)
 
A - B
torch.subtract(A, B)
 
A / B
torch.divide(A, B)
 
A * B
torch.multiply(A, B)
 
A @ B
torch.matmul(A, B)
 
torch.abs(A)
torch.sqrt(A)
torch.exp(A)
torch.log(A)
torch.log2(A)
torch.log10(A)
torch.round(A, decimal=2)
torch.floor(A)
torch.ceil(A)
torch.sin(A)
torch.cos(A)

Comparison

A == B # Element-wise comparison (The shape of the tensor is preserved)
A < 2
 
torch.equal(A, B) # tensor-wise comparison

Aggregate Functions

A.sum()
A.min() # return the max value
A.max()
A.argmax() # return the index of thmax value
A.argmin()
A.mean()
A.median()
 
A.cumsum()
>>> torch.tensor([[1, 2, 3], [4, 5, 6]]).cumsum(dim=0)
array([[1, 2, 3], [5, 7, 9]])
>>> torch.tensor([[1, 2, 3], [4, 5, 6]]).cumsum(dim=1)
array([[1, 3, 6], [4, 9, 15]])
A.cumprod()
A.cummin()
A.cummax()

Other Functions

A.sort(dim=1)
 
# Activation Functions
A.sigmoid()
A.softmax()
 
A.to('cpu') # Move the tensor to CPU
A.to('cuda') # Move the tensor to GPU
A.cpu() # Similar to A.to('cpu'), but it returns a copy of the tensor on the CPU

Copying Arrays

B = A.data # Share data, not the structure
B = A.clone() # Deep copy, preserves computational graph
B = A.clone().detach() # Deep copy, doesn't preserve computational graph (independent)

Slicing and Indexing

# Slicing
A[0:2, 1] # Select items at rows 0 and 1 in column 1
A[1, ...] # Same as [1, :, :]
A[::-1] # Reversed array of A
 
# Indexing
A[A<2]
# List indexing
A[d0_list, d1_list, d2_list] # Use zip([d0_list, d1_list, d2_list]) as an index, result is an 1d tensor.

Array Manipulation

torch.transpose(A, 0, 1)
A.transpose(0, 1)
# For 2d arrays
A.T
A.t()
 
A.permute(1, 2, 0)
 
A.reshape([2, 6])
A.flatten()
 
torch.concat([A, B], axis=0)
torch.vstack([A, B])
torch.hstack([A, B])
 
A.squeeze()
A.unsqueeze(dim=0) # 0: [3, 4] -> [1, 3, 4], 1: [3, 4] -> [3, 1, 4], 2: [3, 4] -> [3, 4, 1]

torch-numpy Converting

# A is np.ndarray
B = torch.tensor(A)
B = torch.from_numpy(A)
 
# B is torch.tensor
A = B.numpy()
A = np.array(B)

Gradient

# Single variable
x = torch.tensor([1.], requires_grad=True)  
a = x ** 2 # x^2  
b = a + 1 # x^2 + 1 = [2.]
c = b ** 2 # (x^2 + 1)^2 = [4.]
c.backward() # differentiation
print(x.grad) # dc/dx = 2(x^2 + 1) * 2x = [8.]
 
 
# Multi variables
x = torch.tensor([1.], requires_grad=True)  
y = torch.tensor([1.], requires_grad=True)  
z = 2 * x ** 2 + y ** 2 # 2x^2 + y^2 = [3.]
z.backward()  # partial derivation  
print(x.grad)  # dz/dx = 4x = [4.]
print(y.grad)  # dz/dx = 2y = [2.]
 
 
# detach
x = torch.tensor([1.], requires_grad=True)
x.requires_grad = False # It doesn't detach the tensor from the computational graph
x = x.detach() # It detach the tensor from the computational graph.
 
with torch.no_grad():  # Do not calculate the gradient within here.  
    # x.requires_grad == True
    y = x ** 2 # [1.]
# x.requires_grad == True
 
with torch.inference_mode(): # Enhanced version of torch.no_grad()
    # x.requires_grad == True
    y = x ** 2 # [1.]
# x.requires_grad == True

Torch

Random Seed

torch.manual_seed(0)
torch.cuda.manual_seed(0)

Save / Load

# Full model
torch.save(model, model_path) 
saved_model = torch.load(model_path, map_location=DEVICE)                          
 
 
# state_dict() only
torch.save(model.state_dict(), model_path) 
 
loaded_model = Model()
loaded_model.load_state_dict(torch.load(model_path, map_location=DEVICE, weights_only=True))

Clear Memory

torch.cuda.empty_cache()

Precisions/Perfornamce Settings

torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.benchmark = True

Automatic Mixed Precision

# TODO: verify it!
import torch
from torch.cuda.amp import autocast, GradScaler
 
 
model = Model().device('cuda')
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()
 
for epoch in range(num_epochs):
    for x_batch, y_batch in dataloader:
        x_batch = x_batch.to('cuda')
        y_batch = y_batch.to('cuda')
        optimizer.zero_grad()
        
        # Runs the forward pass with autocasting
        with autocast(device_type='cuda', dtype=torch.float16):
            y_preds = model(x_batch)
            loss = loss_function(y_preds, y_batch)
        
        # Scales loss and calls backward() to create scaled gradients
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
 
model.eval()
with torch.no_grad(), autocast():
    y_preds = model(x_batch)

Neural Net

Embedding

# Input of nn.Embedding is a tensor of word indices from 0 to num_embeddings-1
nn.Embedding(num_embeddings=10000, embedding_dim=512) # [32, 100] -> [32, 100, 512]

Loss Functions

nn.L1Loss() # L1Loss, MAE, mean absolute error
nn.MSELoss() # MSELoss, mean squared error
nn.BCEWithLogitsLoss() # Binary cross entropy (for binary classification problems)
nn.CrossEntropyLoss() # Cross entropy (for multi-class classification problems)

Transfer Learning

from torchvision import models
 
weights = models.EfficientNet_B0_Weights.DEFAULT
transform = weights.transforms()
model = models.efficientnet_b0(weights=weights).to(device)
 
>>> print(weights.meta)
['categories', 'min_size', 'recipe', 'num_params', '_metrics', '_ops', '_file_size', '_docs']

Custom Dataset

import numpy as np  
from torch.utils.data import Dataset, DataLoader, random_split  
  
  
class CustomDataset(Dataset):  
    def __init__(self, X, Y, transform=None):  
        self.X = X  
        self.Y = Y  
        self.transform = transform  
  
    def __len__(self):  
        return self.X.shape[0]  
  
    def __getitem__(self, index):  
        x = self.X[index]  
        if self.transform:  
            x = self.transform(x)  
        y = self.Y[index]  
        return x, y  
  
  
BATCH_SIZE = 8  
  
X_data = np.arange(-10, 10).reshape(-1, 1)  
Y_data = X_data ** 2  
custom_ds = CustomDataset(X_data, Y_data, lambda x: x + 1)  
train_ds, val_ds, test_ds = random_split(custom_ds, (0.8, 0.1, 0.1))  
  
train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, pin_memory=True)  
val_dl = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=True, pin_memory=True)  
test_dl = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=True, pin_memory=True)

Baseline

from copy import deepcopy  
  
import matplotlib.pyplot as plt  
import torch  
from torch import nn, optim  
from torch.utils.data import DataLoader, random_split  
from torchvision import datasets, transforms  
from tqdm import tqdm  
  
  
class CNN(nn.Module):  
    def __init__(self):  
        super().__init__()  
        self.conv1 = nn.Sequential(  
            nn.Conv2d(3, 8, 3, padding='same'),  
            nn.BatchNorm2d(8),  
            nn.ReLU(),  
        )  
        self.maxpool1 = nn.MaxPool2d(2)  
        self.conv2 = nn.Sequential(  
            nn.Conv2d(8, 16, 3, padding='same'),  
            nn.BatchNorm2d(16),  
            nn.ReLU(),  
        )  
        self.maxpool2 = nn.MaxPool2d(2)  
        self.conv3 = nn.Sequential(  
            nn.Conv2d(16, 32, 3, padding='same'),  
            nn.BatchNorm2d(32),  
            nn.ReLU(),  
        )  
        self.maxpool3 = nn.MaxPool2d(2)  
        self.fc = nn.Linear(32 * 4 * 4, 10)  
      
    def forward(self, x):  
        x = self.conv1(x)  
        x = self.maxpool1(x)  
        x = self.conv2(x)  
        x = self.maxpool2(x)  
        x = self.conv3(x)  
        x = self.maxpool3(x)  
        x = x.flatten(start_dim=1)  
        x = self.fc(x)  
        return x  
  
  
def batch_epoch(model, dl, criterion, device, optimizer=None):  
    running_loss = 0  
    running_correct = 0  
    for x_batch, y_batch in dl:  
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)  
          
        y_logits = model(x_batch)  
        y_preds = y_logits.softmax(dim=1).argmax(dim=1)  
        loss = criterion(y_logits, y_batch)  
        correct = torch.eq(y_preds, y_batch).sum().item()  
          
        if optimizer:  
            optimizer.zero_grad()  
            loss.backward()  
            optimizer.step()  
        running_loss += loss.item() * x_batch.shape[0]  
        running_correct += correct  
    n_data = len(dl.dataset)  
    loss = running_loss / n_data  
    acc = running_correct / n_data  
      
    return loss, acc  
  
  
@torch.inference_mode()  
def evaluation(model, test_dl, criterion, device):  
    model.eval()  
    test_loss, test_acc = batch_epoch(model, test_dl, criterion, device)  
    return test_loss, test_acc  
  
  
def train(model, train_dl, val_dl, criterion, optimizer, epochs, device):  
    best_model = None  
    train_history = dict(train_loss=[], val_loss=[], train_acc=[], val_acc=[])  
    with tqdm(range(1, epochs + 1)) as pbar:  
        for epoch in pbar:  
            model.train()  
            train_loss, train_acc = batch_epoch(model, train_dl, criterion, device, optimizer)  
            val_loss, val_acc = evaluation(model, val_dl, criterion, device)  
              
            best_model = deepcopy(model) if epoch > 1 and min(train_history['val_loss']) > val_loss else best_model  
              
            train_history['train_loss'].append(train_loss)  
            train_history['val_loss'].append(val_loss)  
            train_history['train_acc'].append(train_acc)  
            train_history['val_acc'].append(val_acc)  
              
            pbar.set_postfix(dict(train_loss=f'{train_loss:.3f}', train_acc=f'{train_acc:.2%}',  
                                  val_loss=f'{val_loss:.3f}', val_acc=f'{val_acc:.2%}'))  
    return train_history, best_model  
  
  
def plot_train_history(history):  
    _, axs = plt.subplots(1, 2, figsize=(8, 3))  
    epoch_range = range(1, len(history['train_loss']) + 1)  
      
    axs[0].plot(epoch_range, history['train_loss'], label='train')  
    axs[0].plot(epoch_range, history['val_loss'], label='val')  
    axs[0].set(xlabel='Epoch', ylabel='loss', title='Training Loss')  
    axs[0].legend()  
      
    axs[1].plot(epoch_range, history['train_acc'], label='train')  
    axs[1].plot(epoch_range, history['val_acc'], label='val')  
    axs[1].set(xlabel='Epoch', ylabel='accuracy', title='Training Accuracy')  
    axs[1].legend()  
      
    plt.tight_layout()  
    plt.show()  
  
  
if __name__ == '__main__':  
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"  
    BATCH_SIZE = 256  
    LR = 1e-3  
    EPOCH = 5  
    TRAIN_RATIO = 0.8  
    criterion = nn.CrossEntropyLoss()  
      
    transform_train = transforms.Compose([  
        transforms.ToTensor(),  
    ])  
    transform_test = transforms.Compose([  
        transforms.ToTensor(),  
    ])  
      
    data_dir = '../data/'  
    train_ds = datasets.CIFAR10(root=data_dir, download=True, transform=transform_train, train=True)  
    train_ds, val_ds = random_split(train_ds, (TRAIN_RATIO, 1 - TRAIN_RATIO))  
    val_ds.transform = transform_test  
    test_ds = datasets.CIFAR10(root=data_dir, download=True, transform=transform_test, train=False)  
      
    train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)  
    val_dl = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=True)  
    test_dl = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)  
      
    # Instantiate the model    
model = CNN().to(DEVICE)  
      
    # Training    
optimizer = optim.Adam(model.parameters(), lr=LR)  
    history, best_model = train(model, train_dl, val_dl, criterion, optimizer, EPOCH, DEVICE)  
      
    # Visualize the training result    
plot_train_history(history)  
      
    # Testing    
test_loss, test_acc = evaluation(model, test_dl, criterion, DEVICE)  
    print(f'{test_loss=:.3f}, {test_acc=:.2%}')

Experiment Tracking

wandb

def train():
    run.watch(model) # Track the info (such as gradient) of the model
    for epoch in epochs:
        wandb.log({'train/loss': train_loss, 'val/loss': val_loss}, step=epoch)
    ...
 
config = dict(epochs=epoch, model_name=model_name, batch_size=BATCH_SIZE, lr=LR)
with wandb.init(project="project", group='group', name=name, config=config) as run:
    train()

wandb Hyperparameter Searching

def train():
    run.watch(model) # Track the info (such as gradient) of the model
    for epoch in epochs:
        wandb.log({'train/loss': train_loss, 'val/loss': val_loss}, step=epoch)
    ...
 
def sweep_train():
    with wandb.init() as run:  
        w_config = run.config  
        train()
    ...
 
 
# Define the sweep parameters
sweep_params = dict(batch_size=dict(values=[16, 32, 64, 128, 256]),  
                    lr=dict(distribution='log_uniform_values', min=1e-5, max=1e-1),  
                    epochs=dict(distribution='q_uniform', min=5, max=30, q=5))  
sweep_config = dict(method='bayes',  
                    metric=dict(goal='minimize', name='val/loss'),  
                    parameters=sweep_params)  
 
# Initialize a sweep
sweep_id = wandb.sweep(sweep=sweep_config, project='project')  
 
# Start the sweep agent
wandb.agent(sweep_id, function=sweep_train, count=10)

Load The Best Model from Sweep

api = wandb.Api()  
sweep = api.sweep('user/project/sweeps/sweep_id')  
  
best_run: wandb.apis.public.runs.Run = sweep.best_run(order='+val/loss') # +: asc, -: desc  
best_parameters = best_run.config  
print(best_parameters)