diff --git a/src/utils/auto_mapper.py b/src/utils/auto_mapper.py index 50b041c..d89f058 100644 --- a/src/utils/auto_mapper.py +++ b/src/utils/auto_mapper.py @@ -1,32 +1,57 @@ import json import os +import re import tempfile - -import platformdirs +from datetime import datetime from ftplib import FTP +import numpy as np import pygame +from scipy.signal import convolve2d -from src.utils.minecraft.chunk import Chunk +from src.utils.minecraft.chunk import Chunk, MalformedChunk, OldChunk from src.utils.minecraft.region import Region +from src.utils.paths import CONFIG_DIR, CACHE_DIR, get_project_path + +CHUNK_SIZE = 16 +CHUNKS_IN_REGION = 32 +REGION_SIZE = CHUNK_SIZE * CHUNKS_IN_REGION +GRADIENT_MAT = np.array([ + [0, 0, 0], + [0, 1, 0.5], + [0, -0.5, -1] +]) +GRADIENT_RANGE = 2 +GAUSSIAN_MAT = np.array([ + [1, 4, 7, 4, 1], + [4, 16, 26, 16, 4], + [7, 26, 41, 26, 7], + [4, 16, 26, 16, 4], + [1, 4, 7, 4, 1], +]) +GAUSSIAN_FACT = GAUSSIAN_MAT.sum() class AutoMapper: - APP_NAME: str = "lycacraft-paths" - APP_AUTHOR: str = "Lycacraft" - CONFIG_DIR = platformdirs.user_config_dir(appname=APP_NAME, appauthor=APP_AUTHOR) CONFIG_PATH = os.path.join(CONFIG_DIR, "mapper.json") - CACHE_DIR = platformdirs.user_cache_dir(appname=APP_NAME, appauthor=APP_AUTHOR) - CACHE_PATH = os.path.join(CACHE_DIR, "regions.json") + CACHE_PATH = os.path.join(CACHE_DIR, "regions.txt") COLORS_PATH = os.path.join(CACHE_DIR, "colors.json") + MAPS_DIR = os.path.join(CACHE_DIR, "maps") + MAP_SIZE = 1024 def __init__(self): self.config: FTPConfig = FTPConfig(self.CONFIG_PATH) self.ftp: FTP = FTP(self.config.HOST) self.regions: list[tuple[int, int]] = [] - self.temp_dir: tempfile.TemporaryDirectory[str] = tempfile.TemporaryDirectory(prefix="regions") - self.colors: dict[str, tuple[float, float, float, float]] = {} + self.temp_dir: tempfile.TemporaryDirectory[str] = tempfile.TemporaryDirectory() + self.colors: dict[str, tuple[float, float, float]] = {} + self.cache: dict[tuple[int, int], int] = {} + self.colormaps: dict[str, dict[str, tuple[float, float, float]]] = {} + self.available_regions: list[tuple[int, int]] = [] self.load_colors() + self.load_cache() + self.load_colormaps() + self.no_color: set[str] = set() def __enter__(self): self.ftp.login(self.config.USERNAME, self.config.PASSWORD) @@ -40,12 +65,64 @@ class AutoMapper: with open(self.COLORS_PATH, "r") as f: self.colors = json.load(f) - def list_available_regions(self) -> list[tuple[int, int]]: - files = self.ftp.nlst() - regions = [] - for f in files: - _, x, z, _ = f.split(".") - regions.append((int(x), int(z))) + def load_cache(self) -> None: + if os.path.exists(self.CACHE_PATH): + with open(self.CACHE_PATH, "r") as f: + lines = filter(lambda l: len(l.strip()) != 0, f.read().splitlines()) + self.cache = {} + for line in lines: + rx, rz, ts = map(int, line.split(" ")) + self.cache[(rx, rz)] = ts + + def load_colormaps(self) -> None: + biome_ids = {} + with open(get_project_path("res", "biome_ids.txt")) as f: + lines = f.read().splitlines() + for line in lines: + if len(line.strip()) == 0: + continue + biome_name, biome_id = line.split(" ") + biome_ids[f"minecraft:{biome_name}"] = int(biome_id) + + self.colormaps = {} + maps_dir = get_project_path("res", "colormaps") + for filename in os.listdir(maps_dir): + if not filename.endswith(".png"): + continue + img = pygame.image.load(os.path.join(maps_dir, filename)) + block = filename.split(".")[0] + colormap = {} + for biome_name, biome_id in biome_ids.items(): + colormap[biome_name] = img.get_at((biome_id, 0))[:3] + self.colormaps[f"minecraft:{block}"] = colormap + + self.colormaps["minecraft:grass"] = self.colormaps["minecraft:grass_block"] + self.colormaps["minecraft:tall_grass"] = self.colormaps["minecraft:grass_block"] + self.colormaps["minecraft:vine"] = self.colormaps["minecraft:oak_leaves"] + self.colormaps["minecraft:large_fern"] = self.colormaps["minecraft:grass_block"] + self.colormaps["minecraft:fern"] = self.colormaps["minecraft:grass_block"] + self.colormaps["minecraft:melon_stem"] = self.colormaps["minecraft:foliage"] + self.colormaps["minecraft:attached_melon_stem"] = self.colormaps["minecraft:foliage"] + self.colormaps["minecraft:mangrove_leaves"] = self.colormaps["minecraft:foliage"] + self.colormaps["minecraft:bubble_column"] = self.colormaps["minecraft:water"] + + def save_cache(self) -> None: + with open(self.CACHE_PATH, "w") as f: + for (rx, rz), ts in self.cache.items(): + f.write(f"{rx} {rz} {ts}\n") + + def get_available_regions(self) -> dict[tuple[int, int], int]: + files = self.ftp.mlsd(facts=["modify"]) + regions = {} + self.available_regions = [] + for filename, facts in files: + m = re.match(r"r\.(-?\d+)\.(-?\d+)\.mca", filename) + if m: + rx = int(m.group(1)) + rz = int(m.group(2)) + t = datetime.strptime(facts["modify"], "%Y%m%d%H%M%S") + regions[(rx, rz)] = int(t.timestamp()) + self.available_regions.append((rx, rz)) return regions def fetch_region(self, rx: int, rz: int) -> str: @@ -56,40 +133,114 @@ class AutoMapper: return outpath - def map_region(self, rx: int, rz: int) -> pygame.Surface: - print(f"[Fetching region ({rx},{rz})]") + def map_region(self, rx: int, rz: int) -> tuple[pygame.Surface, np.array]: + print(f" [Fetching region ({rx},{rz})]") path = self.fetch_region(rx, rz) region = Region(path) - surf = pygame.Surface([512, 512], pygame.SRCALPHA) + print(" [Rendering]") + surf = pygame.Surface([REGION_SIZE, REGION_SIZE], pygame.SRCALPHA) + surf.fill((0, 0, 0, 0)) + heightmap = np.zeros((REGION_SIZE, REGION_SIZE), dtype="uint16") - for cz in range(32): - for cx in range(32): - chunk = region.get_chunk(rx * 32 + cx, rz * 32 + cz) - if chunk is not None: - self.render_chunk(chunk, surf, cx * 16, cz * 16) + for cz in range(CHUNKS_IN_REGION): + for cx in range(CHUNKS_IN_REGION): + ox, oy = cx * CHUNK_SIZE, cz * CHUNK_SIZE + chunk = region.get_chunk(rx * CHUNKS_IN_REGION + cx, rz * CHUNKS_IN_REGION + cz) + if isinstance(chunk, Chunk): + hm = self.render_chunk(chunk, surf, ox, oy) + heightmap[oy:oy+CHUNK_SIZE, ox:ox+CHUNK_SIZE] = hm + elif isinstance(chunk, MalformedChunk): + pygame.draw.rect(surf, (92, 47, 32, 200), [ox, oy, CHUNK_SIZE, CHUNK_SIZE]) + elif isinstance(chunk, OldChunk): + pygame.draw.rect(surf, (32, 61, 92, 200), [ox, oy, CHUNK_SIZE, CHUNK_SIZE]) - return surf + print() + os.remove(path) + + return surf, heightmap def map_region_group(self, x: int, z: int) -> pygame.Surface: - surf = pygame.Surface([1024, 1024], pygame.SRCALPHA) - for dz in range(2): - for dx in range(2): - region = self.map_region(x * 2 + dx, z * 2 + dz) - surf.blit(region, [dx * 512, dz * 512]) + surf = pygame.Surface([self.MAP_SIZE, self.MAP_SIZE], pygame.SRCALPHA) + surf.fill((0, 0, 0, 0)) + n_regions = self.MAP_SIZE // REGION_SIZE + heightmap = np.zeros((self.MAP_SIZE, self.MAP_SIZE), dtype="uint16") + for dz in range(n_regions): + for dx in range(n_regions): + rx, rz = x * n_regions + dx, z * n_regions + dz + if (rx, rz) in self.available_regions: + region, hm = self.map_region(rx, rz) + ox, oy = dx * REGION_SIZE, dz * REGION_SIZE + surf.blit(region, [ox, oy]) + heightmap[oy:oy+REGION_SIZE, ox:ox+REGION_SIZE] = hm + self.cache[(rx, rz)] = int(datetime.utcnow().timestamp()) + + gradient = convolve2d(heightmap, GRADIENT_MAT, boundary="symm")[1:self.MAP_SIZE+1, 1:self.MAP_SIZE+1] + gradient = gradient.clip(-GRADIENT_RANGE, GRADIENT_RANGE) + gradient = 1 + gradient / 2 / GRADIENT_RANGE + gradient = np.array([gradient, gradient, gradient, np.ones(gradient.shape)]) + gradient = np.swapaxes(gradient, 0, 2) + surf_array = pygame.surfarray.array3d(surf) + alpha_array = pygame.surfarray.array_alpha(surf).reshape((*surf_array.shape[0:2], 1)) + surf_array = np.concatenate((surf_array, alpha_array), 2) + res = surf_array * gradient + res = res.clip(0, 255) + buf = res.transpose((1, 0, 2)).astype("uint8").tobytes(order="C") + surf = pygame.image.frombuffer(buf, surf.get_size(), "RGBA") + return surf - def render_chunk(self, chunk: Chunk, surf: pygame.Surface, ox: int, oy: int): - #blocks, hmap_surf = chunk.get_top_blocks() - blocks = chunk.get_top_blocks() - # surf.blit(hmap_surf, [ox, oy]) - # return - for z in range(16): - for x in range(16): - color = self.get_color(blocks[z][x]) - surf.set_at((ox + x, oy + z), color) + def render_chunk(self, chunk: Chunk, surf: pygame.Surface, ox: int, oy: int) -> np.array: + blocks, heightmap, biomes, is_empty = chunk.get_top_blocks() + if is_empty: + pygame.draw.rect(surf, (0, 0, 0, 0), [ox, oy, CHUNK_SIZE, CHUNK_SIZE]) - def get_color(self, block: str) -> tuple[float, float, float]: - return self.colors.get(block, (0, 0, 0)) + else: + for z in range(CHUNK_SIZE): + for x in range(CHUNK_SIZE): + color = self.get_color(blocks[z][x], biomes[z][x]) + surf.set_at((ox + x, oy + z), color) + return heightmap + + def get_color(self, block: str, biome: str) -> tuple[float, float, float, float]: + if block in self.colormaps: + r, g, b = self.colormaps[block][biome] + r2, g2, b2 = self.colors.get(block, (0, 0, 0, 0)) + return min(255.0, r*r2/255), min(255.0, g*g2/255), min(255.0, b*b2/255), 255 + if block not in self.colors: + if block not in self.no_color: + print(f" no color for {block}") + self.no_color.add(block) + return self.colors.get(block, (0, 0, 0, 0)) + + def map_world(self) -> None: + if not os.path.exists(self.MAPS_DIR): + os.mkdir(self.MAPS_DIR) + + regions = self.get_available_regions() + groups_to_map = set() + n_regions = self.MAP_SIZE // REGION_SIZE + for pos, modified_at in regions.items(): + if pos in self.cache and modified_at <= self.cache[pos]: + continue + + gx = pos[0] // n_regions + gz = pos[1] // n_regions + groups_to_map.add((gx, gz)) + + n_groups = len(groups_to_map) + print(f"Groups to map: {n_groups}") + proceed = input("Proceed ? y/[N] ") + if proceed.strip().lower() == "y": + groups_to_map = sorted(groups_to_map, key=lambda g: g[0]*g[0] + g[1]*g[1]) + for i, (gx, gz) in enumerate(groups_to_map): + print(f"[Mapping group ({gx}, {gz}) ({i+1}/{n_groups})]") + try: + surf = self.map_region_group(gx, gz) + + pygame.image.save(surf, os.path.join(self.MAPS_DIR, f"map_{gx}_{gz}.png")) + self.save_cache() + except Exception as e: + raise e class FTPConfig: @@ -124,11 +275,11 @@ class FTPConfig: if __name__ == '__main__': - from math import floor pygame.init() - x = -1 - z = 0 + # x = -1 + # z = 0 with AutoMapper() as mapper: - # print(mapper.list_available_regions()) - surf = mapper.map_region_group(x, z) - pygame.image.save(surf, f"/tmp/map_{x}_{z}.png") + # mapper.get_available_regions() + # surf = mapper.map_region_group(x, z) + # pygame.image.save(surf, f"/tmp/map_{x}_{z}.png") + mapper.map_world() diff --git a/src/utils/minecraft/chunk.py b/src/utils/minecraft/chunk.py index a131df1..39d674e 100644 --- a/src/utils/minecraft/chunk.py +++ b/src/utils/minecraft/chunk.py @@ -1,7 +1,7 @@ from typing import Optional import numpy as np -from nbt.nbt import NBTFile, TAG_Compound +from nbt.nbt import NBTFile, TAG_Compound, TAG_String class PositionOutOfBounds(ValueError): @@ -12,11 +12,16 @@ class BlockNotFound(Exception): pass -class Chunk: - def __init__(self, x: int, z: int, nbt: NBTFile): +class ChunkBase: + def __init__(self, x: int, z: int): self.x: int = x self.z: int = z + +class Chunk(ChunkBase): + def __init__(self, x: int, z: int, nbt: NBTFile): + super().__init__(x, z) + self.ox: int = x * 16 self.oy: int = nbt.get("yPos").value * 16 self.oz: int = z * 16 @@ -25,7 +30,7 @@ class Chunk: self.palettes: list[list[TAG_Compound]] = [] self.blockstates: list[list[int]] = [] - self.biome_palettes: list[list[str]] = [] + self.biome_palettes: list[list[TAG_String]] = [] self.biomes: list[list[int]] = [] self.get_sections() @@ -46,62 +51,88 @@ class Chunk: biomes = biomes_tag.get("data") self.biomes.append([] if biomes is None else list(biomes)) - def get_block(self, x: int, y: int, z: int) -> Optional[str]: + def get_block(self, x: int, y: int, z: int) -> Optional[tuple[str, str]]: if 0 <= x < 16 and 0 <= z < 16: section_i = y // 16 + 4 oy = y // 16 * 16 palette = self.palettes[section_i - 1] blockstates = self.blockstates[section_i - 1] - if blockstates is None: - return palette[0].get("Name").value - - bits = max((len(palette) - 1).bit_length(), 4) - ids_per_long = 64 // bits + biome_palette = self.biome_palettes[section_i - 1] + biomes = self.biomes[section_i - 1] + if blockstates is None or len(blockstates) == 0: + return palette[0].get("Name").value, biome_palette[0].value rel_y = y - oy - block_i = rel_y * 256 + z * 16 + x - long_i = block_i // ids_per_long - long_val = blockstates[long_i] - - if long_val < 0: - long_val += 2**64 - - bit_i = (block_i % ids_per_long) * bits - state = (long_val >> bit_i) & ((1 << bits) - 1) + block_bits = max((len(palette) - 1).bit_length(), 4) + state = self.get_from_long_array(blockstates, block_bits, x, rel_y, z) block = palette[state] - return block.get("Name").value + if len(biome_palette) == 1: + biome_state = 0 + else: + biome_bits = (len(biome_palette) - 1).bit_length() + biome_state = self.get_from_long_array(biomes, biome_bits, x // 4, rel_y // 4, z // 4, 4) + biome = biome_palette[biome_state] + return block.get("Name").value, biome.value else: raise PositionOutOfBounds(f"Coordinates x and z should be in range [0:16[") - def get_top_blocks(self) -> list[list[str]]: + def get_from_long_array(self, long_array, bits, x, y, z, size=16) -> int: + ids_per_long = 64 // bits + block_i = y * size * size + z * size + x + long_i = block_i // ids_per_long + long_val = long_array[long_i] + if long_val < 0: + long_val += 2**64 + + bit_i = (block_i % ids_per_long) * bits + value = (long_val >> bit_i) & ((1 << bits) - 1) + return value + + def get_top_blocks(self) -> tuple[list[list[str]], np.array, np.array, bool]: blocks = [[] for _ in range(16)] - heightmap_longs = self.nbt.get("Heightmaps").get("MOTION_BLOCKING") + heightmap_shadow = np.zeros((16, 16), dtype="uint16") + heightmap = np.zeros((16, 16), dtype="uint16") + biomes = [["minecraft:plains"]*16 for _ in range(16)] + heightmap_shadow_longs = self.nbt.get("Heightmaps").get("MOTION_BLOCKING") + heightmap_longs = self.nbt.get("Heightmaps").get("WORLD_SURFACE") if heightmap_longs is None: - return [["minecraft:air"]*16 for _ in range(16)] + return [["minecraft:air"]*16 for _ in range(16)], heightmap_shadow, biomes, True + heightmap_shadow_longs = heightmap_shadow_longs.value heightmap_longs = heightmap_longs.value - # heightmap = [[0]*16 for _ in range(16)] - # hmap_surf = pygame.Surface((16, 16)) i = 0 for z in range(16): for x in range(16): # i = z * 16 + x long_i = i // 7 - long_val = heightmap_longs[long_i] + long1_val = heightmap_shadow_longs[long_i] + long2_val = heightmap_longs[long_i] bit_i = (i % 7) * 9 - height = (long_val >> bit_i) & 0b1_1111_1111 + height_shadow = (long1_val >> bit_i) & 0b1_1111_1111 + height = (long2_val >> bit_i) & 0b1_1111_1111 - # heightmap[z][x] = height - # col = 255 * height / 384 - # hmap_surf.set_at((x, z), (col, col, col)) + heightmap_shadow[z, x] = height_shadow + heightmap[z, x] = height y = self.oy + height - 1 - if y < 0: + if height == 0: block = "minecraft:air" + biome = "minecraft:plains" else: - block = self.get_block(x, y, z) + block, biome = self.get_block(x, y, z) blocks[z].append(block) + biomes[z][x] = biome i += 1 - return blocks + return blocks, heightmap_shadow, biomes, False + + +class OldChunk(ChunkBase): + def __init__(self, x: int, z: int, data_version: int): + super().__init__(x, z) + self.data_version: int = data_version + + +class MalformedChunk(ChunkBase): + pass diff --git a/src/utils/minecraft/region.py b/src/utils/minecraft/region.py index 68ccbaa..41d38f4 100644 --- a/src/utils/minecraft/region.py +++ b/src/utils/minecraft/region.py @@ -3,8 +3,7 @@ from typing import Optional import nbt -from src.utils.minecraft.block import Block -from src.utils.minecraft.chunk import Chunk +from src.utils.minecraft.chunk import Chunk, ChunkBase, MalformedChunk, OldChunk def to_int(bytes_): @@ -12,6 +11,8 @@ def to_int(bytes_): class Region: + DATA_VERSION = 3465 + def __init__(self, filepath): self.file = open(filepath, "rb") @@ -31,7 +32,7 @@ class Region: return locations - def get_chunk(self, x, z) -> Optional[Chunk]: + def get_chunk(self, x, z) -> ChunkBase: if (x, z) in self.chunks.keys(): return self.chunks[(x, z)] @@ -43,15 +44,22 @@ class Region: data = self.file.read(length-1) if compression == 2: - data = zlib.decompress(data) + try: + data = zlib.decompress(data) + except zlib.error: + return MalformedChunk(x, z) else: - print(f"{compression} is not a valid compression type") - return + # print(f"{compression} is not a valid compression type") + return MalformedChunk(x, z) chunk = nbt.nbt.NBTFile(buffer=nbt.chunk.BytesIO(data)) - chunk = Chunk(x, z, chunk) + data_version = chunk.get("DataVersion").value + if data_version != self.DATA_VERSION: + print(f"\r Invalid data version {data_version} for chunk ({x},{z})", end="") + return OldChunk(x, z, data_version) + chunk = Chunk(x, z, chunk) self.chunks[(x, z)] = chunk return chunk @@ -59,9 +67,11 @@ class Region: def get_timestamps(self) -> list[int]: return [] - def get_block(self, x: int, y: int, z: int) -> Block: + def get_block(self, x: int, y: int, z: int) -> Optional[str]: chunk_x, chunk_z = x//16, z//16 x_rel, z_rel = x % 16, z % 16 chunk = self.get_chunk(chunk_x, chunk_z) + if not isinstance(chunk, Chunk): + return None return chunk.get_block(x_rel, y, z_rel)