こちらの記事で説明した酒田五法の「赤三兵」「黒三兵」の売買シグナルのpythonコードの解説です。
pythonコード
import requests from datetime import datetime import time def get_price(min): response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params = { "periods" : min }) data = response.json() last_data = data["result"][str(min)][-2] return { "close_time" : last_data[0], "open_price" : last_data[1], "high_price" : last_data[2], "low_price" : last_data[3], "close_price":last_data[4] } def print_price( data ): print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) ) def check_candle( data ): realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) increase_rate = data["close_price"] / data["open_price"] - 1 if data["close_price"] < data["open_price"] : return False elif increase_rate < 0.0005 : return False elif realbody_rate < 0.5 : return False else : return True def check_ascend( data,last_data ): if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]: return True else: return False last_data = get_price(60) print_price( last_data ) flag = 0 while True: data = get_price(60) if data["close_time"] != last_data["close_time"]: print_price( data ) if flag == 0 and check_candle( data ): flag = 1 elif flag == 1 and check_candle( data ) and check_ascend( data,last_data ): print("2本連続で陽線") flag = 2 elif flag == 2 and check_candle( data ) and check_ascend( data,last_data ): print("3本連続で陽線 なので 買い!") flag = 3 else: flag = 0 last_data["close_time"] = data["close_time"] last_data["open_price"] = data["open_price"] last_data["close_price"] = data["close_price"] time.sleep(10)
かなり長いので部分的に順番に説明してきます。
解説パート1
import requests from datetime import datetime import time
CryptowatchのAPIを使うために、「requests」ライブラリをインポートしています。
またAPIで取得したタイムスタンプ(UNIX時間)を人間の読める時間に変換するために、「datetime」ライブラリをインポートします。「time」ライブラリは、sleep()という関数を使って、ループ処理の間の待機をするために使います。
解説パート2
def get_price(min): response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params = { "periods" : min }) data = response.json() last_data = data["result"][str(min)][-2] return { "close_time" : last_data[0], "open_price" : last_data[1], "high_price" : last_data[2], "low_price" : last_data[3], "close_price":last_data[4] }
CryptowatchのAPIで最新のBitflyerFXの最新のローソク足の価格データを取得して、それを配列(辞書リスト)で返すための、自作の関数(def get_price())を作ります。
これで外部から、get_price(60)などと指定するだけで、最新の(終値の固まった)1分足のローソク足の価格データを取得できます。5分足を取得したい場合は、get_price(300)とするだけでOKです。
この関数を使って取得したデータは、data[“close_time”]で日時、data[“open_price”]で始値、data[“close_price”]で終値を取り出せるようにしておきます。
解説パート3
def print_price( data ): print( "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 始値: " + str(data["open_price"]) + " 終値: " + str(data["close_price"]) )
APIで取得したデータを黒い画面(Anacondaプロンプト)に表示するためのコードも、何度も使うので関数にまとめています。ここでは、日時、始値、終値の3つだけを表示するようにしておきます。
解説パート4
def check_candle( data ): realbody_rate = abs(data["close_price"] - data["open_price"]) / (data["high_price"]-data["low_price"]) increase_rate = data["close_price"] / data["open_price"] - 1 if data["close_price"] < data["open_price"] : return False elif increase_rate < 0.0005 : return False elif realbody_rate < 0.5 : return False else : return True
取得した各ローソク足(単体)が条件を満たしているかどうかをチェックするために、check_candle() という関数を自作します。
この関数では、実体の割合(ローソク足の値幅のうちの実体部分の割合)と、上昇率の2つを計算しています。前のロジック定義の記事で説明したように、今回はテストとして、実体の大きさが価格の0.05%以上、ヒゲを含めた全値幅のうちの実体の割合が50%以上、というのを条件にしています。ここは自分の好みで調整してください。
条件を1つでも満たしていなければFalseを返し、すべてを満たしていればTrueを返します。
解説パート5
def check_ascend( data,last_data ): if data["open_price"] > last_data["open_price"] and data["close_price"] > last_data["close_price"]: return True else: return False
次に2本目、3本目のローソク足が上昇しているかどうか(終値・始値の両方が切りあがっているかどうか)を検証するために、check_ascend() という関数を自作します。これはロジック定義の記事のルール2に該当する部分です。
今回の始値と前回の始値、今回の終値と前回の終値を比較し、両方とも上昇していればTrueを返します。片方しか上昇していない、またはどちらも上昇していない場合は、Falseを返します。
解説パート6
last_data = get_price(60) print_price( last_data ) flag = 0
ここはWhile文(無限ループ処理)に入る前の準備です。
今回のロジックでは、「前回の終値」と「今回の終値」を比較する必要があるため、ループ処理に入る前に、1度、「前回の終値」を取得しておきます。この価格データは、last_data という配列の変数に格納されます。
また「何本連続で陽線が続いているか?」を判定するために、flagという変数を用意しておきます。条件を満たす陽線が1本現れたらflag=1、2本連続で現れたらflag=2、3本連続で現れたらflag=3 として、買いシグナルを点灯させます。
解説パート6
while True: data = get_price(60) if data["close_time"] != last_data["close_time"]: #if文の中身 time.sleep(10)
ここからはWhile文の中身の解説です。
今回のサンプルコードで使っているのは1分足ですが、なるべく価格の取得に遅れが出ないようにAPIは10秒おきに利用しています。そのため、「更新があった場合だけ次の処理をする」ために、3行目のif文を書いています。
全体の構成としては、「APIで価格を取得 → 更新があったかチェック → 更新があればif文の中身を実行 → 10秒待機する」を無限ループしています。このWhileの部分は、前の記事でも説明済です。
解説パート7
if data["close_time"] != last_data["close_time"]: print_price( data ) if flag == 0 and check_candle( data ): flag = 1 elif flag == 1 and check_candle( data ) and check_ascend( data,last_data ): print("2本連続で陽線") flag = 2 elif flag == 2 and check_candle( data ) and check_ascend( data,last_data ): print("3本連続で陽線 なので 買い!") flag = 3 else: flag = 0
この部分はWhile文の中のif文の中身の解説です。
ローソク足の1分足データが更新された場合に、まず先ほど作っておいた flag という変数をチェックします。もしflag=0 の場合(まだ1本も陽線のシグナルが出ていない場合)は、check_candle()関数で、今回のローソク足が条件を満たしているか確認します。
条件を満たしていれば、flagの値を1にします。条件を満たしていなければ、flag=0 のまま次のループに入ります。
もしflag=1の場合(前回の足が陽線の場合)は、今回の足が2つの条件(前回のロジック定義の記事で定義したルール1とルール2)を満たしているかどうかを、check_candle()とcheck_ascend()の両方でチェックします。両方とも満たしていれば、flag = 2 にします。どちらか一方でも満たしていなければ、flag = 0 に戻します。
もしflag=2 の場合(前の足が2本連続で陽線の場合)は、今回の足も2つの条件(ルール1とルール2)と満たしているかどうか、check_candle()とcheck_ascend()で確認します。両方とも満たしていれば、買いシグナルを出します。どちらか一方でも満たしていなければ、flag=0に戻します。
解説パート8
last_data["close_time"] = data["close_time"] last_data["open_price"] = data["open_price"] last_data["close_price"] = data["close_price"]
if文の最後の部分です。
ローソク足の更新があったので、「前回の価格データ」を「今回の価格データ」で上書きします。次のループからは、「今回の価格データ(の日時)」と次回の価格データの日時を比較して、更新があったかどうかを判断します。