Pandasの練習問題ーBTCFXのドンチャンブレイクアウトBOTの月別成績を集計する

前回の記事「Pandasを使ってBTCFXの自動売買BOTの月別の成績を集計しよう!」の練習問題の回答コードです。

1時間足での30期間ドンチャン・ブレイクアウトBOTの月別の成績を集計してみましょう! なお、今回のコードではPandasの集計方法を使って以下のような指標も加えておきました。

・全トレードの回数と勝率
・全トレードの平均リターン
・最大の勝ちトレードでの利益
・最大の負けトレードでの損失
・最終的な利益合計
・最終的な損失合計

これらも自動売買BOTの成績を評価する際に参考にしてみてください。


import requests
from datetime import datetime
import time
import matplotlib.pyplot as plt
import pandas as pd

#-----設定項目

chart_sec = 3600    # 1時間足を使用
term = 30           # 過去n期間の設定
wait = 0            # ループの待機時間
lot = 1             # BTCの注文枚数
slippage = 0.001    # 手数料・スリッページ


# CryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# 時間と高値・安値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag


# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data)
	if data["high_price"] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data)
	if data["low_price"] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}


# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で買いの指値注文を出します\n")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)

	if signal["side"] == "SELL":
		flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で売りの指値注文を出します\n")

		# ここに売り注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)

	return flag



# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
	
	flag["position"]["count"] += 1
	signal = donchian( data,last_data )
	
	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします\n")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"
			flag["order"]["price"] = round(data["close_price"] * lot)
			

	if flag["position"]["side"] == "SELL":
		if signal["side"] == "BUY":
			flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします\n")
			
			# ここに買い注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "BUY"
			flag["order"]["price"] = round(data["close_price"] * lot)
			
	return flag


# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 手仕舞った日時と保有期間を記録
	flag["records"]["date"].append(data["close_time_dt"])
	flag["records"]["holding-periods"].append( flag["position"]["count"] )
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["side"].append( "BUY" )
		flag["records"]["profit"].append( buy_profit )
		flag["records"]["return"].append( round( buy_profit / entry_price * 100, 4 ))
		if buy_profit  > 0:
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["side"].append( "SELL" )
		flag["records"]["profit"].append( sell_profit )
		flag["records"]["return"].append( round( sell_profit / entry_price * 100, 4 ))
		if sell_profit > 0:
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag

# バックテストの集計用の関数
def backtest(flag):
	
	# 成績を記録したpandas DataFrameを作成
	records = pd.DataFrame({
		"Date"     :  pd.to_datetime(flag["records"]["date"]),
		"Profit"   :  flag["records"]["profit"],
		"Side"     :  flag["records"]["side"],
		"Rate"     :  flag["records"]["return"],
		"Periods"  :  flag["records"]["holding-periods"],
		"Slippage" :  flag["records"]["slippage"]
	})
	
	# 総損益の列を追加する
	records["Gross"] = records.Profit.cumsum()
	
	# 最大ドローダウンの列を追加する
	records["Drawdown"] = records.Gross.cummax().subtract(records.Gross)
	records["DrawdownRate"] = round(records.Drawdown / records.Gross.cummax() * 100,1)
	
	# 買いエントリーと売りエントリーだけをそれぞれ抽出する
	buy_records = records[records.Side.isin(["BUY"])]
	sell_records = records[records.Side.isin(["SELL"])]
	
	# 月別のデータを集計する
	records["月別集計"] = pd.to_datetime( records.Date.apply(lambda x: x.strftime('%Y/%m')))
	grouped = records.groupby("月別集計")
	
	month_records = pd.DataFrame({
		"Number"   :  grouped.Profit.count(),
		"Gross"    :  grouped.Profit.sum(),
		"Rate"     :  round(grouped.Rate.mean(),2),
		"Drawdown" :  grouped.Drawdown.max(),
		"Periods"  :  grouped.Periods.mean()
		})
	
	print("バックテストの結果")
	print("-----------------------------------")
	print("買いエントリの成績")
	print("-----------------------------------")
	print("トレード回数       :  {}回".format( len(buy_records) ))
	print("勝率               :  {}%".format(round(len(buy_records[buy_records.Profit>0]) / len(buy_records) * 100,1)))
	print("平均リターン       :  {}%".format(round(buy_records.Rate.mean(),2)))
	print("総損益             :  {}円".format( buy_records.Profit.sum() ))
	print("平均保有期間       :  {}足分".format( round(buy_records.Periods.mean(),1) ))
	
	print("-----------------------------------")
	print("売りエントリの成績")
	print("-----------------------------------")
	print("トレード回数       :  {}回".format( len(sell_records) ))
	print("勝率               :  {}%".format(round(len(sell_records[sell_records.Profit>0]) / len(sell_records) * 100,1)))
	print("平均リターン       :  {}%".format(round(sell_records.Rate.mean(),2)))
	print("総損益             :  {}円".format( sell_records.Profit.sum() ))
	print("平均保有期間       :  {}足分".format( round(sell_records.Periods.mean(),1) ))
	
	print("-----------------------------------")
	print("総合の成績")
	print("-----------------------------------")
	print("全トレード数       :  {}回".format(len(records) ))
	print("勝率               :  {}%".format(round(len(records[records.Profit>0]) / len(records) * 100,1)))
	print("平均リターン       :  {}%".format(round(records.Rate.mean(),2)))
	print("平均保有期間       :  {}足分".format( round(records.Periods.mean(),1) ))
	print("")
	print("最大の勝ちトレード :  {}円".format(records.Profit.max()))
	print("最大の負けトレード :  {}円".format(records.Profit.min()))
	print("最大ドローダウン   :  {0}円 / {1}%".format(-1 * records.Drawdown.max(),  -1 * records.DrawdownRate.loc[records.Drawdown.idxmax()]  ))
	print("利益合計           :  {}円".format( records[records.Profit>0].Profit.sum() ))
	print("損失合計           :  {}円".format( records[records.Profit<0].Profit.sum() ))
	print("")
	print("最終損益           :  {}円".format( records.Profit.sum() ))
	print("手数料合計         :  {}円".format( -1 * records.Slippage.sum() ))
	
	print("-----------------------------------")
	print("月別の成績")
	
	for index , row in month_records.iterrows():
		print("-----------------------------------")
		print( "{0}年{1}月の成績".format( index.year, index.month ) )
		print("-----------------------------------")
		print("トレード数         :  {}回".format( row.Number.astype(int) ))
		print("月間損益           :  {}円".format( row.Gross.astype(int) ))
		print("平均リターン       :  {}%".format( row.Rate ))
		print("月間ドローダウン   :  {}円".format( -1 * row.Drawdown.astype(int) ))

	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])

	# 損益曲線をプロット
	plt.plot( records.Date, records.Gross )
	plt.xlabel("Date")
	plt.ylabel("Balance")
	plt.xticks(rotation=50) # X軸の目盛りを50度回転
	
	plt.show()
	


# ここからメイン処理

# 価格チャートを取得
price = get_price(chart_sec,after=1483228800)

flag = {
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"date":[],
		"profit":[],
		"return":[],
		"side":[],
		"holding-periods":[],
		"slippage":[],
		"log":[]
	}
}


last_data = []
i = 0
while i < len(price):

	# ドンチャンの判定に使う過去30期間分の安値・高値データを準備する
	if len(last_data) < term:
		last_data.append(price[i])
		flag = log_price(price[i],flag)
		time.sleep(wait)
		i += 1
		continue
	
	data = price[i]
	flag = log_price(data,flag)
	
	
	if flag["order"]["exist"]:
		flag = check_order( flag )
	elif flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )
	else:
		flag = entry_signal( data,last_data,flag )
	
	
	# 過去データを30個に保つために先頭を削除
	del last_data[0]
	last_data.append( data )
	i += 1
	time.sleep(wait)


print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

backtest(flag)

実行結果

以下が1時間足でシンプルな30期間のドンチャンブレイクBOTを、2017年8月~2018年4月にかけて運用した場合の成績です。

損益グラフ

ドンチャン・ブレイクアウトBOTの成績


(base) C:\Pydoc>python test.py
--------------------------
テスト期間:
開始時点 : 2017/08/13 08:00
終了時点 : 2018/04/20 15:00
6000件のローソク足データで検証
--------------------------
バックテストの結果
-----------------------------------
買いエントリの成績
-----------------------------------
トレード回数       :  49回
勝率               :  51.0%
平均リターン       :  3.04%
総損益             :  1090811円
平均保有期間       :  62.2足分
-----------------------------------
売りエントリの成績
-----------------------------------
トレード回数       :  49回
勝率               :  42.9%
平均リターン       :  0.85%
総損益             :  670134円
平均保有期間       :  56.7足分
-----------------------------------
総合の成績
-----------------------------------
全トレード数       :  98回
勝率               :  46.9%
平均リターン       :  1.94%
平均保有期間       :  59.5足分

最大の勝ちトレード :  545284円
最大の負けトレード :  -296151円
最大ドローダウン   :  -483332円 / -24.4%
利益合計           :  4591094円
損失合計           :  -2830149円

最終損益           :  1760945円
手数料合計         :  -104961円
-----------------------------------
月別の成績
-----------------------------------
2017年8月の成績
-----------------------------------
トレード数         :  6回
月間損益           :  1552円
平均リターン       :  0.08%
月間ドローダウン   :  -20319円
-----------------------------------
2017年9月の成績
-----------------------------------
トレード数         :  10回
月間損益           :  139626円
平均リターン       :  3.02%
月間ドローダウン   :  -20785円
-----------------------------------
2017年10月の成績
-----------------------------------
トレード数         :  12回
月間損益           :  119928円
平均リターン       :  2.12%
月間ドローダウン   :  -50845円
-----------------------------------
2017年11月の成績
-----------------------------------
トレード数         :  11回
月間損益           :  405276円
平均リターン       :  4.89%
月間ドローダウン   :  -138695円
-----------------------------------
2017年12月の成績
-----------------------------------
トレード数         :  10回
月間損益           :  175018円
平均リターン       :  1.68%
月間ドローダウン   :  -364745円
-----------------------------------
2018年1月の成績
-----------------------------------
トレード数         :  14回
月間損益           :  425925円
平均リターン       :  1.08%
月間ドローダウン   :  -247323円
-----------------------------------
2018年2月の成績
-----------------------------------
トレード数         :  13回
月間損益           :  284263円
平均リターン       :  2.58%
月間ドローダウン   :  -483332円
-----------------------------------
2018年3月の成績
-----------------------------------
トレード数         :  11回
月間損益           :  155253円
平均リターン       :  1.07%
月間ドローダウン   :  -425155円
-----------------------------------
2018年4月の成績
-----------------------------------
トレード数         :  11回
月間損益           :  54104円
平均リターン       :  0.3%
月間ドローダウン   :  -216376円

月別の成績で見ても、すべての月でプラスの成績が出ています。
これはなかなか悪くない結果ですね。

こちらの月別の成績は、あくまで「ポジションを手仕舞った日時」を基準に区切っている点に注意してください。例えば、2月の成績はプラスになっていますが、これは1月から持ち越したポジションで大きな利益が出ているからです。もし2月からBOTの稼働を開始していたら、2月の損益はマイナスになります。

コードの解説

基本的には、前回の記事「Pandasを使って自動売買BOTの成績を月別に集計しよう!」で、例として解説したコードをそのまま使っています。特に難しいところは無かったのではないでしょうか。

以下のところだけ、前回の記事には登場していなかった書き方なので、追加で解説しておきます。

最大ドローダウン率

最大ドローダウン率とは、ある行の最大ドローダウンの金額を、その行までの最大資産額で割った数字です。特定の行までの最大値は、cummax()で取得できます。そのため、以下のような式になります。

records["DrawdownRate"] = round(records.Drawdown / records.Gross.cummax() * 100,1)

ただし実際に最大ドローダウン率を表示するときには注意が必要です。単にドローダウン率の中から最大値を選ぶだけだと、最大ドローダウンの金額と時期が一致するとは限らないからです。

例えば、「最大ドローダウン率でみると序盤の2月の30%が最大だけど、金額ベースでみると4月の300万円が最大ドローダウンだ」ということもあり得ます。

このとき私たちが知りたいのは、最大ドローダウン率ではなく、「最大ドローダウン金額が最大だったときのドローダウン率」であるはずです。そのため、以下のように記述します。


# 最大ドローダウンの行番号を取得
records.Drawdown.idxmax()

# 最大ドローダウンと同じ行のドローダウン率を取得
records.DrawdownRate.loc[records.Drawdown.idxmax()]

これで最大ドローダウンと同じ時期のドローダウン率を取得することができました。

pandasの表データのfor文処理

for index,row in month_records.iterrows():
		print("-----------------------------------")
		print( "{0}年{1}月の成績".format( index.year, index.month ) )
		print("-----------------------------------")
		print("トレード数         :  {}回".format( row.Number.astype(int) ))
		print("月間損益           :  {}円".format( row.Gross.astype(int) ))
		print("平均リターン       :  {}%".format( row.Rate ))
		print("月間ドローダウン   :  {}円".format( -1 * row.Drawdown.astype(int) ))

pandasの表データを1行ずつループ処理したい場合、以下のように書くことができます。

for index,row in 表データ変数.iterrows():
		# 1行ずつ処理したい内容

index (インデックス)というのは、表データから特定の「行」を検索するときの「索引」のことです。

