| from logging import getLogger |
| from typing import Tuple |
|
|
| import torch |
| from torch import nn |
| from torch.nn import functional as F |
|
|
| from timm.models import register_model |
| from timm.models import vision_transformer as tvit |
| from timm.models import convnext as tconv |
|
|
| from einops import rearrange |
|
|
| from . import extra_timm_models as et |
|
|
|
|
| class Fuser(nn.Module): |
| def __init__(self, src_dim: int, tgt_dim: int, gated: bool = True): |
| super().__init__() |
| self.gated = gated |
|
|
| mid_dim = max(src_dim, tgt_dim) * 2 |
|
|
| self.fwd = nn.Sequential( |
| nn.Conv2d(src_dim, mid_dim, kernel_size=3, stride=1, padding=1), |
| nn.GELU(), |
| nn.Conv2d(mid_dim, tgt_dim * (2 if gated else 1), kernel_size=3, stride=1, padding=1), |
| ) |
|
|
| def forward(self, src: torch.Tensor, tgt: torch.Tensor) -> torch.Tensor: |
| if src.ndim == 3: |
| shape = tgt.shape[-2:] |
| else: |
| shape = src.shape[-2:] |
|
|
| nd = shape[0] * shape[1] |
|
|
| if src.ndim == 3: |
| src = src[:, -nd:].reshape(src.shape[0], src.shape[2], *shape) |
|
|
| if tgt.ndim == 3: |
| tgt_pre = tgt[:, :-nd] |
| tgt = tgt[:, -nd:].reshape(tgt.shape[0], tgt.shape[2], *shape) |
| else: |
| tgt_pre = None |
|
|
| pred = self.fwd(src) |
|
|
| if self.gated: |
| g, pred = torch.chunk(pred, 2, dim=1) |
|
|
| g = F.sigmoid(g) |
|
|
| pred = g * pred |
|
|
| tgt = tgt + pred |
|
|
| if tgt_pre is not None: |
| tgt = rearrange(tgt, 'b c h w -> b (h w) c') |
| tgt = torch.cat([tgt_pre, tgt], dim=1) |
|
|
| return tgt |
|
|
|
|
| class AttnDownsample(nn.Module): |
| def __init__(self, dim: int, window_size: int, num_heads: int = 16): |
| super().__init__() |
| self.q = nn.Parameter(torch.randn(1, num_heads, 1, dim // num_heads) * 0.01) |
| self.kv = nn.Linear(dim, dim * 2) |
| self.proj = nn.Linear(dim, dim) |
| self.window_size = window_size |
| self.num_heads = num_heads |
| self.head_dim = dim // num_heads |
| self.scale = self.head_dim ** -0.5 |
|
|
| def forward(self, x: torch.Tensor, twod_shape: Tuple[int, int]) -> torch.Tensor: |
| ntok = twod_shape[0] * twod_shape[1] |
| x_pre = x[:, :-ntok] |
|
|
| B = x.shape[0] |
| ds_hw = tuple(s // self.window_size for s in twod_shape) |
|
|
| x_spat = rearrange( |
| x[:, -ntok:], |
| 'b (h d1 w d2) c -> (b h w) (d1 d2) c', |
| h=ds_hw[0], w=ds_hw[1], |
| d1=self.window_size, d2=self.window_size, |
| ) |
|
|
| B, N, C = x_spat.shape |
|
|
| k, v = self.kv(x_spat).reshape(B, N, 2, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4) |
|
|
| q = (self.q * self.scale).expand(B, -1, -1, -1) |
| attn = q @ k.transpose(-2, -1) |
| attn = F.softmax(attn, dim=-1) |
| x = attn @ v |
|
|
| x = x.transpose(1, 2).reshape(B, C) |
| x = self.proj(x) |
|
|
| x = rearrange(x, '(b h w) c -> b (h w) c', b=x_pre.shape[0], h=ds_hw[0], w=ds_hw[1]) |
|
|
| x = torch.cat([x_pre, x], dim=1) |
| return x |
|
|
|
|
| class HybridModel(nn.Module): |
| def __init__(self, vit: tvit.VisionTransformer, conv: tconv.ConvNeXt, pretrained: bool = False, |
| concatenate: bool = False, **kwargs): |
| super().__init__() |
| self.conv = conv |
| self.vit = vit |
| self.concatenate = concatenate |
|
|
| conv.stages = nn.ModuleList(conv.stages) |
| vit.blocks = nn.ModuleList(vit.blocks) |
|
|
| self._half_vit_idx = len(vit.blocks) // 2 + 1 |
|
|
| self._half_conv_idx = None |
| x = torch.empty(1, 3, 256, 256) |
| x = self.conv.stem(x) |
| for i in range(len(conv.stages)): |
| x = conv.stages[i](x) |
| if self._half_conv_idx is None and x.shape[-2:] == (16, 16): |
| self._half_conv_idx = i + 1 |
| half_conv_dim = x.shape[1] |
| final_conv_dim = x.shape[1] |
|
|
| self.vit_to_conv_fusion = Fuser(vit.embed_dim, half_conv_dim) |
| self.conv_to_vit_fusion = Fuser(half_conv_dim, vit.embed_dim) |
| self.vit_ds = AttnDownsample(vit.embed_dim, window_size=2) |
|
|
| embed_dim = vit.embed_dim + (final_conv_dim if concatenate else 0) |
| if not concatenate: |
| self.final_fuse = Fuser(final_conv_dim, vit.embed_dim, gated=False) |
| self.final_block = tvit.Block(embed_dim, num_heads=16) |
|
|
| self.embed_dim = embed_dim |
|
|
| @property |
| def patch_size(self): |
| return 32 |
|
|
| @property |
| def no_fsdp_wrap_types(self): |
| return {tvit.VisionTransformer, tconv.ConvNeXt} |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| return self.forward_features(x) |
|
|
| def forward_features(self, x: torch.Tensor) -> torch.Tensor: |
| y_vit = self.vit.patch_generator(x) |
|
|
| for i in range(self._half_vit_idx): |
| y_vit = self.vit.blocks[i](y_vit) |
|
|
| y_conv = self.conv.stem(x) |
| for i in range(self._half_conv_idx): |
| y_conv = self.conv.stages[i](y_conv) |
|
|
| y_vit, y_conv = self.conv_to_vit_fusion(y_conv, y_vit), self.vit_to_conv_fusion(y_vit, y_conv) |
|
|
| y_vit = self.vit_ds(y_vit, y_conv.shape[-2:]) |
|
|
| for i in range(self._half_vit_idx, len(self.vit.blocks)): |
| y_vit = self.vit.blocks[i](y_vit) |
|
|
| for i in range(self._half_conv_idx, len(self.conv.stages)): |
| y_conv = self.conv.stages[i](y_conv) |
|
|
| if self.concatenate: |
| y_conv = rearrange(y_conv, 'b c h w -> b (h w) c') |
| |
| conv_summary = y_conv.mean(dim=1, keepdim=True).expand(-1, self.vit.patch_generator.num_cls_patches, -1) |
| y_conv = torch.cat([conv_summary, y_conv], dim=1) |
| y = torch.cat([y_vit, y_conv], dim=2) |
| else: |
| y = self.final_fuse(y_conv, y_vit) |
| y = self.final_block(y) |
|
|
| summary = y[:, :self.vit.patch_generator.num_cls_tokens] |
| features = y[:, self.vit.patch_generator.num_cls_patches:] |
|
|
| return summary, features |
|
|
|
|
| @register_model |
| def hybrid_base(pretrained=False, concatenate: bool = False, weight_init: str = 'skip', **kwargs): |
| cfg = dict(num_classes=0, **kwargs) |
| conv = tconv.convnextv2_base(pretrained=pretrained, **cfg) |
| vit = tvit.vit_base_patch16_224(pretrained=pretrained, weight_init=weight_init, **cfg) |
|
|
| return HybridModel(vit, conv, pretrained, concatenate=concatenate) |
|
|
|
|
| @register_model |
| def hybrid_large(pretrained=False, concatenate: bool = False, weight_init: str = 'skip', **kwargs): |
| cfg = dict(num_classes=0, **kwargs) |
| conv = tconv.convnextv2_large(pretrained=pretrained, **cfg) |
| vit = tvit.vit_large_patch16_224(pretrained=pretrained, weight_init=weight_init, **cfg) |
|
|
| return HybridModel(vit, conv, pretrained, concatenate=concatenate) |
|
|
|
|
| @register_model |
| def hybrid_huge(pretrained=False, concatenate: bool = False, weight_init: str = 'skip', **kwargs): |
| cfg = dict(num_classes=0, **kwargs) |
| conv = tconv.convnextv2_huge(pretrained=pretrained, **cfg) |
| vit = et.vit_huge_patch16_224(pretrained=pretrained, weight_init=weight_init, **cfg) |
|
|
| return HybridModel(vit, conv, pretrained, concatenate=concatenate) |
|
|