• ベストアンサー
※ ChatGPTを利用し、要約された質問です(原文:openMPにしたいのですがお手伝いください。)

openMPによる並列化についての質問

このQ&Aのポイント
  • openMPにしたいのですが、内側のfor文はreductionでできると考えたのですが、外側のfor文は依存があり上手くいきません。
  • openMPを使用する際に、2重for文の書き方がわからず困っています。
  • 質問者はopenMPのコツや知恵を教えていただきたいと思っています。

質問者が選んだベストアンサー

  • ベストアンサー
  • ki073
  • ベストアンサー率77% (491/634)
回答No.9

No.8の補足について テストプログラムは、 indiv1=list->next_indiv; ではなく indiv1=list; にしています。質問者さんの実際のプログラムの全容が分かりませんでしたので、前回の質問に貼付けられているものから推測してテストプログラムを作成しました。 引数listが指しているINDIVIDUALが空の(x,yがなくてnext_indivしか設定されていない)変数ならそれで良いのですが、テストプログラムの方は、listが指している変数にもデータが入っているのでindiv1=list;でないとデータ1個分が読み込まれなくなります。 そのために結果が違ってきます。テストプログラムの様なやり方が無駄がなくて良いのではないかと思います。 テストプログラムをよく見てください。 (先のも書きましたが、Cについては全くの素人なのでポインタの使い方がかなりおかしいかもしれませんので、そのつもりで) 最適化ということでは、 外側のforがN回だとすると、内側がN*N回になりますので、内側だけに集中して最適化すれば速くなる可能性があります。openMPの効果にプラスされますのでぱっと見ですが、更に1.5倍程度は期待できるのではないかと想像します。 計算の相対的な速度は http://download.intel.com/jp/developer/jpdoc/ia32.pdf でおまかなことが分かります。 表B6を見ると、スループットでは、加減乗算で2、割り算と平方根で35ですので、可能な限り加減乗算だけで済ますのが得策です。 if (sqrt(x*x +y*y )<=SD_INTERACTION) tmp = 1; else tmp = 0; のような式になっていますので、 このifを if (x*x +y*y <=????????) のように変えられれば、平方根計算が省けるわけです。 余談ですが、昔のコンピュータはかけ算も遅くて、割り算がもっと遅くて、平方根などとんでもなく遅い状況でしたので、 if (fabs(x)<=SD_INTERACTION && fabs(y)<=SD_INTERACTION && x*x +y*y <=????????) のようにしていたように思います。 >申し訳ないのですが、もう少し知恵をお貸しくださらないでしょうか? openMPだけではなくまだまだ速くできる余地はありますので、頭の体操のつもりでやっております。 スレッドが長くなっていましたので、できれば「補足」ではなくて、「お礼」欄でお願いできますか? 「お礼」だと、質問者さんが書き込まれると直ぐにメールで届くので回答する側としては便利なのです。「補足」だとメールがこないので探しにいかなといけないので、(設定で変わるのかな??)

asuka_1026
質問者

お礼

ki073さん本当にありがとうございます♪ openMPを試してみたところ スレッド数に比例して速度が速くなることができました!!!! この掲示板のスレッドが長くなるまでお付き合い頂き感謝です。 まだまだ並列ができそうな部分がありそうなので また質問させていただきます!! そのときはよろしくお願いいたします♪ 次回からは「お礼の欄」で書かせていただきます!! 本当に感謝しております♪ asuka

その他の回答 (8)

  • ki073
  • ベストアンサー率77% (491/634)
回答No.8