行には通常、1行ごとに先頭から 0,1,2,3,4,5.....などの行番号が割り振られていますが、それだと検索するときに不便です。縦の列にカラム名(ここでは Gross/Rate/Drawdownなど)があるのと同じように、行にも名前を付けることができます。それが index です。

今回のコードでは、pandas の groupby()で「月別」にデータをグループ化しているため、index には「2017/08」「2017/09」「2017/10」などのDate型のデータが指定されています。そのため、for文の中で index.year / index.month を指定することで、これらの数字を print できます。

pythonで資産曲線を作って自動売買BOTの成績を視覚的に評価しよう!

自動売買BOTの世界では、長期的に使える強いトレードシステムのことを「堅牢性(ロバスト性)がある」と表現します。わかりやすい言葉でいうと「環境の変化に強く安定したシステム」のことです。

「堅牢性がある」とは、単に期待リターンが高いというだけではありません。以下のような特徴を備えた売買ロジックのことをいいます。

1.期待リターンに対してリスクが低い
2.毎月の期待リターンのバラツキ具合が小さい
3.最大ドローダウンが小さい
4.最適化された固定値のパラメーターが少ない

前回の記事では、自動売買BOTの勝率・期待リターンなどを計算する方法を勉強しましたが、それだけではシステムの堅牢性は評価できません。

この記事では、ドンチャン・ブレイクアウトBOTの堅牢性を評価するために資産推移のグラフを描写して、さらに最大ドローダウンを計算する方法を解説していきます。

1)グラフをpythonで描画する方法

先にpythonでグラフを描画する方法だけ解説しておきましょう。

大丈夫、めちゃくちゃ簡単です! まずは練習としてBTCの日々の終値の価格グラフを作成してみましょう。以下のように書くだけです。

import matplotlib.pyplot as plt
import pandas as pd

# 日付の配列データ(x軸)
date_list = [
	"2018/1/1",
	"2018/1/2",
	"2018/1/3",
	"2018/1/4",
	"2018/1/5",
	"2018/1/6",
	"2018/1/7",
	"2018/1/8",
	"2018/1/9",
	"2018/1/10",
	"2018/1/11",
	"2018/1/12",
	"2018/1/13",
	"2018/1/14"
]

# BTC価格(ドル)のデータ(y軸)
price_list = [
	13657,
	14982,
	15201,
	15599,
	17429,
	17527,
	16477,
	15170,
	14595,
	14973,
	13405,
	13980,
	14360,
	13772
]

# 日付データに変換
date_list = pd.to_datetime(date_list)

# plot(x,y)でx軸とy軸を指定
plt.plot( date_list,price_list )
plt.xlabel("Date")
plt.ylabel("Price")

# 描写を実行
plt.show()

グラフの描画には matplotlib というライブラリを使います。

これはAnacondaを使ってpythonをインストールした方であれば、最初から入っていますので、以下の1行を先頭に書くだけで使用できます。

import matplotlib.pyplot as plt

コードの解説

まずX軸とY軸にそれぞれ描画したいデータを配列として用意します。

それぞれの要素の数が一致していないといけないので、注意してください。今回は、日付データ(date_list)と価格データ(price_list)にそれぞれ14個ずつのデータを用意しました。

次に、日付データを「テキスト型」から「日付型」に変換します。なぜ日付型に変換するのかというと、テキスト型のままだと以下のようなデータがすべてX軸に等間隔で並んでしまうからです。

例)
date = ["1/1","1/2","3/2","3/4","4/2"]

テキスト型のままだとこの5個のデータは、同じ目盛り上に等しい間隔で並んでしまいます。1/1~1/2と、1/2~3/2が、同じ間隔で並んでしまうと、後ほど資産推移の曲線などを描画するときに困りますね。

これを避けるために、以下のコードで日付型に変換しておきます。

import pandas as pd
# 日付データに変換
date_list = pd.to_datetime(date_list)

ここでは変換にpandasというライブラリを使用しています。こちらもAnacondaでPythonをインストールした方であれば、最初からインストールされています。そうでない方はpipを使ってください。

実行結果

当サイトと同じ方法で(Anacondaで)Pythonをインストールしている方なら、すでにSpyderという実行環境がインストールされています。そのため、上記のコードを実行すると、自動的に以下のようなウィンドウが立ち上がり、グラフの描画が実行されます。

グラフの描画の説明はこれだけです!

2)資産曲線を描く

まずは資産カーブを描くために、ポジションを手仕舞うたびにその時点での損益(資産推移)を記録するコードを作っておきましょう。やり方は前回までの記事と全く同じです。

データを記録する準備

資産推移のグラフを描くために必要なデータは以下の2つです。

1)各ポジションを手仕舞った時点での資産額
2)各ポジションを手仕舞った時点の日付(時間)

まずは資産額と日付を記録するために、flag変数に以下を追加しておきましょう。

"records":{
	"buy-count": 0,
	"buy-winning" : 0,
	#(中略)
	
	"date":[], # 追加
	"gross-profit":[0],# 追加
	"slippage":[],
	"log":[]
	}

総損益を記録するための配列データには、初期値として0を入れておきます。

そして手仕舞いの関数が呼ばれるたびに、その時点での日付データと総損益を append で記録しておきます。 以下のように書けばいいですね。

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	# 手仕舞った日時の記録
	flag["records"]["date"].append(data["close_time_dt"])

	if flag["position"]["side"] == "BUY":
		# 追記部分
		flag["records"]["gross-profit"].append( flag["records"]["gross-profit"][-1] + buy_profit )
	if flag["position"]["side"] == "SELL":
		# 追記部分
		flag["records"]["gross-profit"].append( flag["records"]["gross-profit"][-1] + sell_profit )

各トレードの成績を記録する関数(def records)で、ポジションを手仕舞ったときの日付データを記録しておきます。これはローソク足のデータを記録する変数(data)にある日時情報(data[“close_time_dt”])を代入するだけです。

また、総損益を記録する変数(gross-profit)の前回の値([-1])を取り出し、そこに今回の最新の損益(buy_profit/sell_profit)を足した数字を、appendで配列の末尾に追加します。

これで準備は完了です。

グラフを描画する

最後にバックテスト集計用の関数(def backtest)の中で資産カーブを描画するためのコードを追加します。

# 損益曲線をプロット
del flag["records"]["gross-profit"][0] # X軸/Y軸のデータ数を揃えるため、先頭の0を削除
date_list = pd.to_datetime( flag["records"]["date"] ) # 日付型に変換

plt.plot( date_list, flag["records"]["gross-profit"] )
plt.xlabel("Date")
plt.ylabel("Balance")
plt.xticks(rotation=50) # X軸の目盛りを50度回転
	
plt.show()

最初の例と全く同じですね。

flag[records][gross-profit]には、初期値として0円が入っており、日付データと数が一致しないため、del [0]で先頭の0円を削除しています。また、pd.to_datetime()で、日付のテキストデータを日付型に変換しています。

では実行してみましょう!

実行結果

以下は、30期間のドンチャン・ブレイクアウト(損切なし・ドテン売買)を、1時間足を使って 2017/8/9 ~ 2018/4/16 までの期間、6000足分のローソク足データを使って検証した結果です。

▽ 最終結果

▽ 資産カーブ

右肩上がりといえば右肩上がりですが、何度か大きなドローダウン(利益が大きく削られる局面)を経験していることがわかりますね。では、この売買システムの最大ドローダウンはどのくらいの大きさなのでしょうか?

3)最大ドローダウンを計算する

まずpythonで最大ドローダウンの計算を実装するにあたって、ドローダウンとは何なのか?を明確に定義しておきましょう。

自動売買BOTを自作でプログラミングする最大のメリットは、あらゆるテクニカル指標・売買ロジック・バックテストの指標などをコーディングする過程で、正確にその意味を理解できるようになることです。

私もそうですが、文系で何となく投資本を読んでいるだけだと、ドローダウンの意味1つとっても、「何となくこういうものだろう」というイメージだけで理解してしまいがちです。しかし自分でコーディングするようになると、どんな指標・ロジックでも明確な定義を考える習慣がつきます。

定義を考える習慣がつくようになると、単に概念を本で学んで覚えているだけの人に比べて、その指標の活用方法や弱点に気づきやすくなります。

1.ドローダウンの定義

基本的にはドローダウンは、「ある時点での資産額と、それ以前の資産の最大額との落差のこと」と定義できます。

例えば、以下のように資産が推移したと過程しましょう。

1月 50万円
2月 100万円
3月 200万円 ← 直近の最大額
4月 180万円
5月 150万円 ← 最大ドローダウン
6月 170万円
7月 220万円 ← 直近の最大額を更新
8月 190万円
9月 200万円
10月 170万円 ← 最大ドローダウン
11月 210万円

この場合、大きなドローダウンは2つ存在します。

1つ目は、3月のピーク200万円から5月の150万円までのドローダウン(-50万円)です。もう1つは、7月のピーク(220万円)から10月の170万円までのドローダウン(-50万円)です。

この場合、最大ドローダウンは-50万円、ということになります。%表記にすると、50/220 = 0.2272… なので約22.7%のドローダウンです。

2.Pythonコードで実装する

ではこれを実装していきましょう。

手始めに、とりあえず、いつものように記録用の変数を追加します。以下のような変数を追加しておきましょう。

"records":{
	"buy-count": 0,
	"buy-winning" : 0,
	# 追記
	"drawdown": 0,

次にポジションを閉じるたびに呼ばれる「各トレードの成績を記録する関数」に、以下の3行を追加しましょう。

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	# (中略)
	# ドローダウンの計算
	drawdown =  max(flag["records"]["gross-profit"]) - flag["records"]["gross-profit"][-1] 
	if  drawdown  > flag["records"]["drawdown"]:
		flag["records"]["drawdown"] = drawdown

ポジションを閉じるたびに、その時点における過去の資産の最大額をmax(flag[records][gross-profit])で探します。そこから、現時点での資産額(flag[records][gross-profit][-1])を引いて現在のドローダウンを計算します。

そして現在のドローダウンが過去に記録したものよりも大きければ、flag[records][drawdown] に代入して値を更新します。これにより、最終的に「最大ドローダウン」だけが残ります。

そして最後に「バックテスト集計用の関数」に以下を追記します。

# バックテストの集計用の関数
def backtest(flag):
	# (中略)
	print("最大ドローダウン :  {0}円 / {1}%".format(-1 * flag["records"]["drawdown"], -1 * round(flag["records"]["drawdown"]/max(flag["records"]["gross-profit"])*100,1)  ))

ここで最大ドローダウンの金額を表示するとともに、そのパーセンテージも計算して表示しています。ではこれを実行してみましょう。

3.実行結果

以下のようになりました。

もし過去の半年間に1時間足で30期間のドンチャンブレイクアウトを採用していたら、この部分の最大ドローダウンが約48万円(-23%)だったということですね。

※ 2月中盤~後半にかけて、資産207万円 ⇒ 159万円のドローダウン

3)もし2月から自動売買BOTの運用を開始していた場合

では、もし2月からドンチャン・ブレイクアウトの自動売買BOTの運用を開始していた場合はどうなっていたのでしょうか? これを最後に検証して、今回の記事を終えておきましょう。

以下のコードの部分を変更するだけです。

・UNIX時間変換ツール

これを実行すると、以下のようになります。

▽ バックテストの結果

▽ 資産カーブ

1枚のBTCを売買したとすると、開始早々-20万円のドローダウンに見舞われることになります。その後、+20万円まで持ち直しますが、そこから-約50万円のドローダウンを食らいます。

長期的には期待値プラスの売買ロジックでも、このように開始早々、大きなマイナスを食らったり、数カ月連続してプラス転換しない場合もあります。このようなときに、長期的な期待値を信じて我慢できるかどうか、あるいは「相場が変わったからもうこのシステムは通用しないんだ…」と諦めるべきかは、判断が難しいところです。

パラメーターの過剰最適化がされていない、バックテストのデータ数が十分である、など、堅牢性の高いシステムであればあるほど、このような局面でも期待値を信じて我慢することが可能になります。この章のはじめで、サンプル数の重要性などを説明したのはそのためです。

今回の勉強で使ったコード

最後に今回の記事で作ったpythonコードを掲載しておきましょう。


import requests
from datetime import datetime
import time
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

#-----設定項目

chart_sec = 3600    # 1時間足を使用
term = 30           # 過去n足の設定
wait = 0            # ループの待機時間
lot = 1             # BTCの注文枚数
slippage = 0.001    # 手数料・スリッページ


# CryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# 時間と高値・安値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag


# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data)
	if data["high_price"] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data)
	if data["low_price"] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}


# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で買いの指値注文を出します\n")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)

	if signal["side"] == "SELL":
		flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で売りの指値注文を出します\n")

		# ここに売り注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)

	return flag



# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
	
	flag["position"]["count"] += 1
	signal = donchian( data,last_data )
	
	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします\n")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"
			flag["order"]["price"] = round(data["close_price"] * lot)
			

	if flag["position"]["side"] == "SELL":
		if signal["side"] == "BUY":
			flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします\n")
			
			# ここに買い注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "BUY"
			flag["order"]["price"] = round(data["close_price"] * lot)
			
	return flag


# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 手仕舞った日時の記録
	flag["records"]["date"].append(data["close_time_dt"])
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["gross-profit"].append( flag["records"]["gross-profit"][-1] + buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["gross-profit"].append( flag["records"]["gross-profit"][-1] + sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	# ドローダウンの計算
	drawdown =  max(flag["records"]["gross-profit"]) - flag["records"]["gross-profit"][-1] 
	if  drawdown  > flag["records"]["drawdown"]:
		flag["records"]["drawdown"] = drawdown
	
	return flag

# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数     :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率             :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン     :  {}%".format(round(np.average(flag["records"]["buy-return"]),2)))
	print("総損益           :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	print("平均保有期間     :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数     :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率             :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン     :  {}%".format(round(np.average(flag["records"]["sell-return"]),2)))
	print("総損益           :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	print("平均保有期間     :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("最大ドローダウン :  {0}円 / {1}%".format(-1 * flag["records"]["drawdown"], -1 * round(flag["records"]["drawdown"]/max(flag["records"]["gross-profit"])*100,1)  ))
	print("総損益           :  {}円".format( flag["records"]["gross-profit"][-1] ))
	print("手数料合計       :  {}円".format( -1 * np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])

	# 損益曲線をプロット
	del flag["records"]["gross-profit"][0]
	date_list = pd.to_datetime( flag["records"]["date"] )
	
	plt.plot( date_list, flag["records"]["gross-profit"] )
	plt.xlabel("Date")
	plt.ylabel("Balance")
	plt.xticks(rotation=50) # X軸の目盛りを50度回転
	
	plt.show()
	


# ここからメイン処理

# 価格チャートを取得
price = get_price(chart_sec,after=1483228800)

flag = {
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		"buy-holding-periods":[],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		"sell-holding-periods":[],
		
		"drawdown": 0,
		"date":[],
		"gross-profit":[0],
		"slippage":[],
		"log":[]
	}
}


last_data = []
i = 0
while i < len(price):

	# ドンチャンの判定に使う過去30足分の安値・高値データを準備する
	if len(last_data) < term:
		last_data.append(price[i])
		flag = log_price(price[i],flag)
		time.sleep(wait)
		i += 1
		continue
	
	data = price[i]
	flag = log_price(data,flag)
	
	
	if flag["order"]["exist"]:
		flag = check_order( flag )
	elif flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )
	else:
		flag = entry_signal( data,last_data,flag )
	
	
	# 過去データを30個に保つために先頭を削除
	del last_data[0]
	last_data.append( data )
	i += 1
	time.sleep(wait)


print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

backtest(flag)

BTCFXのドンチャンチャネルブレイクアウトの勝率をバックテストで検証する

前回までの記事で、過去チャートのバックテストで自動売買BOTの勝率や平均リターン、総損益を検証する基本的な方法がわかったと思います。

今回の記事では、「ドンチャン・ブレイクアウト」というより実践的な売買ロジックを使って、さらに高度な自動売買BOTの評価方法を解説していきたいと思います!

ドンチャンブレイクアウトとは

ブレイクアウト手法とは、狭い値幅での揉み合い(レンジ相場)をブレイクアウトした瞬間を捕まえてエントリーし、その後のトレンドに乗って利益を出す順張り(トレンドフォロー型)の手法のことをいいます。

ブレイクアウトを捉えるための手法はさまざまなものが開発されていますが、シストレなどの自動売買BOTの世界では、以下の2つが最も定番です。

1.ドンチャン・チャネル・ブレイクアウト

過去n期間の最高値・最安値を更新したときに、その方向にエントリーする手法です。

例えば、過去20期間の最高値を更新したときに「買い」でエントリーします。次に過去20日の最安値を更新したときに決済して手仕舞いますが、このときさらに反対方向にそのままエントリー(ドテン)する場合も多いです。

このシンプルな戦略の優位性は、トレーダーの必読本である「タートル流投資の魔術」や、ジョン・ヒルの「究極のトレーディングガイド」などの本に詳しい解説があります。

非常に単純な売買ロジックなので、プログラミングしやすく、シストレや自動売買BOTの勉強にも最適です。

2.オープニングレンジ・ブレイクアウト

オープニングレンジ・ブレイクアウトも同じく「どうやったらブレイクアウトの開始点を、シンプルな数字とロジックで発見できるか?」という視点で開発された手法です。

こちらは、過去n期間の値幅の平均値(ATR)を基準値として利用し、ある足が始値から(基準値の)X%以上動いたらその方向にエントリーします。例えば、過去3期間の安値–高値の平均値が10万円でXが70%とすると、次の足で始値から7万円上に動いた時点で買いでエントリーします。

前の足が陰線か陽線かで場合分けをして、買いと売りとで別々のX%のパラメーターを使用することが多いです。

この戦略については、ラリーウィリアムズの「短期売買法」や、先ほどのジョンヒルの「究極のトレーディングガイド」などの本に詳しい解説があります。

3.戦略の使い方

どちらも多くのトレーダーが知っている手法ですが、実際には、どの時間軸でどういうパラメーターを使うか、どういうトレンド判定の条件やフィルターと組み合わて「騙し」を除去するか、どういう手仕舞いの方法を選択するか、などによって全く違う成績になります。

これらの手法は、あくまで売買ロジックを考えるときにベースのアイデアとして使いやすいというだけで、それ自体が必勝法というわけではありません。詳しい戦略のカスタマイズ方法などは、今後、解説していく予定です。

今回はバックテストの方法の解説記事なので、まずは一番シンプルな「ドンチャン・ブレイクアウト」のロジックをpythonでコーディングするところから始めます。

ドンチャンブレイクアウトは非常に単純なロジックなので、今まで勉強した知識だけでも、pythonのプログラムコードを書くことができます。早速、やっていきましょう!

ドンチャン・ブレイクアウトのpythonコード

プログラミング初心者の方は、いきなり複雑なロジックを書こうとしてはいけません。まずは必要最小限のシンプルな骨組みの部分を作り、そこから徐々に条件などを足していくのがお勧めです。

例えば、買いと売りの両方を考えて混乱するならまずは買いエントリーだけ実装しましょう。エントリーと手仕舞いの両方を考えて混乱するなら、まずは「n足後に無条件で手仕舞う」といったシンプルなロジックで実装しましょう。

ではドンチャン・ブレイクアウトの最もシンプルなロジックとは何でしょうか? 今回は以下のように定義してみます。

1.今回実装するロジック

1)過去n期間の最高値を直近の足の高値が上回ったら(その足の終値の指値で)買いエントリーする。過去n期間の最安値を直近の足の安値が下回ったら売りエントリーする。
2)買いエントリーをした場合、過去n期間の最安値を更新したら手仕舞い、さらに売りエントリーする
3)売りエントリーをした場合、過去n期間の最高値を更新したら手仕舞い、さらに買いエントリーする

これだけです!

ではこれを「第4回 BOT作成編」で勉強した内容を前提としながらコーディングしてみましょう。

2.pythonコード

以下は、Cryptowatchから過去のローソク足を取得して、20期間のドンチャンブレイクアウトで売買シグナルや手仕舞いのタイミングを確認するためのコードです。



import requests
from datetime import datetime
import time


#-----設定項目

chart_sec = 3600  # 1時間足を使用
term = 20         # 過去n期間の設定
wait = 0          # ループの待機時間



# CryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) )


# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data)
	if data["high_price"] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data)
	if data["low_price"] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}


# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		print("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました".format(term,signal["price"],data["high_price"]))
		print(str(data["close_price"]) + "円で買いの指値注文を出します")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"

	if signal["side"] == "SELL":
		print("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました".format(term,signal["price"],data["low_price"]))
		print(str(data["close_price"]) + "円で売りの指値注文を出します")

		# ここに売り注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"

	return flag



# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
	
	flag["position"]["count"] += 1
	signal = donchian( data,last_data )
	
	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			print("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました".format(term,signal["price"],data["low_price"]))
			print("成行注文を出してポジションを決済します")
			
			# 決済の成行注文コードを入れる
			
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			print("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"
			
			

	if flag["position"]["side"] == "SELL":
		if signal["side"] == "BUY":
			print("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました".format(term,signal["price"],data["high_price"]))
			print("成行注文を出してポジションを決済します")
			
			# 決済の成行注文コードを入れる
			
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			print("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします")
			
			# ここに買い注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "BUY"
			
	return flag


# ------------------------------
# ここからメイン処理
# ------------------------------

price = get_price(chart_sec)
last_data = []

flag = {
	"order":{
		"exist" : False,
		"side" : "",
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"count" : 0
	}
}

i = 0
while i < len(price):

	# ドンチャンの判定に使う過去n足分の安値・高値データを準備する
	if len(last_data) < term:
		last_data.append(price[i])
		print_price(price[i])
		time.sleep(wait)
		i += 1
		continue
	
	data = price[i]
	print_price(data)
	
	
	if flag["order"]["exist"]:
		flag = check_order( flag )
	elif flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )
	else:
		flag = entry_signal( data,last_data,flag )
	
	
	# 過去データをn個ピッタリに保つために先頭を削除
	del last_data[0]
	last_data.append( data )
	i += 1
	time.sleep(wait)

基本的な仕組みは、第4回で作成した練習用のBOTと同じです。

data という変数に最新のローソク足のデータを入れて、last_data という変数に過去のローソク足のデータを入れて、それを比べることでエントリーシグナルの条件を満たしたどうかを判定しています。

1.ドンチャンブレイクアウトのデータ準備

ただしドンチャンブレイクアウトでは、過去n足(以下、20期間として説明します)の最高値をブレイクしたかどうかを判定する必要があるため、過去20期間分のローソク足データを保持する変数が必要です。

そこでメイン処理のループ文の最初に以下のような処理を入れます。

while i < len(price):
	# ドンチャンの判定に使う過去n足分の安値・高値データを準備する
	if len(last_data) < term:
		last_data.append(price[i])
		print_price(price[i])
		time.sleep(wait)
		i += 1
		continue

まず3行目の if len(last_data) < term: の部分で、last_dataに含まれているローソク足データの数が、20足分に達するまで上記の処理を実行するように指示しています。

continue は「これより下は実行せずにwhile文の先頭に戻る」という意味です。つまり、last_dataという変数に過去20足のローソク足データが保持されるまでは、この先の処理はおこなわず、この部分の処理だけをループで実行します。

いったん20足分のデータが溜まった後は、ループのたびに以下の処理を実行します。

# 過去データをn個ピッタリに保つために先頭を削除
del last_data[0]
last_data.append( data )

1行目のdel 変数名[0] で、last_data に含まれる先頭のデータを削除しています。さらに、2行目の last_data.append(data)で、最新のローソク足のデータを last_data の末尾に追加しています。

メイン処理のループのたびにこれを実行することで、last_data という変数の中身を、常に過去の新しい20期間分のローソク足データにぴったり揃えることができます。

2.ドンチャンブレイクアウトを判定する関数

# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data)
	if data["high_price"] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data)
	if data["low_price"] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}

この関数で、ドンチアン・ブレイクアウトが発生したかどうか、どちらの方向に発生したかを判定しています。

ロングの方向にブレイクアウトが発生した場合は「BUY」シグナルを返し、ショートの方向にブレイクアウトが発生した場合は「SELL」シグナルを返します。また一緒に、ブレイクアウトされた最高値・最安値の価格を返す(return)ようにしておきます。

過去20足の最高値を更新したかどうか、を判定するロジックが以下の部分です。

highest = max(i["high_price"] for i in last_data)
if data["high_price"] > highest:
	return {"side":"BUY","price":highest}

1行目で、last_data に含まれる変数を1個ずつチェックし、["high_price"]だけを取り出した配列を作り、max()でそこから最大値を取得しています。

これはリスト内包表記というfor文を1行で書くための書き方ですが、少し難しく感じる方は、今まで勉強した知識を使って以下のように書いても構いません。

# 「リスト内包表記」で1行で書いた場合
highest = max(i["high_price"] for i in last_data)

# わかりやすく書いた場合
highest = 0
for i in last_data:
	if highest < i["high_price"]:
		highest = i["high_price"]

また、この最高値(highest)を、現在の最新のローソク足の高値(data["high_price"])と比べ、もし現在の最新のローソク足の高値の方が高ければ、ブレイクアウトの条件を満たしたことになります。

ブレイクアウトされていた場合は、その方向と価格をセットで返します。

3.エントリー注文を出す関数

def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		print("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました".format(term,signal["price"],data["high_price"]))
		print(str(data["close_price"]) + "円で買いの指値注文を出します")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"

最初に1行目でさきほど作成した donchian() 関数を呼び、ドンチャンブレイクアウトが発生しているかどうかを確認します。返ってきたデータは signal という変数で受け取ります。

signal = donchian( data,last_data )

これで"BUY"が返ってくれば買いでエントリーし、"SELL"が返ってくれば売りでエントリーしています。

4.決済注文とドテン注文を出す関数

