Bitflyerの自動売買BOTのバックテストで平均保有期間を追加したコード

前回の記事「Bitflyerの自動売買BOTの勝率・平均リターン・総利益をバックテストで計算する」の最後の練習問題の答え合わせです。

解答


import requests
from datetime import datetime
import time
import json
import ccxt
import numpy as np


# ----バックテスト用の初期設定値
chart_sec = 300            # 5分足
lot = 1                    # 1トレードの枚数
slippage = 0.0005          # 手数料やスリッページ(0.05%初期値)
close_condition = 0        # エントリー後、n足が経過するまでは手仕舞わない(初期値0)


# パラーメータを指定してCryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# json形式のファイルから価格データを読み込む関数
def get_price_from_file(path):
	file = open(path,'r',encoding='utf-8')
	price = json.load(file)
	return price



# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) )

# 時間と始値・終値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag



# 各ローソク足が陽線・陰線の基準を満たしているか確認する関数
def check_candle( data,side ):
	try:
		realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) 
		increase_rate = data["close_price"] / data["open_price"] - 1
	except ZeroDivisionError as e:
		return False
	
	if side == "buy":
		if data["close_price"] < data["open_price"] : return False
		#elif increase_rate <  0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True
		
	if side == "sell":
		if data["close_price"] > data["open_price"] : return False
		#elif increase_rate > -0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True


# ローソク足が連続で上昇しているか確認する関数
def check_ascend( data,last_data ):
	if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]:
		return True
	else:
		return False

# ローソク足が連続で下落しているか確認する関数
def check_descend( data,last_data ):
	if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]:
		return True
	else:
		return False


# 買いシグナルが出たら指値で買い注文を出す関数
def buy_signal( data,last_data,flag ):
	if flag["buy_signal"] == 0 and check_candle( data,"buy" ):
		flag["buy_signal"] = 1

	elif flag["buy_signal"] == 1 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		flag["buy_signal"] = 2

	elif flag["buy_signal"] == 2 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["buy_signal"] = 3
		
		# ここにBitflyerへの買い注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)
	
	else:
		flag["buy_signal"] = 0
	return flag


# 売りシグナルが出たら指値で売り注文を出す関数
def sell_signal( data,last_data,flag ):
	if flag["sell_signal"] == 0 and check_candle( data,"sell" ):
		flag["sell_signal"] = 1

	elif flag["sell_signal"] == 1 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		flag["sell_signal"] = 2

	elif flag["sell_signal"] == 2 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		log = "3本連続で陰線 なので" + str(data["close_price"]) + "円で売り指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["sell_signal"] = 3
		
		# ここにBitflyerへの売り注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)
		
	else:
		flag["sell_signal"] = 0
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文を出す関数
def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1
	
	if flag["position"]["side"] == "BUY":
		if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
	if flag["position"]["side"] == "SELL":
		if data["close_price"] > last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を上回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
	return flag


# サーバーに出した注文が約定したかどうかチェックする関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag


# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])
	
# ここからメインの実行処理

# 価格チャートを取得
price = get_price(chart_sec,after=1483228800)

print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")



last_data = price[0] # 初期値となる価格データをセット


flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		"buy-holding-periods":[],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		"sell-holding-periods":[],
		
		"slippage":[],
		"log":[]
	}
}

i = 1
while i < len(price):
	if flag["order"]["exist"]:
		flag = check_order( flag )
	
	data = price[i]
	flag = log_price(data,flag)
	
	if flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag )
		flag = sell_signal( data,last_data,flag )
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1

backtest(flag)

解説

まずflag変数に新しく以下の2つの変数を追加しました。

"sell-holding-periods":[]
"buy-holding-periods":[]

ポジションを保有している期間は、もともと手仕舞いのための関数 def close_position() の先頭部分でカウントしています。

def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1

そのため、ポジションを手仕舞うたびに、ここで記録している[count]の情報を、上記の holding-periods:[] に append していけばOKです。そのため、関数 def records() に以下の部分を追記します。

追記部分

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	(中略)
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		# ここに追記
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
	if flag["position"]["side"] == "SELL":
		# ここに追記
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )

また最後にバックテスト集計用の関数のところで、平均保有期間を計算してprintします。計算には、numpy の np.average() を使い、それを round() で小数点1桁に丸めています。

以下の部分です。

追記部分

# バックテストの集計用の関数
def backtest(flag):
	(中略)
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))

「Bitflyerの自動売買BOTのバックテストで平均保有期間を追加したコード」への2件のフィードバック

  1. いつも初心者にも分かりやすい記事ありがとうございます。
    建玉の平均価格の例えば1万円下に注文するといったやり方を知りたいです。
    よろしくお願いします!

    1. コメントありがとうございます!
      なるほど、それは知りたい方も多いかもですね。
      約束はできないですが、時間があれば記事にしてみますね^^

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です