多分gccの記述全部に-gを加えれば良いのではないかと思います。 sqrt()はかけ算ではなく平方根ですので、かけ算よりもずっとずっと時間がかかります。 テストプログラムを書いてみました。Cは全然使っていませんので、かなり怪しいので真似をしないように。ポインタは特に怪しいかも。 まず元のプログラムに近いものです。 listの渡し方が分からなかったので、 indiv1=list->next_indiv; ではなく indiv1=list; に変更しています。 #include <stdio.h> #include <stdlib.h> typedef struct individual { double x, y, local_density; struct individual *next_indiv; }INDIVIDUAL; INDIVIDUAL *add_next_indiv(INDIVIDUAL*indiv){ INDIVIDUAL *new_indiv; if ((new_indiv=(INDIVIDUAL *)malloc(sizeof(INDIVIDUAL)))==NULL){ printf("malloc error\n"); exit(EXIT_FAILURE); } new_indiv->next_indiv=indiv; return new_indiv; } void calc_local_density(INDIVIDUAL *list){ INDIVIDUAL *indiv1, *indiv2; double sum; for(indiv1=list; indiv1; indiv1=indiv1->next_indiv){ sum = 0.0; for(indiv2=list; indiv2; indiv2=indiv2->next_indiv){ sum += indiv1->x+indiv1->y+indiv2->x+indiv2->y; } indiv1->local_density = sum; } } int main(){ INDIVIDUAL *list_head, *indiv, *indiv1; int i; indiv = NULL; for(i=1; i<=10; i++){ indiv = add_next_indiv(indiv); indiv->x = i; indiv->y = i; indiv->local_density = 0.0; } calc_local_density(indiv); for(indiv1=indiv; indiv1; indiv1=indiv1->next_indiv){ printf("%f\n", indiv1->local_density); } printf("Finished\n"); } 次に、calc_local_density()を書き換えたものです。両方との同じ結果がでます。 void calc_local_density(INDIVIDUAL *list){ INDIVIDUAL *indiv1, *indiv[99]; double sum; int h,i,j,count; count=0; indiv1=list; for(h=0; indiv1; h++){ indiv[h]=indiv1; indiv1=indiv1->next_indiv; count++; } for(i=0; i<count; i++){ sum = 0.0; for(j=0; j<count; j++){ sum += indiv[i]->x+indiv[i]->y+indiv[j]->x+indiv[j]->y; } indiv[i]->local_density = sum; } } >どうすればki073さんみたいになれますか? 私自身はプログラマでもありませんので、特にCはプログラムを見ても驚かない程度で、ほとんど書けません。 とても時間のかかる既成のプログラムを、自分で使うために仕方なくスピードを上げるように最適化をすることをやる程度です。

asuka_1026
質問者

補足