def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1 # ポジションを持って何足経過してるかを計測
	signal = donchian( data,last_data )

	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			print("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました".format(term,signal["price"],data["low_price"]))
			print("成行注文を出してポジションを決済します")
			
			# 決済の成行注文コードを入れる
			
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			print("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"

こちらも基本的な手順は同じです。最初の1行目に signal = donchian() を実行して、ドンチャンブレイクアウトのシグナルを確認します。

もし買いポジションを持っている場合は、売り方向にブレイクアウトが発生した場合のみ、ポジションを決済します。

if flag["position"]["side"] == "BUY":
	if signal["side"] == "SELL":
		print("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました".format(term,signal["price"],data["low_price"]))
		print("成行注文を出してポジションを決済します")
		
		# 決済の成行注文コードを入れる
		
		flag["position"]["exist"] = False
		flag["position"]["count"] = 0

ポジションを手仕舞ったので、flag変数の[exist]をFalseに更新しておきます。

さらに、そのまま売り方向でのエントリーを試みます。これがドテン注文です。

print("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします")

# ここに売り注文のコードを入れる

flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"

ドテンでエントリー注文を出したので、またflag変数の[order][exist]をTrueに更新しておきます。これで、次のwhile文では「注文が通ったかどうか確認する関数」が呼ばれることになります。

それでは最後に実行結果を見ておきましょう。

実行結果

練習問題

このドンチャンブレイクアウトのバックテスト(勝率の検証用)のコードを作り、以下の項目を計算してみましょう。コードの書き方は前回の記事で説明したのと全く同じです。

・トレード回数
・勝率
・平均リターン
・総損益
・平均保有期間

解答サンプルのコード

次回記事

上記の練習問題で、こちらのドンチャンブレイクアウトの勝率をバックテストで検証すると、以下のような成績になります。

これだけシンプルな売買ロジックでも、成績はそれほど悪くないように見えますね。

トレンドフォロー戦略は、一般論として勝率が低いです。トレードの相場の大半がレンジ相場なので、ブレイクアウトの方向に順張りするロジックは「騙し」に引っかかりやすく、勝率が低くなりがちです。その代わり、1度トレンドに乗れれば大きな利益を得られるため、勝率が50%を下回っても期待値は正(プラス)になる、というのが典型的な教科書の考え方です。

バックテストの注意点

しかしこのような戦略には、特有の弱点もあります。それはドローダウンが大きいことです。平均保有期間が長いため、途中で一時的に大きく資産が目減りする時期があるかもしれません。

また月別のリターンにかなり大きなバラつきがある可能性もあります。最終的な平均リターンが2%でも、例えば、その範囲がマイナス数十%までバラついている可能性もあります。

これらのリスクは、上記の成績表を見ているだけではわかりません。

そのため、次回の記事では、バックテストのコードをさらに改良して、資産カーブ(曲線グラフ)を描画したり、平均リターンのバラツキ具合をグラフにする方法を解説します!

【解答】ドンチャンチャネルブレイクアウトBOTの勝率や平均リターンを検証するコード

前回の記事「BTCFXでドンチャンブレイクアウトの勝率をバックテストで検証する」の練習問題の回答コードです。


import requests
from datetime import datetime
import time
import numpy as np


#-----設定項目

chart_sec = 3600    # 1時間足を使用
term = 30           # 過去n日の設定
wait = 0            # ループの待機時間
lot = 1             # BTCの注文枚数
slippage = 0.001    # 手数料・スリッページ


# CryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# 時間と高値・安値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag


# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
	
	highest = max(i["high_price"] for i in last_data)
	if data["high_price"] > highest:
		return {"side":"BUY","price":highest}
	
	lowest = min(i["low_price"] for i in last_data)
	if data["low_price"] < lowest:
		return {"side":"SELL","price":lowest}
	
	return {"side" : None , "price":0}


# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
	signal = donchian( data,last_data )
	if signal["side"] == "BUY":
		flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で買いの指値注文を出します\n")

		# ここに買い注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)

	if signal["side"] == "SELL":
		flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
		flag["records"]["log"].append(str(data["close_price"]) + "円で売りの指値注文を出します\n")

		# ここに売り注文のコードを入れる
		
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)

	return flag



# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
	
	flag["position"]["count"] += 1
	signal = donchian( data,last_data )
	
	if flag["position"]["side"] == "BUY":
		if signal["side"] == "SELL":
			flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします\n")
			
			# ここに売り注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "SELL"
			flag["order"]["price"] = round(data["close_price"] * lot)
			

	if flag["position"]["side"] == "SELL":
		if signal["side"] == "BUY":
			flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
			flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
			
			# 決済の成行注文コードを入れる
			
			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
			flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします\n")
			
			# ここに買い注文のコードを入れる
			
			flag["order"]["exist"] = True
			flag["order"]["side"] = "BUY"
			flag["order"]["price"] = round(data["close_price"] * lot)
			
	return flag


# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag

# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),2)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),2)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])



# ここからメイン処理

# 価格チャートを取得
price = get_price(chart_sec,after=1483228800)

flag = {
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		"buy-holding-periods":[],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		"sell-holding-periods":[],
		
		"slippage":[],
		"log":[]
	}
}


last_data = []
i = 0
while i < len(price):

	# ドンチャンの判定に使う過去30日分の安値・高値データを準備する
	if len(last_data) < term:
		last_data.append(price[i])
		flag = log_price(price[i],flag)
		time.sleep(wait)
		i += 1
		continue
	
	data = price[i]
	flag = log_price(data,flag)
	
	
	if flag["order"]["exist"]:
		flag = check_order( flag )
	elif flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )
	else:
		flag = entry_signal( data,last_data,flag )
	
	
	# 過去データを30個に保つために先頭を削除
	del last_data[0]
	last_data.append( data )
	i += 1
	time.sleep(wait)


print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

backtest(flag)

さくらのWindowsVPSでPython(Anaconda)の環境を構築する

WindowsServer2012でpythonを実行できるようにするまでの手順です。

WindowsVPSでは、自宅のWindowsパソコンと同じようにブラウザを使うことができます。そのため、WEBから普通にAnacondaのファイルをダウンロードしてセットアップすることも可能です。

ただし初期設定のInternetExplorer(IE)では、セキュリティの関係などからうまくダウンロードができないことがあります。また、そもそもAnacondaのセットアップファイルはかなり容量も大きいので、リモートデスクトップでダウンロードすると少しストレスを感じるかもしれません。

そこでローカルのPCの方であらかじめAnacondaのセットアップファイルを用意しておき、それをZIPファイルにして、リモートデスクトップ接続先にコピー&ペーストするのがおすすめです。

windowsVPSでのAnacondaの設定手順

▽ Anacondaのsetup.exeをコピー&ペースト

すると以下のようにコピーが始まります。

ZIPファイルのコピーが完了したら、展開してセットアップファイルを起動します。

あとは前にこちらの記事「Anacondaという開発環境をパソコンに入れてみよう」で説明したときの手順と全く同じです。忘れた方はこちらの記事を読んでください。

▽ セットアップ中

無事、Anacondaのインストールが完了しました。

PythonファイルをWindowsVPSにコピーする

さて、次にpythonの自動売買BOTが動くのかどうか実際にテストしておきましょう。

先ほどと同じように実行したいpythonファイルを「Ctrl + C」「Ctrl + V」でリモートデスクトップ接続先(WindowsVPS)にコピーします。

Anacondaプロンプトを起動する

左下のWindowsメニューアイコンを「右クリック」して「検索」を選択します。

「Anaconda」で検索してAnacondaプロンプトを探しましょう。右クリックして「ファイルの場所を開く」をクリックします。

まずは作業フォルダのパスをデスクトップに変更しておきましょう。
Anacondaプロンプトを右クリックして、「プロバティ」を選択します。

そして「作業フォルダ」をデスクトップに変更して「適用」をクリックしておきます。

これでpythonファイルを簡単に実行できるようになります。

デスクトップにショートカットを作る

作業パスを変更したら、Anacondaプロンプトのショートカットを作っておきましょう。

さきほどのフォルダからAnacondaプロンプトを右クリックして「送る」->「デスクトップ(ショートカットを作成)」を選択します。

デスクトップにpythonファイルとAnacondaプロンプトを準備することができました。これで、このブログで解説してきたPC環境と全く同じ環境をサーバーに構築することができましたね。

pythonファイルを実行する

ショートカットアイコンから「Anaconda Prompt」を起動します。

第4回 BOT作成編 で作成した自動売買BOTを稼働させる場合は、こちらの記事で説明したように、再度、「CCXT」をpipでインストールするのを忘れないようにしてください。

▽ pipでCCXTライブラリをインストール

さて、いよいよpythonファイルを実行しましょう。
「python test.py」で実行します。

▽ pythonファイル実行

無事、windowsVPS(サーバー)でBOTを稼働することができました!
これで自宅のパソコンを切っても安心してBOTを動かすことができます。

※ 実行を開始した後はリモート接続を切っても構いません。リモート接続を右上の「×」ボタン切っても、pythonプログラムは稼働し続けます。

BTCFXの自動売買BOTを24時間動かすためのWindowsVPSの環境作り

前回までの記事で、24時間365日、稼働し続けるFX自動売買BOTを作ることができました!

ところで、pythonのプログラミングコードはこれで完成ですが、実際にこのプログラムを24時間動作させるためには、24時間動き続けるPC(パソコン)環境が必要になります。当たり前ですね。

これを実現する方法は2つあります。

(1)自宅のパソコンをつけっぱなしにして24時間動かす
(2)WindowsVPSを契約してクラウドで動くwindowsPCを借りる

自宅のパソコンを付けっ放しにできる方は、(1)の方法でも問題ないでしょう。「スリープモード」の設定をオフに変更して、windowsのアップデートも手動更新にし、自動アップデートがかからないように設定を変更すれば大丈夫です。

サーバーでBOTを運用するメリット

ただしパソコンを24時間起動できない環境にいる方や、BOTを運用しながら同時に新しいBOTの開発やテスト検証をしたい方は、間違って止めたりしないためにも運用サーバーは自宅PCと別にした方が便利です。

またCryptowatchのようなAPIには、パソコンのIPごとにリクエスト頻度の上限(Cost)が定められていますが、BOT運用をサーバーにしておけば、自宅PCのIPが上限にかかっても実践BOTの稼働を邪魔する心配がありません。

このような理由から、今回の記事ではより本格的に自動売買BOTに取り組む方に向けて、月額1000円でクラウドにサーバー環境(windowsVPS環境)を整える方法を紹介します。

WindowsVPSとは?

簡単にいえば、リモートからアクセスできるwindowsのパソコンを1台借りるようなものです。

一応、部類としてはサーバーですが、WEBサイトを運営するときに借りるレンタルサーバーとは用途が全く違います。なので、「サーバーって何だか難しそう…」と身構える必要はありません。

一般的なレンタルサーバーは、LinuxなどのOSがインストールされており、マウスによるデスクトップ操作もできないため、コマンド操作などサーバー周りの知識が多少必要になります。

しかしWindowsVPSは、家にあるwindowsPCとほとんど同じような画面とマウスで操作することができます。そのため、FXの自動売買BOTや何らかの購入ツールを稼働させる、というニーズで借りる人も多いです。

▽ WindowsVPSの操作画面

早速、使ってみよう!

windowsVPSを提供している会社は、さくらインターネット、お名前.com、GMOなどいくつかあります。私が知っている範囲で昔から口コミの評判が良いのはさくらインターネットです。私もさくらのwindowsVPSを使っています。

さくらのVPS for Windows Server

自動売買BOTをいくつか動かすだけなら、月額1000円の「W768」プランで問題ありません。クレジットカード払いの登録をすれば、2週間無料でお試し版を利用することもできます。

利用手順

以下の順番で説明します。

1)無料お試し版の仮登録までの流れ
2)Windowsリモートデスクトップ接続の設定手順

すでに登録している方は2)から読んでください。

1.お試し版の申込方法

まずはこちらのページから、以下の「2週間お試し無料!お申込み」ボタンをクリックします。

すると、会員登録の画面に飛びます。

まず会員登録をする

さくらのVPS for Windows Serverを申し込むためには、まず会員登録をする必要があります。

会員登録には、携帯電話のショートメッセージによる認証が必要です。

▽ 電話番号認証

申込みのプランを選択する

プランは最初は月額1080円のW768で問題ないと思います。初期費用が2160円かかります。

プリインストールするソフトを選ぶ

エクセルなどをWindowsVPSでも使いたい方は、追加ソフトウェアをここでインストールすることができます。ただし有料です。自動売買BOTを動かすだけなら不要なので私は入れていません。

追加ライセンスが必要な場合はここで選択します。基本的には不要です。そのまま進むと、支払方法の確認画面になります。

2週間の無料お試し版を使いたい場合は、支払方法にクレジットカードを選択する必要があります。先にクレジットカードを登録しなければなりません。

▽ 会員情報画面からクレジットカードを登録

クレジットカードを登録して支払方法を選択すると、以下のような最終画面に進みます。最初の月は2カ月分と初期費用で4000円ほどかかりますので、金額を確認してください。

なお、クレジットカードは先に登録しなければなりませんが、お試し期間中はいつでもキャンセルが可能です。お試し期間中にキャンセルすれば、1円も課金されることはありません。

何もしないとそのまま継続扱いになりますが、お試し期間終了の数日前にもちゃんと確認のメールが届きます。

登録完了後

上のような登録完了画面になった後、2通のメールが届きます。

・お申込み受付完了のお知らせ
・[さくらのVPS]仮党登録完了のお知らせ

この2通目のメールに以下のようなアカウント情報が入っているので、これをメモしておいてください。

2.設定手順

メールに記載されている「さくらVPS」のログイン画面にアクセスします。以下の画面です。

先ほどのメールにあるIPアドレス/パスワードでも、さくらの会員IDでもどちらでもログインが可能です。ログインしたら、以下のボタンでwindowsサーバーを起動させます。

「稼働中」になったら準備OKです。

次に自身が使っているパソコンのメニュー画面から、「リモートデスクトップ接続」というアプリを起動します。

