Source code for eth_defi.event_reader.json_state
"""Serialise event scan state as a JSON file."""
import os
from typing import Tuple
from eth_defi.event_reader.state import ScanState
[docs]class JSONFileScanState(ScanState):
"""Save and resume block event scan using state serialised in JSON file."""
[docs] def __init__(self, fname: str):
"""
:param fname: In which file we store the last processed block number.
"""
self.fname = fname
[docs] def save_state(self, last_block):
"""Saves the last block we have read."""
with open(self.fname, "wt", encoding="utf-8") as f:
print(f"{last_block}", file=f)
[docs] def restore_state(self, default_block: int) -> Tuple[bool, int]:
"""Restore the last block we have processes.
:return:
Tuple (did we restore state, the first block numebr to scan)
"""
if os.path.exists(self.fname):
with open(self.fname, "rt", encoding="utf-8") as f:
last_block_text = f.read()
return True, int(last_block_text)
return False, default_block