common module¶
The common module contains common functions and classes used by the other modules.
classify(data, column, cmap=None, colors=None, labels=None, scheme='Quantiles', k=5, legend_kwds=None, classification_kwds=None)
¶
Classify a dataframe column using a variety of classification schemes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
str | pd.DataFrame | gpd.GeoDataFrame |
The data to classify. It can be a filepath to a vector dataset, a pandas dataframe, or a geopandas geodataframe. |
required |
column |
str |
The column to classify. |
required |
cmap |
str |
The name of a colormap recognized by matplotlib. Defaults to None. |
None |
colors |
list |
A list of colors to use for the classification. Defaults to None. |
None |
labels |
list |
A list of labels to use for the legend. Defaults to None. |
None |
scheme |
str |
Name of a choropleth classification scheme (requires mapclassify). Name of a choropleth classification scheme (requires mapclassify). A mapclassify.MapClassifier object will be used under the hood. Supported are all schemes provided by mapclassify (e.g. 'BoxPlot', 'EqualInterval', 'FisherJenks', 'FisherJenksSampled', 'HeadTailBreaks', 'JenksCaspall', 'JenksCaspallForced', 'JenksCaspallSampled', 'MaxP', 'MaximumBreaks', 'NaturalBreaks', 'Quantiles', 'Percentiles', 'StdMean', 'UserDefined'). Arguments can be passed in classification_kwds. |
'Quantiles' |
k |
int |
Number of classes (ignored if scheme is None or if column is categorical). Default to 5. |
5 |
legend_kwds |
dict |
Keyword arguments to pass to :func: |
None |
classification_kwds |
dict |
Keyword arguments to pass to mapclassify. Defaults to None. |
None |
Returns:
| Type | Description |
|---|---|
pd.DataFrame, dict |
A pandas dataframe with the classification applied and a legend dictionary. |
Source code in geopypi/common.py
def classify(
data,
column,
cmap=None,
colors=None,
labels=None,
scheme="Quantiles",
k=5,
legend_kwds=None,
classification_kwds=None,
):
"""Classify a dataframe column using a variety of classification schemes.
Args:
data (str | pd.DataFrame | gpd.GeoDataFrame): The data to classify. It can be a filepath to a vector dataset, a pandas dataframe, or a geopandas geodataframe.
column (str): The column to classify.
cmap (str, optional): The name of a colormap recognized by matplotlib. Defaults to None.
colors (list, optional): A list of colors to use for the classification. Defaults to None.
labels (list, optional): A list of labels to use for the legend. Defaults to None.
scheme (str, optional): Name of a choropleth classification scheme (requires mapclassify).
Name of a choropleth classification scheme (requires mapclassify).
A mapclassify.MapClassifier object will be used
under the hood. Supported are all schemes provided by mapclassify (e.g.
'BoxPlot', 'EqualInterval', 'FisherJenks', 'FisherJenksSampled',
'HeadTailBreaks', 'JenksCaspall', 'JenksCaspallForced',
'JenksCaspallSampled', 'MaxP', 'MaximumBreaks',
'NaturalBreaks', 'Quantiles', 'Percentiles', 'StdMean',
'UserDefined'). Arguments can be passed in classification_kwds.
k (int, optional): Number of classes (ignored if scheme is None or if column is categorical). Default to 5.
legend_kwds (dict, optional): Keyword arguments to pass to :func:`matplotlib.pyplot.legend` or `matplotlib.pyplot.colorbar`. Defaults to None.
Keyword arguments to pass to :func:`matplotlib.pyplot.legend` or
Additional accepted keywords when `scheme` is specified:
fmt : string
A formatting specification for the bin edges of the classes in the
legend. For example, to have no decimals: ``{"fmt": "{:.0f}"}``.
labels : list-like
A list of legend labels to override the auto-generated labblels.
Needs to have the same number of elements as the number of
classes (`k`).
interval : boolean (default False)
An option to control brackets from mapclassify legend.
If True, open/closed interval brackets are shown in the legend.
classification_kwds (dict, optional): Keyword arguments to pass to mapclassify. Defaults to None.
Returns:
pd.DataFrame, dict: A pandas dataframe with the classification applied and a legend dictionary.
"""
import numpy as np
import pandas as pd
import geopandas as gpd
import matplotlib as mpl
import matplotlib.pyplot as plt
try:
import mapclassify
except ImportError:
raise ImportError(
"mapclassify is required for this function. Install with `pip install mapclassify`."
)
if (
isinstance(data, gpd.GeoDataFrame)
or isinstance(data, pd.DataFrame)
or isinstance(data, pd.Series)
):
df = data
else:
try:
df = gpd.read_file(data)
except Exception:
raise TypeError(
"Data must be a GeoDataFrame or a path to a file that can be read by geopandas.read_file()."
)
if df.empty:
warnings.warn(
"The GeoDataFrame you are attempting to plot is "
"empty. Nothing has been displayed.",
UserWarning,
)
return
columns = df.columns.values.tolist()
if column not in columns:
raise ValueError(
f"{column} is not a column in the GeoDataFrame. It must be one of {columns}."
)
# Convert categorical data to numeric
init_column = None
value_list = None
if np.issubdtype(df[column].dtype, np.object0):
value_list = df[column].unique().tolist()
value_list.sort()
df["category"] = df[column].replace(value_list, range(0, len(value_list)))
init_column = column
column = "category"
k = len(value_list)
if legend_kwds is not None:
legend_kwds = legend_kwds.copy()
# To accept pd.Series and np.arrays as column
if isinstance(column, (np.ndarray, pd.Series)):
if column.shape[0] != df.shape[0]:
raise ValueError(
"The dataframe and given column have different number of rows."
)
else:
values = column
# Make sure index of a Series matches index of df
if isinstance(values, pd.Series):
values = values.reindex(df.index)
else:
values = df[column]
values = df[column]
nan_idx = np.asarray(pd.isna(values), dtype="bool")
if cmap is None:
cmap = "Blues"
cmap = plt.cm.get_cmap(cmap, k)
if colors is None:
colors = [mpl.colors.rgb2hex(cmap(i))[1:] for i in range(cmap.N)]
colors = ["#" + i for i in colors]
elif isinstance(colors, list):
colors = [check_color(i) for i in colors]
elif isinstance(colors, str):
colors = [check_color(colors)] * k
allowed_schemes = [
"BoxPlot",
"EqualInterval",
"FisherJenks",
"FisherJenksSampled",
"HeadTailBreaks",
"JenksCaspall",
"JenksCaspallForced",
"JenksCaspallSampled",
"MaxP",
"MaximumBreaks",
"NaturalBreaks",
"Quantiles",
"Percentiles",
"StdMean",
"UserDefined",
]
if scheme.lower() not in [s.lower() for s in allowed_schemes]:
raise ValueError(
f"{scheme} is not a valid scheme. It must be one of {allowed_schemes}."
)
if classification_kwds is None:
classification_kwds = {}
if "k" not in classification_kwds:
classification_kwds["k"] = k
binning = mapclassify.classify(
np.asarray(values[~nan_idx]), scheme, **classification_kwds
)
df["category"] = binning.yb
df["color"] = [colors[i] for i in df["category"]]
if legend_kwds is None:
legend_kwds = {}
if "interval" not in legend_kwds:
legend_kwds["interval"] = True
if "fmt" not in legend_kwds:
if np.issubdtype(df[column].dtype, np.floating):
legend_kwds["fmt"] = "{:.2f}"
else:
legend_kwds["fmt"] = "{:.0f}"
if labels is None:
# set categorical to True for creating the legend
if legend_kwds is not None and "labels" in legend_kwds:
if len(legend_kwds["labels"]) != binning.k:
raise ValueError(
"Number of labels must match number of bins, "
"received {} labels for {} bins".format(
len(legend_kwds["labels"]), binning.k
)
)
else:
labels = list(legend_kwds.pop("labels"))
else:
# fmt = "{:.2f}"
if legend_kwds is not None and "fmt" in legend_kwds:
fmt = legend_kwds.pop("fmt")
labels = binning.get_legend_classes(fmt)
if legend_kwds is not None:
show_interval = legend_kwds.pop("interval", False)
else:
show_interval = False
if not show_interval:
labels = [c[1:-1] for c in labels]
if init_column is not None:
labels = value_list
elif isinstance(labels, list):
if len(labels) != len(colors):
raise ValueError("The number of labels must match the number of colors.")
else:
raise ValueError("labels must be a list or None.")
legend_dict = dict(zip(labels, colors))
df["category"] = df["category"] + 1
return df, legend_dict
df_to_gdf(df, geometry='geometry', src_crs='EPSG:4326', dst_crs=None, **kwargs)
¶
Converts a pandas DataFrame to a GeoPandas GeoDataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df |
pandas.DataFrame |
The pandas DataFrame to convert. |
required |
geometry |
str |
The name of the geometry column in the DataFrame. |
'geometry' |
src_crs |
str |
The coordinate reference system (CRS) of the GeoDataFrame. Default is "EPSG:4326". |
'EPSG:4326' |
dst_crs |
str |
The target CRS of the GeoDataFrame. Default is None |
None |
Returns:
| Type | Description |
|---|---|
geopandas.GeoDataFrame |
The converted GeoPandas GeoDataFrame. |
Source code in geopypi/common.py
def df_to_gdf(df, geometry="geometry", src_crs="EPSG:4326", dst_crs=None, **kwargs):
"""
Converts a pandas DataFrame to a GeoPandas GeoDataFrame.
Args:
df (pandas.DataFrame): The pandas DataFrame to convert.
geometry (str): The name of the geometry column in the DataFrame.
src_crs (str): The coordinate reference system (CRS) of the GeoDataFrame. Default is "EPSG:4326".
dst_crs (str): The target CRS of the GeoDataFrame. Default is None
Returns:
geopandas.GeoDataFrame: The converted GeoPandas GeoDataFrame.
"""
import geopandas as gpd
from shapely import wkt
# Convert the geometry column to Shapely geometry objects
df[geometry] = df[geometry].apply(lambda x: wkt.loads(x))
# Convert the pandas DataFrame to a GeoPandas GeoDataFrame
gdf = gpd.GeoDataFrame(df, geometry=geometry, crs=src_crs, **kwargs)
if dst_crs is not None and dst_crs != src_crs:
gdf = gdf.to_crs(dst_crs)
return gdf
download_file(url=None, output=None, quiet=False, proxy=None, speed=None, use_cookies=True, verify=True, id=None, fuzzy=False, resume=False, unzip=True, overwrite=False, subfolder=False)
¶
Download a file from URL, including Google Drive shared URL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url |
str |
Google Drive URL is also supported. Defaults to None. |
None |
output |
str |
Output filename. Default is basename of URL. |
None |
quiet |
bool |
Suppress terminal output. Default is False. |
False |
proxy |
str |
Proxy. Defaults to None. |
None |
speed |
float |
Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None. |
None |
use_cookies |
bool |
Flag to use cookies. Defaults to True. |
True |
verify |
bool | str |
Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True. |
True |
id |
str |
Google Drive's file ID. Defaults to None. |
None |
fuzzy |
bool |
Fuzzy extraction of Google Drive's file Id. Defaults to False. |
False |
resume |
bool |
Resume the download from existing tmp file if possible. Defaults to False. |
False |
unzip |
bool |
Unzip the file. Defaults to True. |
True |
overwrite |
bool |
Overwrite the file if it already exists. Defaults to False. |
False |
subfolder |
bool |
Create a subfolder with the same name as the file. Defaults to False. |
False |
Returns:
| Type | Description |
|---|---|
str |
The output file path. |
Source code in geopypi/common.py
def download_file(
url=None,
output=None,
quiet=False,
proxy=None,
speed=None,
use_cookies=True,
verify=True,
id=None,
fuzzy=False,
resume=False,
unzip=True,
overwrite=False,
subfolder=False,
):
"""Download a file from URL, including Google Drive shared URL.
Args:
url (str, optional): Google Drive URL is also supported. Defaults to None.
output (str, optional): Output filename. Default is basename of URL.
quiet (bool, optional): Suppress terminal output. Default is False.
proxy (str, optional): Proxy. Defaults to None.
speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
use_cookies (bool, optional): Flag to use cookies. Defaults to True.
verify (bool | str, optional): Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string,
in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.
id (str, optional): Google Drive's file ID. Defaults to None.
fuzzy (bool, optional): Fuzzy extraction of Google Drive's file Id. Defaults to False.
resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
unzip (bool, optional): Unzip the file. Defaults to True.
overwrite (bool, optional): Overwrite the file if it already exists. Defaults to False.
subfolder (bool, optional): Create a subfolder with the same name as the file. Defaults to False.
Returns:
str: The output file path.
"""
try:
import gdown
except ImportError:
print(
"The gdown package is required for this function. Use `pip install gdown` to install it."
)
return
if output is None:
if isinstance(url, str) and url.startswith("http"):
output = os.path.basename(url)
out_dir = os.path.abspath(os.path.dirname(output))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
if isinstance(url, str):
if os.path.exists(os.path.abspath(output)) and (not overwrite):
print(
f"{output} already exists. Skip downloading. Set overwrite=True to overwrite."
)
return os.path.abspath(output)
else:
url = github_raw_url(url)
if "https://drive.google.com/file/d/" in url:
fuzzy = True
output = gdown.download(
url, output, quiet, proxy, speed, use_cookies, verify, id, fuzzy, resume
)
if unzip:
if output.endswith(".zip"):
with zipfile.ZipFile(output, "r") as zip_ref:
if not quiet:
print("Extracting files...")
if subfolder:
basename = os.path.splitext(os.path.basename(output))[0]
output = os.path.join(out_dir, basename)
if not os.path.exists(output):
os.makedirs(output)
zip_ref.extractall(output)
else:
zip_ref.extractall(os.path.dirname(output))
elif output.endswith(".tar.gz") or output.endswith(".tar"):
if output.endswith(".tar.gz"):
mode = "r:gz"
else:
mode = "r"
with tarfile.open(output, mode) as tar_ref:
if not quiet:
print("Extracting files...")
if subfolder:
basename = os.path.splitext(os.path.basename(output))[0]
output = os.path.join(out_dir, basename)
if not os.path.exists(output):
os.makedirs(output)
tar_ref.extractall(output)
else:
tar_ref.extractall(os.path.dirname(output))
return os.path.abspath(output)
download_files(urls, out_dir=None, filenames=None, quiet=False, proxy=None, speed=None, use_cookies=True, verify=True, id=None, fuzzy=False, resume=False, unzip=True, overwrite=False, subfolder=False, multi_part=False)
¶
Download files from URLs, including Google Drive shared URL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
urls |
list |
The list of urls to download. Google Drive URL is also supported. |
required |
out_dir |
str |
The output directory. Defaults to None. |
None |
filenames |
list |
Output filename. Default is basename of URL. |
None |
quiet |
bool |
Suppress terminal output. Default is False. |
False |
proxy |
str |
Proxy. Defaults to None. |
None |
speed |
float |
Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None. |
None |
use_cookies |
bool |
Flag to use cookies. Defaults to True. |
True |
verify |
bool | str |
Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True. |
True |
id |
str |
Google Drive's file ID. Defaults to None. |
None |
fuzzy |
bool |
Fuzzy extraction of Google Drive's file Id. Defaults to False. |
False |
resume |
bool |
Resume the download from existing tmp file if possible. Defaults to False. |
False |
unzip |
bool |
Unzip the file. Defaults to True. |
True |
overwrite |
bool |
Overwrite the file if it already exists. Defaults to False. |
False |
subfolder |
bool |
Create a subfolder with the same name as the file. Defaults to False. |
False |
multi_part |
bool |
If the file is a multi-part file. Defaults to False. |
False |
Examples:
files = ["sam_hq_vit_tiny.zip", "sam_hq_vit_tiny.z01", "sam_hq_vit_tiny.z02", "sam_hq_vit_tiny.z03"] base_url = "https://github.com/opengeos/datasets/releases/download/models/" urls = [base_url + f for f in files] leafmap.download_files(urls, out_dir="models", multi_part=True)
Source code in geopypi/common.py
def download_files(
urls,
out_dir=None,
filenames=None,
quiet=False,
proxy=None,
speed=None,
use_cookies=True,
verify=True,
id=None,
fuzzy=False,
resume=False,
unzip=True,
overwrite=False,
subfolder=False,
multi_part=False,
):
"""Download files from URLs, including Google Drive shared URL.
Args:
urls (list): The list of urls to download. Google Drive URL is also supported.
out_dir (str, optional): The output directory. Defaults to None.
filenames (list, optional): Output filename. Default is basename of URL.
quiet (bool, optional): Suppress terminal output. Default is False.
proxy (str, optional): Proxy. Defaults to None.
speed (float, optional): Download byte size per second (e.g., 256KB/s = 256 * 1024). Defaults to None.
use_cookies (bool, optional): Flag to use cookies. Defaults to True.
verify (bool | str, optional): Either a bool, in which case it controls whether the server's TLS certificate is verified, or a string, in which case it must be a path to a CA bundle to use. Default is True.. Defaults to True.
id (str, optional): Google Drive's file ID. Defaults to None.
fuzzy (bool, optional): Fuzzy extraction of Google Drive's file Id. Defaults to False.
resume (bool, optional): Resume the download from existing tmp file if possible. Defaults to False.
unzip (bool, optional): Unzip the file. Defaults to True.
overwrite (bool, optional): Overwrite the file if it already exists. Defaults to False.
subfolder (bool, optional): Create a subfolder with the same name as the file. Defaults to False.
multi_part (bool, optional): If the file is a multi-part file. Defaults to False.
Examples:
files = ["sam_hq_vit_tiny.zip", "sam_hq_vit_tiny.z01", "sam_hq_vit_tiny.z02", "sam_hq_vit_tiny.z03"]
base_url = "https://github.com/opengeos/datasets/releases/download/models/"
urls = [base_url + f for f in files]
leafmap.download_files(urls, out_dir="models", multi_part=True)
"""
if out_dir is None:
out_dir = os.getcwd()
if filenames is None:
filenames = [None] * len(urls)
filepaths = []
for url, output in zip(urls, filenames):
if output is None:
filename = os.path.join(out_dir, os.path.basename(url))
else:
filename = os.path.join(out_dir, output)
filepaths.append(filename)
if multi_part:
unzip = False
download_file(
url,
filename,
quiet,
proxy,
speed,
use_cookies,
verify,
id,
fuzzy,
resume,
unzip,
overwrite,
subfolder,
)
if multi_part:
archive = os.path.splitext(filename)[0] + ".zip"
out_dir = os.path.dirname(filename)
extract_archive(archive, out_dir)
for file in filepaths:
os.remove(file)
gdf_to_geojson(gdf, out_geojson=None, epsg=None, tuple_to_list=False, encoding='utf-8')
¶
Converts a GeoDataFame to GeoJSON.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gdf |
GeoDataFrame |
A GeoPandas GeoDataFrame. |
required |
out_geojson |
str |
File path to he output GeoJSON. Defaults to None. |
None |
epsg |
str |
An EPSG string, e.g., "4326". Defaults to None. |
None |
tuple_to_list |
bool |
Whether to convert tuples to lists. Defaults to False. |
False |
encoding |
str |
The encoding to use for the GeoJSON. Defaults to "utf-8". |
'utf-8' |
Exceptions:
| Type | Description |
|---|---|
TypeError |
When the output file extension is incorrect. |
Exception |
When the conversion fails. |
Returns:
| Type | Description |
|---|---|
dict |
When the out_json is None returns a dict. |
Source code in geopypi/common.py
def gdf_to_geojson(
gdf, out_geojson=None, epsg=None, tuple_to_list=False, encoding="utf-8"
):
"""Converts a GeoDataFame to GeoJSON.
Args:
gdf (GeoDataFrame): A GeoPandas GeoDataFrame.
out_geojson (str, optional): File path to he output GeoJSON. Defaults to None.
epsg (str, optional): An EPSG string, e.g., "4326". Defaults to None.
tuple_to_list (bool, optional): Whether to convert tuples to lists. Defaults to False.
encoding (str, optional): The encoding to use for the GeoJSON. Defaults to "utf-8".
Raises:
TypeError: When the output file extension is incorrect.
Exception: When the conversion fails.
Returns:
dict: When the out_json is None returns a dict.
"""
check_package(name="geopandas", URL="https://geopandas.org")
def listit(t):
return list(map(listit, t)) if isinstance(t, (list, tuple)) else t
try:
if epsg is not None:
if gdf.crs is not None and gdf.crs.to_epsg() != epsg:
gdf = gdf.to_crs(epsg=epsg)
geojson = gdf.__geo_interface__
if tuple_to_list:
for feature in geojson["features"]:
feature["geometry"]["coordinates"] = listit(
feature["geometry"]["coordinates"]
)
if out_geojson is None:
return geojson
else:
ext = os.path.splitext(out_geojson)[1]
if ext.lower() not in [".json", ".geojson"]:
raise TypeError(
"The output file extension must be either .json or .geojson"
)
out_dir = os.path.dirname(out_geojson)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
gdf.to_file(out_geojson, driver="GeoJSON", encoding=encoding)
except Exception as e:
raise Exception(e)
get_geometry_type(in_geojson)
¶
Get the geometry type of a GeoJSON file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
in_geojson |
str | dict |
The path to the GeoJSON file or a GeoJSON dictionary. |
required |
Returns:
| Type | Description |
|---|---|
str |
The geometry type. Can be one of "Point", "LineString", "Polygon", "MultiPoint", "MultiLineString", "MultiPolygon", "GeometryCollection", or "Unknown". |
Source code in geopypi/common.py
def get_geometry_type(in_geojson: Union[str, Dict]) -> str:
"""Get the geometry type of a GeoJSON file.
Args:
in_geojson (str | dict): The path to the GeoJSON file or a GeoJSON dictionary.
Returns:
str: The geometry type. Can be one of "Point", "LineString", "Polygon", "MultiPoint",
"MultiLineString", "MultiPolygon", "GeometryCollection", or "Unknown".
"""
import geojson
try:
if isinstance(in_geojson, str): # If input is a file path
with open(in_geojson, "r") as geojson_file:
geojson_data = geojson.load(geojson_file)
elif isinstance(in_geojson, dict): # If input is a GeoJSON dictionary
geojson_data = in_geojson
else:
return "Invalid input type. Expected file path or dictionary."
if "type" in geojson_data:
if geojson_data["type"] == "FeatureCollection":
features = geojson_data.get("features", [])
if features:
first_feature = features[0]
geometry = first_feature.get("geometry")
if geometry and "type" in geometry:
return geometry["type"]
else:
return "No geometry type found in the first feature."
else:
return "No features found in the FeatureCollection."
elif geojson_data["type"] == "Feature":
geometry = geojson_data.get("geometry")
if geometry and "type" in geometry:
return geometry["type"]
else:
return "No geometry type found in the Feature."
else:
return "Unsupported GeoJSON type."
else:
return "No 'type' field found in the GeoJSON data."
except Exception as e:
raise e
github_raw_url(url)
¶
Get the raw URL for a GitHub file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url |
str |
The GitHub URL. |
required |
Returns:
| Type | Description |
|---|---|
str |
The raw URL. |
Source code in geopypi/common.py
def github_raw_url(url):
"""Get the raw URL for a GitHub file.
Args:
url (str): The GitHub URL.
Returns:
str: The raw URL.
"""
if isinstance(url, str) and url.startswith("https://github.com/") and "blob" in url:
url = url.replace("github.com", "raw.githubusercontent.com").replace(
"blob/", ""
)
return url
hello_world()
¶
Prints "Hello World!" to the console.
Source code in geopypi/common.py
def hello_world():
"""Prints "Hello World!" to the console.
"""
print("Hello World!")
open_image_from_url(url)
¶
Loads an image from the specified URL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url |
str |
URL of the image. |
required |
Returns:
| Type | Description |
|---|---|
object |
Image object. |
Source code in geopypi/common.py
def open_image_from_url(url: str):
"""Loads an image from the specified URL.
Args:
url (str): URL of the image.
Returns:
object: Image object.
"""
from PIL import Image
from io import BytesIO
# from urllib.parse import urlparse
try:
response = requests.get(url)
img = Image.open(BytesIO(response.content))
return img
except Exception as e:
print(e)
def pillow_to_base64(image: Image.Image) -> str:
"""
Convert a PIL image to a base64-encoded string.
Parameters
----------
image: PIL.Image.Image
The image to be converted.
Returns
-------
str
The base64-encoded string.
"""
in_mem_file = io.BytesIO()
image.save(in_mem_file, format="JPEG", subsampling=0, quality=100)
img_bytes = in_mem_file.getvalue() # bytes
image_str = base64.b64encode(img_bytes).decode("utf-8")
base64_src = f"data:image/jpg;base64,{image_str}"
return base64_src
def read_image_as_pil(
image: Union[Image.Image, str, np.ndarray], exif_fix: bool = False
):
"""
Loads an image as PIL.Image.Image.
Args:
image : Can be image path or url (str), numpy image (np.ndarray) or PIL.Image
"""
# https://stackoverflow.com/questions/56174099/how-to-load-images-larger-than-max-image-pixels-with-pil
Image.MAX_IMAGE_PIXELS = None
if isinstance(image, Image.Image):
image_pil = image.convert("RGB")
elif isinstance(image, str):
# read image if str image path is provided
try:
image_pil = Image.open(
requests.get(image, stream=True).raw
if str(image).startswith("http")
else image
).convert("RGB")
if exif_fix:
image_pil = exif_transpose(image_pil)
except: # handle large/tiff image reading
try:
import skimage.io
except ImportError:
raise ImportError(
"Please run 'pip install -U scikit-image imagecodecs' for large image handling."
)
image_sk = skimage.io.imread(image).astype(np.uint8)
if len(image_sk.shape) == 2: # b&w
image_pil = Image.fromarray(image_sk, mode="1").convert("RGB")
elif image_sk.shape[2] == 4: # rgba
image_pil = Image.fromarray(image_sk, mode="RGBA").convert("RGB")
elif image_sk.shape[2] == 3: # rgb
image_pil = Image.fromarray(image_sk, mode="RGB")
else:
raise TypeError(
f"image with shape: {image_sk.shape[3]} is not supported."
)
elif isinstance(image, np.ndarray):
if image.shape[0] < 5: # image in CHW
image = image[:, :, ::-1]
image_pil = Image.fromarray(image).convert("RGB")
else:
raise TypeError("read image with 'pillow' using 'Image.open()'")
return image_pil
random_number()
¶
Returns a random number between 1 and 100.
Source code in geopypi/common.py
def random_number():
"""Returns a random number between 1 and 100.
"""
import random
return random.randint(1, 100)
read_parquet(source, geometry=None, columns=None, exclude=None, db=None, table_name=None, sql=None, limit=None, src_crs=None, dst_crs=None, return_type='gdf', **kwargs)
¶
Read Parquet data from a source and return a GeoDataFrame or DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source |
str |
The path to the Parquet file or directory containing Parquet files. |
required |
geometry |
str |
The name of the geometry column. Defaults to None. |
None |
columns |
str or list |
The columns to select. Defaults to None (select all columns). |
None |
exclude |
str or list |
The columns to exclude from the selection. Defaults to None. |
None |
db |
str |
The DuckDB database path or alias. Defaults to None. |
None |
table_name |
str |
The name of the table in the DuckDB database. Defaults to None. |
None |
sql |
str |
The SQL query to execute. Defaults to None. |
None |
limit |
int |
The maximum number of rows to return. Defaults to None (return all rows). |
None |
src_crs |
str |
The source CRS (Coordinate Reference System) of the geometries. Defaults to None. |
None |
dst_crs |
str |
The target CRS to reproject the geometries. Defaults to None. |
None |
return_type |
str |
The type of object to return: - 'gdf': GeoDataFrame (default) - 'df': DataFrame - 'numpy': NumPy array - 'arrow': Arrow Table - 'polars': Polars DataFrame |
'gdf' |
**kwargs |
Additional keyword arguments that are passed to the DuckDB connection. |
{} |
Returns:
| Type | Description |
|---|---|
Union[gpd.GeoDataFrame, pd.DataFrame, np.ndarray] |
The loaded data. |
Exceptions:
| Type | Description |
|---|---|
ValueError |
If the columns or exclude arguments are not of the correct type. |
Source code in geopypi/common.py
def read_parquet(
source: str,
geometry: Optional[str] = None,
columns: Optional[Union[str, list]] = None,
exclude: Optional[Union[str, list]] = None,
db: Optional[str] = None,
table_name: Optional[str] = None,
sql: Optional[str] = None,
limit: Optional[int] = None,
src_crs: Optional[str] = None,
dst_crs: Optional[str] = None,
return_type: str = "gdf",
**kwargs,
):
"""
Read Parquet data from a source and return a GeoDataFrame or DataFrame.
Args:
source (str): The path to the Parquet file or directory containing Parquet files.
geometry (str, optional): The name of the geometry column. Defaults to None.
columns (str or list, optional): The columns to select. Defaults to None (select all columns).
exclude (str or list, optional): The columns to exclude from the selection. Defaults to None.
db (str, optional): The DuckDB database path or alias. Defaults to None.
table_name (str, optional): The name of the table in the DuckDB database. Defaults to None.
sql (str, optional): The SQL query to execute. Defaults to None.
limit (int, optional): The maximum number of rows to return. Defaults to None (return all rows).
src_crs (str, optional): The source CRS (Coordinate Reference System) of the geometries. Defaults to None.
dst_crs (str, optional): The target CRS to reproject the geometries. Defaults to None.
return_type (str, optional): The type of object to return:
- 'gdf': GeoDataFrame (default)
- 'df': DataFrame
- 'numpy': NumPy array
- 'arrow': Arrow Table
- 'polars': Polars DataFrame
**kwargs: Additional keyword arguments that are passed to the DuckDB connection.
Returns:
Union[gpd.GeoDataFrame, pd.DataFrame, np.ndarray]: The loaded data.
Raises:
ValueError: If the columns or exclude arguments are not of the correct type.
"""
import duckdb
if isinstance(db, str):
con = duckdb.connect(db)
else:
con = duckdb.connect()
con.install_extension("httpfs")
con.load_extension("httpfs")
con.install_extension("spatial")
con.load_extension("spatial")
if columns is None:
columns = "*"
elif isinstance(columns, list):
columns = ", ".join(columns)
elif not isinstance(columns, str):
raise ValueError("columns must be a list or a string.")
if exclude is not None:
if isinstance(exclude, list):
exclude = ", ".join(exclude)
elif not isinstance(exclude, str):
raise ValueError("exclude_columns must be a list or a string.")
columns = f"{columns} EXCLUDE {exclude}"
if return_type in ["df", "numpy", "arrow", "polars"]:
if sql is None:
sql = f"SELECT {columns} FROM '{source}'"
if limit is not None:
sql += f" LIMIT {limit}"
if return_type == "df":
result = con.sql(sql, **kwargs).df()
elif return_type == "numpy":
result = con.sql(sql, **kwargs).fetchnumpy()
elif return_type == "arrow":
result = con.sql(sql, **kwargs).arrow()
elif return_type == "polars":
result = con.sql(sql, **kwargs).pl()
if table_name is not None:
con.sql(f"CREATE OR REPLACE TABLE {table_name} AS FROM result", **kwargs)
elif return_type == "gdf":
if geometry is None:
geometry = "geometry"
if sql is None:
# if src_crs is not None and dst_crs is not None:
# geom_sql = f"ST_AsText(ST_Transform(ST_GeomFromWKB({geometry}), '{src_crs}', '{dst_crs}', true)) AS {geometry}"
# else:
geom_sql = f"ST_AsText(ST_GeomFromWKB({geometry})) AS {geometry}"
sql = f"SELECT {columns} EXCLUDE {geometry}, {geom_sql} FROM '{source}'"
if limit is not None:
sql += f" LIMIT {limit}"
df = con.sql(sql, **kwargs).df()
if table_name is not None:
con.sql(f"CREATE OR REPLACE TABLE {table_name} AS FROM df", **kwargs)
result = df_to_gdf(df, geometry=geometry, src_crs=src_crs, dst_crs=dst_crs)
con.close()
return result