※ここではwindows10を例に説明していますが、windows8/7/Vistaでも同じように「リモートデスクトップ接続」というアプリがあります。

クリックすると以下のウィンドウが起動します。

この「コンピューター」のところに、先ほどのメール内にあった「サーバー基本情報」のIPアドレス(11.111.222.222 のような数字)を入力します。

すると以下のような画面になりますので、先ほどのメールにあった「管理用ユーザ」のユーザ名とパスワードを入力します。ユーザー名は「Administrator」なので注意してください。

入力したら「OK」をクリックします。

環境によっては以下のようなセキュリティ証明書の警告が出ますが、これは公式のサポートページにも説明があるので、続行してしまって問題ありません。

「はい」をクリックすると、しばらく黒い画面が出たままになりますが大丈夫です。しばらう待っていると、以下のようにリモートでwindowsが起動されます。

Windowsの起動完了!

あとはこのサイトの第1回で説明したのと同じように、Anacondaの環境を構築するだけです。

ただしAnacondaのセットアップファイル(exe)はWEBからダウンロードするよりも、ローカルのパソコンでダウンロードしてからコピー&ペーストした方が早いです。以下、一応手順を紹介しておきます。

続き:さくらのWindowsVPSでPython(Anaconda)の環境を構築する

Bitflyerの自動売買BOTのバックテストで平均保有期間を追加したコード

前回の記事「Bitflyerの自動売買BOTの勝率・平均リターン・総利益をバックテストで計算する」の最後の練習問題の答え合わせです。

解答


import requests
from datetime import datetime
import time
import json
import ccxt
import numpy as np


# ----バックテスト用の初期設定値
chart_sec = 300            # 5分足
lot = 1                    # 1トレードの枚数
slippage = 0.0005          # 手数料やスリッページ(0.05%初期値)
close_condition = 0        # エントリー後、n足が経過するまでは手仕舞わない(初期値0)


# パラーメータを指定してCryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# json形式のファイルから価格データを読み込む関数
def get_price_from_file(path):
	file = open(path,'r',encoding='utf-8')
	price = json.load(file)
	return price



# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) )

# 時間と始値・終値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag



# 各ローソク足が陽線・陰線の基準を満たしているか確認する関数
def check_candle( data,side ):
	try:
		realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) 
		increase_rate = data["close_price"] / data["open_price"] - 1
	except ZeroDivisionError as e:
		return False
	
	if side == "buy":
		if data["close_price"] < data["open_price"] : return False
		#elif increase_rate <  0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True
		
	if side == "sell":
		if data["close_price"] > data["open_price"] : return False
		#elif increase_rate > -0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True


# ローソク足が連続で上昇しているか確認する関数
def check_ascend( data,last_data ):
	if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]:
		return True
	else:
		return False

# ローソク足が連続で下落しているか確認する関数
def check_descend( data,last_data ):
	if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]:
		return True
	else:
		return False


# 買いシグナルが出たら指値で買い注文を出す関数
def buy_signal( data,last_data,flag ):
	if flag["buy_signal"] == 0 and check_candle( data,"buy" ):
		flag["buy_signal"] = 1

	elif flag["buy_signal"] == 1 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		flag["buy_signal"] = 2

	elif flag["buy_signal"] == 2 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["buy_signal"] = 3
		
		# ここにBitflyerへの買い注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)
	
	else:
		flag["buy_signal"] = 0
	return flag


# 売りシグナルが出たら指値で売り注文を出す関数
def sell_signal( data,last_data,flag ):
	if flag["sell_signal"] == 0 and check_candle( data,"sell" ):
		flag["sell_signal"] = 1

	elif flag["sell_signal"] == 1 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		flag["sell_signal"] = 2

	elif flag["sell_signal"] == 2 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		log = "3本連続で陰線 なので" + str(data["close_price"]) + "円で売り指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["sell_signal"] = 3
		
		# ここにBitflyerへの売り注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)
		
	else:
		flag["sell_signal"] = 0
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文を出す関数
def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1
	
	if flag["position"]["side"] == "BUY":
		if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
	if flag["position"]["side"] == "SELL":
		if data["close_price"] > last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を上回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
	return flag


# サーバーに出した注文が約定したかどうかチェックする関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag


# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])
	
# ここからメインの実行処理

# 価格チャートを取得
price = get_price(chart_sec,after=1483228800)

print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")



last_data = price[0] # 初期値となる価格データをセット


flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		"buy-holding-periods":[],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		"sell-holding-periods":[],
		
		"slippage":[],
		"log":[]
	}
}

i = 1
while i < len(price):
	if flag["order"]["exist"]:
		flag = check_order( flag )
	
	data = price[i]
	flag = log_price(data,flag)
	
	if flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag )
		flag = sell_signal( data,last_data,flag )
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1

backtest(flag)

解説

まずflag変数に新しく以下の2つの変数を追加しました。

"sell-holding-periods":[]
"buy-holding-periods":[]

ポジションを保有している期間は、もともと手仕舞いのための関数 def close_position() の先頭部分でカウントしています。

def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1

そのため、ポジションを手仕舞うたびに、ここで記録している[count]の情報を、上記の holding-periods:[] に append していけばOKです。そのため、関数 def records() に以下の部分を追記します。

追記部分

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	(中略)
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		# ここに追記
		flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
	if flag["position"]["side"] == "SELL":
		# ここに追記
		flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )

また最後にバックテスト集計用の関数のところで、平均保有期間を計算してprintします。計算には、numpy の np.average() を使い、それを round() で小数点1桁に丸めています。

以下の部分です。

追記部分

# バックテストの集計用の関数
def backtest(flag):
	(中略)
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
	print("平均保有期間  :  {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))

Bitflyerの自動売買BOTの勝率・平均リターン・総利益をバックテストで計算する

さて、いよいよ前の章で作成した自動売買BOTの勝率を過去データを使って検証していきましょう。過去データには、前回の記事で取得した5分足の過去6000件のデータを使います。

pythonコード

まずは先にイメージが湧きやすいように、具体的なコードと実行結果を示します。各行のコードの意味は後ほど詳しく説明するので、いきなり長いコードを読むのがしんどい方は、以下のコードは読み飛ばしてください。


import requests
from datetime import datetime
import time
import json
import ccxt
import numpy as np


# ----バックテスト用の初期設定値
chart_sec = 300            # 5分足
lot = 1                    # 1トレードの枚数
slippage = 0.0005          # 手数料やスリッページ(0.05%初期値)
close_condition = 0        # エントリー後、n足が経過するまでは手仕舞わない(初期値0)


# パラーメータを指定してCryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0:
				price.append({ "close_time" : i[0],
					"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
					"open_price" : i[1],
					"high_price" : i[2],
					"low_price" : i[3],
					"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None


# json形式のファイルから価格データを読み込む関数
def get_price_from_file(path):
	file = open(path,'r',encoding='utf-8')
	price = json.load(file)
	return price



# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) )

# 時間と始値・終値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag



# 各ローソク足が陽線・陰線の基準を満たしているか確認する関数
def check_candle( data,side ):
	try:
		realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) 
		increase_rate = data["close_price"] / data["open_price"] - 1
	except ZeroDivisionError as e:
		return False
	
	if side == "buy":
		if data["close_price"] < data["open_price"] : return False
		#elif increase_rate <  0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True
		
	if side == "sell":
		if data["close_price"] > data["open_price"] : return False
		#elif increase_rate > -0.0003 : return False
		#elif realbody_rate < 0.5 : return False
		else : return True


# ローソク足が連続で上昇しているか確認する関数
def check_ascend( data,last_data ):
	if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]:
		return True
	else:
		return False

# ローソク足が連続で下落しているか確認する関数
def check_descend( data,last_data ):
	if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]:
		return True
	else:
		return False


# 買いシグナルが出たら指値で買い注文を出す関数
def buy_signal( data,last_data,flag ):
	if flag["buy_signal"] == 0 and check_candle( data,"buy" ):
		flag["buy_signal"] = 1

	elif flag["buy_signal"] == 1 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		flag["buy_signal"] = 2

	elif flag["buy_signal"] == 2 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["buy_signal"] = 3
		
		# ここにBitflyerへの買い注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
		flag["order"]["price"] = round(data["close_price"] * lot)
	
	else:
		flag["buy_signal"] = 0
	return flag


# 売りシグナルが出たら指値で売り注文を出す関数
def sell_signal( data,last_data,flag ):
	if flag["sell_signal"] == 0 and check_candle( data,"sell" ):
		flag["sell_signal"] = 1

	elif flag["sell_signal"] == 1 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		flag["sell_signal"] = 2

	elif flag["sell_signal"] == 2 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		log = "3本連続で陰線 なので" + str(data["close_price"]) + "円で売り指値を入れます\n"
		flag["records"]["log"].append(log)
		flag["sell_signal"] = 3
		
		# ここにBitflyerへの売り注文コードを入れる

		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		flag["order"]["price"] = round(data["close_price"] * lot)
		
	else:
		flag["sell_signal"] = 0
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文を出す関数
def close_position( data,last_data,flag ):
	flag["position"]["count"] += 1
	
	if flag["position"]["side"] == "BUY":
		if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
			
	if flag["position"]["side"] == "SELL":
		if data["close_price"] > last_data["close_price"] and flag["position"]["count"] > close_condition:
			log = "前回の終値を上回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
			flag["records"]["log"].append(log)
			
			# 決済の成行注文コードを入れる

			records( flag,data )
			flag["position"]["exist"] = False
			flag["position"]["count"] = 0
	return flag


# サーバーに出した注文が約定したかどうかチェックする関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	flag["position"]["price"] = flag["order"]["price"]
	
	return flag

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag


# バックテストの集計用の関数
def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])
	
# ここからメインの実行処理

# 価格チャートを取得
price = get_price(chart_sec,after=1514764800)

print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")



last_data = price[0] # 初期値となる価格データをセット


flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		
		"slippage":[],
		"log":[]
	}
}

i = 1
while i < len(price):
	if flag["order"]["exist"]:
		flag = check_order( flag )
	
	data = price[i]
	flag = log_price(data,flag)
	
	if flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag )
		flag = sell_signal( data,last_data,flag )
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1

backtest(flag)

先に実行結果と、このバックテストの結果を「どう解釈するか?」から見ておきましょう!

実行結果

同時に以下のようなログファイルも出力されるため、どのようなタイミング(価格)でいつエントリーして、各トレードでどの程度の利益・損失が出たのかも確認できます。

▽ 自動で取引のログファイルを出力

▽ 各トレードの価格と利益・手数料を記録

※ ここではサンプル数の問題から、実体の長さ(increase_rate)、実体率(realbody_rate)の条件は無効にしています。有効にして試したい方は、def check_candle の中のコメントアウト(#)部分を外してください。

とりあえず、このテストで作った売買BOTのロジックのままでは、勝てなさそうなことがわかりましたね(笑)

売買ロジックの考察

テストBOTのコードなので深くは考察しませんが、いくつか結果からポイントを挙げておきます。pythonのソースコードの解説だけ読みたい方は、後半まで読み飛ばしてください。

まず手数料コストの23万円が痛いですが、手数料率を0%にして再計算しても、やはり最終損益はマイナスになります。つまり売買ロジックそのものの期待値がマイナスです。

1.「売り」と「買い」をひっくり返してみる

売買ロジックの期待値がマイナスということは、エントリー条件をひっくり返してみたらどうでしょうか?

つまり3本連続の陽線を順張りの「買いシグナル」ではなく逆張りの「売りシグナル」として考え、同じように3本連続の陰線を「売りシグナル」ではなく「買いシグナル」と考えてみます。これは以下の部分を2箇所、書き換えるだけで簡単にテストしてみることができます。

これを実行すると以下のようになりました。

▽ 売買の条件を反転させた場合

勝率は少し改善したものの、買い・売りの条件を正反対にひっくり返してもやはり期待値はマイナスの結果になりました。

不思議に思う方もいるかもしれませんが、手仕舞いの条件が正反対になっていないので、このようなことは当然ありえます。どちらの場合でも期待値がマイナスということは、手仕舞いの方法にも問題があるといえそうです。

2.手仕舞いを条件を変更してみる

では、手仕舞いの条件に「エントリーした次の足では手仕舞いしない」という条件を追加してみたらどうでしょうか?

もし仮に「3本連続の陽線が買いシグナル」だとしても、4本目の足では一時的に反発して戻す可能性も考えられます。ログファイルを見ていると、エントリーした次の足ですぐに損切にかかっているケースが非常に多いですね。このような仮説から「4本目の足は手仕舞いの条件判定から除外する」というロジックを追加することも考えられそうです。

上記のコードでは、初期設定値の close_condition を使って、そのような条件を試すことができるようにしておきました。興味がある方は試してみてください。

3.パラメーターの調整の問題点

しかしこのようなパラメーターの調整にはやりすぎると問題です。片っ端から色々なパラメーターを試していけば、簡単にプラスの期待値を持つ売買ロジックを作ることができてしまうからです。例えば、以下のように。

▽ 手数料0.05%でもプラスになった売買ロジック

1)売り/買いの反転
 3本連続の陽線を売りシグナルとする(逆張り)
 3本連続の陰線を買いシグナルとする(逆張り)
2)実体の長さ、実体率の条件を有効化
 increase_rate と realbody_rateの設定
3)5分足ではなく1時間足に変更
4)close_conditionのパラメータを2に設定
 エントリー後、最低2足経過するまで手仕舞わない

