Scrap Public Trade
Follow Binance Best Traders page
A not so usefull script, to follow the trade from this same page :
#!/usr/bin/env python
# coding:utf-8
TraderList = ['B6EF34B1C875FF4097AF51FF73868E70']
discordUrl = "https://discord.com/api/webhooks/secretUrl/secretUrl"
rafraichissement_en_seconde = 1
from time import sleep
from requests import post as requestPost
from json import loads as jsonLoads, dumps as jsonDumps
from discord_webhook import DiscordWebhook, AsyncDiscordWebhook
def getOrDefault(value, default):
if not value is None:
return value
else:
return default
class DiscoLogger:
Name="DiscoLogger"
def __init__(self, user, room, url):
self.user = user
self.room = room
self.url = url
def send_msg(self, msg):
try:
webhook = DiscordWebhook(url=self.url, content=msg)
webhook.execute()
except Exception as e:
print("{0} : user '{1}', room '{2}', error while trying to send message '{3}' : {4}".format(self.Name, self.user, self.room, msg, e))
async def async_send_msg(self, msg):
try:
webhook = AsyncDiscordWebhook(url=self.url, content=msg)
await webhook.execute()
except Exception as e:
print("{0} : user '{1}', room '{2}', error while trying to send async message '{3}' : {4}".format(self.Name, self.user, self.room, msg, e))
Log_Chat = DiscoLogger(user="name", room="ossef?", url=discordUrl)
def getKeyFromPos(pos):
return "{0}|{1}|{2}|{3}|{4}".format(pos["symbol"], pos["entryPrice"], pos["amount"], pos["leverage"], pos["updateTimeStamp"])
class BinanceBestTrader:
Name="BinanceBestTrader"
def __init__(self, TraderId):
try:
self.TraderId = TraderId
self.Positions = {}
except Exception as e:
print("{0} : error while trying to init new Trader Id '{1}' : {2}".format(self.Name, getOrDefault(self.TraderId, "#N/A"), e))
exit(1)
def __call__(self, PositionsStr):
try:
if len(self.Positions) > 0:
# afer init position
New_Positions = {}
for pos in (jsonLoads(PositionsStr))["data"]["otherPositionRetList"]:
New_Positions[getKeyFromPos(pos)] = pos
if hash(self.Positions.values()) != hash(New_Positions.values()):
return self.return_new_position(New_Positions)
else:
# init position
for pos in (jsonLoads(PositionsStr))["data"]["otherPositionRetList"]:
self.Positions[getKeyFromPos(pos)] = pos
return None
except Exception as e:
print("{0} : error while trying to refresh Trader Id '{1}' position(s) : {2}".format(self.Name, self.TraderId, e))
return None
def return_new_position(self, New_Positions):
try:
buf_pos=[]
for pos in self.Positions:
if not pos in New_Positions:
# position closed
details = self.Positions[pos]
Log_Chat.send_msg("Position closed by {0} : ticker: '{1}' - amount: '{2}' - leverage: '{3}".format(self.TraderId, TICKER=details["symbol"], QUANTITY=details["amount"], LEVERAGE=details["leverage"]))
#print("Position closed by {0} : ticker: '{1}' - amount: '{2}' - leverage: '{3}".format(self.TraderId, TICKER=details["symbol"], QUANTITY=details["amount"], LEVERAGE=details["leverage"]))
#signals.append(Signal(NAME=self.TraderId, TYPE="close", TICKER=details["symbol"], QUANTITY=details["amount"], LEVERAGE=details["leverage"]))
buf_pos.append(pos)
for pos in buf_pos:
self.Positions.pop(pos)
for pos in New_Positions:
if not pos in self.Positions:
# position opening
details = New_Positions[pos]
Log_Chat.send_msg("Position open by {0} : ticker: '{1}' - amount: '{2}' - leverage: '{3}".format(self.TraderId, TICKER=details["symbol"], QUANTITY=details["amount"], LEVERAGE=details["leverage"]))
#print("Position open by {0} : ticker: '{1}' - amount: '{2}' - leverage: '{3}".format(self.TraderId, TICKER=details["symbol"], QUANTITY=details["amount"], LEVERAGE=details["leverage"]))
#signals.append(Signal(NAME=self.TraderId, TYPE=str(("put" if float(details["amount"]) < 0 else "call")), TICKER=details["symbol"], QUANTITY=details["amount"], LEVERAGE=details["leverage"]))
self.Positions[pos] = details
except Exception as e:
print("{0} : error while trying to return Trader Id '{1}' new position(s) : {2}".format(self.Name, self.TraderId, e))
BestTraders = {}
for TraderId in TraderList:
BestTraders[TraderId] = BinanceBestTrader(TraderId=TraderId)
def watch_trades(BestTraders):
try:
for Trader in BestTraders.values():
payload = {"encryptedUid":Trader.TraderId,"tradeType":"PERPETUAL"}
headers = {
'content-type': 'application/json',
'Accept': '*/*',
'Host': 'www.binance.com',
'Origin': 'https://www.binance.com',
'Connection': 'keep-alive',
'Referer': 'https://www.binance.com/fr/futures-activity/leaderboard/user/um?encryptedUid={0}'.format(Trader.TraderId),
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15',
}
TradeTable = requestPost(url='https://www.binance.com/bapi/futures/v1/public/future/leaderboard/getOtherPosition', data=jsonDumps(payload), headers=headers).text
return Trader(TradeTable)
except Exception as e:
print("watch_trades : error while trying to watch positions : {0}".format(e))
return None
while True:
watch_trades(BestTraders)
sleep(rafraichissement_en_seconde)