Source code for cpy_amm.swap

from math import sqrt
from typing import Tuple

import numpy as np
from loguru import logger

from .market import MarketPair, TradeOrder


[docs]class MidPrice: def __init__(self, trading_pair: str, x: float, y: float): """The price between what users can buy and sell tokens at a given moment. In Uniswap this is the ratio of the two ERC20 token reserves. Args: trading_pair (str) : The trading pair ticker eg. ETH/USD x (float) : Reserve of token A before swap y (float) : Reserve of token B before swap """ assert x > 0 assert y > 0 assert trading_pair.count("/") == 1 xy_ticker = trading_pair.split("/") self.x_ticker = xy_ticker[0] self.y_ticker = xy_ticker[1] self.x = x self.y = y self.mid_price = x / y
[docs]class PriceImpactRange: def __init__(self, start: MidPrice, mid: MidPrice, end: MidPrice): """Price impact range of a swap. Starts at mid price before the swap, ends at mid price after the swap. Also contains the point where mid price is equal to the swap execution price. Args: start (MidPrice) : point corresponding to the mid price of the pool before swap mid (MidPrice) : point corresponding to the execution price of the swap eg. (x,y,x/y) where mid price == execution price end (MidPrice) : point corresponding to the next mid price of the pool eg. next price after swap """ self.start = start self.mid = mid self.end = end
[docs]def assert_cp_invariant(x: float, y: float, k: float, precision: float | None = None): """Asserts that the constant product is invariant. Args: x (float) : Reserve of tokens A y (float) : Reserve of tokens B k (float) : Constant product invariant precision (float) : Precision at which the constant product invariant is evaluated Returns: None """ precision = precision or 1e-14 try: assert k > 0 assert abs(k - (x * y)) / k <= precision except Exception as e: logger.error("Constant product invariant not satisfied") logger.error(f"diff={abs((x*y) - k)}") logger.error(f"precision={precision}") logger.error(f"x={x}") logger.error(f"y={y}") logger.error(f"x*y={x*y}") logger.error(f"k={k}") raise e
[docs]def constant_product_swap( mkt: MarketPair, order: TradeOrder, precision: float | None = None, ) -> Tuple[float, float]: """Swap tokens A for tokens B from pool with a XY constant product. Args: mkt (MarketPair) : The market pair to trade against order (TradeOrder) : The trade order to execute precision (float) : Precision at which constant product invariant is evaluated Returns: Tuple[float, float] : (Amount of tokens B out, Swap execution price) """ assert order.order_size != 0 # the reserves depending on the swap direction x, y = mkt.pool_1.balance, mkt.pool_2.balance # the order size if order.direction == "buy": dx = order.net_order_size # calculate dy amount of tokens B to be taken out from the AMM dy = (y * dx) / (x + dx) # add dx amount of tokens A to the AMM mkt.pool_1.reserves.append(x + dx) # take dy amount of tokens B out from the AMM mkt.pool_2.reserves.append(y - dy) mkt.volume_base -= dy mkt.volume_quote += dx / (1 - mkt.swap_fee) mkt.total_fees_quote += mkt.swap_fee * dx / (1 - mkt.swap_fee) # assert k is still invariant assert_cp_invariant( mkt.pool_1.balance, mkt.pool_2.balance, mkt.cp_invariant, precision ) return dy, dx / dy elif order.direction == "sell": dy = order.net_order_size # calculate dx amount of tokens A to be taken out from the AMM dx = (x * dy) / (y + dy) # add dx amount of tokens A to the AMM mkt.pool_1.reserves.append(x - dx) # take dy amount of tokens B out from the AMM mkt.pool_2.reserves.append(y + dy) mkt.volume_base += dy mkt.volume_quote -= dx / (1 - mkt.swap_fee) mkt.total_fees_quote += mkt.swap_fee * dx / (1 - mkt.swap_fee) # assert k is still invariant assert_cp_invariant( mkt.pool_1.balance, mkt.pool_2.balance, mkt.cp_invariant, precision ) return dx, dx / dy else: raise Exception( f"Trade order direction must be buy or sell. Got {order.direction}" )
[docs]def mock_constant_product_swap( mkt: MarketPair, order: TradeOrder, precision: float | None = None, ) -> Tuple[float, float]: """Swap tokens A for tokens B from pool with a XY constant product. Args: mkt (MarketPair) : The market pair to trade against order (TradeOrder) : The trade order to execute precision (float) : Precision at which constant product invariant is evaluated Returns: Tuple[float, float] : (Amount of tokens B out, Swap execution price) """ assert order.order_size != 0 # the reserves depending on the swap direction x, y = mkt.pool_1.balance, mkt.pool_2.balance # the order size if order.direction == "buy": dx = order.order_size # calculate dy amount of tokens B to be taken out from the AMM dy = dx / mkt.mkt_price # add dx amount of tokens A to the AMM mkt.pool_1.reserves.append(x + dx) # take dy amount of tokens B out from the AMM mkt.pool_2.reserves.append(y - dy) mkt.volume_base -= dy mkt.volume_quote += dx mkt.total_fees_quote += mkt.swap_fee * dx # assert k is still invariant assert_cp_invariant( mkt.pool_1.balance, mkt.pool_2.balance, mkt.cp_invariant, precision ) return dy, dx / dy elif order.direction == "sell": dy = order.order_size # calculate dx amount of tokens A to be taken out from the AMM dx = dy * mkt.mkt_price # add dx amount of tokens A to the AMM mkt.pool_1.reserves.append(x - dx) # take dy amount of tokens B out from the AMM mkt.pool_2.reserves.append(y + dy) mkt.volume_base += dy mkt.volume_quote -= dx mkt.total_fees_quote += mkt.swap_fee * dx # assert k is still invariant assert_cp_invariant( mkt.pool_1.balance, mkt.pool_2.balance, mkt.cp_invariant, precision ) return dx, dx / dy else: raise Exception( f"Trade order direction must be buy or sell. Got {order.direction}" )
[docs]def swap_price(x, y, dx) -> float: """Computes the swap execution price for an order size given two pools with reserves x and y. Args: x (float) : Reserve of tokens A y (float) : Reserve of tokens B dx (float) : Order size Returns: float : Swap execution price """ return (x + dx) / y
[docs]def calc_arb_trade( mkt: MarketPair, tx_mkt_fee=0.002, tx_network_fee=0 ) -> Tuple[float, float]: """Computes the trade size and P&L for an arb trade which will align the mid price of the market pair with the market price. Args: mkt (MarketPair) : The market pair to trade against tx_mkt_fee (float) : The market fee to pay for the mkt leg of the arb tx_network_fee (float) : The network fee to pay expressed as a percentage of the trade size Returns: Tuple[float, float] : (Trade order size, Trade order P&L) """ dx, dy = mkt.get_delta_reserves() if dy == 0 or dx == 0: return 0, 0 exec_price = dx / dy tx_fees = tx_mkt_fee + mkt.swap_fee + tx_network_fee if dx > 0: # long pool / short mkt qty = dx spread = (mkt.mkt_price - exec_price) / exec_price else: # short pool / long mkt qty = dy spread = (exec_price - mkt.mkt_price) / exec_price return qty, (spread - tx_fees) * abs(dx)
[docs]def constant_product_curve( mkt: MarketPair, x_min: float | None = None, x_max: float | None = None, num: int | None = None, ) -> Tuple[list[float], list[float]]: """Computes the AMM curve Y = K/X for a constant product AMM K = XY Args: mkt (MarketPair) : The market pair to trade against x_min (float) : minimum value of X x_max (float) : maximum value of X num (float) : number of points to be computed Returns: Tuple[list[float],list[float]] : (Amount of tokens B out, Swap execution price) """ num = num or 1000 x_min = x_min or 0.1 * mkt.pool_1.balance x_max = x_max or 5.0 * mkt.pool_1.balance x = np.linspace(x_min, x_max, num=num) y = mkt.cp_invariant / x return x, y
[docs]def price_impact_range( mkt: MarketPair, order: TradeOrder | None = None, precision: float | None = None, ) -> PriceImpactRange: """Price impact of a trade order against a market. Args: mkt (MarketPair) : The market pair to trade against order (TradeOrder) : The trade order to execute precision (float) : precision at which the invariant is evaluated Returns: PriceImpactRange : Price impact range for given pools and order size """ # create default trade order order = order or TradeOrder.create_default(mkt) # trade size provided or defaulted to 10% of x dx = order.order_size # constant product invariant k = mkt.cp_invariant # start: (x,y) x_start, y_start = mkt.get_reserves(order.ticker) # end: (x+dx, y-dy) x_end = x_start + dx y_end = y_start * (1.0 - dx / (x_start + dx)) # assert k is invariant at start and end assert_cp_invariant(x_start, y_start, k, precision) assert_cp_invariant(x_end, y_end, k, precision) # swap execution price at start for dx amount of tokens A exec_price = swap_price(x_start, y_start, dx) # (x, y) of the mid price equal to the execution price x_mid = sqrt(k * exec_price) y_mid = k / sqrt(k * exec_price) return PriceImpactRange( MidPrice(mkt.ticker, x_start, y_start), MidPrice(mkt.ticker, x_mid, y_mid), MidPrice(mkt.ticker, x_end, y_end), )
[docs]def order_book( mkt: MarketPair, x_min: float | None = None, x_max: float | None = None, num: int | None = None, ): """Computes the cumulative quantity at any mid price according to the formula from the paper "Order Book Depth and Liquidity Provision in Automated Market Makers". Args: mkt (MarketPair) : The market pair to trade against x_min (float) : minimum value of X x_max (float) : maximum value of X num (float) : number of points to be computed Returns: Tuple[list[float],list[float]] : (reserves of token A, reserves of token B) """ q = [] x_0 = float(mkt.pool_1.initial_deposit) p_0 = float(mkt.pool_1.initial_deposit / mkt.pool_2.initial_deposit) x, y = constant_product_curve(mkt, x_min, x_max, num) p = [x_i / y_i for x_i, y_i in zip(x, y)] for p_i in p: q_i = float(0) if p_i < p_0: q_i = x_0 * (sqrt(p_0 / p_i) - 1) if p_i > p_0: q_i = x_0 * (1 - sqrt(p_0 / p_i)) q.append(q_i) return x, p, q