上記の条件でテストをすると、2017年7月~2018年4月までのテスト期間で売り・買いともに勝率は50%を上回り、手数料を差し引いてもプラスの利益を上げることができました! これは使えるストラテジーを見つけてしまったのではないでしょうか!

...いえ、残念ながらそうとは限りません。ここでプラスの成績が出ているのは、「プラスの成績が出るまで色々な組み合わせを試した」からに過ぎません。

まずトレード数が少ない(買い16回、売り33回)のも気になりますが、一般論としていえば、過去テストのパフォーマンスを見ながらどんどん後から条件を足していく行為は、「カーブフィッティング」(過剰最適化)と呼ばれ、システムトレードではあまりやってはいけない行為とされています。

以下、理由を簡単に説明します。ただしこれから自動売買BOTを始める方は、あまり最初から頭でっかちになっても良くありません。軽く読み流す程度でも構いません。

過剰最適化とは?

例えば、「エントリー後、〇本目の足で無条件に手仕舞う」という手仕舞いのロジックを考えたとします。このルールに基づいて、「〇本目(n)の足」の部分に1、2、3、4、5...と、順番に数字を入れていき、なぜかたまたま「9」を入れたときに、めちゃくちゃ良い成績が出たとします。

この「理由はわからないけど、9という数字を使うと異様にいい成績が出る」という固定値を無理やり探す行為は、やりすぎると過去のデータに対する過剰な最適化となります。

例えば、典型的なのが移動平均線のクロスを使ったエントリーです。短期移動平均線(5/6/7/8...)と長期移動平均線(20/21/22/23...)などの組み合わせを片っ端から試し、ある特定期間で明らかに「5期間と37期間」の組み合わせでパフォーマンスが良かったとしましょう。

しかし他の期間でもそれが当てはまるとは限りません。過去データにおいて、たまたま上手く当てはまった数字(パラメーター)が、将来にも当てはまるとは限りません。

変数が2つくらいならまだ大丈夫ですが、ここにさらに「火曜日はトレードの対象から外す」「手仕舞いには8期間の経過を待つ」「損切りラインをエントリー価格より 1.7ATR 下に入れる」など、理由のよくわからない恣意的な数字を使った条件を足していくと、いくらでも(過去データにおける)良い成績を作れてしまいます。

もし恣意的な数字を選んで使う場合は、そのパラメータは「少なければ少ないほどいい」と覚えておいてください。また、テスト期間を半分に分けて、前半で調整したパラメーターを後半の期間で再テストしてみるのも有効です。

バックテスト検証コードの作り方の解説

少し話が逸れてしまいました。
ここからバックテスト検証コードの解説部分に入ります。

1.成績を管理するための変数を用意する

前回の章で自動売買BOTを作成したときに、売買シグナルの有無やポジションの状況などの情報をプログラム上で管理させるために「flag」という変数を作りました。バックテストの検証でもこの「flag」変数を以下のように改良して使います。


flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"price" : 0,
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : "",
		"price": 0,
		"count":0
	},
	"records":{
		"buy-count": 0,
		"buy-winning" : 0,
		"buy-return":[],
		"buy-profit": [],
		
		"sell-count": 0,
		"sell-winning" : 0,
		"sell-return":[],
		"sell-profit":[],
		
		"slippage":[],
		"log":[]
	}
}

バックテストのために追加したのは以下の部分ですね。

"records":{
	"buy-count": 0, # 買いエントリのトレード数を記録
	"buy-winning" : 0, # 勝った数を記録
	"buy-return":[], # 各トレードでのリターン(利益率)を記録
	"buy-profit": [], # 各トレードでの利益・損失額を記録
	
	"sell-count": 0, # 売りエントリのトレード数を記録
	"sell-winning" : 0, # 勝った数を記録
	"sell-return":[], # 各トレードでのリターン(利益率)を記録
	"sell-profit":[], # 各トレードでの利益・損失額を記録
	
	"slippage":[], # 各トレードで生じた手数料を記録
	"log":[] # あとでテキストファイルに出力したい内容を記録
}

基本的な考え方は、「ポジションを手仕舞うたびに上記のフラッグを更新して、各トレードの成績をflag変数に記録していく」というだけです。

バックテストといっても、そのコード内容は基本的にはリアルタイム(実践用)のコードと同じです。最初にまとめて用意したローソク足を、while文を使って1足ずつループで処理していき、売買シグナルが点灯する ⇒ 買い/売りでエントリーする ⇒ 条件を満たしたら手仕舞う の流れを忠実に再現しています。

その仕組みがわかっていれば、今まで作成していた売買BOTの「手仕舞いのための関数 def close_position()」が呼ばれたときに、一緒に flag["records"]を以下のように更新するpythonコードを作ればいい、とわかりますね。

1.トレード回数(count)を1増やす
2.勝ったか負けたか(winning)をカウントする
3.利益額(profit)を記録する
4.リターン率(return)を記録する

買い/売りの記録を分ける理由

今回は、買いエントリーと売りエントリーの勝率を、それぞれ別々に検証しておきたいので、あらかじめ全ての変数を「buy-〇〇」と「sell-〇〇」に分類しておきます。

BitflyerのBTCFXのように短い期間でしか過去データを検証できない状況の場合、そのときの局面(下落トレンドか上昇トレンドか)の影響を受けやすくなります。買いエントリーで入るか、売りエントリーで入るかによって、バイアス(勝率の偏り)が生じている可能性があるため、一緒くたに勝率を計測してしまうと、売買ロジック自体の有効性が検証しにくくなります。

また利益額を計算するためには、エントリーしたときの価格(指値)を覚えておく必要がありますね。そのために上記のflagでは、flag[position]やflag[order] に新たに [price] という項目を追加しています。

2.全体のコードの流れを把握する

まず全体のコードの中で、追加しなければならない処理を大まかにまとめておきましょう。

1)エントリー注文する関数
・エントリーしたときの指値価格を記録しておく
・エントリーした日時と価格をログ変数に入れる

2)ポジションを手仕舞いする関数
・成績を記録するための関数を呼ぶ

3)成績を記録するための関数(NEW)
・トレード回数をカウントする
・手数料(スリッページ)を計算する
・利益額を計算する
・勝ったか負けたかを記録する
・リターン(率)を計算する
・利益/損失の日時と額をログ変数に入れる

4)最終的に集計する関数(NEW)
・総利益額を計算する
・勝率(勝ち数/トレード総数)を計算する
・平均リターンを計算する
・買い/売りごとの成績を print で表示する
・ログ内容をテキストファイルで出力する

では、具体的なコードを見ていきましょう!

1)買いシグナルでエントリーする関数

log = "3本連続で陽線 なので" + str(data["close_price"]) + "円で買い指値を入れます\n"
flag["records"]["log"].append(log) # テキスト内容をログに記録
flag["buy_signal"] = 3

# ここにBitflyerへの買い注文コードを入れる

flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = round(data["close_price"] * lot) # エントリー価格を計算

以下の部分を新たに変更しました。

1~2行目は、今までAnacondaプロンプトに表示していた「3本連続なので 1223023円 で買い指値を入れます」というテキスト内容を、かわりに[log]変数に記録しています。バックテストでは、プロンプトにいちいち全ての情報を出力するのは邪魔なので、最終成績以外の細かい情報はすべてログファイルの方に出力するようにします。

なお、ここで覚えておいて欲しいのが、.append()という関数です。

便利すぎる.appned()を使いこなそう!

.append()というのは、配列の要素に新しい要素を追加するときに使う関数です。例えば、以下のような配列があるとします。

crypto = ["BTC","ETH","XRP"]

ここに新しく"XEM"という要素を加えたい場合は以下のように書きます。

cyrpto.append("XEM")

これを実行すると、以下のようになります。

crypto = ["BTC","ETH","XRP","XEM"]

この.append()は、while文やfor文などのループ処理の中で、データを記録するときに、めちゃくちゃ使います。この後も何度も出てくるので、ここで覚えておきましょう!

2)手仕舞い(決済)する関数

flag["position"]["count"] += 1
if data["close_price"] < last_data["close_price"] and flag["position"]["count"] > close_condition:
	log = "前回の終値を下回ったので" + str(data["close_price"]) + "円あたりで成行で決済します\n"
	flag["records"]["log"].append(log)
	
	# 決済の成行注文コードを入れる

	records( flag,data )
	flag["position"]["exist"] = False
	flag["position"]["count"] = 0

最初の1行はバックテストの検証コードとは関係ない部分です。

今回の記事から「〇足経過するまでは手仕舞いをしない」という新しい手仕舞いのための条件を追加したため、手仕舞いの関数が呼ばれるたびに1ずつ増えるカウンタ(flag[position][count])を新たに用意しています。

このカウンタが、最初に設定した(close_condition)を上回らない限り、手仕舞いのための条件を満たさない、という仕組みです。

また手仕舞いの条件を満たした場合には、成行注文を出した後に、以下の「成績を記録するための関数」を呼んでいます。

3)各トレードの成績などを記録する関数

# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
	
	# 取引手数料等の計算
	entry_price = flag["position"]["price"]
	exit_price = round(data["close_price"] * lot)
	trade_cost = round( exit_price * slippage )
	
	log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
	flag["records"]["log"].append(log)
	flag["records"]["slippage"].append(trade_cost)
	
	# 値幅の計算
	buy_profit = exit_price - entry_price - trade_cost
	sell_profit = entry_price - exit_price - trade_cost
	
	# 利益が出てるかの計算
	if flag["position"]["side"] == "BUY":
		flag["records"]["buy-count"] += 1
		flag["records"]["buy-profit"].append( buy_profit )
		flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
		if buy_profit  > 0:
			flag["records"]["buy-winning"] += 1
			log = str(buy_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(buy_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	if flag["position"]["side"] == "SELL":
		flag["records"]["sell-count"] += 1
		flag["records"]["sell-profit"].append( sell_profit )
		flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
		if sell_profit > 0:
			flag["records"]["sell-winning"] += 1
			log = str(sell_profit) + "円の利益です\n"
			flag["records"]["log"].append(log)
		else:
			log = str(sell_profit) + "円の損失です\n"
			flag["records"]["log"].append(log)
	
	return flag

最初に、注文枚数(lot)に応じた手仕舞い価格を計算し、そこから手数料(スリッページコスト)を計算しています。次に、そのコストを加味した上で利益が出ているかどうかを計算しています。

基本的には1行ずつ読んでいただければ、何をやっているかは理解できると思います。

以下の項目は、利益が出ているか出ていないかに関わらず、共通で実行する項目です。

flag["records"]["buy-count"] += 1 #トレード回数の記録
flag["records"]["buy-profit"].append( buy_profit ) #利益、損失の記録
flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 )) #利益率(損失率)を計算

また勝った場合のみ、以下の処理を追加で実行して「勝った数」をカウントします。

買いエントリーであれば、手仕舞い価格がエントリー価格を上回っていれば「勝ち」です。売りエントリーであれば、エントリー価格が手仕舞い価格を上回っていれば勝ちです。この「勝ち」カウンタは最後に勝率を計算するときに使います。

flag["records"]["sell-winning"] += 1

また「手数料は〇円でした」「〇円の利益(損失)でした」というログ用のテキストを、flag[records][log] に追加しています。ここではログだけでなく、リターンや利益額など、数字の記録にもすべて append() を使っています。

4)メイン処理の最初部分

price = get_price(chart_sec,after=1514764800)

print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")

Cryptowatchから過去のローソク足のデータを取得する部分のコードです。

(after = UNIX時間) で指定された期間以降のBitflyerの価格データをCryptowatchから取得し、それがいつからいつのデータなのかをprintで表示しています。この部分のソースコードは、前回の記事で解説したものと全く同じなので、そちらを参考にしてください。

上記の get_price() の部分を、get_price_form_file() にすれば、自分で保存しておいたjsonファイルからローソク足データを読み込むこともできます。これも前回の記事で解説済です。

5)メイン処理のループ部分

i = 1
while i < len(price):
	if flag["order"]["exist"]: # 未約定の注文がないかチェック
		flag = check_order( flag )
	
	data = price[i]
	flag = log_price(data,flag) # 新しいローソク足の日時と始値・終値をログ
	
	if flag["position"]["exist"]: # ポジションを保有していれば、手仕舞いの条件をチェック
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag ) # 買いエントリの条件をチェック
		flag = sell_signal( data,last_data,flag ) # 売りエントリの条件をチェック
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1

backtest(flag)

全体のメイン処理のループ部分では、特に変わった箇所はありません。
変わったのは、7行目のlog_price()くらいですね。

今までは、すべてのローソク足の「日時: 始値: 終値:」をプロンプト(黒い画面)に表示していました。しかしバックテストで全てのローソク足を表示するのは邪魔なので、ログファイルに出力するよう変更しています。

今回のコードでは、以下のようなlog_price()関数を作り、それを呼んでいます。

# 時間と始値・終値をログに記録する関数
def log_price( data,flag ):
	log =  "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) + "\n"
	flag["records"]["log"].append(log)
	return flag

またループ処理が終わった後に、一番最後に、backtest() という「最後の集計用の関数」を実行しています。では最後にこの関数の中身を見てみましょう。

6)バックテストの集計用の関数

