Module liquer.ext.dataframe_batches
Expand source code
from liquer import *
import pandas as pd
from liquer.store import get_store
from liquer.state_types import (
data_characteristics,
state_types_registry,
StateType,
register_state_type,
)
from liquer.constants import mimetype_from_extension
import json
@command
def concat_batches(idf, max_batches=None, context=None):
"""Concatenates an iterator of dataframes (batches) into a single dataframe
A maximum limit of batches to be processed can be specified with max_batches.
By default all the batches will be concatenated.
"""
context = get_context(context)
context.info(f"Concatenate batches")
if isinstance(idf, pd.DataFrame):
context.info(f"Single dataframe received")
idf = [idf]
batch_number = 0
if max_batches in ("0", "", None):
max_batches = 0
else:
max_batches = int(max_batches)
buffer = pd.DataFrame()
for df in context.progress_iter(idf):
if not len(df):
continue
context.info(
f"Receiving dataframe with {len(df)} rows and {len(df.columns)} columns"
)
if len(buffer) > 0:
if len(df.columns) != len(buffer.columns):
context.warning(
f"Number of columns in the batches differs - before:{len(buffer.columns)}, now:{len(df.columns)}"
)
batch_number += 1
if max_batches:
context.info(f"Concatenate batch {batch_number}/{max_batches}")
else:
context.info(f"Concatenate batch {batch_number}")
buffer = buffer.append(df, ignore_index=True)
if max_batches and batch_number >= max_batches:
context.info(f"Maximum number of batches reached")
break
return buffer
@command(volatile=True, cache=False)
def repackage_batches(idf, batch_size=1024, max_batches=0, context=None):
"""Repackages an iterator of dataframes (batches) into fixed-length batches.
Results in an iterator of dataframes where each dataframe (with the exception of the last one)
has a the batch_size number of rows.
Note: Since the result is an iterator, it is volatile and can't be cached.
A maximum limit of batches to be processed can be specified with max_batches.
By default all the batches will be repackaged.
"""
context = get_context(context)
context.info(f"Repackage batches")
if isinstance(idf, pd.DataFrame):
context.info(f"Single dataframe received")
idf = [idf]
buffer = pd.DataFrame()
if max_batches in ("0", "", None):
max_batches = 0
else:
max_batches = int(max_batches)
batch_number = 0
for df in context.progress_iter(idf):
if not len(df):
continue
context.info(
f"Receiving dataframe with {len(df)} rows and {len(df.columns)} columns"
)
if len(buffer) > 0:
if len(df.columns) != len(buffer.columns):
context.warning(
f"Number of columns in the batches differs - before:{len(buffer.columns)}, now:{len(df.columns)}"
)
buffer = buffer.append(df, ignore_index=True)
while len(buffer) >= batch_size:
batch_number += 1
if max_batches:
context.info(f"Yield batch {batch_number}/{max_batches}")
else:
context.info(f"Yield batch {batch_number}")
result = buffer.iloc[:batch_size, :]
buffer = buffer.iloc[batch_size:, :]
yield result
if max_batches and batch_number >= max_batches:
context.info(f"Maximum number of batches reached")
return
while len(buffer) >= batch_size:
batch_number += 1
if max_batches:
context.info(f"Yield batch {batch_number}/{max_batches} (finishing)")
else:
context.info(f"Yield batch {batch_number} (finishing)")
result = buffer.iloc[:batch_size, :]
buffer = buffer.iloc[batch_size:, :]
yield result
if max_batches and batch_number >= max_batches:
context.info(f"Maximum number of batches reached")
return
if len(buffer):
batch_number += 1
if max_batches:
context.info(f"Yield batch {batch_number}/{max_batches} (last)")
else:
context.info(f"Yield batch {batch_number} (last)")
yield buffer
class StoredDataframeIterator(object):
def __init__(
self,
key,
item_keys=None,
extension="parquet",
number_format="%04d",
batch_number=0,
store=None,
):
self.key = key
self.item_keys = item_keys or []
self.extension = extension
self.number_format = number_format
self.batch_number = batch_number
self.state_type = state_types_registry().get("dataframe")
if store is None:
store = get_store()
self.store = store
def to_dict(self, with_batch_number=False):
d = dict(
key=self.key,
item_keys=self.item_keys[:],
extension=self.extension,
number_format=self.number_format,
)
if with_batch_number:
d["batch_number"] = self.batch_number
return d
@classmethod
def from_dict(cls, d):
return cls(
d["key"],
item_keys=d.get("item_keys", []),
extension=d.get("extension"),
number_format=d.get("number_format"),
batch_number=d.get("batch_number", 0),
)
def copy(self):
return StoredDataframeIterator(
self.key,
item_keys=self.item_keys,
extension=self.extension,
number_format=self.number_format,
batch_number=self.batch_number,
store=self.store,
)
def _key_to_value(self, key):
if not self.store.contains(key):
raise Exception(f"Batch {self.batch_number} failure: '{key}' not in store")
if self.store.is_dir(key):
raise Exception(
f"Batch {self.batch_number} failure: '{key}' is a directory"
)
metadata = self.store.get_metadata(key)
if metadata is None:
print(f"WARNING: Batch {self.batch_number}, key '{key}' missing metadata")
b = self.store.get_bytes(key)
if b is None:
raise Exception(f"Batch {self.batch_number} failure: '{key}' has no data")
assert type(b) == bytes
v = key.split(".")
extension = self.extension if len(v) <= 1 else v[-1]
df = self.state_type.from_bytes(b, extension=extension)
return df
def new_key(self):
batch_number = len(self.item_keys) + 1
if self.number_format is None:
name = str(batch_number)
else:
name = self.number_format % batch_number
if self.extension in (None, ""):
key = f"{self.key}/{name}"
else:
key = f"{self.key}/{name}.{self.extension}"
if key in self.item_keys:
raise Exception(f"New key '{key}' already exists")
return key
def append(self, df):
key = self.new_key()
store = self.store
b, mimetype = self.state_type.as_bytes(df, self.extension)
dc = data_characteristics(df)
assert dc["type_identifier"] == "dataframe"
metadata = dict(
type_identifier=dc.get("type_identifier"),
data_characteristics=data_characteristics(df),
)
store.store(key, b, metadata)
self.item_keys.append(key)
def rewind(self):
self.batch_number = 0
return self
def __len__(self):
return len(self.item_keys)
def __iter__(self):
return self.copy().rewind()
def __next__(self):
if len(self.item_keys) > self.batch_number:
key = self.item_keys[self.batch_number]
self.batch_number += 1
return self._key_to_value(key)
raise StopIteration
def __str__(self):
return f"Dataframe iterator stored in {self.key}"
def __repr__(self):
return f"StoredDataframeIterator('{self.key}')"
class StoredDataframeIteratorStateType(StateType):
def identifier(self):
return "dataframe_iterator"
def default_extension(self):
return "idf"
def is_type_of(self, data):
return isinstance(data, StoredDataframeIterator)
def as_bytes(self, data, extension=None):
if extension is None:
extension = self.default_extension()
assert self.is_type_of(data)
if extension in ["idf", "json"]:
mimetype = mimetype_from_extension("json")
d = data.to_dict()
return json.dumps(d).encode("utf-8"), mimetype
else:
raise Exception(
f"Serialization: file extension {extension} is not supported by stored dataframe iterator type."
)
def from_bytes(self, b: bytes, extension=None):
if extension is None:
extension = self.default_extension()
if extension in ["idf", "json"]:
return StoredDataframeIterator.from_dict(json.loads(b.decode("utf-8")))
raise Exception(
f"Deserialization: file extension {extension} is not supported by dataframe type."
)
def copy(self, data):
return data.copy()
def data_characteristics(self, data):
return dict(
description=f"Dataframe iterator with {len(data.item_keys)} batches."
)
STORED_DATAFRAME_ITERATOR_STATE_TYPE = StoredDataframeIteratorStateType()
register_state_type(StoredDataframeIterator, STORED_DATAFRAME_ITERATOR_STATE_TYPE)
def _store_batches(idf, key, max_batches=None, context=None):
"""Store iterator of dataframes (batches) in a store.
The key specifies a directory in the store where the items will be stored.
Helper function yielding StoredDataframeIterator object and dataframes.
"""
context = get_context(context)
context.info(f"Store iterator")
batch_number = 0
if max_batches in ("0", "", None):
max_batches = 0
else:
max_batches = int(max_batches)
store = context.store()
if store.contains(key):
if store.is_dir(key):
context.info(f"Cleaning {key}")
for x in store.listdir_keys(key):
context.info(f"Remove {x}")
store.remove(x)
else:
raise Exception(
f"Can't store the iterator in '{key}'. The key exists and it is not a directory."
)
sdfi_key = store.join_key(key, "dataframe_iterator.json")
sdfi = StoredDataframeIterator(key)
for df in context.progress_iter(idf):
if not len(df):
continue
batch_number += 1
if max_batches:
context.info(f"Storing batch {batch_number}/{max_batches}")
else:
context.info(f"Storing batch {batch_number}")
sdfi.append(df)
context.store_data(sdfi_key, sdfi)
yield sdfi, df
# sdfi_bytes, mimetype = STORED_DATAFRAME_ITERATOR_STATE_TYPE.as_bytes(sdfi)
# dc = data_characteristics(sdfi)
# sdfi_metadata = context.metadata()
# sdfi_metadata.update(
# dict(type_identifier=dc["type_identifier"], data_characteristics=dc)
# )
# store.store(sdfi_key, sdfi_bytes, sdfi_metadata)
if max_batches and batch_number > max_batches:
context.info(f"Maximum number of batches reached")
break
@command
def store_batches(idf, key, max_batches=None, context=None):
"""Store iterator of dataframes (batches) in a store.
The key specifies a directory in the store where the items will be stored.
Results in a StoredDataframeIterator object after all batches have been processed.
This object can be serialized or stored in the object.
After each iteration step, the StoredDataframeIterator is stored as well in key/dataframe_iterator.json as a side-effect.
Thus for long-running iterations, the partial data is stored even if the evaluation does not finish.
"""
context = get_context(context)
sdfi = StoredDataframeIterator(key)
for sdfi, df in _store_batches(idf, key, max_batches=max_batches, context=context):
pass
return sdfi
@command(cache=False, volatile=True)
def store_batches_pass_through(idf, key, max_batches=None, context=None):
"""Store iterator of dataframes (batches) in a store.
The key specifies a directory in the store where the items will be stored.
Unlike store_batches, this immediately yields the dataframes,
thus the result is a volatile iterator which can not be stored in cache.
The StoredDataframeIterator is, however, stored in key/dataframe_iterator.json as a side-effect after each iteration step.
"""
context = get_context(context)
for sdfi, df in _store_batches(idf, key, max_batches=max_batches, context=context):
yield df
Functions
def concat_batches(idf, max_batches=None, context=None)
-
Concatenates an iterator of dataframes (batches) into a single dataframe A maximum limit of batches to be processed can be specified with max_batches. By default all the batches will be concatenated.
Expand source code
@command def concat_batches(idf, max_batches=None, context=None): """Concatenates an iterator of dataframes (batches) into a single dataframe A maximum limit of batches to be processed can be specified with max_batches. By default all the batches will be concatenated. """ context = get_context(context) context.info(f"Concatenate batches") if isinstance(idf, pd.DataFrame): context.info(f"Single dataframe received") idf = [idf] batch_number = 0 if max_batches in ("0", "", None): max_batches = 0 else: max_batches = int(max_batches) buffer = pd.DataFrame() for df in context.progress_iter(idf): if not len(df): continue context.info( f"Receiving dataframe with {len(df)} rows and {len(df.columns)} columns" ) if len(buffer) > 0: if len(df.columns) != len(buffer.columns): context.warning( f"Number of columns in the batches differs - before:{len(buffer.columns)}, now:{len(df.columns)}" ) batch_number += 1 if max_batches: context.info(f"Concatenate batch {batch_number}/{max_batches}") else: context.info(f"Concatenate batch {batch_number}") buffer = buffer.append(df, ignore_index=True) if max_batches and batch_number >= max_batches: context.info(f"Maximum number of batches reached") break return buffer
def repackage_batches(idf, batch_size=1024, max_batches=0, context=None)
-
Repackages an iterator of dataframes (batches) into fixed-length batches. Results in an iterator of dataframes where each dataframe (with the exception of the last one) has a the batch_size number of rows. Note: Since the result is an iterator, it is volatile and can't be cached. A maximum limit of batches to be processed can be specified with max_batches. By default all the batches will be repackaged.
Expand source code
@command(volatile=True, cache=False) def repackage_batches(idf, batch_size=1024, max_batches=0, context=None): """Repackages an iterator of dataframes (batches) into fixed-length batches. Results in an iterator of dataframes where each dataframe (with the exception of the last one) has a the batch_size number of rows. Note: Since the result is an iterator, it is volatile and can't be cached. A maximum limit of batches to be processed can be specified with max_batches. By default all the batches will be repackaged. """ context = get_context(context) context.info(f"Repackage batches") if isinstance(idf, pd.DataFrame): context.info(f"Single dataframe received") idf = [idf] buffer = pd.DataFrame() if max_batches in ("0", "", None): max_batches = 0 else: max_batches = int(max_batches) batch_number = 0 for df in context.progress_iter(idf): if not len(df): continue context.info( f"Receiving dataframe with {len(df)} rows and {len(df.columns)} columns" ) if len(buffer) > 0: if len(df.columns) != len(buffer.columns): context.warning( f"Number of columns in the batches differs - before:{len(buffer.columns)}, now:{len(df.columns)}" ) buffer = buffer.append(df, ignore_index=True) while len(buffer) >= batch_size: batch_number += 1 if max_batches: context.info(f"Yield batch {batch_number}/{max_batches}") else: context.info(f"Yield batch {batch_number}") result = buffer.iloc[:batch_size, :] buffer = buffer.iloc[batch_size:, :] yield result if max_batches and batch_number >= max_batches: context.info(f"Maximum number of batches reached") return while len(buffer) >= batch_size: batch_number += 1 if max_batches: context.info(f"Yield batch {batch_number}/{max_batches} (finishing)") else: context.info(f"Yield batch {batch_number} (finishing)") result = buffer.iloc[:batch_size, :] buffer = buffer.iloc[batch_size:, :] yield result if max_batches and batch_number >= max_batches: context.info(f"Maximum number of batches reached") return if len(buffer): batch_number += 1 if max_batches: context.info(f"Yield batch {batch_number}/{max_batches} (last)") else: context.info(f"Yield batch {batch_number} (last)") yield buffer
def store_batches(idf, key, max_batches=None, context=None)
-
Store iterator of dataframes (batches) in a store. The key specifies a directory in the store where the items will be stored. Results in a StoredDataframeIterator object after all batches have been processed. This object can be serialized or stored in the object. After each iteration step, the StoredDataframeIterator is stored as well in key/dataframe_iterator.json as a side-effect. Thus for long-running iterations, the partial data is stored even if the evaluation does not finish.
Expand source code
@command def store_batches(idf, key, max_batches=None, context=None): """Store iterator of dataframes (batches) in a store. The key specifies a directory in the store where the items will be stored. Results in a StoredDataframeIterator object after all batches have been processed. This object can be serialized or stored in the object. After each iteration step, the StoredDataframeIterator is stored as well in key/dataframe_iterator.json as a side-effect. Thus for long-running iterations, the partial data is stored even if the evaluation does not finish. """ context = get_context(context) sdfi = StoredDataframeIterator(key) for sdfi, df in _store_batches(idf, key, max_batches=max_batches, context=context): pass return sdfi
def store_batches_pass_through(idf, key, max_batches=None, context=None)
-
Store iterator of dataframes (batches) in a store. The key specifies a directory in the store where the items will be stored. Unlike store_batches, this immediately yields the dataframes, thus the result is a volatile iterator which can not be stored in cache. The StoredDataframeIterator is, however, stored in key/dataframe_iterator.json as a side-effect after each iteration step.
Expand source code
@command(cache=False, volatile=True) def store_batches_pass_through(idf, key, max_batches=None, context=None): """Store iterator of dataframes (batches) in a store. The key specifies a directory in the store where the items will be stored. Unlike store_batches, this immediately yields the dataframes, thus the result is a volatile iterator which can not be stored in cache. The StoredDataframeIterator is, however, stored in key/dataframe_iterator.json as a side-effect after each iteration step. """ context = get_context(context) for sdfi, df in _store_batches(idf, key, max_batches=max_batches, context=context): yield df
Classes
class StoredDataframeIterator (key, item_keys=None, extension='parquet', number_format='%04d', batch_number=0, store=None)
-
Expand source code
class StoredDataframeIterator(object): def __init__( self, key, item_keys=None, extension="parquet", number_format="%04d", batch_number=0, store=None, ): self.key = key self.item_keys = item_keys or [] self.extension = extension self.number_format = number_format self.batch_number = batch_number self.state_type = state_types_registry().get("dataframe") if store is None: store = get_store() self.store = store def to_dict(self, with_batch_number=False): d = dict( key=self.key, item_keys=self.item_keys[:], extension=self.extension, number_format=self.number_format, ) if with_batch_number: d["batch_number"] = self.batch_number return d @classmethod def from_dict(cls, d): return cls( d["key"], item_keys=d.get("item_keys", []), extension=d.get("extension"), number_format=d.get("number_format"), batch_number=d.get("batch_number", 0), ) def copy(self): return StoredDataframeIterator( self.key, item_keys=self.item_keys, extension=self.extension, number_format=self.number_format, batch_number=self.batch_number, store=self.store, ) def _key_to_value(self, key): if not self.store.contains(key): raise Exception(f"Batch {self.batch_number} failure: '{key}' not in store") if self.store.is_dir(key): raise Exception( f"Batch {self.batch_number} failure: '{key}' is a directory" ) metadata = self.store.get_metadata(key) if metadata is None: print(f"WARNING: Batch {self.batch_number}, key '{key}' missing metadata") b = self.store.get_bytes(key) if b is None: raise Exception(f"Batch {self.batch_number} failure: '{key}' has no data") assert type(b) == bytes v = key.split(".") extension = self.extension if len(v) <= 1 else v[-1] df = self.state_type.from_bytes(b, extension=extension) return df def new_key(self): batch_number = len(self.item_keys) + 1 if self.number_format is None: name = str(batch_number) else: name = self.number_format % batch_number if self.extension in (None, ""): key = f"{self.key}/{name}" else: key = f"{self.key}/{name}.{self.extension}" if key in self.item_keys: raise Exception(f"New key '{key}' already exists") return key def append(self, df): key = self.new_key() store = self.store b, mimetype = self.state_type.as_bytes(df, self.extension) dc = data_characteristics(df) assert dc["type_identifier"] == "dataframe" metadata = dict( type_identifier=dc.get("type_identifier"), data_characteristics=data_characteristics(df), ) store.store(key, b, metadata) self.item_keys.append(key) def rewind(self): self.batch_number = 0 return self def __len__(self): return len(self.item_keys) def __iter__(self): return self.copy().rewind() def __next__(self): if len(self.item_keys) > self.batch_number: key = self.item_keys[self.batch_number] self.batch_number += 1 return self._key_to_value(key) raise StopIteration def __str__(self): return f"Dataframe iterator stored in {self.key}" def __repr__(self): return f"StoredDataframeIterator('{self.key}')"
Static methods
def from_dict(d)
-
Expand source code
@classmethod def from_dict(cls, d): return cls( d["key"], item_keys=d.get("item_keys", []), extension=d.get("extension"), number_format=d.get("number_format"), batch_number=d.get("batch_number", 0), )
Methods
def append(self, df)
-
Expand source code
def append(self, df): key = self.new_key() store = self.store b, mimetype = self.state_type.as_bytes(df, self.extension) dc = data_characteristics(df) assert dc["type_identifier"] == "dataframe" metadata = dict( type_identifier=dc.get("type_identifier"), data_characteristics=data_characteristics(df), ) store.store(key, b, metadata) self.item_keys.append(key)
def copy(self)
-
Expand source code
def copy(self): return StoredDataframeIterator( self.key, item_keys=self.item_keys, extension=self.extension, number_format=self.number_format, batch_number=self.batch_number, store=self.store, )
def new_key(self)
-
Expand source code
def new_key(self): batch_number = len(self.item_keys) + 1 if self.number_format is None: name = str(batch_number) else: name = self.number_format % batch_number if self.extension in (None, ""): key = f"{self.key}/{name}" else: key = f"{self.key}/{name}.{self.extension}" if key in self.item_keys: raise Exception(f"New key '{key}' already exists") return key
def rewind(self)
-
Expand source code
def rewind(self): self.batch_number = 0 return self
def to_dict(self, with_batch_number=False)
-
Expand source code
def to_dict(self, with_batch_number=False): d = dict( key=self.key, item_keys=self.item_keys[:], extension=self.extension, number_format=self.number_format, ) if with_batch_number: d["batch_number"] = self.batch_number return d
class StoredDataframeIteratorStateType
-
Abstract state type basis
Expand source code
class StoredDataframeIteratorStateType(StateType): def identifier(self): return "dataframe_iterator" def default_extension(self): return "idf" def is_type_of(self, data): return isinstance(data, StoredDataframeIterator) def as_bytes(self, data, extension=None): if extension is None: extension = self.default_extension() assert self.is_type_of(data) if extension in ["idf", "json"]: mimetype = mimetype_from_extension("json") d = data.to_dict() return json.dumps(d).encode("utf-8"), mimetype else: raise Exception( f"Serialization: file extension {extension} is not supported by stored dataframe iterator type." ) def from_bytes(self, b: bytes, extension=None): if extension is None: extension = self.default_extension() if extension in ["idf", "json"]: return StoredDataframeIterator.from_dict(json.loads(b.decode("utf-8"))) raise Exception( f"Deserialization: file extension {extension} is not supported by dataframe type." ) def copy(self, data): return data.copy() def data_characteristics(self, data): return dict( description=f"Dataframe iterator with {len(data.item_keys)} batches." )
Ancestors
Inherited members