#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Base and utility classes for pandas-on-Spark objects.
"""
from abc import ABCMeta, abstractmethod
from functools import wraps, partial
from itertools import chain
from typing import Any, Callable, Optional, Sequence, Tuple, Union, cast, TYPE_CHECKING
import numpy as np
import pandas as pd # noqa: F401
from pandas.api.types import is_list_like, CategoricalDtype
from pyspark.sql import functions as F, Column, Window
from pyspark.sql.types import LongType, BooleanType
from pyspark import pandas as ps # For running doctests and reference resolution in PyCharm.
from pyspark.pandas._typing import Axis, Dtype, IndexOpsLike, Label, SeriesOrIndex
from pyspark.pandas.config import get_option, option_context
from pyspark.pandas.internal import (
InternalField,
InternalFrame,
NATURAL_ORDER_COLUMN_NAME,
SPARK_DEFAULT_INDEX_NAME,
)
from pyspark.pandas.spark import functions as SF
from pyspark.pandas.spark.accessors import SparkIndexOpsMethods
from pyspark.pandas.typedef import extension_dtypes
from pyspark.pandas.utils import (
combine_frames,
same_anchor,
scol_for,
validate_axis,
ERROR_MESSAGE_CANNOT_COMBINE,
)
from pyspark.pandas.frame import DataFrame
if TYPE_CHECKING:
from pyspark.sql._typing import ColumnOrName # noqa: F401 (SPARK-34943)
from pyspark.pandas.data_type_ops.base import DataTypeOps # noqa: F401 (SPARK-34943)
from pyspark.pandas.series import Series # noqa: F401 (SPARK-34943)
def should_alignment_for_column_op(self: SeriesOrIndex, other: SeriesOrIndex) -> bool:
from pyspark.pandas.series import Series
if isinstance(self, Series) and isinstance(other, Series):
return not same_anchor(self, other)
else:
return self._internal.spark_frame is not other._internal.spark_frame
def align_diff_index_ops(
func: Callable[..., Column], this_index_ops: SeriesOrIndex, *args: Any
) -> SeriesOrIndex:
"""
Align the `IndexOpsMixin` objects and apply the function.
Parameters
----------
func : The function to apply
this_index_ops : IndexOpsMixin
A base `IndexOpsMixin` object
args : list of other arguments including other `IndexOpsMixin` objects
Returns
-------
`Index` if all `this_index_ops` and arguments are `Index`; otherwise `Series`
"""
from pyspark.pandas.indexes import Index
from pyspark.pandas.series import Series, first_series
cols = [arg for arg in args if isinstance(arg, IndexOpsMixin)]
if isinstance(this_index_ops, Series) and all(isinstance(col, Series) for col in cols):
combined = combine_frames(
this_index_ops.to_frame(),
*[cast(Series, col).rename(i) for i, col in enumerate(cols)],
how="full"
)
return column_op(func)(
combined["this"]._psser_for(combined["this"]._internal.column_labels[0]),
*[
combined["that"]._psser_for(label)
for label in combined["that"]._internal.column_labels
]
).rename(this_index_ops.name)
else:
# This could cause as many counts, reset_index calls, joins for combining
# as the number of `Index`s in `args`. So far it's fine since we can assume the ops
# only work between at most two `Index`s. We might need to fix it in the future.
self_len = len(this_index_ops)
if any(len(col) != self_len for col in args if isinstance(col, IndexOpsMixin)):
raise ValueError("operands could not be broadcast together with shapes")
with option_context("compute.default_index_type", "distributed-sequence"):
if isinstance(this_index_ops, Index) and all(isinstance(col, Index) for col in cols):
return Index(
column_op(func)(
this_index_ops.to_series().reset_index(drop=True),
*[
arg.to_series().reset_index(drop=True)
if isinstance(arg, Index)
else arg
for arg in args
]
).sort_index(),
name=this_index_ops.name,
)
elif isinstance(this_index_ops, Series):
this = cast(DataFrame, this_index_ops.reset_index())
that = [
cast(Series, col.to_series() if isinstance(col, Index) else col)
.rename(i)
.reset_index(drop=True)
for i, col in enumerate(cols)
]
combined = combine_frames(this, *that, how="full").sort_index()
combined = combined.set_index(
combined._internal.column_labels[: this_index_ops._internal.index_level]
)
combined.index.names = this_index_ops._internal.index_names
return column_op(func)(
first_series(combined["this"]),
*[
combined["that"]._psser_for(label)
for label in combined["that"]._internal.column_labels
]
).rename(this_index_ops.name)
else:
this = cast(Index, this_index_ops).to_frame().reset_index(drop=True)
that_series = next(col for col in cols if isinstance(col, Series))
that_frame = that_series._psdf[
[
cast(Series, col.to_series() if isinstance(col, Index) else col).rename(i)
for i, col in enumerate(cols)
]
]
combined = combine_frames(this, that_frame.reset_index()).sort_index()
self_index = (
combined["this"].set_index(combined["this"]._internal.column_labels).index
)
other = combined["that"].set_index(
combined["that"]._internal.column_labels[: that_series._internal.index_level]
)
other.index.names = that_series._internal.index_names
return column_op(func)(
self_index,
*[
other._psser_for(label)
for label, col in zip(other._internal.column_labels, cols)
]
).rename(that_series.name)
def booleanize_null(scol: Column, f: Callable[..., Column]) -> Column:
"""
Booleanize Null in Spark Column
"""
comp_ops = [
getattr(Column, "__{}__".format(comp_op))
for comp_op in ["eq", "ne", "lt", "le", "ge", "gt"]
]
if f in comp_ops:
# if `f` is "!=", fill null with True otherwise False
filler = f == Column.__ne__
scol = F.when(scol.isNull(), filler).otherwise(scol)
return scol
def column_op(f: Callable[..., Column]) -> Callable[..., SeriesOrIndex]:
"""
A decorator that wraps APIs taking/returning Spark Column so that pandas-on-Spark Series can be
supported too. If this decorator is used for the `f` function that takes Spark Column and
returns Spark Column, decorated `f` takes pandas-on-Spark Series as well and returns
pandas-on-Spark Series.
:param f: a function that takes Spark Column and returns Spark Column.
:param self: pandas-on-Spark Series
:param args: arguments that the function `f` takes.
"""
@wraps(f)
def wrapper(self: SeriesOrIndex, *args: Any) -> SeriesOrIndex:
from pyspark.pandas.indexes.base import Index
from pyspark.pandas.series import Series
# It is possible for the function `f` takes other arguments than Spark Column.
# To cover this case, explicitly check if the argument is pandas-on-Spark Series and
# extract Spark Column. For other arguments, they are used as are.
cols = [arg for arg in args if isinstance(arg, (Series, Index))]
if all(not should_alignment_for_column_op(self, col) for col in cols):
# Same DataFrame anchors
scol = f(
self.spark.column,
*[arg.spark.column if isinstance(arg, IndexOpsMixin) else arg for arg in args]
)
field = InternalField.from_struct_field(
self._internal.spark_frame.select(scol).schema[0],
use_extension_dtypes=any(
isinstance(col.dtype, extension_dtypes) for col in [self] + cols
),
)
if not field.is_extension_dtype:
scol = booleanize_null(scol, f).alias(field.name)
if isinstance(self, Series) or not any(isinstance(col, Series) for col in cols):
index_ops = self._with_new_scol(scol, field=field)
else:
psser = next(col for col in cols if isinstance(col, Series))
index_ops = psser._with_new_scol(scol, field=field)
elif get_option("compute.ops_on_diff_frames"):
index_ops = align_diff_index_ops(f, self, *args)
else:
raise ValueError(ERROR_MESSAGE_CANNOT_COMBINE)
if not all(self.name == col.name for col in cols):
index_ops = index_ops.rename(None)
return index_ops
return wrapper
def numpy_column_op(f: Callable[..., Column]) -> Callable[..., SeriesOrIndex]:
@wraps(f)
def wrapper(self: SeriesOrIndex, *args: Any) -> SeriesOrIndex:
# PySpark does not support NumPy type out of the box. For now, we convert NumPy types
# into some primitive types understandable in PySpark.
new_args = []
for arg in args:
# TODO: This is a quick hack to support NumPy type. We should revisit this.
if isinstance(self.spark.data_type, LongType) and isinstance(arg, np.timedelta64):
new_args.append(float(arg / np.timedelta64(1, "s")))
else:
new_args.append(arg)
return column_op(f)(self, *new_args)
return wrapper
class IndexOpsMixin(object, metaclass=ABCMeta):
"""common ops mixin to support a unified interface / docs for Series / Index
Assuming there are following attributes or properties and function.
"""
@property
@abstractmethod
def _internal(self) -> InternalFrame:
pass
@property
@abstractmethod
def _psdf(self) -> DataFrame:
pass
@abstractmethod
def _with_new_scol(
self: IndexOpsLike, scol: Column, *, field: Optional[InternalField] = None
) -> IndexOpsLike:
pass
@property
@abstractmethod
def _column_label(self) -> Optional[Label]:
pass
@property
@abstractmethod
def spark(self: IndexOpsLike) -> SparkIndexOpsMethods[IndexOpsLike]:
pass
@property
def _dtype_op(self) -> "DataTypeOps":
from pyspark.pandas.data_type_ops.base import DataTypeOps
return DataTypeOps(self.dtype, self.spark.data_type)
@abstractmethod
def copy(self: IndexOpsLike) -> IndexOpsLike:
pass
# arithmetic operators
def __neg__(self: IndexOpsLike) -> IndexOpsLike:
return self._dtype_op.neg(self)
def __add__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.add(self, other)
def __sub__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.sub(self, other)
def __mul__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.mul(self, other)
def __truediv__(self, other: Any) -> SeriesOrIndex:
"""
__truediv__ has different behaviour between pandas and PySpark for several cases.
1. When divide np.inf by zero, PySpark returns null whereas pandas returns np.inf
2. When divide positive number by zero, PySpark returns null whereas pandas returns np.inf
3. When divide -np.inf by zero, PySpark returns null whereas pandas returns -np.inf
4. When divide negative number by zero, PySpark returns null whereas pandas returns -np.inf
+-------------------------------------------+
| dividend (divisor: 0) | PySpark | pandas |
|-----------------------|---------|---------|
| np.inf | null | np.inf |
| -np.inf | null | -np.inf |
| 10 | null | np.inf |
| -10 | null | -np.inf |
+-----------------------|---------|---------+
"""
return self._dtype_op.truediv(self, other)
def __mod__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.mod(self, other)
def __radd__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.radd(self, other)
def __rsub__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.rsub(self, other)
def __rmul__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.rmul(self, other)
def __rtruediv__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.rtruediv(self, other)
def __floordiv__(self, other: Any) -> SeriesOrIndex:
"""
__floordiv__ has different behaviour between pandas and PySpark for several cases.
1. When divide np.inf by zero, PySpark returns null whereas pandas returns np.inf
2. When divide positive number by zero, PySpark returns null whereas pandas returns np.inf
3. When divide -np.inf by zero, PySpark returns null whereas pandas returns -np.inf
4. When divide negative number by zero, PySpark returns null whereas pandas returns -np.inf
+-------------------------------------------+
| dividend (divisor: 0) | PySpark | pandas |
|-----------------------|---------|---------|
| np.inf | null | np.inf |
| -np.inf | null | -np.inf |
| 10 | null | np.inf |
| -10 | null | -np.inf |
+-----------------------|---------|---------+
"""
return self._dtype_op.floordiv(self, other)
def __rfloordiv__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.rfloordiv(self, other)
def __rmod__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.rmod(self, other)
def __pow__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.pow(self, other)
def __rpow__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.rpow(self, other)
def __abs__(self: IndexOpsLike) -> IndexOpsLike:
return self._dtype_op.abs(self)
# comparison operators
def __eq__(self, other: Any) -> SeriesOrIndex: # type: ignore[override]
return self._dtype_op.eq(self, other)
def __ne__(self, other: Any) -> SeriesOrIndex: # type: ignore[override]
return self._dtype_op.ne(self, other)
def __lt__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.lt(self, other)
def __le__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.le(self, other)
def __ge__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.ge(self, other)
def __gt__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.gt(self, other)
def __invert__(self: IndexOpsLike) -> IndexOpsLike:
return self._dtype_op.invert(self)
# `and`, `or`, `not` cannot be overloaded in Python,
# so use bitwise operators as boolean operators
def __and__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.__and__(self, other)
def __or__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.__or__(self, other)
def __rand__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.rand(self, other)
def __ror__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.ror(self, other)
def __len__(self) -> int:
return len(self._psdf)
# NDArray Compat
def __array_ufunc__(
self, ufunc: Callable, method: str, *inputs: Any, **kwargs: Any
) -> SeriesOrIndex:
from pyspark.pandas import numpy_compat
# Try dunder methods first.
result = numpy_compat.maybe_dispatch_ufunc_to_dunder_op(
self, ufunc, method, *inputs, **kwargs
)
# After that, we try with PySpark APIs.
if result is NotImplemented:
result = numpy_compat.maybe_dispatch_ufunc_to_spark_func(
self, ufunc, method, *inputs, **kwargs
)
if result is not NotImplemented:
return cast(SeriesOrIndex, result)
else:
# TODO: support more APIs?
raise NotImplementedError(
"pandas-on-Spark objects currently do not support %s." % ufunc
)
@property
def dtype(self) -> Dtype:
"""Return the dtype object of the underlying data.
Examples
--------
>>> s = ps.Series([1, 2, 3])
>>> s.dtype
dtype('int64')
>>> s = ps.Series(list('abc'))
>>> s.dtype
dtype('O')
>>> s = ps.Series(pd.date_range('20130101', periods=3))
>>> s.dtype
dtype('<M8[ns]')
>>> s.rename("a").to_frame().set_index("a").index.dtype
dtype('<M8[ns]')
"""
return self._internal.data_fields[0].dtype
@property
def empty(self) -> bool:
"""
Returns true if the current object is empty. Otherwise, returns false.
>>> ps.range(10).id.empty
False
>>> ps.range(0).id.empty
True
>>> ps.DataFrame({}, index=list('abc')).index.empty
False
"""
return self._internal.resolved_copy.spark_frame.rdd.isEmpty()
@property
def hasnans(self) -> bool:
"""
Return True if it has any missing values. Otherwise, it returns False.
>>> ps.DataFrame({}, index=list('abc')).index.hasnans
False
>>> ps.Series(['a', None]).hasnans
True
>>> ps.Series([1.0, 2.0, np.nan]).hasnans
True
>>> ps.Series([1, 2, 3]).hasnans
False
>>> (ps.Series([1.0, 2.0, np.nan]) + 1).hasnans
True
>>> ps.Series([1, 2, 3]).rename("a").to_frame().set_index("a").index.hasnans
False
"""
return self.isnull().any()
@property
def is_monotonic(self) -> bool:
"""
Return boolean if values in the object are monotonically increasing.
.. note:: the current implementation of is_monotonic requires to shuffle
and aggregate multiple times to check the order locally and globally,
which is potentially expensive. In case of multi-index, all data are
transferred to single node which can easily cause out-of-memory error currently.
.. note:: Disable the Spark config `spark.sql.optimizer.nestedSchemaPruning.enabled`
for multi-index if you're using pandas-on-Spark < 1.7.0 with PySpark 3.1.1.
Returns
-------
is_monotonic : bool
Examples
--------
>>> ser = ps.Series(['1/1/2018', '3/1/2018', '4/1/2018'])
>>> ser.is_monotonic
True
>>> df = ps.DataFrame({'dates': [None, '1/1/2018', '2/1/2018', '3/1/2018']})
>>> df.dates.is_monotonic
False
>>> df.index.is_monotonic
True
>>> ser = ps.Series([1])
>>> ser.is_monotonic
True
>>> ser = ps.Series([])
>>> ser.is_monotonic
True
>>> ser.rename("a").to_frame().set_index("a").index.is_monotonic
True
>>> ser = ps.Series([5, 4, 3, 2, 1], index=[1, 2, 3, 4, 5])
>>> ser.is_monotonic
False
>>> ser.index.is_monotonic
True
Support for MultiIndex
>>> midx = ps.MultiIndex.from_tuples(
... [('x', 'a'), ('x', 'b'), ('y', 'c'), ('y', 'd'), ('z', 'e')])
>>> midx # doctest: +SKIP
MultiIndex([('x', 'a'),
('x', 'b'),
('y', 'c'),
('y', 'd'),
('z', 'e')],
)
>>> midx.is_monotonic
True
>>> midx = ps.MultiIndex.from_tuples(
... [('z', 'a'), ('z', 'b'), ('y', 'c'), ('y', 'd'), ('x', 'e')])
>>> midx # doctest: +SKIP
MultiIndex([('z', 'a'),
('z', 'b'),
('y', 'c'),
('y', 'd'),
('x', 'e')],
)
>>> midx.is_monotonic
False
"""
return self._is_monotonic("increasing")
is_monotonic_increasing = is_monotonic
@property
def is_monotonic_decreasing(self) -> bool:
"""
Return boolean if values in the object are monotonically decreasing.
.. note:: the current implementation of is_monotonic_decreasing requires to shuffle
and aggregate multiple times to check the order locally and globally,
which is potentially expensive. In case of multi-index, all data are transferred
to single node which can easily cause out-of-memory error currently.
.. note:: Disable the Spark config `spark.sql.optimizer.nestedSchemaPruning.enabled`
for multi-index if you're using pandas-on-Spark < 1.7.0 with PySpark 3.1.1.
Returns
-------
is_monotonic : bool
Examples
--------
>>> ser = ps.Series(['4/1/2018', '3/1/2018', '1/1/2018'])
>>> ser.is_monotonic_decreasing
True
>>> df = ps.DataFrame({'dates': [None, '3/1/2018', '2/1/2018', '1/1/2018']})
>>> df.dates.is_monotonic_decreasing
False
>>> df.index.is_monotonic_decreasing
False
>>> ser = ps.Series([1])
>>> ser.is_monotonic_decreasing
True
>>> ser = ps.Series([])
>>> ser.is_monotonic_decreasing
True
>>> ser.rename("a").to_frame().set_index("a").index.is_monotonic_decreasing
True
>>> ser = ps.Series([5, 4, 3, 2, 1], index=[1, 2, 3, 4, 5])
>>> ser.is_monotonic_decreasing
True
>>> ser.index.is_monotonic_decreasing
False
Support for MultiIndex
>>> midx = ps.MultiIndex.from_tuples(
... [('x', 'a'), ('x', 'b'), ('y', 'c'), ('y', 'd'), ('z', 'e')])
>>> midx # doctest: +SKIP
MultiIndex([('x', 'a'),
('x', 'b'),
('y', 'c'),
('y', 'd'),
('z', 'e')],
)
>>> midx.is_monotonic_decreasing
False
>>> midx = ps.MultiIndex.from_tuples(
... [('z', 'e'), ('z', 'd'), ('y', 'c'), ('y', 'b'), ('x', 'a')])
>>> midx # doctest: +SKIP
MultiIndex([('z', 'a'),
('z', 'b'),
('y', 'c'),
('y', 'd'),
('x', 'e')],
)
>>> midx.is_monotonic_decreasing
True
"""
return self._is_monotonic("decreasing")
def _is_locally_monotonic_spark_column(self, order: str) -> Column:
window = (
Window.partitionBy(F.col("__partition_id"))
.orderBy(NATURAL_ORDER_COLUMN_NAME)
.rowsBetween(-1, -1)
)
if order == "increasing":
return (F.col("__origin") >= F.lag(F.col("__origin"), 1).over(window)) & F.col(
"__origin"
).isNotNull()
else:
return (F.col("__origin") <= F.lag(F.col("__origin"), 1).over(window)) & F.col(
"__origin"
).isNotNull()
def _is_monotonic(self, order: str) -> bool:
assert order in ("increasing", "decreasing")
sdf = self._internal.spark_frame
sdf = (
sdf.select(
F.spark_partition_id().alias(
"__partition_id"
), # Make sure we use the same partition id in the whole job.
F.col(NATURAL_ORDER_COLUMN_NAME),
self.spark.column.alias("__origin"),
)
.select(
F.col("__partition_id"),
F.col("__origin"),
self._is_locally_monotonic_spark_column(order).alias(
"__comparison_within_partition"
),
)
.groupby(F.col("__partition_id"))
.agg(
F.min(F.col("__origin")).alias("__partition_min"),
F.max(F.col("__origin")).alias("__partition_max"),
F.min(F.coalesce(F.col("__comparison_within_partition"), SF.lit(True))).alias(
"__comparison_within_partition"
),
)
)
# Now we're windowing the aggregation results without partition specification.
# The number of rows here will be as the same of partitions, which is expected
# to be small.
window = Window.orderBy(F.col("__partition_id")).rowsBetween(-1, -1)
if order == "increasing":
comparison_col = F.col("__partition_min") >= F.lag(F.col("__partition_max"), 1).over(
window
)
else:
comparison_col = F.col("__partition_min") <= F.lag(F.col("__partition_max"), 1).over(
window
)
sdf = sdf.select(
comparison_col.alias("__comparison_between_partitions"),
F.col("__comparison_within_partition"),
)
ret = sdf.select(
F.min(F.coalesce(F.col("__comparison_between_partitions"), SF.lit(True)))
& F.min(F.coalesce(F.col("__comparison_within_partition"), SF.lit(True)))
).collect()[0][0]
if ret is None:
return True
else:
return ret
@property
def ndim(self) -> int:
"""
Return an int representing the number of array dimensions.
Return 1 for Series / Index / MultiIndex.
Examples
--------
For Series
>>> s = ps.Series([None, 1, 2, 3, 4], index=[4, 5, 2, 1, 8])
>>> s.ndim
1
For Index
>>> s.index.ndim
1
For MultiIndex
>>> midx = pd.MultiIndex([['lama', 'cow', 'falcon'],
... ['speed', 'weight', 'length']],
... [[0, 0, 0, 1, 1, 1, 2, 2, 2],
... [1, 1, 1, 1, 1, 2, 1, 2, 2]])
>>> s = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], index=midx)
>>> s.index.ndim
1
"""
return 1
def astype(self: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
"""
Cast a pandas-on-Spark object to a specified dtype ``dtype``.
Parameters
----------
dtype : data type
Use a numpy.dtype or Python type to cast entire pandas object to
the same type.
Returns
-------
casted : same type as caller
See Also
--------
to_datetime : Convert argument to datetime.
Examples
--------
>>> ser = ps.Series([1, 2], dtype='int32')
>>> ser
0 1
1 2
dtype: int32
>>> ser.astype('int64')
0 1
1 2
dtype: int64
>>> ser.rename("a").to_frame().set_index("a").index.astype('int64')
Int64Index([1, 2], dtype='int64', name='a')
"""
return self._dtype_op.astype(self, dtype)
def isin(self: IndexOpsLike, values: Sequence[Any]) -> IndexOpsLike:
"""
Check whether `values` are contained in Series or Index.
Return a boolean Series or Index showing whether each element in the Series
matches an element in the passed sequence of `values` exactly.
Parameters
----------
values : set or list-like
The sequence of values to test.
Returns
-------
isin : Series (bool dtype) or Index (bool dtype)
Examples
--------
>>> s = ps.Series(['lama', 'cow', 'lama', 'beetle', 'lama',
... 'hippo'], name='animal')
>>> s.isin(['cow', 'lama'])
0 True
1 True
2 True
3 False
4 True
5 False
Name: animal, dtype: bool
Passing a single string as ``s.isin('lama')`` will raise an error. Use
a list of one element instead:
>>> s.isin(['lama'])
0 True
1 False
2 True
3 False
4 True
5 False
Name: animal, dtype: bool
>>> s.rename("a").to_frame().set_index("a").index.isin(['lama'])
Index([True, False, True, False, True, False], dtype='object', name='a')
"""
if not is_list_like(values):
raise TypeError(
"only list-like objects are allowed to be passed"
" to isin(), you passed a [{values_type}]".format(values_type=type(values).__name__)
)
values = values.tolist() if isinstance(values, np.ndarray) else list(values)
other = [SF.lit(v) for v in values]
scol = self.spark.column.isin(other)
field = self._internal.data_fields[0].copy(
dtype=np.dtype("bool"), spark_type=BooleanType(), nullable=False
)
return self._with_new_scol(scol=F.coalesce(scol, F.lit(False)), field=field)
def isnull(self: IndexOpsLike) -> IndexOpsLike:
"""
Detect existing (non-missing) values.
Return a boolean same-sized object indicating if the values are NA.
NA values, such as None or numpy.NaN, gets mapped to True values.
Everything else gets mapped to False values. Characters such as empty strings '' or
numpy.inf are not considered NA values
(unless you set pandas.options.mode.use_inf_as_na = True).
Returns
-------
Series or Index : Mask of bool values for each element in Series
that indicates whether an element is not an NA value.
Examples
--------
>>> ser = ps.Series([5, 6, np.NaN])
>>> ser.isna() # doctest: +NORMALIZE_WHITESPACE
0 False
1 False
2 True
dtype: bool
>>> ser.rename("a").to_frame().set_index("a").index.isna()
Index([False, False, True], dtype='object', name='a')
"""
from pyspark.pandas.indexes import MultiIndex
if isinstance(self, MultiIndex):
raise NotImplementedError("isna is not defined for MultiIndex")
return self._dtype_op.isnull(self)
isna = isnull
def notnull(self: IndexOpsLike) -> IndexOpsLike:
"""
Detect existing (non-missing) values.
Return a boolean same-sized object indicating if the values are not NA.
Non-missing values get mapped to True.
Characters such as empty strings '' or numpy.inf are not considered NA values
(unless you set pandas.options.mode.use_inf_as_na = True).
NA values, such as None or numpy.NaN, get mapped to False values.
Returns
-------
Series or Index : Mask of bool values for each element in Series
that indicates whether an element is not an NA value.
Examples
--------
Show which entries in a Series are not NA.
>>> ser = ps.Series([5, 6, np.NaN])
>>> ser
0 5.0
1 6.0
2 NaN
dtype: float64
>>> ser.notna()
0 True
1 True
2 False
dtype: bool
>>> ser.rename("a").to_frame().set_index("a").index.notna()
Index([True, True, False], dtype='object', name='a')
"""
from pyspark.pandas.indexes import MultiIndex
if isinstance(self, MultiIndex):
raise NotImplementedError("notna is not defined for MultiIndex")
return (~self.isnull()).rename(self.name) # type: ignore
notna = notnull
# TODO: axis, skipna, and many arguments should be implemented.
def all(self, axis: Axis = 0) -> bool:
"""
Return whether all elements are True.
Returns True unless there at least one element within a series that is
False or equivalent (e.g. zero or empty)
Parameters
----------
axis : {0 or 'index'}, default 0
Indicate which axis or axes should be reduced.
* 0 / 'index' : reduce the index, return a Series whose index is the
original column labels.
Examples
--------
>>> ps.Series([True, True]).all()
True
>>> ps.Series([True, False]).all()
False
>>> ps.Series([0, 1]).all()
False
>>> ps.Series([1, 2, 3]).all()
True
>>> ps.Series([True, True, None]).all()
True
>>> ps.Series([True, False, None]).all()
False
>>> ps.Series([]).all()
True
>>> ps.Series([np.nan]).all()
True
>>> df = ps.Series([True, False, None]).rename("a").to_frame()
>>> df.set_index("a").index.all()
False
"""
axis = validate_axis(axis)
if axis != 0:
raise NotImplementedError('axis should be either 0 or "index" currently.')
sdf = self._internal.spark_frame.select(self.spark.column)
col = scol_for(sdf, sdf.columns[0])
# Note that we're ignoring `None`s here for now.
# any and every was added as of Spark 3.0
# ret = sdf.select(F.expr("every(CAST(`%s` AS BOOLEAN))" % sdf.columns[0])).collect()[0][0]
# Here we use min as its alternative:
ret = sdf.select(F.min(F.coalesce(col.cast("boolean"), SF.lit(True)))).collect()[0][0]
if ret is None:
return True
else:
return ret
# TODO: axis, skipna, and many arguments should be implemented.
def any(self, axis: Axis = 0) -> bool:
"""
Return whether any element is True.
Returns False unless there at least one element within a series that is
True or equivalent (e.g. non-zero or non-empty).
Parameters
----------
axis : {0 or 'index'}, default 0
Indicate which axis or axes should be reduced.
* 0 / 'index' : reduce the index, return a Series whose index is the
original column labels.
Examples
--------
>>> ps.Series([False, False]).any()
False
>>> ps.Series([True, False]).any()
True
>>> ps.Series([0, 0]).any()
False
>>> ps.Series([0, 1, 2]).any()
True
>>> ps.Series([False, False, None]).any()
False
>>> ps.Series([True, False, None]).any()
True
>>> ps.Series([]).any()
False
>>> ps.Series([np.nan]).any()
False
>>> df = ps.Series([True, False, None]).rename("a").to_frame()
>>> df.set_index("a").index.any()
True
"""
axis = validate_axis(axis)
if axis != 0:
raise NotImplementedError('axis should be either 0 or "index" currently.')
sdf = self._internal.spark_frame.select(self.spark.column)
col = scol_for(sdf, sdf.columns[0])
# Note that we're ignoring `None`s here for now.
# any and every was added as of Spark 3.0
# ret = sdf.select(F.expr("any(CAST(`%s` AS BOOLEAN))" % sdf.columns[0])).collect()[0][0]
# Here we use max as its alternative:
ret = sdf.select(F.max(F.coalesce(col.cast("boolean"), SF.lit(False)))).collect()[0][0]
if ret is None:
return False
else:
return ret
# TODO: add frep and axis parameter
def shift(
self: IndexOpsLike, periods: int = 1, fill_value: Optional[Any] = None
) -> IndexOpsLike:
"""
Shift Series/Index by desired number of periods.
.. note:: the current implementation of shift uses Spark's Window without
specifying partition specification. This leads to move all data into
single partition in single machine and could cause serious
performance degradation. Avoid this method against very large dataset.
Parameters
----------
periods : int
Number of periods to shift. Can be positive or negative.
fill_value : object, optional
The scalar value to use for newly introduced missing values.
The default depends on the dtype of self. For numeric data, np.nan is used.
Returns
-------
Copy of input Series/Index, shifted.
Examples
--------
>>> df = ps.DataFrame({'Col1': [10, 20, 15, 30, 45],
... 'Col2': [13, 23, 18, 33, 48],
... 'Col3': [17, 27, 22, 37, 52]},
... columns=['Col1', 'Col2', 'Col3'])
>>> df.Col1.shift(periods=3)
0 NaN
1 NaN
2 NaN
3 10.0
4 20.0
Name: Col1, dtype: float64
>>> df.Col2.shift(periods=3, fill_value=0)
0 0
1 0
2 0
3 13
4 23
Name: Col2, dtype: int64
>>> df.index.shift(periods=3, fill_value=0)
Int64Index([0, 0, 0, 0, 1], dtype='int64')
"""
return self._shift(periods, fill_value).spark.analyzed
def _shift(
self: IndexOpsLike,
periods: int,
fill_value: Any,
*,
part_cols: Sequence["ColumnOrName"] = ()
) -> IndexOpsLike:
if not isinstance(periods, int):
raise TypeError("periods should be an int; however, got [%s]" % type(periods).__name__)
col = self.spark.column
window = (
Window.partitionBy(*part_cols)
.orderBy(NATURAL_ORDER_COLUMN_NAME)
.rowsBetween(-periods, -periods)
)
lag_col = F.lag(col, periods).over(window)
col = F.when(lag_col.isNull() | F.isnan(lag_col), fill_value).otherwise(lag_col)
return self._with_new_scol(col, field=self._internal.data_fields[0].copy(nullable=True))
# TODO: Update Documentation for Bins Parameter when its supported
def value_counts(
self,
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
bins: None = None,
dropna: bool = True,
) -> "Series":
"""
Return a Series containing counts of unique values.
The resulting object will be in descending order so that the
first element is the most frequently-occurring element.
Excludes NA values by default.
Parameters
----------
normalize : boolean, default False
If True then the object returned will contain the relative
frequencies of the unique values.
sort : boolean, default True
Sort by values.
ascending : boolean, default False
Sort in ascending order.
bins : Not Yet Supported
dropna : boolean, default True
Don't include counts of NaN.
Returns
-------
counts : Series
See Also
--------
Series.count: Number of non-NA elements in a Series.
Examples
--------
For Series
>>> df = ps.DataFrame({'x':[0, 0, 1, 1, 1, np.nan]})
>>> df.x.value_counts() # doctest: +NORMALIZE_WHITESPACE
1.0 3
0.0 2
Name: x, dtype: int64
With `normalize` set to `True`, returns the relative frequency by
dividing all values by the sum of values.
>>> df.x.value_counts(normalize=True) # doctest: +NORMALIZE_WHITESPACE
1.0 0.6
0.0 0.4
Name: x, dtype: float64
**dropna**
With `dropna` set to `False` we can also see NaN index values.
>>> df.x.value_counts(dropna=False) # doctest: +NORMALIZE_WHITESPACE
1.0 3
0.0 2
NaN 1
Name: x, dtype: int64
For Index
>>> idx = ps.Index([3, 1, 2, 3, 4, np.nan])
>>> idx
Float64Index([3.0, 1.0, 2.0, 3.0, 4.0, nan], dtype='float64')
>>> idx.value_counts().sort_index()
1.0 1
2.0 1
3.0 2
4.0 1
dtype: int64
**sort**
With `sort` set to `False`, the result wouldn't be sorted by number of count.
>>> idx.value_counts(sort=True).sort_index()
1.0 1
2.0 1
3.0 2
4.0 1
dtype: int64
**normalize**
With `normalize` set to `True`, returns the relative frequency by
dividing all values by the sum of values.
>>> idx.value_counts(normalize=True).sort_index()
1.0 0.2
2.0 0.2
3.0 0.4
4.0 0.2
dtype: float64
**dropna**
With `dropna` set to `False` we can also see NaN index values.
>>> idx.value_counts(dropna=False).sort_index() # doctest: +SKIP
1.0 1
2.0 1
3.0 2
4.0 1
NaN 1
dtype: int64
For MultiIndex.
>>> midx = pd.MultiIndex([['lama', 'cow', 'falcon'],
... ['speed', 'weight', 'length']],
... [[0, 0, 0, 1, 1, 1, 2, 2, 2],
... [1, 1, 1, 1, 1, 2, 1, 2, 2]])
>>> s = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], index=midx)
>>> s.index # doctest: +SKIP
MultiIndex([( 'lama', 'weight'),
( 'lama', 'weight'),
( 'lama', 'weight'),
( 'cow', 'weight'),
( 'cow', 'weight'),
( 'cow', 'length'),
('falcon', 'weight'),
('falcon', 'length'),
('falcon', 'length')],
)
>>> s.index.value_counts().sort_index()
(cow, length) 1
(cow, weight) 2
(falcon, length) 2
(falcon, weight) 1
(lama, weight) 3
dtype: int64
>>> s.index.value_counts(normalize=True).sort_index()
(cow, length) 0.111111
(cow, weight) 0.222222
(falcon, length) 0.222222
(falcon, weight) 0.111111
(lama, weight) 0.333333
dtype: float64
If Index has name, keep the name up.
>>> idx = ps.Index([0, 0, 0, 1, 1, 2, 3], name='pandas-on-Spark')
>>> idx.value_counts().sort_index()
0 3
1 2
2 1
3 1
Name: pandas-on-Spark, dtype: int64
"""
from pyspark.pandas.series import first_series
if bins is not None:
raise NotImplementedError("value_counts currently does not support bins")
if dropna:
sdf_dropna = self._internal.spark_frame.select(self.spark.column).dropna()
else:
sdf_dropna = self._internal.spark_frame.select(self.spark.column)
index_name = SPARK_DEFAULT_INDEX_NAME
column_name = self._internal.data_spark_column_names[0]
sdf = sdf_dropna.groupby(scol_for(sdf_dropna, column_name).alias(index_name)).count()
if sort:
if ascending:
sdf = sdf.orderBy(F.col("count"))
else:
sdf = sdf.orderBy(F.col("count").desc())
if normalize:
sum = sdf_dropna.count()
sdf = sdf.withColumn("count", F.col("count") / SF.lit(sum))
internal = InternalFrame(
spark_frame=sdf,
index_spark_columns=[scol_for(sdf, index_name)],
column_labels=self._internal.column_labels,
data_spark_columns=[scol_for(sdf, "count")],
column_label_names=self._internal.column_label_names,
)
return first_series(DataFrame(internal))
def nunique(self, dropna: bool = True, approx: bool = False, rsd: float = 0.05) -> int:
"""
Return number of unique elements in the object.
Excludes NA values by default.
Parameters
----------
dropna : bool, default True
Don’t include NaN in the count.
approx: bool, default False
If False, will use the exact algorithm and return the exact number of unique.
If True, it uses the HyperLogLog approximate algorithm, which is significantly faster
for large amount of data.
Note: This parameter is specific to pandas-on-Spark and is not found in pandas.
rsd: float, default 0.05
Maximum estimation error allowed in the HyperLogLog algorithm.
Note: Just like ``approx`` this parameter is specific to pandas-on-Spark.
Returns
-------
int
See Also
--------
DataFrame.nunique: Method nunique for DataFrame.
Series.count: Count non-NA/null observations in the Series.
Examples
--------
>>> ps.Series([1, 2, 3, np.nan]).nunique()
3
>>> ps.Series([1, 2, 3, np.nan]).nunique(dropna=False)
4
On big data, we recommend using the approximate algorithm to speed up this function.
The result will be very close to the exact unique count.
>>> ps.Series([1, 2, 3, np.nan]).nunique(approx=True)
3
>>> idx = ps.Index([1, 1, 2, None])
>>> idx
Float64Index([1.0, 1.0, 2.0, nan], dtype='float64')
>>> idx.nunique()
2
>>> idx.nunique(dropna=False)
3
"""
res = self._internal.spark_frame.select([self._nunique(dropna, approx, rsd)])
return res.collect()[0][0]
def _nunique(self, dropna: bool = True, approx: bool = False, rsd: float = 0.05) -> Column:
colname = self._internal.data_spark_column_names[0]
count_fn = cast(
Callable[[Column], Column],
partial(F.approx_count_distinct, rsd=rsd) if approx else F.countDistinct,
)
if dropna:
return count_fn(self.spark.column).alias(colname)
else:
return (
count_fn(self.spark.column)
+ F.when(
F.count(F.when(self.spark.column.isNull(), 1).otherwise(None)) >= 1, 1
).otherwise(0)
).alias(colname)
def take(self: IndexOpsLike, indices: Sequence[int]) -> IndexOpsLike:
"""
Return the elements in the given *positional* indices along an axis.
This means that we are not indexing according to actual values in
the index attribute of the object. We are indexing according to the
actual position of the element in the object.
Parameters
----------
indices : array-like
An array of ints indicating which positions to take.
Returns
-------
taken : same type as caller
An array-like containing the elements taken from the object.
See Also
--------
DataFrame.loc : Select a subset of a DataFrame by labels.
DataFrame.iloc : Select a subset of a DataFrame by positions.
numpy.take : Take elements from an array along an axis.
Examples
--------
Series
>>> psser = ps.Series([100, 200, 300, 400, 500])
>>> psser
0 100
1 200
2 300
3 400
4 500
dtype: int64
>>> psser.take([0, 2, 4]).sort_index()
0 100
2 300
4 500
dtype: int64
Index
>>> psidx = ps.Index([100, 200, 300, 400, 500])
>>> psidx
Int64Index([100, 200, 300, 400, 500], dtype='int64')
>>> psidx.take([0, 2, 4]).sort_values()
Int64Index([100, 300, 500], dtype='int64')
MultiIndex
>>> psmidx = ps.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("x", "c")])
>>> psmidx # doctest: +SKIP
MultiIndex([('x', 'a'),
('x', 'b'),
('x', 'c')],
)
>>> psmidx.take([0, 2]) # doctest: +SKIP
MultiIndex([('x', 'a'),
('x', 'c')],
)
"""
if not is_list_like(indices) or isinstance(indices, (dict, set)):
raise TypeError("`indices` must be a list-like except dict or set")
if isinstance(self, ps.Series):
return cast(IndexOpsLike, self.iloc[indices])
else:
return cast(IndexOpsLike, self._psdf.iloc[indices].index)
def factorize(
self: IndexOpsLike, sort: bool = True, na_sentinel: Optional[int] = -1
) -> Tuple[IndexOpsLike, pd.Index]:
"""
Encode the object as an enumerated type or categorical variable.
This method is useful for obtaining a numeric representation of an
array when all that matters is identifying distinct values.
Parameters
----------
sort : bool, default True
na_sentinel : int or None, default -1
Value to mark "not found". If None, will not drop the NaN
from the uniques of the values.
Returns
-------
codes : Series or Index
A Series or Index that's an indexer into `uniques`.
``uniques.take(codes)`` will have the same values as `values`.
uniques : pd.Index
The unique valid values.
.. note ::
Even if there's a missing value in `values`, `uniques` will
*not* contain an entry for it.
Examples
--------
>>> psser = ps.Series(['b', None, 'a', 'c', 'b'])
>>> codes, uniques = psser.factorize()
>>> codes
0 1
1 -1
2 0
3 2
4 1
dtype: int32
>>> uniques
Index(['a', 'b', 'c'], dtype='object')
>>> codes, uniques = psser.factorize(na_sentinel=None)
>>> codes
0 1
1 3
2 0
3 2
4 1
dtype: int32
>>> uniques
Index(['a', 'b', 'c', None], dtype='object')
>>> codes, uniques = psser.factorize(na_sentinel=-2)
>>> codes
0 1
1 -2
2 0
3 2
4 1
dtype: int32
>>> uniques
Index(['a', 'b', 'c'], dtype='object')
For Index:
>>> psidx = ps.Index(['b', None, 'a', 'c', 'b'])
>>> codes, uniques = psidx.factorize()
>>> codes
Int64Index([1, -1, 0, 2, 1], dtype='int64')
>>> uniques
Index(['a', 'b', 'c'], dtype='object')
"""
from pyspark.pandas.series import first_series
assert (na_sentinel is None) or isinstance(na_sentinel, int)
assert sort is True
if isinstance(self.dtype, CategoricalDtype):
categories = self.dtype.categories
if len(categories) == 0:
scol = SF.lit(None)
else:
kvs = list(
chain(
*[
(SF.lit(code), SF.lit(category))
for code, category in enumerate(categories)
]
)
)
map_scol = F.create_map(*kvs)
scol = map_scol[self.spark.column]
codes, uniques = self._with_new_scol(
scol.alias(self._internal.data_spark_column_names[0])
).factorize(na_sentinel=na_sentinel)
return codes, uniques.astype(self.dtype)
uniq_sdf = self._internal.spark_frame.select(self.spark.column).distinct()
# Check number of uniques and constructs sorted `uniques_list`
max_compute_count = get_option("compute.max_rows")
if max_compute_count is not None:
uniq_pdf = uniq_sdf.limit(max_compute_count + 1).toPandas()
if len(uniq_pdf) > max_compute_count:
raise ValueError(
"Current Series has more then {0} unique values. "
"Please set 'compute.max_rows' by using 'pyspark.pandas.config.set_option' "
"to more than {0} rows. Note that, before changing the "
"'compute.max_rows', this operation is considerably expensive.".format(
max_compute_count
)
)
else:
uniq_pdf = uniq_sdf.toPandas()
# pandas takes both NaN and null in Spark to np.nan, so de-duplication is required
uniq_series = first_series(uniq_pdf).drop_duplicates()
uniques_list = uniq_series.tolist()
uniques_list = sorted(uniques_list, key=lambda x: (pd.isna(x), x))
# Constructs `unique_to_code` mapping non-na unique to code
unique_to_code = {}
if na_sentinel is not None:
na_sentinel_code = na_sentinel
code = 0
for unique in uniques_list:
if pd.isna(unique):
if na_sentinel is None:
na_sentinel_code = code
else:
unique_to_code[unique] = code
code += 1
kvs = list(
chain(*([(SF.lit(unique), SF.lit(code)) for unique, code in unique_to_code.items()]))
)
if len(kvs) == 0: # uniques are all missing values
new_scol = SF.lit(na_sentinel_code)
else:
map_scol = F.create_map(*kvs)
null_scol = F.when(self.isnull().spark.column, SF.lit(na_sentinel_code))
new_scol = null_scol.otherwise(map_scol[self.spark.column])
codes = self._with_new_scol(new_scol.alias(self._internal.data_spark_column_names[0]))
if na_sentinel is not None:
# Drops the NaN from the uniques of the values
uniques_list = [x for x in uniques_list if not pd.isna(x)]
uniques = pd.Index(uniques_list)
return codes, uniques
def _test() -> None:
import os
import doctest
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.base
os.chdir(os.environ["SPARK_HOME"])
globs = pyspark.pandas.base.__dict__.copy()
globs["ps"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]").appName("pyspark.pandas.base tests").getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.base,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)
if __name__ == "__main__":
_test()