def backtest(flag):
	
	buy_gross_profit = np.sum(flag["records"]["buy-profit"])
	sell_gross_profit = np.sum(flag["records"]["sell-profit"])
	
	print("バックテストの結果")
	print("--------------------------")
	print("買いエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["buy-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["buy-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["buy-profit"]) ))
	
	print("--------------------------")
	print("売りエントリの成績")
	print("--------------------------")
	print("トレード回数  :  {}回".format(flag["records"]["sell-count"] ))
	print("勝率          :  {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
	print("平均リターン  :  {}%".format(round(np.average(flag["records"]["sell-return"]),4)))
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) ))
	
	print("--------------------------")
	print("総合の成績")
	print("--------------------------")
	print("総損益        :  {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
	print("手数料合計    :  {}円".format( np.sum(flag["records"]["slippage"]) ))
	
	# ログファイルの出力
	file =  open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
	file.writelines(flag["records"]["log"])

今回、新しく登場したのが「numpy」です。

numpyを使って平均や合計を計算しよう!

これは、平均値、合計値、標準偏差、中央値、などを簡単に計算してくれる便利な計算用のライブラリです。例えば、以下のような配列の合計値や平均値を1行で計算することができます。

例)

score = [112,342,124,34,56,234,99,124,553] #各スコア

sum_score = np.sum(score) #スコアの合計
average_score = np.average(score) #スコアの平均値

np.sum() や np.average() というのは、numpyライブラリのsum関数、average関数を使います、という意味です。Bitflyerの注文を出すときに、「bitflyer.create_order()」と書いたのと同じイメージですね。

また一番最初のimport文で以下のように記載しています。

import numpy as np

このように「as np」と書いておくことで、プログラムのコード中で numpy.sum() と書く代わりに、np.sum() と書くことができるようになります。

正直、初心者にとっては、numpy.sum()と書いた方がわかりやすい気がするのですが、なぜかあらゆるプログラムでは「np.sum()」と書くのが慣例になっています。そのためそういったコードに慣れるために、ここでもそうしておきます。

round()を使って四捨五入をしよう!

またround()は四捨五入するための関数です。round( 数字 , 小数点以下の桁数 )と書くことで、指定した数字を指定した桁数(小数点以下の桁数)に丸めることができます。

例:
x = 186.44444
round(x,2)
#x = 186.44 

上記のnumpyやround関数を使って、

・勝率
・平均リターン
・総損益
・手数料の合計

を順番に計算して出力しています。
これでバックテストのコードの解説は終わりです。

まとめ

今回、新しく作成した勝率検証(バックテスト)のためのコードは、基本的に「売買シグナルの判定」「手仕舞いの条件判定」の箇所とは独立した関数です。そのため、どんな売買ロジックのpythonコードにもそのまま応用することができます。

次回からは、「ドンチャン・ブレイクアウト」という有名なブレイクアウト手法を使って、同じように勝率を検証する方法を解説します。また、平均リターンのバラツキ(標準偏差)という概念や、最大ドローダウンの計算方法、そして損益グラフを視覚的に描画(プロット)する方法などを解説していきます!

練習問題

バックテストでの検証項目に「平均保有期間」を追加してみましょう!

平均保有期間とは「各トレードで平均してどのくらいの期間、ポジションを保有していたか?」を表す指標です。一般的には、ポジションは長く保有するほどリスクが大きくなります。またエントリー機会も減ってしまうため、同じ勝率・同じリターン(期待値)なら、平均保有期間の短い売買ロジックの方が優秀です。

ヒント

上記のコードでは、flag["position"]["count"]でポジションを持っている期間をカウントしていますね。これを成績を記録する関数(def records)の中で、appendを使って配列に記録しておけば良いだけです!(そのためには、flagに記録用の新しい変数を追加する必要があります)

そして、最後にバックテスト集計用の関数(backtest)の中でnumpyを使って平均値を計算します。今回の記事の総復習になりますので是非やってみてください!

解答のコードはこちら

バックテストに必要なBitflyerの過去のローソク足の価格データを集めよう!

Bitflyerで使う自動売買BOTの勝率を検証するために、まずは十分な量の過去の価格データを入手する必要があります。取得元はCryptowatchを使います。

CryptowatchのAPIを改めて理解する

まず最初にもう1度、CryptowatchのAPIの特徴を確認しておきましょう。CryptowatchからOHLC価格(ローソク足の始値・高値・安値・終値)を取得するAPIには、以下の3つのパラメーターが設定できます。

periods ・・・ 時間軸を秒で指定
after ・・・ 〇〇(UNIX日時)以降のデータを取得
before ・・・〇〇(UNIX日時)より前のデータを取得

・CryptowatchのAPI仕様書
・UNIX時間変換ツール

今までは1分足の価格データを取得するために「periods」のパラメーターだけを 60秒(1分)に指定していました。このように時間軸だけを指定した場合、デフォルトではCryptowatchは直近500件のデータを返します。

ではパラメーターを指定するとどのようなデータが返ってくるのでしょうか? テストするために、以下のような関数を作ってみましょう。

パラメーターをテストする関数

今回は様々なパラメーターでテストができるように、新しい関数を作ります。今までとは違う新しいpythonファイルを作って以下のコードを記載してください。


import requests
from datetime import datetime
import time


def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			price.append({ "close_time" : i[0],
				"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
				"open_price" : i[1],
				"high_price" : i[2],
				"low_price" : i[3],
				"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None

# ここからメイン
price = get_price(60)

if price is not None:
	print("先頭データ : " + price[0]["close_time_dt"] + "  UNIX時間 : " + str(price[0]["close_time"]))
	print("末尾データ : " + price[-1]["close_time_dt"] + "  UNIX時間 : " + str(price[-1]["close_time"]))
	print("合計 : " + str(len(price)) + "件のローソク足データを取得")
	print("--------------------------")
	print("--------------------------")

今回作った関数では、get_price()の中で、before/afterのパラメーターも設定できるように改良しました。これを get_price.py などの名前で保存しておきます。

以下のように書けば、パラメーター無しの5分足(300秒)のデータが取得できます。

get_price(300)

以下のように書けば、UNIX時間で1521849600(2018/3/24 09:00:00)より前の5分足の価格データを取得できます。

get_price(300,before=1521849600)

以下のように書けば、UNIX時間で1521849600(2018/3/24 09:00:00)以降の1時間足の価格データを取得できます。

get_price(3600,after=1521849600)

返ってくるデータ件数と期間

まずは何もパラメーターを設定しない場合をテストしてみましょう。
以下の3つをそれぞれ実行してみます。

get_price(60)
get_price(300)
get_price(3600)

1分足の場合

先頭データ : 2018/03/29 12:56
末尾データ : 2018/03/29 21:15
500件のローソク足データを取得

およそ8時間分のデータが返ってきました。

5分足の場合

先頭データ : 2018/03/28 03:35
末尾データ : 2018/03/29 21:20
500件のローソク足データを取得

およそ1~2日分のデータが返ってきました。

1時間足の場合

先頭データ : 2018/03/09 03:00
末尾データ : 2018/03/29 22:00
500件のローソク足データを取得

およそ20日分のデータが返ってきました。

結果画面

やはりどの時間軸でも、基本的にはデフォルトで500件のデータが返ってきています。

Beforeを指定してみると…?

それでは、この先頭の日付を《before》パラメータに設定することで、ここからさらに遡って500件を取得できるのでしょうか? 試しに以下を実行してみましょう。

get_price(60,before=1522295760)

先ほど返ってきた1分足のデータの先頭の日時(2018/03/28 03:35 = UNIX時間で1522295760)を基準に、その前のデータを要求してみます。すると、以下のように空データが返ってきてしまいました。

時間を少しズラシて色々と試してみても、《before》パラメータでは、直近の500件以上のデータは取得できないようです。

Afterを指定してみると…?

それでは、afterで指定してみてはどうでしょうか? 試しにダメ元で2018年1月1日午前9時(UNIX時間で1514764800)以降の1分足データをすべて要求してみます。

get_price(60,after=1514764800)

すると、以下のように6000件のデータが返ってきます。先頭日時は3月25日です。期間にすると、直近4日分くらいの1分足データですね。

先頭データ : 2018/03/25 16:45
末尾データ : 2018/03/29 21:28
6000件のローソク足データを取得

いろいろな日時にパラメーターを変更して試してみても、《after》でパラメーターを指定する方法だと、取得できるデータ件数の上限は直近6000件までのようです。他の時間軸で試しても、以下のような結果になります。

5分足の場合

先頭データ : 2018/03/08 23:45
末尾データ : 2018/03/29 21:35
合計 : 6000件のローソク足データを取得

5分足だと6000件で約2週間分以上のデータが得られることになります。

1時間足の場合

先頭データ : 2017/07/22 15:00
末尾データ : 2018/03/29 22:00
合計 : 6000件のローソク足データを取得

1時間足だと6000件で約7~8カ月分程度のデータが得られることになります。

どの時間軸でバックテストをするにしても、大体、ローソク足6000本くらいのデータで検証すれば、とりあえずテストとしては十分だと思います。もちろんデータは多ければ多いほどいいのですが、無いものは仕方ありません。

自分でデータを集める方法

もっと広範にテストをしたい人は、自前で1分足の価格データを定期的に保存しておくといいでしょう。バックテストの検証に限らず、あらゆる統計的なテストで最も難しいのは「十分な量のデータを集めること」です。自前で十分な量の価格データを持っていれば、それだけ精度の高い検証が可能です。

取得したデータをjsonファイルに保存する

ひとまずjson形式のファイルで保存するだけなら、print()の後に以下のコードを書き足せばOKです。

import json #これだけ先頭に追加で記述

#ファイルに書き込む
file = open("./{0}-{1}-price.json".format(price[0]["close_time"],price[-1]["close_time"]),"w",encoding="utf-8")
json.dump(price,file,indent=4)

これを実行すると、以下のような形式でファイルが自動的に作成され、その中に価格データが保存されます。

・先頭ローソク足の日時-末尾ローソク足の日時-price.json
※ 日時はUNIXタイムスタンプ

中身を開くと以下のようなJSON形式で、価格データが保存されているのがわかります。

保存したjsonファイルを読み込む

逆に読み込むときは、以下のような関数を作っておけばOKです。

import json #これだけ先頭に追加で記述

#jsonファイルを読み込む関数
def get_price_from_file(path):
	file = open(path,"r",encoding="utf-8")
	price = json.load(file)
	return price

# ここからメイン
price = get_price_from_file("./1521978660-1522341240-price.json")

これで上のコードを実行すると、APIを叩いたときと全く同じ結果が得られます。つまり、WEBで取得したデータと同じように扱うことができます。

▽ 上がCryptowatchのAPIで価格データを取得しjsonで保存した場面
  下がそのjsonファイルを読み込んで価格データを取得した場面

なお、もし自分でデータを保存するのであれば、1分足のデータだけあれば十分です。1分足のデータさえあれば、そこから加工して5分足、15分足、1時間足のデータを作るのは簡単(最高値と最安値以外を切り捨てるだけ)です。

また今回の章ではいったん6000件のデータがあれば問題ありません。自前で価格データを収集して蓄積することに興味がある方は、以下の記事を参考にしてください。

・Cryptowatchで過去6000件以上のローソク足を保存して自前で価格データを蓄積する

まとめ

最後に今回使ったpythonのコードをまとめておきます。


import requests
from datetime import datetime
import time
import json

# パラーメータを指定してCryptowatchのAPIを使用する関数
def get_price(min, before=0, after=0):
	price = []
	params = {"periods" : min }
	if before != 0:
		params["before"] = before
	if after != 0:
		params["after"] = after

	response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
	data = response.json()
	
	if data["result"][str(min)] is not None:
		for i in data["result"][str(min)]:
			price.append({ "close_time" : i[0],
				"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
				"open_price" : i[1],
				"high_price" : i[2],
				"low_price" : i[3],
				"close_price": i[4] })
		return price
		
	else:
		print("データが存在しません")
		return None

# json形式のファイルから価格データを読み込む関数
def get_price_from_file(path):
	file = open(path,"r",encoding="utf-8")
	price = json.load(file)
	return price


# ここからメイン
price = get_price(60,after=1514764800)

#ファイルから読みこむ場合
#price = get_price_from_file("./ファイル名.json")


if price is not None:
	print("先頭データ : " + price[0]["close_time_dt"] + "  UNIX時間 : " + str(price[0]["close_time"]))
	print("末尾データ : " + price[-1]["close_time_dt"] + "  UNIX時間 : " + str(price[-1]["close_time"]))
	print("合計 : " + str(len(price)) + "件のローソク足データを取得")
	print("--------------------------")
	print("--------------------------")
	
	#ファイルに書き込む場合
	#file = open("./{0}-{1}-price.json".format(price[0]["close_time"],price[-1]["close_time"]),"w",encoding="utf-8")
	#json.dump(price,file,indent=4)

「#」が先頭に付いているコードは、コメントとして扱われるため実行されません。上記のファイル書き込み、読み込みを使う場合は先頭の「#」を外してください。

BTCFXで作成した自動売買BOTの勝率をバックテストで検証してみよう

さて、前回の章までをお読みいただいた方であれば、ひとまず、Bitflyerで自動的に稼働し続ける売買BOTを作ることができたと思います。

当然、次にあなたが疑問に思うのは、「….で、実際にこのBOTはどのくらい勝てるの?」という点でしょう(笑)。今回の章では、過去の価格データから自作した売買ロジックの勝率を検証するための方法を解説していきます。

勝率についての考え方

「すごいぞ! 勝率80%で1回のトレードあたりの平均リターンが20%の売買ロジックができた!」とはしゃいでみても、その売買シグナルが年に1回した点灯しないようだと、実用上は意味がありません。

まずは、バックテストで勝率を検証する際に、考慮すべき指標を5つだけ簡単に説明しておきます。

1.トレードの回数

過去のデータから勝率を検証する場合、統計的には、最低でも50回以上(できれば100回以上)のトレード数が必要です。

トレード回数が少なすぎると、その勝率や平均リターン率が「たまたま偶然良かっただけなのか?」、それとも「継続的に勝てる再現性のあるロジックなのか?」判別できません。

一般論としていえば、トレードの売買ロジックは、エントリー条件を厳しくすればするほど期待できる勝率やリターンは高くなります。しかしその分、エントリーできる機会が少なくなります。

またBitflyerFXの過去の価格データは無限に手に入るわけではありません。例えば、Cryptowatchから価格データを取得する場合、私の知る限り、1分足のデータは最大6000件までしか入手できません。あまりエントリー条件を厳しくすると、十分な回数のテストができなくなります。

2.勝率

勝率というのは、1回のトレードで利益が出る確率です。例えば、100回トレードをしてそのうち36回利益が出た場合、その売買ロジックの勝率は36%となります。

勝率は1つの重要な指標です。しかし勝率だけでは売買ロジックが優秀かどうかを判定することはできません。勝率36%でも、勝ったときの平均利益が9%、負けたときの平均損失が2%なら、その売買ロジックの期待リターンは+(プラス)です。

(例)1万円を賭けたときの期待値
勝ち 1万円 × 9% × 36% = 324円
負け 1万円 × -2% × 64% = -128円
期待値 196円

ただし同じ期待リターンなら、勝率が高いほうが安心感は上です。勝率が高いと(負けが連続する確率が低くなるので)損益収支グラフを見たときに綺麗に右肩上がりで利益が積み上がりやすくなります。

3.平均リターン

平均リターンというのは、1回のトレードで張った金額に対して平均して何%のリターンがあるか、の指標です。言うまでもなく最も重要な指標の1つです。

ここまで紹介した3つの指標「トレード数」「勝率」「平均リターン」はどれも同じくらい重要です。同じ勝率でも、平均リターン10%の売買ロジックで月に5回トレードするよりも、平均リターン3%で月に100回トレードする方が、利益の絶対額は大きくなります。

儲かった利益をそのまま次回のトレードに投入するスタイル(複利運用)の場合には、同じ期待利回りならトレード回数が多いほど利益も大きくなります。また前述のように、トレード回数が多いほうが期待したリターンに収束しやすくなります。

4.総利益額

テスト期間の最初と最後を比較して、結局、この自動売買BOTによって「軍資金は何倍になったのか?」という指標です。

この総利益額という数字には、前述の「トレード数」「勝率」「平均リターン」がすべて考慮されています。結局のところ、私たちが最終的に最大化したい数字はこの「総利益額」なので、この中では最も大事な指標になります。

5.最大ドローダウン

最大ドローダウンというのは、テスト期間中に最大で「何%まで軍資金を減らしたか?」、つまり「どれだけ苦しい時期を乗り越えたか?」という指標です。

過去のテスト検証の結果、最終的な平均リターンや総利益額がプラスだとしても、途中で70%も軍資金が目減りするような場面があったとしたら、その売買ロジックを実用で試すのは非常に怖いはずです。

最大ドローダウンが大きいということは、テストする期間を変えて検証したら最終リターンがマイナスに終わった可能性もあった、ということです。

実際の世界の運用では(テスト検証と違って)どこで終わり、という期間はありません。リアルタイムでBOTを運用していて、軍資金が70%も目減りしてしまったら、平常心でBOTの稼働を続けられる人はほとんどいないでしょう。そのため、最大ドローダウンは30%以内におさえるのが理想とされています。

自動売買BOTに特有の注意点

次に、Bitflyerで自動売買BOTを動かすときに特有の注意点を上げておきます。これらはバックテストでの検証結果と、実際の世界でのBOT稼働の成績が一致しなくなる原因になるため、テストの時点で考慮しておく必要があります。

注文の遅延と成行注文のスリップ

Bitflyerでは頻繁にAPI注文の遅延がおこります。注文の遅延というのは、サーバーに注文を送った後、それが認識される(板に並ぶ)までの間に数十秒~1分以上のラグ(遅延)が生じることをいいます。

また成行注文のスリップとは、リアルタイムの意図した価格で注文が約定しないことをいいます。例えば、以下の試験運用では、終値のシグナルで82万7666円のときに手仕舞いの成行注文を出していますが、実際には82万7426円で約定しています。

▽ 本当はこの価格で手仕舞いたい

▽ 実際のスリッページの例

ここにBitflyerのサーバーエラーや遅延が重なると、どんどん理想の手仕舞い価格と実際の約定価格が乖離していき、テスト通りのパフォーマンスを上げることが難しくなります。

バックテストでは、シグナルが点灯したローソク足の終値(または次の足の始値)で約定したもの、と仮定してテストしますが、実際にはその価格通りに約定することはほとんどなく、不利な方向にズレて約定することが多いです。

対策

対策はいくつかあります。

1)そもそもテスト段階でスリップによる損失を考慮する
2)実際の運用で1分足は使わない。15分足や1時間足で動かす
3)エントリー注文は指値にして、刺さらなければ諦める

