Time Series Fundamentals in TDAG
TDAG's core revolves around the concept of time series, encapsulated in the powerful TimeSerie
class. Each TimeSerie
object manages the process of updating data, reflecting the most current available information. It interacts seamlessly with databases and maintains a robust internal state, supporting efficient data pipelines.
Understanding the Update Process
In TDAG, updating involves:
- Updating DataRepositories: Stores the generated data from the update process.
- Updating ORM: Manages the internal state of the data and the pipeline.
The following diagram illustrates these interactions:
flowchart TD
subgraph TDAG_System[TDAG Framework]
TimeSerieConstructor["TimeSerie.__init__(*args, **kwargs)"] -->|Defines| TimeSerie["TimeSerie.update(latest_value)"]
end
subgraph DataRepositories["Data Repositories"]
DB["TimeScaleDB"]
DataLake["DataLake"]
end
subgraph ORM["TDAG ORM"]
LocalTimeSerie["LocalTimeSerie (local_hash_id)"] -->|View of Table| DynamicTable["DynamicTable (hash_id)"]
end
TimeSerie -->|Updates| DataRepositories
TimeSerie -->|Updates| ORM
ORM -->|Manages state of| DataRepositories
ORM -->|Updates| DynamicTable
ORM -->|Updates| LocalTimeSerie
Initializing a TimeSerie
The constructor (__init__
) defines the initial state and configuration:
def __init__(self, init_meta=None,
build_meta_data: Union[dict, None] = None,
local_kwargs_to_ignore: Union[List[str], None] = None,
data_configuration_path: Union[str, None] = None,
*args, **kwargs):
...
Hashing Mechanism
The constructor arguments create two essential hashes that facilitate efficient management of data and updates:
-
hash_id
: Used to uniquely identify data repositories linked to a specificTimeSerie
. This ensures different configurations or datasets are appropriately separated or merged based on their content rather than their names alone. -
local_hash_id
: Used to uniquely identify the specific update processes. It enables TDAG to recognize distinct update routines and manage their internal state independently, crucial for parallel updates or workflows that reuse identical data structures with different update logic.
Special Constructor Arguments
Some arguments are explicitly excluded from hashing:
init_meta
: Arbitrary metadata used during initialization for convenience and clarity.build_meta_data
: Metadata recoverable anytime and editable from interfaces; useful for dynamic or interactive data handling.local_kwargs_to_ignore
: Arguments excluded from thehash_id
calculation but included inlocal_hash_id
, allowing flexibility in differentiating between datasets and update processes.
Decorator Usage
Always decorate the constructor to ensure proper integration with TDAG:
from mainsequence.tdag import TimeSerie
class NewTimeSeries(TimeSerie):
@TimeSerie._post_init_routines
def __init__(self):
...
Managing Dependencies with Introspection
TDAG simplifies dependency management by automatically detecting dependencies through introspection. Rather than manually managing complex dependency trees, developers only need to explicitly declare direct dependencies as class attributes. TDAG then builds the full dependency graph internally.
Example:
class NewTimeSeries(TimeSerie):
@TimeSerie._post_init_routines
def __init__(self, asset_symbols: List[str], *args, **kwargs):
# Explicitly declare direct dependency
self.prices_time_serie = PricesTimeSerie(asset_symbols=asset_symbols)
TDAG automatically understands that NewTimeSeries
depends on PricesTimeSerie
and manages updates accordingly.
State Persistence with Pickles
TDAG pickles each TimeSerie
after its first initialization, significantly reducing load times in future updates. The pickle state is automatically updated when the underlying code changes, ensuring consistency and efficiency.
Updating a TimeSerie
The update
method performs all the necessary logic to fetch, calculate, and store new data points in the series. It uses a parameter called latest_value
, representing the most recent timestamp from previous updates. If latest_value
is None
, the series has never been updated successfully before. Otherwise, it continues from the given point.
Example:
def update(self, update_statistics: DataUpdates, *args, **kwargs) -> pd.DataFrame:
# Perform update logic based on latest_value
new_data = self.fetch_new_data_since(latest_value)
processed_data = self.calculate_metrics(new_data)
return processed_data
Returned DataFrame requirements:
- Unidimensional index:
DatetimeIndex
inpytz.utc
. - Multidimensional index: three dimensions:
time_index
,asset_symbol
, andexecution_venue_symbol
.
Essential Helper Methods
Retrieving Data Between Dates
The get_df_between_dates
method fetches data from a specified date range:
def get_df_between_dates(self, start_date: Union[datetime.datetime, None] = None,
end_date: Union[datetime.datetime, None] = None,
unique_identifier_list: Union[None, list] = None,
great_or_equal=True, less_or_equal=True,
unique_identifier_range_map: Optional[UniqueIdentifierRangeMap] = None
) -> pd.DataFrame:
This method efficiently retrieves data within a specific range, accommodating various filtering scenarios by asset or time intervals.
Getting Last Observation
def get_last_observation(self, asset_symbols: Union[None, list] = None):
Returns the most recent observation in the series.
WrapperTimeSerie Class
WrapperTimeSerie
allows collective management of multiple TimeSerie
instances, facilitating scalable and efficient updates:
class WrapperTimeSerie(TimeSerie):
@TimeSerie._post_init_routines()
def __init__(self, time_series_dict: Dict[str, TimeSerie], *args, **kwargs):
TDAG’s TimeSeries object simplifies managing complex data dependencies, ensuring efficiency, consistency, and maintainability in your data pipelines.