torch.tensor
Creating Tensors
torch.tensor([[ 1.5 , 2 , 3 ], [ 4 , 5 , 6 ]])
torch.zeros([ 2 , 2 ]) # 2x2 constant tensor filled with 0
torch.ones([ 2 , 2 ]) # 2x2 constant tensor filled with 1
torch.full([ 2 , 2 ], 7 ) # 2x2 constant tensor filled with 7
torch.arange( 10 , 25 , 5 ) # ndarray version of range (start, end, step)
torch.linspace( 0 , 2 , 9 ) # 1d Array of evenly spaced values (start, end, num)
torch.eye( 2 ) # 2x2 identity tensor
torch.rand( 2 , 2 ) # 2x2 uniform random tensor
torch.randn( 2 , 2 ) # 2x2 normal random tensor
torch.zeros_like(arr) # arr-like tensor filled with 3
torch.ones_like(arr) # arr-like tensor filled with 3
torch.full_like(arr, 3 ) # arr-like tensor filled with 3
Tensor Info
A.shape
A.dtype # default: torch.float32
A.device # what device is the tensor stored on (cpu, cuda)
tensor.type(torch.float16)
A.type(torch.int8)
dtypes
Data type Value/Range torch.boolTrue, False torch.int8-128 ~ 127 torch.uint80 ~ 255 torch.int16, torch.short-32768 ~ 32767 torch.int32, torch.int-2147483648 ~ 2147483647 torch.int64, torch.long-9223372036854775808 ~ 9223372036854775807 torch.float16, torch.halfsign bit, 5 bits exponent, 10 bits mantissa torch.float32, torch.floatsign bit, 8 bits exponent, 23 bits mantissa torch.float64, torch.doublesign bit, 11 bits exponent, 52 bits mantissa torch.complex64, torch.cfloattwo 32-bit floats (real and imaginary components) torch.complex128, torch.cdoubletwo 64-bit floats (real and imaginary components)
Arithmetic Operations
A + B
torch.add(A, B)
A - B
torch.subtract(A, B)
A / B
torch.divide(A, B)
A * B
torch.multiply(A, B)
A @ B
torch.matmul(A, B)
torch.abs(A)
torch.sqrt(A)
torch.exp(A)
torch.log(A)
torch.log2(A)
torch.log10(A)
torch.round(A, decimal = 2 )
torch.floor(A)
torch.ceil(A)
torch.sin(A)
torch.cos(A)
Comparison
A == B # Element-wise comparison (The shape of the tensor is preserved)
A < 2
torch.equal(A, B) # tensor-wise comparison
Aggregate Functions
A.sum()
A.min() # return the max value
A.max()
A.argmax() # return the index of thmax value
A.argmin()
A.mean()
A.median()
A.cumsum()
>>> torch.tensor([[ 1 , 2 , 3 ], [ 4 , 5 , 6 ]]).cumsum( dim = 0 )
array([[ 1 , 2 , 3 ], [ 5 , 7 , 9 ]])
>>> torch.tensor([[ 1 , 2 , 3 ], [ 4 , 5 , 6 ]]).cumsum( dim = 1 )
array([[ 1 , 3 , 6 ], [ 4 , 9 , 15 ]])
A.cumprod()
A.cummin()
A.cummax()
Other Functions
A.sort( dim = 1 )
# Activation Functions
A.sigmoid()
A.softmax()
A.to( 'cpu' ) # Move the tensor to CPU
A.to( 'cuda' ) # Move the tensor to GPU
A.cpu() # Similar to A.to('cpu'), but it returns a copy of the tensor on the CPU
Copying Arrays
B = A.data # Share data, not the structure
B = A.clone() # Deep copy, preserves computational graph
B = A.clone().detach() # Deep copy, doesn't preserve computational graph (independent)
Slicing and Indexing
# Slicing
A[ 0 : 2 , 1 ] # Select items at rows 0 and 1 in column 1
A[ 1 , ... ] # Same as [1, :, :]
A[:: - 1 ] # Reversed array of A
# Indexing
A[A < 2 ]
# List indexing
A[d0_list, d1_list, d2_list] # Use zip([d0_list, d1_list, d2_list]) as an index, result is an 1d tensor.
Array Manipulation
torch.transpose(A, 0 , 1 )
A.transpose( 0 , 1 )
# For 2d arrays
A.T
A.t()
A.permute( 1 , 2 , 0 )
A.reshape([ 2 , 6 ])
A.flatten()
torch.concat([A, B], axis = 0 )
torch.vstack([A, B])
torch.hstack([A, B])
A.squeeze()
A.unsqueeze( dim = 0 ) # 0: [3, 4] -> [1, 3, 4], 1: [3, 4] -> [3, 1, 4], 2: [3, 4] -> [3, 4, 1]
torch-numpy Converting
# A is np.ndarray
B = torch.tensor(A)
B = torch.from_numpy(A)
# B is torch.tensor
A = B.numpy()
A = np.array(B)
Gradient
# Single variable
x = torch.tensor([ 1 .], requires_grad = True )
a = x ** 2 # x^2
b = a + 1 # x^2 + 1 = [2.]
c = b ** 2 # (x^2 + 1)^2 = [4.]
c.backward() # differentiation
print (x.grad) # dc/dx = 2(x^2 + 1) * 2x = [8.]
# Multi variables
x = torch.tensor([ 1 .], requires_grad = True )
y = torch.tensor([ 1 .], requires_grad = True )
z = 2 * x ** 2 + y ** 2 # 2x^2 + y^2 = [3.]
z.backward() # partial derivation
print (x.grad) # dz/dx = 4x = [4.]
print (y.grad) # dz/dx = 2y = [2.]
# detach
x = torch.tensor([ 1 .], requires_grad = True )
x.requires_grad = False # It doesn't detach the tensor from the computational graph
x = x.detach() # It detach the tensor from the computational graph.
with torch.no_grad(): # Do not calculate the gradient within here.
# x.requires_grad == True
y = x ** 2 # [1.]
# x.requires_grad == True
with torch.inference_mode(): # Enhanced version of torch.no_grad()
# x.requires_grad == True
y = x ** 2 # [1.]
# x.requires_grad == True
Torch
Random Seed
torch.manual_seed( 0 )
torch.cuda.manual_seed( 0 )
Save / Load
# Full model
torch.save(model, model_path)
saved_model = torch.load(model_path, map_location = DEVICE )
# state_dict() only
torch.save(model.state_dict(), model_path)
loaded_model = Model()
loaded_model.load_state_dict(torch.load(model_path, map_location = DEVICE , weights_only = True ))
Clear Memory
torch.cuda.empty_cache()
Precisions/Perfornamce Settings
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.benchmark = True
Automatic Mixed Precision
# TODO : verify it!
import torch
from torch.cuda.amp import autocast, GradScaler
model = Model().device( 'cuda' )
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()
for epoch in range (num_epochs):
for x_batch, y_batch in dataloader:
x_batch = x_batch.to( 'cuda' )
y_batch = y_batch.to( 'cuda' )
optimizer.zero_grad()
# Runs the forward pass with autocasting
with autocast( device_type = 'cuda' , dtype = torch.float16):
y_preds = model(x_batch)
loss = loss_function(y_preds, y_batch)
# Scales loss and calls backward() to create scaled gradients
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
model.eval()
with torch.no_grad(), autocast():
y_preds = model(x_batch)
Neural Net
Embedding
# Input of nn.Embedding is a tensor of word indices from 0 to num_embeddings-1
nn.Embedding( num_embeddings = 10000 , embedding_dim = 512 ) # [32, 100] -> [32, 100, 512]
Loss Functions
nn.L1Loss() # L1Loss, MAE, mean absolute error
nn.MSELoss() # MSELoss, mean squared error
nn.BCEWithLogitsLoss() # Binary cross entropy (for binary classification problems)
nn.CrossEntropyLoss() # Cross entropy (for multi-class classification problems)
Transfer Learning
from torchvision import models
weights = models.EfficientNet_B0_Weights. DEFAULT
transform = weights.transforms()
model = models.efficientnet_b0( weights = weights).to(device)
>>> print (weights.meta)
[ 'categories' , 'min_size' , 'recipe' , 'num_params' , '_metrics' , '_ops' , '_file_size' , '_docs' ]
Custom Dataset
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split
class CustomDataset ( Dataset ):
def __init__ (self, X, Y, transform = None ):
self .X = X
self .Y = Y
self .transform = transform
def __len__ (self):
return self .X.shape[ 0 ]
def __getitem__ (self, index):
x = self .X[index]
if self .transform:
x = self .transform(x)
y = self .Y[index]
return x, y
BATCH_SIZE = 8
X_data = np.arange( - 10 , 10 ).reshape( - 1 , 1 )
Y_data = X_data ** 2
custom_ds = CustomDataset(X_data, Y_data, lambda x: x + 1 )
train_ds, val_ds, test_ds = random_split(custom_ds, ( 0.8 , 0.1 , 0.1 ))
train_dl = DataLoader(train_ds, batch_size = BATCH_SIZE , shuffle = True , pin_memory = True )
val_dl = DataLoader(val_ds, batch_size = BATCH_SIZE , shuffle = True , pin_memory = True )
test_dl = DataLoader(test_ds, batch_size = BATCH_SIZE , shuffle = True , pin_memory = True )
Baseline
from copy import deepcopy
import matplotlib.pyplot as plt
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
from tqdm import tqdm
class CNN ( nn . Module ):
def __init__ (self):
super (). __init__ ()
self .conv1 = nn.Sequential(
nn.Conv2d( 3 , 8 , 3 , padding = 'same' ),
nn.BatchNorm2d( 8 ),
nn.ReLU(),
)
self .maxpool1 = nn.MaxPool2d( 2 )
self .conv2 = nn.Sequential(
nn.Conv2d( 8 , 16 , 3 , padding = 'same' ),
nn.BatchNorm2d( 16 ),
nn.ReLU(),
)
self .maxpool2 = nn.MaxPool2d( 2 )
self .conv3 = nn.Sequential(
nn.Conv2d( 16 , 32 , 3 , padding = 'same' ),
nn.BatchNorm2d( 32 ),
nn.ReLU(),
)
self .maxpool3 = nn.MaxPool2d( 2 )
self .fc = nn.Linear( 32 * 4 * 4 , 10 )
def forward (self, x):
x = self .conv1(x)
x = self .maxpool1(x)
x = self .conv2(x)
x = self .maxpool2(x)
x = self .conv3(x)
x = self .maxpool3(x)
x = x.flatten( start_dim = 1 )
x = self .fc(x)
return x
def batch_epoch (model, dl, criterion, device, optimizer = None ):
running_loss = 0
running_correct = 0
for x_batch, y_batch in dl:
x_batch, y_batch = x_batch.to(device), y_batch.to(device)
y_logits = model(x_batch)
y_preds = y_logits.softmax( dim = 1 ).argmax( dim = 1 )
loss = criterion(y_logits, y_batch)
correct = torch.eq(y_preds, y_batch).sum().item()
if optimizer:
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item() * x_batch.shape[ 0 ]
running_correct += correct
n_data = len (dl.dataset)
loss = running_loss / n_data
acc = running_correct / n_data
return loss, acc
@torch.inference_mode ()
def evaluation (model, test_dl, criterion, device):
model.eval()
test_loss, test_acc = batch_epoch(model, test_dl, criterion, device)
return test_loss, test_acc
def train (model, train_dl, val_dl, criterion, optimizer, epochs, device):
best_model = None
train_history = dict ( train_loss = [], val_loss = [], train_acc = [], val_acc = [])
with tqdm( range ( 1 , epochs + 1 )) as pbar:
for epoch in pbar:
model.train()
train_loss, train_acc = batch_epoch(model, train_dl, criterion, device, optimizer)
val_loss, val_acc = evaluation(model, val_dl, criterion, device)
best_model = deepcopy(model) if epoch > 1 and min (train_history[ 'val_loss' ]) > val_loss else best_model
train_history[ 'train_loss' ].append(train_loss)
train_history[ 'val_loss' ].append(val_loss)
train_history[ 'train_acc' ].append(train_acc)
train_history[ 'val_acc' ].append(val_acc)
pbar.set_postfix( dict ( train_loss = f ' { train_loss :.3f } ' , train_acc = f ' { train_acc :.2% } ' ,
val_loss = f ' { val_loss :.3f } ' , val_acc = f ' { val_acc :.2% } ' ))
return train_history, best_model
def plot_train_history (history):
_, axs = plt.subplots( 1 , 2 , figsize = ( 8 , 3 ))
epoch_range = range ( 1 , len (history[ 'train_loss' ]) + 1 )
axs[ 0 ].plot(epoch_range, history[ 'train_loss' ], label = 'train' )
axs[ 0 ].plot(epoch_range, history[ 'val_loss' ], label = 'val' )
axs[ 0 ].set( xlabel = 'Epoch' , ylabel = 'loss' , title = 'Training Loss' )
axs[ 0 ].legend()
axs[ 1 ].plot(epoch_range, history[ 'train_acc' ], label = 'train' )
axs[ 1 ].plot(epoch_range, history[ 'val_acc' ], label = 'val' )
axs[ 1 ].set( xlabel = 'Epoch' , ylabel = 'accuracy' , title = 'Training Accuracy' )
axs[ 1 ].legend()
plt.tight_layout()
plt.show()
if __name__ == '__main__' :
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 256
LR = 1e-3
EPOCH = 5
TRAIN_RATIO = 0.8
criterion = nn.CrossEntropyLoss()
transform_train = transforms.Compose([
transforms.ToTensor(),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
])
data_dir = '../data/'
train_ds = datasets.CIFAR10( root = data_dir, download = True , transform = transform_train, train = True )
train_ds, val_ds = random_split(train_ds, ( TRAIN_RATIO , 1 - TRAIN_RATIO ))
val_ds.transform = transform_test
test_ds = datasets.CIFAR10( root = data_dir, download = True , transform = transform_test, train = False )
train_dl = DataLoader(train_ds, batch_size = BATCH_SIZE , shuffle = True )
val_dl = DataLoader(val_ds, batch_size = BATCH_SIZE , shuffle = True )
test_dl = DataLoader(test_ds, batch_size = BATCH_SIZE , shuffle = False )
# Instantiate the model
model = CNN().to( DEVICE )
# Training
optimizer = optim.Adam(model.parameters(), lr = LR )
history, best_model = train(model, train_dl, val_dl, criterion, optimizer, EPOCH , DEVICE )
# Visualize the training result
plot_train_history(history)
# Testing
test_loss, test_acc = evaluation(model, test_dl, criterion, DEVICE )
print ( f ' { test_loss =:.3f } , { test_acc =:.2% } ' )
Experiment Tracking
wandb
def train ():
run.watch(model) # Track the info (such as gradient) of the model
for epoch in epochs:
wandb.log({ 'train/loss' : train_loss, 'val/loss' : val_loss}, step = epoch)
...
config = dict ( epochs = epoch, model_name = model_name, batch_size = BATCH_SIZE , lr = LR )
with wandb.init( project = "project" , group = 'group' , name = name, config = config) as run:
train()
wandb Hyperparameter Searching
def train ():
run.watch(model) # Track the info (such as gradient) of the model
for epoch in epochs:
wandb.log({ 'train/loss' : train_loss, 'val/loss' : val_loss}, step = epoch)
...
def sweep_train ():
with wandb.init() as run:
w_config = run.config
train()
...
# Define the sweep parameters
sweep_params = dict ( batch_size = dict ( values = [ 16 , 32 , 64 , 128 , 256 ]),
lr = dict ( distribution = 'log_uniform_values' , min = 1e-5 , max = 1e-1 ),
epochs = dict ( distribution = 'q_uniform' , min = 5 , max = 30 , q = 5 ))
sweep_config = dict ( method = 'bayes' ,
metric = dict ( goal = 'minimize' , name = 'val/loss' ),
parameters = sweep_params)
# Initialize a sweep
sweep_id = wandb.sweep( sweep = sweep_config, project = 'project' )
# Start the sweep agent
wandb.agent(sweep_id, function = sweep_train, count = 10 )
Load The Best Model from Sweep
api = wandb.Api()
sweep = api.sweep( 'user/project/sweeps/sweep_id' )
best_run: wandb.apis.public.runs.Run = sweep.best_run( order = '+val/loss' ) # +: asc, -: desc
best_parameters = best_run.config
print (best_parameters)