解説

例えば、テスト検証のときに、全てのトレードから一律に―0.05~0.1%程度のスリップを考慮して、あらかじめ手数料として差し引いておくと、実際の運用パフォーマンスとの乖離が少なくなります。

また当然、スキャルなどの利幅の小さいトレードほど、APIの遅延やサーバーエラーによるスリップの損失を受けやすくなります。今までの章では、練習のしやすさから全て1分足で勉強してきましたが、実際のテストや運用では、なるべく時間軸は5分以上の足で考えます。

エントリーだけ指値にする

こちらは各々の戦略次第ですが、エントリー注文は指値にして「刺さらなければ諦める」という選択肢もあります。

利確や損切などの手仕舞いのための注文を諦めることはできませんが、エントリー注文に関しては「希望する価格で執行されないなら見送る」ということが、戦略上は可能です。(もちろん機会損失になる可能性はあります)

エントリー注文を指値にすれば、エントリーも手仕舞いも成行注文をする場合に比べて、テスト検証とのズレを半分に抑えることができます。

今回の章で検証する売買ロジック

さて、では実際にテスト検証の方法を解説していきましょう!

今回の章では、前回までの記事で作った自動売買BOTが「実際のところどの程度の勝率なのか?」気になる方もいる(かもしれない)ので、前回のBOTをそのまま題材にします!

・前回作成した自動売買BOTのロジック
・実際にBitflyerで動くBOTコード

バックテストの検証では、以下のpythonコードを前提の教材とし、こちらをカスタマイズしていきます。

検証に使う元のpythonコード

このコードは以前にこちらの記事でテスト用として紹介したソースコード(Bitflyerへの実際の発注処理を行わないコード)を、売り・買いの両方のシグナルに対応させただけのものです。以下のコードの意味は改めて解説しないので、わからない部分がある方は、前の章などを軽く復習しておいてください!


import requests
from datetime import datetime
import time


# 最初に1度だけCryptowatchから価格データ取得
response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params = { "periods" : 60 })

# 指定されたローソク足の価格データ(OHLC)を返す関数
def get_price(min,i):
	data = response.json()
	return { "close_time" : data["result"][str(min)][i][0],
		"open_price" : data["result"][str(min)][i][1],
		"high_price" : data["result"][str(min)][i][2],
		"low_price" : data["result"][str(min)][i][3],
		"close_price": data["result"][str(min)][i][4] }


# 時間と始値・終値を表示する関数
def print_price( data ):
	print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) )


# 各ローソク足が陽線・陰線の基準を満たしているか確認する関数
def check_candle( data,side ):
	realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) 
	increase_rate = data["close_price"] / data["open_price"] - 1
	
	if side == "buy":
		if data["close_price"] < data["open_price"] : return False
		elif increase_rate < 0.0003 : return False
		elif realbody_rate < 0.5 : return False
		else : return True
		
	if side == "sell":
		if data["close_price"] > data["open_price"] : return False
		elif increase_rate > -0.0003 : return False
		elif realbody_rate < 0.5 : return False
		else : return True


# ローソク足が連続で上昇しているか確認する関数
def check_ascend( data,last_data ):
	if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]:
		return True
	else:
		return False

# ローソク足が連続で下落しているか確認する関数
def check_descend( data,last_data ):
	if data["open_price"] < last_data["open_price"] and data["close_price"] < last_data["close_price"]:
		return True
	else:
		return False


# 買いシグナルが出たら指値で買い注文を出す関数
def buy_signal( data,last_data,flag ):
	if flag["buy_signal"] == 0 and check_candle( data,"buy" ):
		flag["buy_signal"] = 1

	elif flag["buy_signal"] == 1 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		flag["buy_signal"] = 2

	elif flag["buy_signal"] == 2 and check_candle( data,"buy" )  and check_ascend( data,last_data ):
		print("3本連続で陽線 なので" + str(data["close_price"]) + "で買い指値を入れます")
		flag["buy_signal"] = 3
		
		# ここに買い指値注文のコードを入れる
		flag["order"]["exist"] = True
		flag["order"]["side"] = "BUY"
	
	else:
		flag["buy_signal"] = 0
	return flag


# 売りシグナルが出たら指値で売り注文を出す関数
def sell_signal( data,last_data,flag ):
	if flag["sell_signal"] == 0 and check_candle( data,"sell" ):
		flag["sell_signal"] = 1

	elif flag["sell_signal"] == 1 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		flag["sell_signal"] = 2

	elif flag["sell_signal"] == 2 and check_candle( data,"sell" )  and check_descend( data,last_data ):
		print("3本連続で陰線 なので" + str(data["close_price"]) + "で売り指値を入れます")
		flag["sell_signal"] = 3
		
		# ここに売り指値注文のコードを入れる
		flag["order"]["exist"] = True
		flag["order"]["side"] = "SELL"
		
	else:
		flag["sell_signal"] = 0
	return flag


# 手仕舞いのシグナルが出たら決済の成行注文を出す関数
def close_position( data,last_data,flag ):
	if flag["position"]["side"] == "BUY":
		if data["close_price"] < last_data["close_price"]:
			print("前回の終値を下回ったので" + str(data["close_price"]) + "あたりで成行で決済します")
			# 決済の成行注文のコードを入れる
			flag["position"]["exist"] = False
			
	if flag["position"]["side"] == "SELL":
		if data["close_price"] > last_data["close_price"]:
			print("前回の終値を上回ったので" + str(data["close_price"]) + "あたりで成行で決済します")
			# 決済の成行注文のコードを入れる
			flag["position"]["exist"] = False
	return flag


# サーバーに出した注文が約定したかどうかチェックする関数
def check_order( flag ):
	
	# 注文状況を確認して通っていたら以下を実行
	# 一定時間で注文が通っていなければキャンセルする
	
	flag["order"]["exist"] = False
	flag["order"]["count"] = 0
	flag["position"]["exist"] = True
	flag["position"]["side"] = flag["order"]["side"]
	return flag


# ここからメイン処理の部分
# まず初期値の取得
last_data = get_price(60,0)
print_price( last_data )


# 注文管理用のフラッグを準備
flag = {
	"buy_signal":0,
	"sell_signal":0,
	"order":{
		"exist" : False,
		"side" : "",
		"count" : 0
	},
	"position":{
		"exist" : False,
		"side" : ""
	}
}

# 500回ループ処理
i = 1
while i < 500:
	if flag["order"]["exist"]:
		flag = check_order( flag )
	
	data = get_price(60,i)
	print_price( data )
	
	if flag["position"]["exist"]:
		flag = close_position( data,last_data,flag )			
	else:
		flag = buy_signal( data,last_data,flag )
		flag = sell_signal( data,last_data,flag )
	last_data["close_time"] = data["close_time"]
	last_data["open_price"] = data["open_price"]
	last_data["close_price"] = data["close_price"]
	i+=1
	

実行結果

ちなみに、この時点で実行すると以下のようになります。
カンの良い方はわかると思いますが、もう実はテストのベースはできています。

ですが、既にこの記事を読んで勉強していただいた方なら1つ気付いたことがあるはずです。

そう!検証するためには、明らかにデータ数・トレード数ともに全然足りないですね。こちらはCryptowatchで取得できる直近500件の1分足データを参照していますが、500件のデータでは5回ほどしかトレードが成立していません。最低でも10倍程度の量の価格データが必要そうです。

次回の記事では、まず十分な量の元データ(ローソク足の価格データ)を取得する方法から解説していきます。

次回:バックテストに必要なBitflyerFXの価格データを集めよう!

条件を緩和してテスト回数を増やす方法

※ ちなみにどうしてもデータが手に入らない場合は、エントリー条件を緩和してテストする方法があります。例えば、上記のロジックだと、実体の長さやヒゲの割合などの追加条件を外すだけで、同じ500件分のデータから合計52回のトレード機会を得られます。

▽ 実体の長さ・ヒゲの割合の条件あり

▽ 実体の長さ・ヒゲの割合の条件なし

基本的に、無料でデータ(API)を提供して貰っている私たちの立場からすれば、データ数が少ないことに文句は言えません。現実的に得られるデータの範囲で有益な売買ロジックを考えるしかありません。この辺りのトレードオフの考え方も、今後説明していきますね!