サンプルプログラムまで作っていただいて・・・ お手数とお時間をかけてしまって申し訳ございません・・・ ありがとうございます!!! プログラム動きました!!! 助かります♪ ただ、 void calc_local_density(INDIVIDUAL *list){ INDIVIDUAL *indiv1, *indiv[1000000]; double sum; int h,i,j,count; count=0; indiv1=list->next_indiv; for(h=0; indiv1; h++){ indiv[h]=indiv1; indiv1=indiv1->next_indiv; count++; } for(i=0; i<count; i++){ sum = 0.0; for(j=0; j<count; j++){ //sum += weight( distance(indiv[i], indiv[j]) ); sum += indiv[i]->x+indiv[i]->y+indiv[j]->x+indiv[j]->y; } indiv[i]->local_density = sum; } } では値が違いました・・・ あれれ・・・と・・・泣 でも、一様これでopenMPに挑戦できます♪ 申し訳ないのですが、もう少し知恵をお貸しくださらないでしょうか? openmpが上手くいくかどうかが不安なので・・・ PGではないのですか!!!! ずっとPGだと思っていました・・・ それなのにこんなに理解しているなんて驚愕です!!!

  • Tacosan
  • ベストアンサー率23% (3656/15482)
回答No.7

-g でデバッガ用の情報を出してくれるね. あと, gcc だと hypot って関数が使えると思うから, 「ユークリッド距離を求める」方法の 1つとして選択肢に入れていいかも. 速度はともかく, sqrt(x*x+y*y) より確実です. おっと, Makefile ももっときれいに書いた方がいいと思う. 特に LFLAG の意味がわからない. 「リンカに渡すオプション」なら LDFLAGS だし, 「コンパイラに渡すオプション」なら CFLAGS が標準で用意されているのでこいつらを使ってあげるのがいいでしょう. 個人的には LD := gcc LIBRARIES := -lm CFLAGS := -O3 -g main:main.o dSFMT.o $(LD) $(LDFLAGS) -o $@ $^ $(LIBRARIES) dSFMT.o:dSFMT.c dSFMT.h dSFMT-params.h $(CC) -c $(CFLAGS) -o $@ $< main.o:main.c $(CC) -c $(CFLAGS) -o $@ $< clean: $(RM) *.o main くらいにはしたい. サフィックスルール使えば, dSFMT.o と main.o を作るところは %.o: %.c $(CC) -c $(CFLAGS) -o $@ $< dSMFT.c: dSFMT.h dSFMT-params.h まで短くできるかな.... やりすぎのような感じもしないでもないけど, 自分が使ってる Makefile ではもっと極端. このところは omake の訓練中.

asuka_1026
質問者

補足

hypot関数というものがあるのですね!!! これは新しく関数を設定してあげればいいのでしょうか? double hypot ( double x, double y ); のようにしたほうが早いってことでしょうか? makefileに関しては全く何も知らない状態で使っていて 動けばいいと思っていたのでこのような常識のようなものがあり また、短くなることまで知りませんでした・・・ ありがとうございます♪また1つ学ぶことができました!!!!

  • ki073
  • ベストアンサー率77% (491/634)
回答No.6

No.5の補足について >for文では0,1,2の値まで回っているのに >indiv[]の配列では indiv[0]とindiv[1]までしか値が >設定されていないということですね!!(合っていますか?) ちょっと違います。 ゆっくり考えてみましょう。はじめのforが2回とします。 1回目 hが0で indiv[0]に値が入る。countは0から1に変化、最後にhが0から1に変化 2回目 hが1で indiv[1]に値が入る。countは1から2に変化、最後にhが1から2に変化 3回目に入る時にindiv1が0でforは終了。 となります。よって、hとcountは2で、indiv[0]とindiv[1]だけに値が入っています。 よく使う例では for(i=0; i<2; i++){ } を同じようにみると(2回ですよね) 1回目: i=0 、最後にiが0から1に変化 i<2を満たすために2回目に 2回目: i=1 、最後にiが1から2に変化 i<2を満たさないので終了 ということで、i<countが正解です。 それでもエラーがでますか。もとのプログラムは正常に動作しますか? そうだとするとデバッガで原因を調べるしかないかな。 gccで-gをつけてコンパイルし、デバッガ順番に実行してエラーになるところを探してください。 CPUは8coreあるようですね。openMPにすることにより8倍近く効果がでる可能性があります。 ちょっと話が変わりますが、 double distance()でsqrt(...)がありますよね。その後で、double weight()で if(x <= SD_INTERACTION)で判断している。 sqrt(...)は無しでいけます。どうすればよいか考えてみてください。sqrtは時間が結構かかるので、これを省けるならかなりの速度アップになります。 また、fabs()も必要あるか考えてみてください。

asuka_1026
質問者

補足

i<countが正解なのですね!!! わかりやすい説明ありがとうございます♪ デバッガしたいのですがmakefileを使っていまして、 どうすればよいのかわからないのでお教えくださいませんか? # LFLAG = -lm main: main.o dSFMT.o gcc -O3 -o main $(LFLAG) main.o dSFMT.o dSFMT.o: dSFMT.c dSFMT.h dSFMT-params.h gcc -c -O3 -o dSFMT.o $(LFLAG) dSFMT.c main.o: main.c gcc -c -O3 -o main.o $(LFLAG) main.c clean: rm *.o main 一様最適化をしているんですけど・・・ dSFMT.cというのはメンルンヌツイスターの乱数使用のために使っています。 8倍期待できるのですか!!! もしできたらすっごくうれしいんです♪ sqrt()なしでもいけるんですか・・・ 掛け算って時間が掛かるんですよね・・・ 難しい・・・です。。 fabs()は絶対値なのですがこれも省略できるんですか!!!! ki073さんのプログラムではないのに、私よりわかっていて・・・ どうすればki073さんみたいになれますか?

  • ki073
  • ベストアンサー率77% (491/634)
回答No.5

No.4の補足について エラーの原因は、落ち着いて考えてみてください。単純化してみましょう。 例えば最初のforは2回で終了するとします。 forを抜けたときには、iが2, indiv[0]とindiv[1]の2つに値が入っています。 countは2になっているはずです。 ということは、indiv[2]は設定されていませんよね。 >外側のfor文は少なくとも3000回程度回ると思います。 内側が9,000,000回ですか。これくらい回ってくれると、いろいろな最適化を試すことができます。 openMPで毎回threadにすると遅くなるかはわかりません。比較してみてください。 これくらいあると、CUDAだとそれなりに効果が出そうですね。 ところで、CPUは何を使っていますか? cat /proc/cpuinfo で出力されます。

asuka_1026
質問者

補足

ki073さん回答有難うございます!! for文では0,1,2の値まで回っているのに indiv[]の配列では indiv[0]とindiv[1]までしか値が設定されていないということですね!!(合っていますか?) i<=count ⇒ i<countにしてみました! void calc_local_density(INDIVIDUAL *list){ INDIVIDUAL *indiv1, *indiv2, *indiv[9999999]; double sum; int h,i,j,count; count=0; indiv1=list->next_indiv; for(h=0; indiv1; h++){ indiv[h]=indiv1; indiv1=indiv1->next_indiv; count++; } for(i=0; i<count; i++){ sum = 0.0; for(j=0; j<count; j++){ sum += weight( distance(indiv[i], indiv[j]) ); } indiv[i]->local_density = sum; //ここを変えています。 } } ですが、セグメンテーション違反です (コアダンプ)らしいです・・・泣 なんでかな?と思いcountの部分を単なる整数(10000など)の値にしてみたのですが これもセグメンテーション違反です (コアダンプ)らしいです・・・ for文の書き方が間違っているのでしょうか? cat /proc/cpuinfoをコマンド入力してみたところ processor : 0 vendor_id : GenuineIntel cpu family : 6 model : 15 model name : Intel(R) Xeon(R) CPU E5310 @ 1.60GHz stepping : 7 cpu MHz : 1600.002 cache size : 4096 KB physical id : 0 siblings : 4 core id : 0 cpu cores : 4 apicid : 0 initial apicid : 0 fpu : yes fpu_exception : yes cpuid level : 10 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx lm constant_tsc arch_perfmon pebs bts rep_good aperfmperf pni dtes64 monitor ds_cpl vmx tm2 ssse3 cx16 xtpr pdcm dca lahf_lm tpr_shadow bogomips : 3191.99 clflush size : 64 cache_alignment : 64 address sizes : 36 bits physical, 48 bits virtual power management: processor数が7までの上と同じ内容が出力されました!

  • ki073
  • ベストアンサー率77% (491/634)
回答No.4

No.3の補足について 二重になっているforで i<=countとなっていますよね。 ということは、indiv[count]へもメモリアクセスされています。しかし、indiv[count]の値は設定されていません。 weight( distance(indiv1, indiv2) ) の中身が分かりませんが、書き換えなどはしていないとして、これで並列化できるはずです。 それとエラーとは直接関係ありませんが、countはintの方が良いです。 i<=countでfloatとint型の比較になりますので、計算速度で損します。それと有効数字はintの方が大きいですので。 ところで外側のforは何回くらい回るのでしょうか? 仮に1000回とすると、openMPにした時に1000回threadが作られます。threadはCPUのcore数しか同時に実行できませんので多すぎるのはかえって速度低下になります。schedule 指示節の有る無しで比較するのもおもしろいと思います。

asuka_1026
質問者

補足

for(i=0; indiv1; i++){ indiv[i]=indiv1; indiv1=indiv1->next_indiv; count++; } でindiv1が作られた(for文が回った)回数だけの値を 2重for文のところでまわしたかったのですがそれがだめだったのですね・・・ i<=countをどうすればいいのか・・・ 外側のfor文は少なくとも3000回程度回ると思います。 for文の回数が多いので並列にできればと思っていたのですが かえって逆効果だったのですか。。。 ひとつ学べてよかったです!! openMPで並列ができたらschedule 指示節を試してみて結果を比べてみたいとおもいます!! weight( distance(indiv1, indiv2) ) の中身なのですが double weight(double x){ double tmp; if(x <= SD_INTERACTION) tmp = 1; else tmp = 0; return tmp; } double distance(INDIVIDUAL *indiv1, INDIVIDUAL *indiv2){ double distx, disty; distx = fabs(indiv1->x - indiv2->x); disty = fabs(indiv1->y - indiv2->y); return (sqrt( distx*distx + disty*disty ) ); } ki073さん親切にありがとうございます!!!!

  • ki073
  • ベストアンサー率77% (491/634)
回答No.3

No.1の補足について まず外側のforのindiv1の値がどう変わっていくか見てみます。 indiv1=list->next_indiv; 最初の値 indiv1=indiv1->next_indiv; 二回目の値 indiv1=indiv1->next_indiv; 三回目の値 (略) の様にindiv1の値が変わっていきますよね。 それ例えばindiv[]という配列を用意しておき(No.1ではindiv0と書きましたが気が変わったのでindivにしました) indiv1=list->next_indiv; 最初の値 indv[0]=indiv1; indiv1=indiv1->next_indiv; 二回目の値 indv[1]=indiv1; indiv1=indiv1->next_indiv; 三回目の値 indv[2]=indiv1; の様にindvに代入をしておきます。これは質問欄の外側のforの前にやっておきます。ここではindivに代入する以外のことはやりません。 (indivの領域を予め確保しておく必要がありますが) その後で、 質問欄の外側のforに入ります。その時にはindiv1の値が全て決っていますので、 for(i=0; i<n ; i++){ sum = 0.0; for(j=0; j<n; j++){ sum += weight( distance(indiv[i], indiv[j]) ); } indiv[i]->local_density = sum; } のような感じになるように思いますが。 (indiv2もindiv1と同じ値をとりますよね) nはindivの有効なデータの大きさですが。

asuka_1026
質問者

補足

ki073さんありがとうございます!! 理解することができました♪ 時間が掛かってしまい申し訳ございません。・。 で、書いてみたのですが void calc_local_density(INDIVIDUAL *list){ INDIVIDUAL *indiv1, *indiv2, *indiv[9990000]; double sum; int i,j; float count; count=0; indiv1=list->next_indiv; for(i=0; indiv1; i++){ indiv[i]=indiv1; indiv1=indiv1->next_indiv; count++; } for(i=0; i<=count; i++){ sum = 0.0; for(j=0; j<=count; j++){ sum += weight( distance(indiv[i], indiv[j]) ); } indiv1->local_density = sum; } } 「セグメンテーション違反です」といわれてしまいました(~_~;) 配列がいけなかったのでしょうか??

  • ki073
  • ベストアンサー率77% (491/634)
回答No.2

No.1の修正です。 for(i; i<n ; i++){ は変ですね。 for(i=0; i<n ; i++){ に修正。 openMPとは関係なのですが、高速化という意味では、内側のforをベクトル化できればかなり速くなります。 ベクトル化するにはいくつか条件がありますが、 例を挙げると #include <stdio.h> #include <math.h> int main(){ int i, j; float x[1000], y[1000], sum; for (i = 0 ; i<1000 ; i++) { x[i] = i; y[i] = i; } sum=0.0; for (i = 0 ; i<1000 ; i++) { sum=sum+sqrtf(x[i]*x[i]+y[i]*y[i]); } printf("sum = %f\n",sum); } の場合には後ろのforがベクトル化されます。 1) ループの依存関係がないこと 2) ループ間で連続したメモリ領域をアクセスすること(x,yが連続してアクセスされる) その他などです。 要するにインテルCPUの場合はSSEを使えるコードを作れるようにすれば良いのですが。

  • ki073
  • ベストアンサー率77% (491/634)
回答No.1

まずは http://www.cc.kyushu-u.ac.jp/scp/system/library/OpenMP/OpenMP.html のOpenMP入門(1)と(2)を読んでみてください。 結構複雑な構造ですので、書かれているようにそのままでは依存関係がでてしまいます。 (1) まず、内側のforで並列化するか、外側のforで並列化するか考えてみます。 手順としては、外側のforで並列化が可能か?、それが駄目なら内側のforで並列化が可能か? それと並列化できたときに効果がでるのか(並列化にともなうオーバヘッドが効果を上回るか) 内側のforで並列化したときに本当に効果があるのか、profilerの結果から実際の実行時間をみて判断します。 あくまでも予感ですが、 sum += weight( distance(indiv1, indiv2) ); の文一回の実行時間は非常に短いと思いますので、外側で考えてみます。 for(; indiv1; indiv1=indiv1->next_indiv){ これはループを回るごとにindiv1に書き換えていますので、そのままでは駄目です。 しかし、indiv1=の値は、forに入る前に決められるように思うのですが、どうでしょうか? indiv0のような配列を作り、その中にforで使うindiv1の値を前もって全部入れとおく。 その後で、外側のforを実行する。 for(i; i<n ; i++){ indiv1=indiv0[i]; のような感じです。 それと indiv1->local_density = sum; のindiv1->local_densityには処理中に一回しか書き込まれないですよねえ。 もし複数回上書きされるのでしたら、実行順によって結果が変わってきますので考え直さないといけないのです。 (2) linkしている「4.3 共有変数とプライベート変数」 を見てください。 外側で並列化できそうですので、内側はreductionする必要がないです。プライベート変数に注意。 どこか大きな見逃しがあるかもしれませんが、ご参考に。

asuka_1026
質問者

補足

ki073さんありがとうございます。 indiv1,indiv2ともにポインタなのですが 全部いれておくとはどういうことでしょうか? for(i; i<n ; i++){ indiv1=indiv0[i]; } ポインタであるindiv1に配列のindiv0を代入していくのですか? またこのときのnとはどのようなものでしょうか? indiv1->local_densityは外側のfor文で1回まわることに一度しか使われてません。 ポインタを使っているので何か処理をしている間にlistがおかしくならないですかね・・・? よくわかっていないのでわけのわからないことを言っていましたら申し訳ございません。

関連するQ&A