sqlstore.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. package sqlstore
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/grafana/grafana/pkg/bus"
  13. "github.com/grafana/grafana/pkg/log"
  14. m "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/registry"
  16. "github.com/grafana/grafana/pkg/services/annotations"
  17. "github.com/grafana/grafana/pkg/services/cache"
  18. "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
  19. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
  20. "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
  21. "github.com/grafana/grafana/pkg/setting"
  22. "github.com/grafana/grafana/pkg/util"
  23. "github.com/go-sql-driver/mysql"
  24. "github.com/go-xorm/xorm"
  25. _ "github.com/grafana/grafana/pkg/tsdb/mssql"
  26. _ "github.com/lib/pq"
  27. sqlite3 "github.com/mattn/go-sqlite3"
  28. )
  29. var (
  30. x *xorm.Engine
  31. dialect migrator.Dialect
  32. sqlog log.Logger = log.New("sqlstore")
  33. )
  34. const ContextSessionName = "db-session"
  35. func init() {
  36. registry.Register(&registry.Descriptor{
  37. Name: "SqlStore",
  38. Instance: &SqlStore{},
  39. InitPriority: registry.High,
  40. })
  41. }
  42. type SqlStore struct {
  43. Cfg *setting.Cfg `inject:""`
  44. Bus bus.Bus `inject:""`
  45. CacheService *cache.CacheService `inject:""`
  46. dbCfg DatabaseConfig
  47. engine *xorm.Engine
  48. log log.Logger
  49. Dialect migrator.Dialect
  50. skipEnsureAdmin bool
  51. }
  52. // NewSession returns a new DBSession
  53. func (ss *SqlStore) NewSession() *DBSession {
  54. return &DBSession{Session: ss.engine.NewSession()}
  55. }
  56. // WithDbSession calls the callback with an session attached to the context.
  57. func (ss *SqlStore) WithDbSession(ctx context.Context, callback dbTransactionFunc) error {
  58. sess, err := startSession(ctx, ss.engine, false)
  59. if err != nil {
  60. return err
  61. }
  62. return callback(sess)
  63. }
  64. // WithTransactionalDbSession calls the callback with an session within a transaction
  65. func (ss *SqlStore) WithTransactionalDbSession(ctx context.Context, callback dbTransactionFunc) error {
  66. return ss.inTransactionWithRetryCtx(ctx, callback, 0)
  67. }
  68. func (ss *SqlStore) inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
  69. sess, err := startSession(ctx, ss.engine, true)
  70. if err != nil {
  71. return err
  72. }
  73. defer sess.Close()
  74. err = callback(sess)
  75. // special handling of database locked errors for sqlite, then we can retry 3 times
  76. if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
  77. if sqlError.Code == sqlite3.ErrLocked {
  78. sess.Rollback()
  79. time.Sleep(time.Millisecond * time.Duration(10))
  80. sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
  81. return ss.inTransactionWithRetryCtx(ctx, callback, retry+1)
  82. }
  83. }
  84. if err != nil {
  85. sess.Rollback()
  86. return err
  87. } else if err = sess.Commit(); err != nil {
  88. return err
  89. }
  90. if len(sess.events) > 0 {
  91. for _, e := range sess.events {
  92. if err = bus.Publish(e); err != nil {
  93. log.Error(3, "Failed to publish event after commit. error: %v", err)
  94. }
  95. }
  96. }
  97. return nil
  98. }
  99. func (ss *SqlStore) Init() error {
  100. ss.log = log.New("sqlstore")
  101. ss.readConfig()
  102. engine, err := ss.getEngine()
  103. if err != nil {
  104. return fmt.Errorf("Fail to connect to database: %v", err)
  105. }
  106. ss.engine = engine
  107. ss.Dialect = migrator.NewDialect(ss.engine)
  108. // temporarily still set global var
  109. x = engine
  110. dialect = ss.Dialect
  111. migrator := migrator.NewMigrator(x)
  112. migrations.AddMigrations(migrator)
  113. for _, descriptor := range registry.GetServices() {
  114. sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
  115. if ok {
  116. sc.AddMigration(migrator)
  117. }
  118. }
  119. if err := migrator.Start(); err != nil {
  120. return fmt.Errorf("Migration failed err: %v", err)
  121. }
  122. // Init repo instances
  123. annotations.SetRepository(&SqlAnnotationRepo{})
  124. ss.Bus.SetTransactionManager(ss)
  125. // Register handlers
  126. ss.addUserQueryAndCommandHandlers()
  127. // ensure admin user
  128. if ss.skipEnsureAdmin {
  129. return nil
  130. }
  131. return ss.ensureAdminUser()
  132. }
  133. func (ss *SqlStore) ensureAdminUser() error {
  134. systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
  135. err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
  136. err := bus.DispatchCtx(ctx, &systemUserCountQuery)
  137. if err != nil {
  138. return fmt.Errorf("Could not determine if admin user exists: %v", err)
  139. }
  140. if systemUserCountQuery.Result.Count > 0 {
  141. return nil
  142. }
  143. cmd := m.CreateUserCommand{}
  144. cmd.Login = setting.AdminUser
  145. cmd.Email = setting.AdminUser + "@localhost"
  146. cmd.Password = setting.AdminPassword
  147. cmd.IsAdmin = true
  148. if err := bus.DispatchCtx(ctx, &cmd); err != nil {
  149. return fmt.Errorf("Failed to create admin user: %v", err)
  150. }
  151. ss.log.Info("Created default admin", "user", setting.AdminUser)
  152. return nil
  153. })
  154. return err
  155. }
  156. func (ss *SqlStore) buildExtraConnectionString(sep rune) string {
  157. if ss.dbCfg.UrlQueryParams == nil {
  158. return ""
  159. }
  160. var sb strings.Builder
  161. for key, values := range ss.dbCfg.UrlQueryParams {
  162. for _, value := range values {
  163. sb.WriteRune(sep)
  164. sb.WriteString(key)
  165. sb.WriteRune('=')
  166. sb.WriteString(value)
  167. }
  168. }
  169. return sb.String()
  170. }
  171. func (ss *SqlStore) buildConnectionString() (string, error) {
  172. cnnstr := ss.dbCfg.ConnectionString
  173. // special case used by integration tests
  174. if cnnstr != "" {
  175. return cnnstr, nil
  176. }
  177. switch ss.dbCfg.Type {
  178. case migrator.MYSQL:
  179. protocol := "tcp"
  180. if strings.HasPrefix(ss.dbCfg.Host, "/") {
  181. protocol = "unix"
  182. }
  183. cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
  184. ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
  185. if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
  186. tlsCert, err := makeCert("custom", ss.dbCfg)
  187. if err != nil {
  188. return "", err
  189. }
  190. mysql.RegisterTLSConfig("custom", tlsCert)
  191. cnnstr += "&tls=custom"
  192. }
  193. cnnstr += ss.buildExtraConnectionString('&')
  194. case migrator.POSTGRES:
  195. host, port, err := util.SplitIpPort(ss.dbCfg.Host, "5432")
  196. if err != nil {
  197. return "", err
  198. }
  199. if ss.dbCfg.Pwd == "" {
  200. ss.dbCfg.Pwd = "''"
  201. }
  202. if ss.dbCfg.User == "" {
  203. ss.dbCfg.User = "''"
  204. }
  205. cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
  206. cnnstr += ss.buildExtraConnectionString(' ')
  207. case migrator.SQLITE:
  208. // special case for tests
  209. if !filepath.IsAbs(ss.dbCfg.Path) {
  210. ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
  211. }
  212. os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
  213. cnnstr += ss.buildExtraConnectionString('&')
  214. default:
  215. return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
  216. }
  217. return cnnstr, nil
  218. }
  219. func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
  220. connectionString, err := ss.buildConnectionString()
  221. if err != nil {
  222. return nil, err
  223. }
  224. sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
  225. engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
  226. if err != nil {
  227. return nil, err
  228. }
  229. engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
  230. engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
  231. engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
  232. // configure sql logging
  233. debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
  234. if !debugSql {
  235. engine.SetLogger(&xorm.DiscardLogger{})
  236. } else {
  237. engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
  238. engine.ShowSQL(true)
  239. engine.ShowExecTime(true)
  240. }
  241. return engine, nil
  242. }
  243. func (ss *SqlStore) readConfig() {
  244. sec := ss.Cfg.Raw.Section("database")
  245. cfgURL := sec.Key("url").String()
  246. if len(cfgURL) != 0 {
  247. dbURL, _ := url.Parse(cfgURL)
  248. ss.dbCfg.Type = dbURL.Scheme
  249. ss.dbCfg.Host = dbURL.Host
  250. pathSplit := strings.Split(dbURL.Path, "/")
  251. if len(pathSplit) > 1 {
  252. ss.dbCfg.Name = pathSplit[1]
  253. }
  254. userInfo := dbURL.User
  255. if userInfo != nil {
  256. ss.dbCfg.User = userInfo.Username()
  257. ss.dbCfg.Pwd, _ = userInfo.Password()
  258. }
  259. ss.dbCfg.UrlQueryParams = dbURL.Query()
  260. } else {
  261. ss.dbCfg.Type = sec.Key("type").String()
  262. ss.dbCfg.Host = sec.Key("host").String()
  263. ss.dbCfg.Name = sec.Key("name").String()
  264. ss.dbCfg.User = sec.Key("user").String()
  265. ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
  266. ss.dbCfg.Pwd = sec.Key("password").String()
  267. }
  268. ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
  269. ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
  270. ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
  271. ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
  272. ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
  273. ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
  274. ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
  275. ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
  276. ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
  277. ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
  278. }
  279. func InitTestDB(t *testing.T) *SqlStore {
  280. t.Helper()
  281. sqlstore := &SqlStore{}
  282. sqlstore.skipEnsureAdmin = true
  283. sqlstore.Bus = bus.New()
  284. sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
  285. dbType := migrator.SQLITE
  286. // environment variable present for test db?
  287. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  288. dbType = db
  289. }
  290. // set test db config
  291. sqlstore.Cfg = setting.NewCfg()
  292. sec, _ := sqlstore.Cfg.Raw.NewSection("database")
  293. sec.NewKey("type", dbType)
  294. switch dbType {
  295. case "mysql":
  296. sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
  297. case "postgres":
  298. sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
  299. default:
  300. sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
  301. }
  302. // need to get engine to clean db before we init
  303. engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
  304. if err != nil {
  305. t.Fatalf("Failed to init test database: %v", err)
  306. }
  307. sqlstore.Dialect = migrator.NewDialect(engine)
  308. // temp global var until we get rid of global vars
  309. dialect = sqlstore.Dialect
  310. if err := dialect.CleanDB(); err != nil {
  311. t.Fatalf("Failed to clean test db %v", err)
  312. }
  313. if err := sqlstore.Init(); err != nil {
  314. t.Fatalf("Failed to init test database: %v", err)
  315. }
  316. sqlstore.engine.DatabaseTZ = time.UTC
  317. sqlstore.engine.TZLocation = time.UTC
  318. return sqlstore
  319. }
  320. func IsTestDbMySql() bool {
  321. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  322. return db == migrator.MYSQL
  323. }
  324. return false
  325. }
  326. func IsTestDbPostgres() bool {
  327. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  328. return db == migrator.POSTGRES
  329. }
  330. return false
  331. }
  332. type DatabaseConfig struct {
  333. Type string
  334. Host string
  335. Name string
  336. User string
  337. Pwd string
  338. Path string
  339. SslMode string
  340. CaCertPath string
  341. ClientKeyPath string
  342. ClientCertPath string
  343. ServerCertName string
  344. ConnectionString string
  345. MaxOpenConn int
  346. MaxIdleConn int
  347. ConnMaxLifetime int
  348. CacheMode string
  349. UrlQueryParams map[string][]string
  350. }